分布式
内容管理
SpringBoot 整合 RabbitMQ,消息模型,确认消费
RabbitMQ在模块解耦,接口限流、异步通信等方面发挥重要作用,在成熟的RabbitMQ之前,项目一般采用Spring的事件驱动模型来进行异步通信
缓存穿透 除了缓存null之外 还可以在之前加上布隆过滤器,也就是hash到一个大bitmap,查询先过滤看是否存在key,存在再查询缓存
并且之前的Cache只是表面的知识,虽然加上分布式🔒可以抗住高并发,但是一旦缓存失效,需要准备高可用的解决方案, 并且必须使用mq限流,因为应用层容器Tomcagt的吞吐量有限,必须保证同时到达的流量不查过Tomcat集群的上限
SpringBoot 整合RabbitMQ
RabbitMQ是一款用于接收、存储和转发消息的开源中间件,其核心在于消息、消息模型、生产和消费者,在整合之前,先介绍生产者、消费者、消息、队列、交换机、路由等基本组件
RabbitMQ可以用邮局来类比【或者STOMP的聊天】, 邮局的核心包括邮件、邮寄箱、寄邮件用户、收邮件用户、邮递员
投递用户A --投递---> 邮件 ---> 邮件箱
|_取---> 邮递员M --> 邮件 -交--> 收件人B
STOMP聊天服务(私聊频道)
用户A ---发送---> 消息 ----> chat消息频道
|__ broker代理--> 用户频道 --> 用户B
投递用户A就是RabbitMQ产生消息的生产者,邮件就相当于消息,接收邮件用户B相当于RabbitMQ接收消息的消费者,邮件箱 就是RabbitMQ的交换机, 邮递员就是 模型中的队列
生产者 Producer
:- 用于产生、发送消息的程序, 向消息队列发布消息
消费者 Consumer
:- 用于监听、接收、消费和处理消息的程序, 从消息队列获取消息
消息 Message
:- 消息由消息头和消息体组成。消息体是不透明的,消息头由一系列可选属性【routing-key,priority优先权、delivery-mode 是否持久性存储】
- 可以是实际的数据,比如文字、图片等,在RabbitMQ的底层架构中,消息通过二进制的数据流进行传输
Channel 信道(频道)
:- 多路复用连接中的一条独立的双向数据流通道,信道建立在真实的TCP内的虚拟连接,复用TCP连接的通道
队列 Queue
:- 存储消息的一种数据结构,保存消息,是消息的容器,也是消息的终点
- 消息的暂存区或存储区,可以看作中转站,消息经过队列传输到消费者手里
- 一个消息可以投入一个或者多个队列,一直在Queue中,等待Consumer连接Queue将消息取走
- 多个消费者同时订阅一个Queue,Queue中的Message平均分摊给多个消费者,不是每个消费者都收到所有的消息并处理
- 每一个消息只能被一个订阅者接收
交换机 Exchange 路由器
:- 消息的中转站、路由器,提供Producer到Queue之间的路由匹配,接收Procuder的消息,将消息按照路由规则转发到消息队列,路由器只是转发消息,不会存储消息,如果没有Queue半岛到Exchange,会丢弃Producer的Message,用于接收和分发消息
- 其有四种消息调度策略: Headers,Fanout(广播)、Direct、Topic
路由键 Routing Key
:- 消息头的一个属性,标记消息的路由规则,,一般不单独使用,而是和交换机绑定在一起,决定消息路由到指定的队列,最大长度255字节
Binding 绑定
:- 用于建立Exchange和Queue之间的关联, 一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则, 交换器理解就是一个由Binding构成的路由表
Binding Key 绑定键
:- Exchange和Queue的绑定关系,匹配Routing Key, 最大长度255字节
Broker就是RabbitMQ的部署的服务器,实现消息的通信代理
RabbitMQ的消息模型主要有队列、交换机、路由组成
——-->交换机-|
生产者 --产生---> 消息 ---| ---消息-->队列 ---监听接收---> 消费者
——-> 路由 --|
整合过程
- 要在项目中使用RabbitMQ,首先就是要引入相关的Starter,在server模块加入, 因为RabbitMQ是基于AMQP协议的,所以springboot是直接抽象的amqp起步依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置RabbitMQ 【yml】,分布式架构下,RabbitMQ也是单独部署的,可以搭建集群,所以需要在项目中像redis一样指定RabbitMQ服务器所在的Host、端口号、用户名、密码
#rabbit配置
rabbitmq:
virtual-host: /
host: 192.168.204.100
port: 5672
username: cfeng
password: a123456sgssdhhhs7890bfagds
publisher-confirm-type: correlated #发送消息之后进行确认 发送到交换机触发回调 publisher-confirm 废弃
publisher-returns: true #发送消息后返回确认信息
因为SpirngBoot的自动配置【可以参见之前的blog和封装minio-starter】,快速使用rabbitMQ【 springBoot、Docker】让使用一个服务变得非常简单
- 自定义配置Bean , 配置文件
上面的简易配置只能够使用默认的简易功能,为了自定义使用Bean,创建RabbitmqConfig配置文件
除了@Value(${})可以调取yml的配置项之外,还可以直接使用Environment对象的getProperty获取yml中的所有的配置项 非常方便,注入Environment对象即可
* rabbitmq自定义配置文件,需要配置单一消费者工厂,多消费者工厂、消息发送Template对象
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableRabbit //开启rabbitmq的注解方式
public class RabbitmqConfig {
//自动装配的连接工厂,连接到rabbitMQ服务器
private final CachingConnectionFactory connectionFactory;
//自动装配的简单Rabbit消息监听器所在容器工厂的配置对象
private final SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
//方便快捷代替配置文件,Environment可以方便获取yml文件中的配置项
private final Environment environment;
/**
* 单一消费者实例监听器容器工厂
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
//消费者监听器所在容器工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置连接的实例
factory.setConnectionFactory(connectionFactory);
//设置消息的阐述格式,JSON
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//设置并发消费者实例最大数量(Security的并发最大)
factory.setMaxConcurrentConsumers(1);
//设置并发消费者实例每个实例拉取的消息数量 1
factory.setPrefetchCount(1);
return factory;
}
/**
* 多消费者实例工厂,针对高并发的业务场景
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置连接工厂,可以使用configure对象配置
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//消息的确认消费模式,先设置为NONE,表示不需要确认消费
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
//高并发并发消费者实例最大数量 初始状态
factory.setConcurrentConsumers(10);
//并发的最大数量
factory.setMaxConcurrentConsumers(15);
//并发消费者实例每个实例拉取的消息数量
factory.setPrefetchCount(10);
return factory;
}
/**
* 配置RabbitMQ发送消息组件RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate() {
//设置发送消息后进行确认, connectionFactory的配置都直接在配置文件配置
// connectionFactory.setPublisherConfirms(true);
// connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
RabbitTemplate rabbitTemplate = new RabbitTemplate();
//生产者通过调用channel.addReturnListener()方法来添加ReturnListener监听器,实现获取没有被正确路由到合适队列的消息
rabbitTemplate.setMandatory(true);
//发消息如果成功,给出反馈信息
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s);
}
});
//发送消息失败,给出反馈
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message({})",returnedMessage.getExchange(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getMessage());
}
});
return rabbitTemplate;
}
当rabbitMQ处理高并发业务场景,使用多消费者实例,正常情况即不需要并发监听消费处理时,只需要配置单一消费者实例的容器工厂
配置完成就可以开始使用,使用rabbitMQ完成生产者发送一串简单的字符串信息到基本的消息模型,由消费者进行监听消费处理
继续在RabbitmqConfig中配置创建队列、交换机、路由及其绑定,也就是创建Queue、基础的DirectExchange、Binding对象
//方便快捷代替配置文件,Environment可以方便获取yml文件中的配置项
private final Environment environment;
/**
* 配置简单的消息模型 :队列、路由、交换机
*/
@Bean(name = "basicQueue")
public Queue basicQueue() {
return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-name")),true);
}
@Bean
public DirectExchange basicExchange() {
return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-name"),true,false);
}
@Bean
public Binding basicBinding() {
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(environment.getProperty("spring.rabbitmq.route-name"));
}
之后编写消息的生产者BasicPulisher,生产者发送消息
* 基础模型的生产者
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class BasicPublisher {
private final ObjectMapper objectMapper;
private final RabbitTemplate rabbitTemplate;
//配置项获取
private final Environment environment;
/**
* 发送消息
*/
public void sendMsg(String message) {
//首先要判断消息是否为空
if(!Strings.isNullOrEmpty(message)) {
try {
//传输数据格式JSON
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//指定消息模型的交换机
rabbitTemplate.setExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-name")));
//指定消息模型的路由
rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-name")));
//消息以二进制字符流传输 , 可以使用writeValueAsBytes,不然转换失败
Message msg = MessageBuilder.withBody(message.getBytes("utf-8")).build();
//转化发送消息
rabbitTemplate.convertAndSend(msg);
log.info("基本模型: 生产者发送消息: {}",message);
} catch (Exception e) {
log.error("基本消息模型 生产者 发送消息发生异常: {}", e.fillInStackTrace());
}
}
}
}
这里的消息发送依赖的就是RabbitTemplate, 之前STOMP服务消息发送依赖的是SimpleMessageOpreations, 也就是一个messageTemplate
之后再简单创建一个基础的消费者实例BasicConsumer
* rabbitMQ基础模型 消费者,监听接收消息
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class BasicConsumer {
private final ObjectMapper objectMapper;
/**
* 监听接收消费队列中的消息, 这里采用单一容器工厂
* @param msg 传输的二进制消息,这里的@PayLoad和之前的STOMP的是类似的就是将消息转化为给定的类型
* 这里使用@RabbitListener 类似之前STOMP的@MessageMapping类似
*/
@RabbitListener(queues = "${spring.rabbitmq.queue-name}",containerFactory = "singleListenerContainer")
public void consumeMsg(@Payload byte[] msg) {
try {
String message = new String(msg,"utf-8");
log.info("基本消费模型 消费者:监听到消息: {}", message);
} catch (Exception e) {
log.error("基本消息模型-消费者 发生异常 : {}",e.fillInStackTrace());
}
}
}
这样也就创建好了基本的生产消费模型,和之前的SpringBoot的事件驱动模型类似,这里编写测试类调用生产者发送消息【主要依靠的是RabbitmqTemplate的convertAndSend】,消费者位置监听器需要指定队列和监听器的容器工厂
@Test
public void testProduceMessage() {
basicPublisher.sendMsg("kiss");
}
之后测试发送,容易出现的问题就是convert失败,所以这里需要注意生产者写入消息的方式,可以直接采用ObjectMapper的writeValueAsBytes
2022-09-16 09:53:04.392 INFO 4428 --- [ main] c.s.rabbitmq.producer.BasicPublisher : 基本模型: 生产者发送消息: kiss
2022-09-16 09:53:04.397 INFO 4428 --- [nectionFactory1] c.server.config.RabbitmqConfig : 消息发送成功:correlationData(null),ack(true),cause(null)
2022-09-16 09:53:04.421 INFO 4428 --- [ntContainer#0-1] c.s.rabbitmq.consumer.BasicConsumer : 基本消费模型 消费者:监听到消息: �+,
2022-09-16 09:53:04.427 INFO 4428 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2022-09-16 09:53:05.431 INFO 4428 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
可以生产者生产消息之后,消费者成功监听,监听线程也是单独的【main线程发送消息】
BasicProducer ---Msg--> 交换机Exchange
|__ Binding -- 队列BasicQueue <---listen--BasicConsumer
RabbitMQ除了可以简单发送字节型(通过getBytes方法或者序列化方法)的消息和采用@RabbitListener监听字节数组类型消息之外,还可以通过发送、接收 对象类型的方式实现消息的发送和接收,下面也简单演示一下
建立一个测试的对象类型用于传输,因为要网络传输,所以需要序列化
* 测试rabbitMQ以对象作为消息类型,不简单为String
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Student {
private Integer id;
private String realName;
private String username;
}
再单独配置对象类型的队列、交换机和路由
##自定义配置对象类型队列、交换机、路由名称
queue-object-name: middleware.mq.object.info.queue
exchange-object-name: middleware.mq.object.info.exchange
route-object-name: middleware.mq.object.info.route
/**
* 配置简单类型的消息模型: 对象类型 : 队列、路由、交换机
*/
@Bean(name = "objectQueue")
public Queue objectQueue() {
return new Queue(environment.getProperty("spring.rabbitmq.queue-object-name"));
}
@Bean
public DirectExchange objectExchange() {
return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"),true,false);
}
@Bean
public Binding objectBinding() {
return BindingBuilder.bind(objectQueue()).to(objectExchange()).with(environment.getProperty("route-object-name"));
}
之后就是再同样建立对象之间的消息传播的基本模型: 消费者、生产者、消息
public void sendObjectMsg(Student student) {
//首先选哟判断是否不为null
if(!Objects.isNull(student)) {
try {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//消息的交换机和binding
rabbitTemplate.setExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"));
rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-object-name")));
//发送消息
rabbitTemplate.convertAndSend(student, message -> {
//获取消息属性
MessageProperties properties = message.getMessageProperties();
//持久化模式,PERSISTENT
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//设置消息的类型,消息头设置AbstractJavaTypeMapper的默认的类型
properties.setHeader(AbstractJavaTypeMapper.DEFAULT_KEY_CLASSID_FIELD_NAME,Student.class);
//返回配置后的消息
return message;
});
log.info("基本消息模型-生产者-发送对象类型消息: {}",student.toString());
} catch (Exception e) {
log.error("基本消费模型-生产者, 发送消息失败: {}", student.toString(),e.fillInStackTrace());
}
}
}
对比简单的消息发送,这里主要就是再发送消息时,通过Process指定消息的属性: 消息的持久化模式、消息头【类型】
@RabbitListener(queues = "${spring.rabbitmq.queue-object-name}",containerFactory = "singleListenerContainer")
public void consumeObjMsg(@Payload Student student) {
try {
log.info("基本消费模型 --- 消费者: 监听到消息: {}",student);
} catch (Exception e) {
log.error("基本消费模型 --- 消费者: 监听消息异常 {}", e.fillInStackTrace());
}
}
之后简单调用Producer的API发送消息即可
2022-09-16 19:46:48.860 INFO 17068 --- [ main] c.s.rabbitmq.producer.BasicPublisher : 基本消息模型-生产者-发送对象类型消息: Student(id=1, realName=zs, username=cfeng)
2022-09-16 19:46:48.872 INFO 17068 --- [nectionFactory1] c.server.config.RabbitmqConfig : 消息发送成功:correlationData(null),ack(true),cause(null)
2022-09-16 19:46:48.890 INFO 17068 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2022-09-16 19:46:48.892 INFO 17068 --- [ntContainer#1-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2022-09-16 19:46:48.905 INFO 17068 --- [ntContainer#0-1] c.s.rabbitmq.consumer.BasicConsumer : 基本消费模型 --- 消费者: 监听到消息: Student(id=1, realName=zs, username=cfeng)
这里演示采用的都是Direct交换机模型,一共四种交换机模型
多种消息模型 — Exchange调度策略
调度策略就是Exchange交换机收到生产者发送的消息后按照什么规则转发消息,调度策略的三个因素:Exchange Type、 Binding Key 和 消息的标记信息(Routing Key 和headers, Exchagne根据消息的Routing Key和Exchange 绑定Queue 的Binding Key分配消息, 生产者发送消息时,一般会指定Routing Key,指定路由规则, Exchange Type和 binding key确定时,
Producer指定routing key即可决定消息的流向, Exhange路由有多种不同的模式, 也就决定了几种不同的消息模型
Fanout 订阅 、广播模式 ---- 适用于 业务数据需要广播场景: 用户操作写日志
Fanout Exhange,Fanout交换机具有广播消息的作用,当消息进入Exchange之后,交换机会检查 binding了哪些Queue,找到之后,将消息发送到相关的Queue中【每一个Queue】, 由队列对应的消费者监听使用
订阅模式 与 Binding Key和Routing Key无关, 同时,Fanout交换机转发消息最快,交换器会将消息分发给所有有绑定关系的消息Queue中, 就像子网广播一样,都获得一份复制的消息; 就算绑定了路由也是无用的; 队列数N >= 1
其实该订阅模式就类似STOMP协议下的服务,订阅相关的频道,就可以介绍到发送到该频道的消息,所以进入的用户都订阅public频道就可以接收消息
这里简单的演示消息的发送,封装消息实体ChatMessage,建立多个队列
###自定义配置广播、订阅模型的多个Queue、交换机,不需要路由,因为绑定了也无用,会子网官博,不是独享
queue-one-name: middleware.mq.fanout.one.queue
queue-two-name: middleware.mq.fanout.two.queue
exchange-fanout-name: middleware.mq.fanout.info.route
/**
* 广播模式交换机、 不需要配置路由信息,因为路由无用,子网广播
*/
@Bean(name = "fanoutQueueOne")
public Queue fanoutQueueOne() {
return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-one-name")));
}
@Bean(name = "fanoutQueueTwo")
public Queue fanoutQueueTwo() {
return new Queue(Objects.requireNonNull(environment.getProperty("queue-two-name")));
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-fanout-name")),true,false);
}
/**
* 只需要将交换机和队列绑定再一起即可,不需要指定Bindinging Key
*/
@Bean
public Binding fanoutBindingOne() {
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingTwo() {
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}
之后简单封装消息
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class MessageEntity implements Serializable {
private static final long serialVersionUID = -6995434700637844150L;
private Integer id;
private String message;
private Student sender;
}
消息生产者ModelPubilsher
@Component
@RequiredArgsConstructor
@Slf4j
public class FanoutPublisher {
private final RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper;
private final Environment environment;
public void sendMsg(MessageEntity messageEntity) throws Exception {
if(!Objects.isNull(messageEntity)) {
try {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//这里只需要指定交换机,不需要给出Routing key
rabbitTemplate.setExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-fanout-name")));
//创建消息Message,转换
Message message = MessageBuilder.withBody(objectMapper.writeValueAsBytes(messageEntity)).build();
log.info("消息模型Fanout 生产者发送消息: {}",messageEntity);
} catch (Exception e) {
log.error("消息模型Fanout 生产者发送消息异常: {}", messageEntity,e.fillInStackTrace());
}
}
}
}
监听队列的消费者
@Component
@Slf4j
@RequiredArgsConstructor
public class ModelConsumer {
private final ObjectMapper objectMapper;
@RabbitListener(queues = "${spring.rabbitmq.queue-one-name}",containerFactory = "singleListenerContainer")
public void consumeFanoutMsgOne(@Payload byte[] msg) {
try {
MessageEntity messageEntity = objectMapper.readValue(msg,MessageEntity.class);
log.info("消息模型fanoutOne 监听到消息:{}", messageEntity);
} catch (Exception e) {
log.info("消息模型——消费者one 发生异常: {}",e.fillInStackTrace());
}
}
@RabbitListener(queues = "${spring.rabbitmq.queue-two-name}",containerFactory = "singleListenerContainer")
public void consumeFanoutMsgTwo(@Payload byte[] msg) {
try {
MessageEntity messageEntity = objectMapper.readValue(msg,MessageEntity.class);
log.info("消息模型fanoutTWO 监听到消息:{}", messageEntity);
} catch (Exception e) {
log.info("消息模型——消费者two 发生异常: {}",e.fillInStackTrace());
}
}
}
之后调用生产者发送消息即可
2022-09-16 21:15:27.775 INFO 15684 --- [ main] c.s.rabbitmq.producer.FanoutPublisher : 消息模型Fanout 生产者发送消息: MessageEntity(id=1, message=你好,春风, sender=Student(id=1, realName=zs, username=cfeng))
2022-09-16 21:15:27.783 INFO 15684 --- [nectionFactory1] c.server.config.RabbitmqConfig : 消息发送成功:correlationData(null),ack(true),cause(null)
2022-09-16 21:15:27.795 INFO 15684 --- [ntContainer#2-1] c.s.rabbitmq.consumer.FanoutConsumer : 消息模型fanoutTWO 监听到消息:(Body:'[B@2855a650(byte[87])'
2022-09-16 21:15:27.795 INFO 15684 --- [ntContainer#3-1] c.s.rabbitmq.consumer.FanoutConsumer : 消息模型fanoutOne 监听到消息:
使用Fanout交换机可以让一个交换机绑定多个队列,从而对应多个消费者, 不需要指定binding Key, 所以发送消息也不需要Routing Key
广播式、订阅式消息模型适用于 业务数据需要广播式传播, 说白了,也就是多个地方需要使用该数据,比如用户操作日志 — 需要将操作信息发送给数据库,同时也需要将其发送给专门的日志系统进行存储, 将其操作日志封装为消息实体,进行传播即可
Direct 路由模式 — 业务数据直接传输消费
Direct交换器也就是直接交换器,就是最直接的意思,需要Routing key和Binding key, 识别消息头中的Routing Key,查找Exchange和Queue之间的Binding key,如果匹配,将消息分发到该队列,Direct 式Exchange的默认模式
默认提供了一个Exchange,名称为空, 类型为Direct,绑定到所有的Queue, 每一个Queue和该Exchange的Binding Key为Queue的名称, 所以不交换器的情况下是可以借助默认交换器发送消息
该模式可以参见上面的整合过程的Basic Queue
此模型需要严格意义的绑定,也就是在Binding配置时,在配置Exchange和Queue之间的关系时,同时需要指定Binding Key; 这样Producer发送数据时,会传递一个Routing Key检验
rabbitTemplate.setExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"));
rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-object-name")));
这里设置的Routing-key和配置conifg的Binding Key相同
@Bean
public Binding basicBinding() {
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(environment.getProperty("spring.rabbitmq.route-name"));
}
这个时候就不会像Fanout一样广播了,消息只会投递到绑定的且满足路由Routing Key的Queue上,不会投递到所有的绑定Queue上
该模式适合业务数据直接传输消费,比如业务服务模块之间的信息交互,一般业务服务模块之间的消息通信时直接、实时的可以借助DirectExchange, 也是最常见的消息通信模式 == 路由模式
Topic 通配符模式
TopicExchange 是一种 发布-主题–订阅 的消息模型,Topic模式也要求交换机、路由、队列严格绑定构成, 和之前的Direct不同,其支持通配方式的路由,可以通过为路由的名称指定特定的 通配符 * 或者 #, 从而绑定到不同的队列中; Routing Key 是一个以 句号. 为分隔的字符串
通配符 * :表示一个特定的单词
通配符 #: 表示任意的单词,可以0到多个 # > *
如果direct是正规军,那么Topic就是王牌军,其可以统治direct, 当路由名称包含* 时,*代表一个单词,所以就会降级为基于DirectExchange的消息模型
而路由名称包含# 时,由于匹配多个单词所以绑定的路由不再起作用,相当于绑定基于FanoutExchange的消息模型, 哪怕有Routing Key,也会匹配到所有的,不再起作用
可以看出Topic和Direct的最主要的不同就是绑定规则Binding Key 是通配的
这里Cfeng 就只是大概阐述一下代码了,提一下和Direct不同的部分
###首先就是定义的route-key 是包含通配符的
route-one-name: middleware.mq.object.*.route 【*匹配一个单词】
route-two-name: middleware.mq.#.info.route 【这里#可以匹配任意长度route字符串】
之后就是config中对上面两个路由的绑定,这里创建的是Topic交换机实例
@Bean
public TopicExchange objectExchange() {
return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-topic-name"),true,false);
}
@Bean
public Binding fanoutBindingOne() {
return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(Objects.requireNonNull(environment.getProperties("spirng.rabbitmq.route-one-name")));
}
@Bean
public Binding fanoutBindingTwo() {
return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(Objects.requireNonNull(environment.getProperties("spirng.rabbitmq.route-two-name")));
}
建立交换机和Queue的Binding关系之后,就直接使用Producer向Exchagne发送数据,这里可以测试多个Routing Key验证通配
......
public void sendTopicMsg(MessageEntity messageEntity, String route) throws Exception {
try {
rabbitTemplate.setMessageConverter(XXX);
rabbitTemplatesetExchange(environment.getProperties("XXX"));
//路由绑定
rabbitTemplate.setRoutingKey(route);
//创建消息
Message msg = ....
rabbitTemplate.sendAndConvert(msg);
} catch(Exception e) {
....
}
}
之后消费者就是@RabbitListener中指定queues即可,消费容器工厂还是单个消费者即可
测试的时候,可以测试输入不同的Routing Key,可以发现只要符合匹配规则,对应的消费者都能够消费消息
确认消费机制 RPC
RabbitMQ 是高性能、高可用的消息分布式中间件,不只是因为消息异步通信和业务模块解耦、 接口限流、消息延迟处理等功能, 重要的是其在消息发送过程中,可以保证消息成功发送、不会丢失、确认消费
- 保证消息成功发送: 生产者的生产确认机制( Publisher的Confirm机制)
- 消息不丢失、确认消费: 面向消费者确认消费而言
消息高可用 – 确认消费
分布式消息中间件RabbitMq本身是基于异步的消息处理, 前面的实例就可以知道: 生产者P将消息发送到RabbitMQ后不会知道消费者处理成功或者失败, 甚至都不知道是否有消费者来处理消息
- 消息是否真正发送成功,P自认为发送成功,可是采用template发送时,绑定模型如果不存在,实际上是发送失败的 【之前的一个demo Cfeng甚至没有bind,但是P端仍然认为投递成功】
- 由于特殊原因,RabbitMQ服务挂掉了,导致需要重启,但是这时队列中还有大量消息没有消费,可能重启过程中丢失
- 消费者监听消息时,可能出现监听失败的问题,导致消息所在队列找不到消费者而不断重新入队,重复消费
但是在实际场景中,可能需要同步处理,同步等待服务端将消息处理完成之后再进行下一步的操作,相当于RPC(remote procedure Call 远程过程调用)
RabbitMQ实现RPC机制:
- P 发送消息,在消息头属性MessageProperties中设置属性 ReplyTo — Queue名称 【消费者处理完成后的消息发送到该Queue】,和correlationId: 此次请求的标识,消费者处理完成之后将此属性返还,P由此确认消息是否成功执行
- 消费者收到消息进行处理
- 处理完成生成应答消息replyTo指定的Queue,同时带上correlationId
- P 订阅了replyTo指定的Queue,从中收到应答消息,根据correlationId确定执行情况
消息生产确认
为了确认消息是否真的发送成功,RabbitMQ要求生产者在发送消息后进行发送确认,代表消息成功发出,配置的规则很easy,
就是配置Template时设置PublisherConfirms,和其反馈信息Returns都为True,同时给出ConfirmCallBack作为发送成功的处理
publisher-confirm-type: correlated #发送消息之后进行确认 发送到交换机触发回调 publisher-confirm 废弃
publisher-returns: true #发送消息后返回确认信息
rabbitTemplate.setConnectionFactory(connectionFactory);
//发消息如果成功,给出反馈信息
rabbitTemplate.setConfirmCallback((correlationData, b, s) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s));
//发送消息失败,给出反馈
rabbitTemplate.setReturnsCallback(returnedMessage -> log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message({})",returnedMessage.getExchange(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getMessage()));
confirm为成功,returns为失败,这样P就可以知道消息是成功发送成功
rabbitMQ宕机、消息积压 — 持久化
rabbitMQ可能出现消息积压的情况 — Producer生产能力强,Consumer宕机、Consumer消费能力弱
消息积压最简单的方法就是增加rabbitMQ机器的数量,如果不行,那就手工增加消费能力 — 并发消费【多个消费者消费,同时设置消费最大数量】 ,还可以再开一个消费者,将MQ的消息全部录入数据库,后续处理
而RabbitMQ本身为保证消息不丢失,建议创建队列、交换机时设置持久化参数durable为true, 也就是durable参数取值为true
@Bean(name = "basicQueue")
public Queue basicQueue() {
return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-name")),true);
}
@Bean
public DirectExchange basicExchange() {
return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-name"),true,false);
}
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
持久化之后,RabbitMQ服务器崩溃重启操作,队列、交换机依然存在并且消息不会丢失
但是可能发生RabbitMQ收到消息还未持久化,就宕机了【和数据库一样】,那么这里也可以采用相似的方式 : 事务
AMQP的重要特点 – 支持事务, 如果生产者将持久化消息发送给服务器,consume命令本身无Response返回,为了应对发出消息服务器崩溃,可以预先通过
- txSelect()开启事务
- txCommit() 提交事务
- txRollBack() 回滚事务
消息不重复消费 — ACK模式
为了保证消息不重复消费、能够准备被消费,提供了消息确认机制,ACK模式
Ack — acknowledge key确认字符,接收方在收到报文后,若未发现错误,就给方法放确认回答ACK,表明信息正确接收
TCP报文的控制位就有一个ACK,当然AMQP协议中基于的信道Channel,不是TCP
为了提高信息的可用性、防止消息重复消费, 一般都会使用消息确认机制 — 当消息确认消费后,消息才会从队列中移除
RabbitMQ提供的取人模式: AcnowledgeMode : NONE、MANUAL手动,AUTO自动,电商平台支付的支付金额、游戏充值的消息提示是必须进行确认消费的,不然就不够严谨可靠
NONE – 无需确认
生产者发送消息到队列,消费者消费之后不给出任何反馈消息, 一股脑发送即可,不需要考虑丢失等异常,就像UDP的感觉, 这样处理流程更短,更快一点,但是实际生产中很少使用
类似用户禁用APP更新提醒,P发送消息提示更新,用户消费之后并不会提示已消费
AUTO — 自动确认
生产者发送消息到队列,消费者监听消息之后,需要发送一个AUTO ACK的反馈给服务器, 之后消息出队
RabbitMQ自动触发, 依靠的是RabbitMQ的内置组件
Producer ---msg--> Exchange---bind-->Queue <--listen--- Consumer
|___RabbitMQ服务器 --- AUTO ACK <---- rabbitMQ 内置组件---|
而确认消费的模式,是在容器工厂配置中配置setAcckownledgeMode即可, 之前为了流程简单,模式设置的为NONE
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置连接工厂,可以使用configure对象配置
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//消息的确认消费模式,先设置为NONE,表示不需要确认消费
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
而这里的交换机模式选择Direct,也就是最初是演示使用的,路由模式
生产者还是BasicProducer, 确认消费模式,主要的变化是在消费者端, 但是这里是自动消费, 消费者的@RabbitListener中的container就会指定容器工厂,设置了Auto
当然,也可以直接在配置文件中指定:
spring:
rabbitmq:
listener:
direct:
acknowledge-mode: auto
和之前的不同就是增加了ACK确认,重新执行代码
[nectionFactory1] c.server.config.RabbitmqConfig : 消息发送成功:correlationData(null),ack(true),cause(null)
可以看到ack为true,也就是确认消费
MANUAL 手动确认
人工手动确认消息消费,生产者发送消息到队列后,消费者监听到该消息时需要 手动以代码方式发送一个ACK 给服务器,之后消息出队,告知P 成功消费
Producer ---msg--> Exchange---bind-->Queue <--listen--- Consumer
|___RabbitMQ服务器 --- ACK 信息 <---- 人为手动ACK ---|
手动确认 需要在消费者 消费 消息逻辑 之后 编写确认消费的逻辑, 消息出队,避免消息重复投入队列而重复消费, 还是使用Direct模式交换机
和AUTO不同, MANUAL需要指定容器Container,并且当容器监听的队列和监听的Consumer
/**
* 配置监听容器
* 消费者监听实例
*/
@Resource
private BasicConsumer basicConsumer;
@Bean(name = "simpleContainerManual")
public SimpleMessageListenerContainer simpleMessageListenerContainer(@Qualifier("basicQueue") Queue basicQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//消费实例配置
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setPrefetchCount(1);
//消息确认模式
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueues(basicQueue);
//手动确认消费 消费者需要实现ChannelAwareMessageListener接口
container.setMessageListener(basicConsumer);
return container;
}
这里的消费者需要实现ChannelAwareMessageListener接口
@Component
@Slf4j
@RequiredArgsConstructor
public class BasicConsumer implements ChannelAwareMessageListener {
private final ObjectMapper objectMapper;
/**
* @param message
* @param channel 通道实例
* @throws Exception
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//这里就是rabbit的一个Message监听器,可以监听到消息
MessageProperties properties = message.getMessageProperties();
//获取消息分发的全局标识
long deliveryTag = properties.getDeliveryTag();
try {
byte[] msg = message.getBody();
//解析消息
String entity = objectMapper.readValue(msg,String.class);
//log
log.info("确认消费模式 - 人为手动确认消息: {}", entity);
//执行业务逻辑后,手动ACK, deliverTag 为全局分发标识(唯一) 是否允许批量消费: 这里设置为true
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
log.info("人为手动确认消费- 监听到消息: {}", e.fillInStackTrace());
//发生异常,拒绝ACK
channel.basicReject(deliveryTag,false);
}
}
基于MANUAL确认消费模式 需要在执行完实际的业务逻辑业务之后,手动调用相关方法比如BasicAck,Reject,当处理过程发生异常,需要执行确认消费, 避免消息一直留在队列中,导致重复消费
INFO 2268 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.204.100:5672]
2022-09-17 16:56:38.394 INFO 2268 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#28fc1132:0/SimpleConnection@3c82bac3 [delegate=amqp://cfeng@192.168.204.100:5672/, localPort= 54514]
2022-09-17 16:56:38.477 INFO 2268 --- [ main] c.server.RabbitmqBasicMessageTest : Started RabbitmqBasicMessageTest in 5.256 seconds (JVM running for 6.762)
2022-09-17 16:56:39.102 INFO 2268 --- [ main] c.s.rabbitmq.producer.BasicPublisher : 基本模型: 生产者发送消息: kiss
2022-09-17 16:56:39.116 INFO 2268 --- [nectionFactory1] c.server.config.RabbitmqConfig : 消息发送成功:correlationData(null),ack(true),cause(null)
2022-09-17 16:56:39.121 INFO 2268 --- [ntainerManual-1] c.s.rabbitmq.consumer.BasicConsumer : 确认消费模式 - 人为手动确认消息: kiss
用户登录成功 写入日志
用户登录操作,作为常规系统的基础操作,一些应用需要跟踪用户的登录、操作轨迹,需要记录用户的登录的日志,将相关的记录录入数据库
用户登录是主流程, 记录用户登录日志是辅助流程, 所以实际过程中,记录用户登录日志操作不应该影响用户登录,也就是不应该是同步的,因为再次数据库操作耗时; 这个时候就可以RabbitMQ进行解耦
常规操作也可以采用之前抢红包操作录入数据库的@Async; 整体业历程:包括 登录模块 和 日志记录模块, 这两个模块不应该相互影响,异步操作
这里为了模拟整个流程,就大概先分析一下思路,首先网站的用户登录成功进入后台的判断逻辑,判断之后就发送一个消息,借助RabbitTemplate即可,发送之后,日志监听消费者监听到消息获取相关的登录信息录入数据库, 这里采用Direct交换机, 手动ACK
这里也就不完整写代码了,没什么特别的地方,这里Cfeng就简单写一些代码说明整个流程【 结合之前的Cfeng.net 使用的Security】
//首先就是用户信息, 就是之前的ChatUser -- 网站用户
//之后封装一个登录日志实体类 SysLog 包括id、用户id、用户操作所属模块、操作的数据、备注、操作时间等
之前Security登录成功后设置的SuccessfulURL, 现在因为需要登录成功后进行后处理操作,所以这里需要自定义SuccessfulHandler, 在其中调用 LogPublisher 将当前用户的登录信息发布到Exchange ; 之后再跳转进入成功的欢迎页面
LogPbulisher的操作就是封装对象类型的消息, MessageProcessor就可以完成,指定消息头的java type为SysLog
在rabbitMQ中配置Direct交换机, 配置日志系统的消费的队列LogQueue, 并且指定日志系统的消费者LogConsumer,配置相关的监听容器container,指定队列和消费者, LogConsumer实现Listener接口 【确认消费】, 在message方法中进行数据库的写入操纵, 当然需要try catch, 如果出现异常手动Direct, 正常就ACK,消息出队避免重复消费,导致数据库消息异常
如果有相关friend好奇,我也可以详细说一下具体的代码🌳