spring-cloud-stream学习
1. 简单使用
安装
curl -o 'cashcard.zip' 'https://start.spring.io/starter.zip?type=gradle-project&language=java&dependencies=cloud-stream%2Ckafka&name=CashCard&groupId=example&artifactId=cashcard&description=CashCard+service+for+Family+Cash+Cards&packaging=jar&packageName=example.cashcard&javaVersion=17' && unzip -d 'cashcard' 'cashcard.zip'
[~] $ cd cashcard
[~/cashcard] $
Next, run the ./gradlew build
command:
[~/cashcard] $ ./gradlew build
[~/cashcard] $ cd build/libs/
[~/cashcard/build/libs] $ java -jar cashcard-0.0.1-SNAPSHOT.jar
测试例子
build.gradle
plugins {
id 'java'
id 'org.springframework.boot' version '3.2.5'
id 'io.spring.dependency-management' version '1.1.4'
}
group = 'example'
version = '0.0.1-SNAPSHOT'
java {
sourceCompatibility = '17'
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2023.0.1")
}
dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
test CashCardApplicationTests.java
package example.cashcard.stream;
import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import example.cashcard.service.DataSourceService;
import static org.mockito.BDDMockito.given;
import example.cashcard.domain.CashCard;
import example.cashcard.domain.Transaction;
import static org.assertj.core.api.Assertions.assertThat;
import org.springframework.messaging.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
@SpringBootTest
// Add this @Import statement
@Import(TestChannelBinderConfiguration.class)
class CashCardApplicationTests {
// Autowire a mock bean for the DataSourceService
@MockBean
private DataSourceService dataSourceService;
@Test
void basicCashCardSupplier1(@Autowired OutputDestination outputDestination) throws IOException {
// Configure the mocked DataSourceService
Transaction testTransaction = new Transaction(1L, new CashCard(123L, "sarah1", 1.00));
given(dataSourceService.getData()).willReturn(testTransaction);
// invoke the outputDestination and make sure it returned something
Message<byte[]> result = outputDestination.receive(5000, "approvalRequest-out-0");
assertThat(result).isNotNull();
// Deserialize the transaction and inspect it
ObjectMapper objectMapper = new ObjectMapper();
Transaction transaction = objectMapper.readValue(result.getPayload(), Transaction.class);
assertThat(transaction.id()).isEqualTo(1L);
assertThat(transaction.cashCard()).isEqualTo(testTransaction.cashCard());
}
}
程序 CashCardStream.java
package example.cashcard.stream;
import example.cashcard.domain.Transaction;
import example.cashcard.service.DataSourceService;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CashCardStream {
@Bean
public Supplier<Transaction> approvalRequest(DataSourceService dataSource) {
// add this function call
return () -> {
return dataSource.getData();
};
}
// Add this bean
@Bean
public DataSourceService dataSourceService() {
return new DataSourceService();
}
}
DataSourceService.java
package example.cashcard.service;
import example.cashcard.domain.CashCard;
import example.cashcard.domain.Transaction;
import java.util.Random;
public class DataSourceService {
public Transaction getData() {
CashCard cashCard = new CashCard(
new Random().nextLong(), // Random ID
"sarah1",
new Random().nextDouble(100.00) // Random Amount
);
return new Transaction(new Random().nextLong(), cashCard);
}
}
Transaction.java (record java14特性,16稳定版,只能构造函数添加数据,默认加了eq toString hash 构造函数,类是不可变的,字段值在构造后不能修改。)
package example.cashcard.domain;
public record Transaction(Long id, CashCard cashCard) {}
CashCard.java
package example.cashcard.domain;
public record CashCard(
Long id,
String owner,
Double amountRequestedForAuth
) {}
运行
[~/exercises] $ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0034eb13e35b apache/kafka:latest "/__cacert_entrypoin…" 15 minutes ago Up 15 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp kafka
[~/exercises] $ ./gradlew bootRun
...
20XX-XX-XXTXX:XX:XX.XXXZ INFO 2627 --- [CashCard] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.6.2
20XX-XX-XXTXX:XX:XX.XXXZ INFO 2627 --- [CashCard] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: c4deed513057c94e
20XX-XX-XXTXX:XX:XX.XXXZ INFO 2627 --- [CashCard] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1715638156031
20XX-XX-XXTXX:XX:XX.XXXZ INFO 2627 --- [CashCard] [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: Some(5L6g3nShT-eMCtK--X86sw)
20XX-XX-XXTXX:XX:XX.XXXZ INFO 2627 --- [CashCard] [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'CashCard.approvalRequest-out-0' has 1 subscriber(s).
20XX-XX-XXTXX:XX:XX.XXXZ INFO 2627 --- [CashCard] [ main] o.s.i.e.SourcePollingChannelAdapter : started bean 'approvalRequest-out-0_spca'
20XX-XX-XXTXX:XX:XX.XXXZ INFO 2627 --- [CashCard] [ main] example.cashcard.CashCardApplication : Started CashCardApplication in 1.624 seconds (process running for 1.812)
<==========---<==========---> 80% EXECUTING [59s]
> :bootRun
这个是1s打印一次
[~/exercises] $ docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic approvalRequest-out-0
...
{"id":-66561165137247368,"cashCard":{"id":6094323678418692169,"owner":"sarah1","amountRequestedForAuth":50.7668781314909}}
{"id":-5634650775918976902,"cashCard":{"id":3090043269501770643,"owner":"sarah1","amountRequestedForAuth":63.79583001467617}}
{"id":-14741824561737749,"cashCard":{"id":-4335146560811993412,"owner":"sarah1","amountRequestedForAuth":12.783311898916805}}
{"id":5558799879234294766,"cashCard":{"id":-4613724913650180843,"owner":"sarah1","amountRequestedForAuth":58.932051104126955}}
{"id":6868476944436589763,"cashCard":{"id":8526417364307544245,"owner":"sarah1","amountRequestedForAuth":46.38569473593444}}
...
停止程序执行5s打印一次输出
[~/exercises] $ ./gradlew bootRun --args="--spring.integration.poller.fixed-delay=5000"
2. 使用其他中间件
[~/exercises] $ docker ps --format 'table {{.Names}}\t{{.ID}}\t{{.Image}}\t{{.Status}}'
NAMES CONTAINER ID IMAGE STATUS
kafka fde1261e8eab apache/kafka:latest Up 8 minutes
rabbitmq 27bce655e84d rabbitmq:management Up 8 minutes
[~/exercises] $ docker stop kafka
程序等会儿自动停止
...
Generating Transaction: Transaction[id=2175805216802058711, cashCard=CashCard[id=7436539443625164261, owner=sarah1, amountRequestedForAuth=2.112304484447358]]
Generating Transaction: Transaction[id=7686026790369954447, cashCard=CashCard[id=-9056617961033348083, owner=sarah1, amountRequestedForAuth=41.2613014355566]]
20XX-XX-XXTXX:XX:XX.XXXZ INFO 18618 --- [CashCard] [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node 1 disconnected.
20XX-XX-XXTXX:XX:XX.XXXZ INFO 18618 --- [CashCard] [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
20XX-XX-XXTXX:XX:XX.XXXZ INFO 18618 --- [CashCard] [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node 1 disconnected.
20
[~/exercises] $ docker ps --format 'table {{.Names}}\t{{.ID}}\t{{.Image}}\t{{.Status}}'
NAMES CONTAINER ID IMAGE STATUS
rabbitmq 27bce655e84d rabbitmq:management Up 10 minutes
替换为rabbit
dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
implementation 'org.springframework.cloud:spring-cloud-stream'
// implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
// implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
// testImplementation 'org.springframework.kafka:spring-kafka-test'
}
重启,发现每秒10条消息
[~/exercises] $ ./gradlew bootRun --args="--spring.integration.poller.fixed-delay=100"
...
Generating Transaction: Transaction[id=-2719852727681158000, cashCard=CashCard[id=-6079371198316955842, owner=sarah1, amountRequestedForAuth=36.18919281848337]]
Generating Transaction: Transaction[id=-5607247743473931121, cashCard=CashCard[id=6889019574148834540, owner=sarah1, amountRequestedForAuth=0.7966132708258855]]
Generating Transaction: Transaction[id=8789939425625482805, cashCard=CashCard[id=6270568951233307661, owner=sarah1, amountRequestedForAuth=33.58135053399174]]
<==========---> 80% EXECUTING [8s]
> :bootRun
创建个队列
[~/exercises] $ docker exec -it rabbitmq sh -c "rabbitmqadmin declare queue name=test-queue && rabbitmqadmin declare binding source=approvalRequest-out-0 destination=test-queue routing_key=#"
使用web
dependencies {
...
implementation 'org.springframework.boot:spring-boot-starter-web'
...
}
@PostMapping(path = "/publish/txn")
public void publishTxn(@RequestBody Transaction transaction) {
System.out.println("POST for Transaction: " + transaction);
}
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class CashCardControllerTests {
...
@Autowired
private TestRestTemplate restTemplate;
@Test
void cashCardStreamBridge() throws IOException {
Transaction transaction = new Transaction(1L, new CashCard(123L, "kumar2", 1.00));
ResponseEntity<Transaction> response = this.restTemplate.postForEntity(
"http://localhost:" + port + "/publish/txn",
transaction, Transaction.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
}
...
}
[~/exercises] $ ./gradlew test
...
BUILD SUCCESSFUL in 10s
4 actionable tasks: 4 executed
运行
[~/exercises] $ ./gradlew bootRun
...
20XX-XX-XXTXX:XX:XX.XXXZ INFO 14453 --- [CashCard] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path ''
20XX-XX-XXTXX:XX:XX.XXXZ INFO 14453 --- [CashCard] [ main] example.cashcard.CashCardApplication : Started CashCardApplication in 2.651 seconds (process running for 2.934)
<==========---<==========---> 80% EXECUTING [59s]
> :bootRun
测试
[~/exercises] $ curl -d '{
"id" : 100,
"cashCard" : {
"id" : 209,
"owner" : "kumar2",
"amountRequestedForAuth" : 200.0
}
}' -H "Content-Type: application/json" -X POST http://localhost:8080/publish/txn
CashCardStream.java添加
package example.cashcard.stream;
import example.cashcard.service.DataSourceService;
import example.cashcard.domain.Transaction;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.cloud.stream.function.StreamBridge;
@Configuration
public class CashCardStream {
//add
private final StreamBridge streamBridge;
//add
public CashCardStream(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
//add
public void publishOnDemand(Transaction transaction) {
this.streamBridge.send("approvalRequest-out-0", transaction);
}
@Bean
public Supplier<Transaction> approvalRequest(DataSourceService dataSource) {
return () -> {
return dataSource.getData();
};
}
@Bean
public DataSourceService dataSourceService() {
return new DataSourceService();
}
}
CashCardController.java
package example.cashcard.controller;
import example.cashcard.domain.Transaction;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import example.cashcard.stream.CashCardStream;
@RestController
public class CashCardController {
@PostMapping(path = "/publish/txn")
public void publishTxn(@RequestBody Transaction transaction) {
//add
this.cashCardStream.publishOnDemand(transaction);
System.out.println("POST for Transaction: " + transaction);
}
//add
private CashCardStream cashCardStream;
public CashCardController(CashCardStream cashCardStream) {
this.cashCardStream = cashCardStream;
}
}
测试类改为
package example.cashcard.controller;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import example.cashcard.domain.CashCard;
import example.cashcard.domain.Transaction;
import example.cashcard.stream.CashCardStream;
//add
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Import({ TestChannelBinderConfiguration.class, CashCardStream.class })
public class CashCardControllerTests {
@LocalServerPort
private int port;
@Autowired
private TestRestTemplate restTemplate;
//add
@Test
void postShouldSendTransactionAsAMessage(@Autowired OutputDestination outputDestination) throws IOException {
Transaction postedTransaction = new Transaction(123L, new CashCard(1L, "Foo Bar", 1.00));
this.restTemplate.postForEntity("http://localhost:" + port + "/publish/txn", postedTransaction, Transaction.class);
//add
Message<byte[]> result = outputDestination.receive(5000, "approvalRequest-out-0");
assertThat(result).isNotNull();
ObjectMapper objectMapper = new ObjectMapper();
Transaction transactionFromMessage = objectMapper.readValue(result.getPayload(), Transaction.class);
assertThat(transactionFromMessage.id()).isEqualTo(postedTransaction.id());
}
@SpringBootApplication
public static class App {
}
}
测试失败了
[~/exercises] $ ./gradlew test --info
... lots and lots of output ...
CashCardControllerTests > cashCardStreamBridge(OutputDestination) FAILED
org.opentest4j.AssertionFailedError:
expected: 123L
but was: -69970613332339299L
因为DataSourceService.java的getData产生了随机数,而着个都发送到一个交换机了,读取的时候读到了随机的那个。
重构下,创建CashCardTransactionOnDemand.java
package example.cashcard.ondemand;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Configuration;
import example.cashcard.domain.Transaction;
@Configuration
public class CashCardTransactionOnDemand {
private final StreamBridge streamBridge;
public CashCardTransactionOnDemand(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
public void publishOnDemand(Transaction transaction) {
this.streamBridge.send("approvalRequest-out-0", transaction);
}
}
改写CashCardStream.java
package example.cashcard.stream;
import example.cashcard.service.DataSourceService;
import example.cashcard.domain.Transaction;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CashCardStream {
@Bean
public Supplier<Transaction> approvalRequest(DataSourceService dataSource) {
return () -> {
return dataSource.getData();
};
}
@Bean
public DataSourceService dataSourceService() {
return new DataSourceService();
}
}
修改CashCardController.java
package example.cashcard.controller;
...
import example.cashcard.ondemand.CashCardTransactionOnDemand;
...
@RestController
public class CashCardController {
private final CashCardTransactionOnDemand cashCardTransactionOnDemand;
public CashCardController(@Autowired CashCardTransactionOnDemand cashCardTransactionOnDemand) {
this.cashCardTransactionOnDemand = cashCardTransactionOnDemand;
}
@PostMapping(path = "/publish/txn")
public void publishTxn(@RequestBody Transaction transaction) {
this.cashCardTransactionOnDemand.publishOnDemand(transaction);
}
}
修改CashCardControllerTests.java
// update the two references from CashCardTransactionStream to CashCardTransactionOnDemand
...
import example.cashcard.ondemand.CashCardTransactionOnDemand;
...
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Import({TestChannelBinderConfiguration.class, CashCardTransactionOnDemand.class})
class CashCardControllerTests {
...
}
测试
[~/exercises] $ ./gradlew test
重写运行
./gradlew bootRun --args="--spring.integration.poller.fixed-delay=5000"
使用kafka验证
[~/exercises] $ docker exec -it kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic approvalRequest-out-0
...
{"id":-66561165137247368,"cashCard":{"id":6094323678418692169,"owner":"sarah1","amountRequestedForAuth":50.7668781314909}}
{"id":-5634650775918976902,"cashCard":{"id":3090043269501770643,"owner":"sarah1","amountRequestedForAuth":63.79583001467617}}
...
使用post测试
[~/exercises] $ curl -d '{
"id" : 100,
"cashCard" : {
"id" : 209,
"owner" : "kumar2",
"amountRequestedForAuth" : 200.0
}
}' -H "Content-Type: application/json" -X POST http://localhost:8080/publish/txn
你会看到post事务和自动生成的事务一起被立即推送到Kafka中。
...
{"id":-5884276558030210833,"cashCard":{"id":-2255250790195745652,"owner":"sarah1","amountRequestedForAuth":90.69339964569193}}
{"id":5266952642528070044,"cashCard":{"id":-5780440342191504864,"owner":"sarah1","amountRequestedForAuth":21.80623845666293}}
{"id":100,"cashCard":{"id":209,"owner":"kumar2","amountRequestedForAuth":200.0}}
{"id":100,"cashCard":{"id":209,"owner":"kumar2","amountRequestedForAuth":200.0}}
{"id":100,"cashCard":{"id":209,"owner":"kumar2","amountRequestedForAuth":200.0}}
{"id":-4042360335249930640,"cashCard":{"id":-8391359726931016778,"owner":"sarah1","amountRequestedForAuth":85.5669171882778}}
测试例子
1. Source发送数据
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置 application.yml
spring:
application:
name: source-app
cloud:
stream:
bindings:
output:
destination: raw-data
content-type: application/json
server:
port: 8081
创建 Source 应用
SourceApplication.java
@SpringBootApplication
public class SourceApplication {
public static void main(String[] args) {
SpringApplication.run(SourceApplication.class, args);
}
}
MessageSource.java
@Component
public class MessageSource {
@StreamEmitter
public void emit() {
Map<String, Object> message = new HashMap<>();
message.put("orderId", "12345");
message.put("amount", 100);
// 发送消息到 raw-data 队列
Source.output().send(MessageBuilder.withPayload(message).build());
}
}
2. 实现 Processor(处理数据)
添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置 application.yml
spring:
application:
name: processor-app
cloud:
stream:
bindings:
input:
destination: raw-data
content-type: application/json
output:
destination: enriched-data
content-type: application/json
server:
port: 8082
创建 Processor 应用
ProcessorApplication.java
@SpringBootApplication
public class ProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
}
DataProcessor.java
@Component
public class DataProcessor {
@Bean
public Function<Map<String, Object>, Map<String, Object>> enrich() {
return input -> {
input.put("processedTime", Instant.now().toString());
input.put("enriched", true);
return input;
};
}
}
3.实现 Sink(消费数据)
添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置 application.yml
spring:
application:
name: sink-app
cloud:
stream:
bindings:
input:
destination: enriched-data
content-type: application/json
server:
port: 8083
创建 Sink 应用
SinkApplication.java
@SpringBootApplication
public class SinkApplication {
public static void main(String[] args) {
SpringApplication.run(SinkApplication.class, args);
}
}
DataSink.java
@Component
public class DataSink {
@Bean
public Consumer<Map<String, Object>> logSink() {
return data -> {
System.out.println("接收到的数据已落地处理:" + data);
};
}
}
启动步骤
- 启动 Source 应用: 运行
SourceApplication
,它会定时向raw-data
队列发送消息。 - 启动 Processor 应用: 运行
ProcessorApplication
,它会从raw-data
队列接收消息,进行处理,并将增强后的数据发送到enriched-data
队列。 - 启动 Sink 应用: 运行
SinkApplication
,它会从enriched-data
队列接收消息,并打印或处理这些消息。
配置与依赖注入
- Source 组件:负责生成消息并发送到 RabbitMQ 队列。
- Processor 组件:接收消息并进行数据处理(如添加字段、数据转换等)。
- Sink 组件:消费处理后的数据并落地(如打印、数据库存储等)。
每个应用都会使用 RabbitMQ 队列作为通信桥梁,raw-data
队列连接 Source 和 Processor,enriched-data
队列连接 Processor 和 Sink。
总结
- 你已经成功将 Source、Processor 和 Sink 拆分成三个独立的 Spring Boot 应用,它们通过 RabbitMQ 队列进行通信。
- 每个应用专注于单一功能(生产、处理、消费),符合微服务架构的设计原则。
- 你可以根据业务需求扩展各个服务并独立部署、测试和维护。