SpringCloud Stream消息驱动
1、SpringCloud Stream概述
官方地址:https://spring.io/projects/spring-cloud-stream#overview
中文指导手册地址:https://m.wang1314.com/doc/webapp/topic/20971999.html
什么是SpringCloudStream官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka。
1.1、设计思想
1、标注的MQ流程
生产者/消费者之间靠消息媒介传递信息内容【massage】
消息必须走特定的通道【消息通道MessageChannel】
消息通道里的消息如何被消费呢,谁负责收发处理
消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅
2、Cloud Stream的作用
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
3、什么是Binder
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
4、Stream中的消息通信方式遵循了发布-订阅模式
使用Topic主题进行广播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic
1.2、标准的流程套路
1、Binder:很方便的连接中间件,屏蔽不同的差异
2、Channel
通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
3、Source和Sink
简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
1.3、编码API和常用注解
组成和注解 | 描述 |
---|---|
Middleware | 中间件,目前只支持RabbitM和Kafka |
Binder | Binder是应用与消息中间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过通道离开应用程序 |
@StreamListener | 监听队列,用户消费者的队列的消息接收 |
@EnableBinding | 指通道channel和exchange绑定在一起 |
RabbitMQ环境必须已经OK,可正常连接使用,否则后面的步骤都实现不了
2、消息驱动之生产者(output)
2.1、新建模块cloud-stream-rabbitmq-provider8801
2.2、引入pom.xml配置文件
如果是需要Stream整合的就将依赖改为
spring-cloud-starter-stream-kafka
<dependencies>
<!--stream整合rabbit依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.3、YAML配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称,消息生产者
destination: studyExchange # 表示要使用的Exchange名称定义【自定义】
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置【上面的配置】
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
2.4、生产者启动类
package com.zcl.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 描述:消息生产者启动类
*
* @author zhong
* @date 2022-09-22 12:19
*/
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
2.5、业务实现
2.5.1、服务接口实现类
自己创建一个实现的接口以及里面的方法
注意:在这个服务实现类里面不是使用
@Service
注解了,因为不是web应用,而是Stream消息驱动,是与中间件进行打交道的不是与数据库
package com.zcl.springcloud.service.Impl;
import com.zcl.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
/**
* 描述:发送接口实现类
* 必须使用@EnableBinding(Source.class)注解开启消息推送管道
*
* @author zhong
* @date 2022-09-22 12:24
*/
@Slf4j
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {
/**
* 消息发送管道
*/
@Resource
private MessageChannel output;
/**
* 发送消息
* @return
*/
@Override
public String send() {
// 定义消息
String serial = UUID.randomUUID().toString();
// 构建并发送消息
this.output.send(MessageBuilder.withPayload(serial).build());
log.info("-------------- " + serial + " ----------------");
return serial;
}
}
2.5.2、控制器实现
package com.zcl.springcloud.controller;
import com.zcl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 描述:消息发送控制器
*
* @author zhong
* @date 2022-09-22 12:37
*/
@RestController
public class SendMessageController {
/**
* 注入消息发送管道接口
*/
@Resource
private IMessageProvider messageProvider;
/**
* 每调用一次接口发送一次消息
* @return
*/
@GetMapping(value = "/sendMessage")
public String sendMessage()
{
return messageProvider.send();
}
}
2.6、启动测试
启动7001Eureka访问中心
启动8801消息发送者,启动成功以及观察RabbitMQ的管理界面
访问接口发送消息,查看MQ的管理页面波峰情况
3、消息驱动之消费者(input)
同样的参考如下流程图
3.1、新建cloud-stream-rabbitmq-consumer8802
模块
3.2、引入pom.xml依赖
与8801一样
3.3、添加YAML配置文件
配置文件与消息生产的区别在于:
output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
3.4、添加启动类StreamMQMain8802
与消息生产者一样
3.5、业务实现
必须要有
@Component
注解注入到Spring容器中
package com.zcl.springcloud.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* 描述:消息消费者控制器
*
* @author zhong
* @date 2022-09-22 13:18
*/
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
/**
* 注入消费者的端口号
*/
@Value("${server.port}")
private String port;
/**
* 监听消息
* @param message
* @return
*/
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
log.info("消费者1号接收到的消息 ----- " + message.getPayload() + " -----,port: " + port);
}
}
3.6、启动项目测试
启动7001
启动8801,消息发送者
启动8802,消息消费者
8801发送消息,8802消费消息,并查看具体的MQ波峰图
控制器输出
4、分组消费与持久化
4.1、完整参考cloud-stream-rabbitmq-consumer8802,创建8803项目
除了启动的端口号不一样之外其他的配置都一样
4.2、启动项目发现问题
- 启动7001(Eureka服务中心)
- 启动8801(生产)、8802(消费)、8803(消费)
- 测试发送消失是否两个消费者都可以接收到
4.2.1、重复消费
目前是8802/8803同时都收到了,存在重复消费问题
解决方案:分组和持久化属性group
常见案例
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。
4.2.2、分组
自定义配置分组,自定义分为同一个组,解决重复消费问题
配置文件分组
分别给8801、8802进行分组【orderA】
重启项目查看MQ管理
orderB是历史记录,上面的配置以及都分为了
ordeerA
组,进入orderA组可以查看实际的消费者数量同一组内会发生竞争关系,只有其中一个可以消费,启动项目测试是否为真
4.2.3、持久化
通过上述,解决了重复消费问题,再看看持久化
停止8802/8803并去除掉8802的分组group: atguiguA,8803保留
8801先发送7条消息到rabbitmq
先启动8802,无分组属性配置,后台没有打出来消息
8802因为取消了
groupA
的分组所以获取不到持久化的数据(如果重启mq也会消失)再启动8803,有分组属性配置,后台打出来了MQ上的消息
8803保存
groupA
的分组所以在启动的时候就会将持久化的数据消费