1. 消息确认机制
生产者发送消息后,到达消费端之后,可能有以下两种情况:
- 消息处理成功
- 消息处理异常
如何保证消费端成功接收了消息并正确进行处理了?
消息确认机制
消费者在订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种:
自动确认 当autoAck为true时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而且不管消费者是否真正地消费到了这些消息. 适合对于消息可靠性要求不高的场景.
手动确认 当autoAck为false时,RabbitMQ会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息. 适合对消息可靠性要求比较高的场景.
当autoAck参数置为false,对于RabbitMQ服务端, 队列中的消息分成了两个部分:
- 等待投递给消费者的消息.
- 已经投递给消费者,但是还没有收到消费者确认信号的消息
1.2 手动确认方法
消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ也提供了不同的确认 应答的方式,消费者客户端可以调用与其对应的channel的相关方法,共有以下三种
肯定确认:Channel.basicAck(long deliveryTag, boolean multiple) RabbitMQ已知道该消息并且成功的处理消息.可以将其丢弃了.
参数说明:
deliveryTag: 消息的唯⼀标识,它是⼀个单调递增的64位的长整型值. deliveryTag 是每个通道 (Channel)独立维护的,所以在每个通道上都是唯一的.当消费者确认(ack)⼀条消息时,必须使用对应 的通道上进行确认.
multiple: 是否批量确认.在某些情况下,为了减少网络流量,可以对一系列连续的 deliveryTag 进行批量确认.
值为true则会⼀次性ack所有小于或等于指定deliveryTag的消息.
值为false,则只确认当前指认deliveryTag的消息.
deliveryTag 是RabbitMQ中消息确认机制的⼀个重要组成部分,它确保了消息传递的可靠性和顺 序性。
不同的是可能会重复的,在当前chanel是唯一的
否定确认: Channel.basicReject(long deliveryTag, boolean requeue) RabbitMQ在2.0.0版本开始引入了 Basic.Reject 这个命令,消费者客户端可以调用channel.basicReject方法来告诉RabbitMQ拒绝这个消息.
参数说明:
- deliveryTag:参考channel.basicAck
- requeue:表示拒绝后,这条消息如何处理. requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下⼀个订阅的消费者. requeue参数设置为false,则RabbitMQ会把 消息从队列中移除,而不会把它发送给新的消费者.
否定确认: Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令.
消费者客户端可以调用channel.basicNack法来方实现. multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息.
1.2三种策略
pring-AMQP对消息确认机制提供了三种策略.
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
AcknowledgeMode.NONE
这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会自动确认消息,从RabbitMQ队列中移除消息.如果消费者处理消息失败,消息可能会丢失.
AcknowledgeMode.AUTO(默认)
这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息.
AcknowledgeMode.MANUAL
◦手动确认模式下,消费者必须在成功处理消息后显式调用basicAck 方法来确认消息.如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这 种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以重新处理.
2.持久性
2.1 交换机持久化
如何保证当RabbitMQ服务停掉以后,生产者 发送的消息不丢失呢?
RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化.
对于一个长期使用的交换机,建议持久化,如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失.
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
durable参数设置为true(默认为true)
2.2 队列的持久化
队列的持久化是通过在声明队列时将 durable 参数置为true实现的. 如果队列不设置持久化,那么在RabbitMQ服务重启之后,该队列就会被删掉,此时数据也会丢失.(队列没有了,消息也没了)
队列的持久化能保证该队列本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失.要确保消息不会丢失,需要将消息设置为持久化.
持久化
QueueBuilder.durable(Constant.ACK_QUEUE).build();
非持久化
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3 消息实现持久化
消息实现持久化,需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2, 也就是 MessageDeliveryMode.PERSISTENT
public enum MessageDeliveryMode {
NON_PERSISTENT,//⾮持久化
PERSISTENT;//持久化
}
设置了队列和消息的持久化,当RabbitMQ 服务重启之后,消息依旧存在.如果只设置队列持久化,重启之后消息会丢失.如果只设置消息的持久化,重启之后队列消失,消息也丢失.所以单单设置消息 持久化而不设置队列的持久化显得毫无意义.
注意:将所有的消息都设置为持久化,会严重影响RabbitMQ的性能(随机).写入磁盘的速度比写入内存的速度慢很多. 对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量.在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡.
2.4 持久化问题
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?
否
- 从消费者来说,如果在订阅消费队列时将autoAck参数设置为true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据居丢失.这种情况很好解决,将autoAck参数设置为 false,并进行手动确认
- 在持久化的消息正确存⼊RabbitMQ之后,还需要有一段时间才能存入磁盘 中.RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中.如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失.
怎么解决?
- 引入RabbitMQ的仲裁队列, 如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效地保证了高可用性,除非整个集群都挂掉(此方法也不能保证100%可靠,但是配置了仲裁队列要比没有配置仲裁队列的可靠性要高很多,实际生产环境中的关键业务队列⼀般都会设置仲裁队列).
- 在发送端引入事务机制或者发送方确认机制来保证消息已经正确地发送并存储到RabbitMQ 中
3.发送方确认
在使用RabbitMQ的时候,可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失,但是还 有⼀个问题,
当消息的生产者将消息发送出去之后,如何确保消息正确到达服务器?
如果在消息到 达服务器之前已经丢失(比如RabbitMQ重启,那么RabbitMQ重启期间生产者消息投递失败),持久化操 作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决方案:
通过事务机制实现
通过发送方确认(publisherconfirm)机制实现
事务机制性能比较低,这里不多介绍,主要介绍confirm机制来实现发送方的确认.
RabbitMQ为我们提供了两个方式来控制消息的可靠性投递
confirm模式
return模式
3.1 confirm模式
Producer在发送消息的时候,对发送端设置⼀个ConfirmCallback的监听,无论消息是否到达 Exchange,这个监听都会被执行,如果Exchange成功收到ACK为true,如果没收到消息,ACK就为false.
配置yml
spring:
application:
name: springboot-extension
rabbitmq:
host: 121.36.62.67 #主机
port: 5672 #默认为5672
username: study
password: study
virtual-host: extension #默认值为 /
listener:
simple:
acknowledge-mode: manual # 消息确认机制
publisher-confirm-type: correlated #消息发送确认
配置交换机和队列
public static final String CONFIRM_QUEUE = "confirm.queue";
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
//发送方确认
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();
}
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();
}
@Bean("confirmBinding")
public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
}
设置确认回调逻辑并发送信息
无论消息确认成功还是失败,都会调用ConfirmCallback的confirm方法.
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate confirmRabbitTemplate=new RabbitTemplate(connectionFactory);
//调用回调方法
confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行confirm方法");
if(ack){
System.out.printf("接收到消息 消息ID :%s\n",correlationData==null?null:correlationData.getId());
}else {
System.out.printf("未接收到消息 消息ID :%s cause: %s \n",correlationData==null?null:correlationData.getId(),cause);
}
}
});
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);
return "消息发送成功";
}
RabbitTemplate.ConfirmCallback和ConfirmListener区别
在RabbitMQ中,ConfirmListener和ConfirmCallback都是用来处理消息确认的机制,但它们属于不同 的客户端库,并且使用的场景和方式有所不同.
- ConfirmListener是RabbitMQJavaClient库中的接口.这个库是RabbitMQ官方提供的⼀个直 接与RabbitMQ服务器交互的客户端库.ConfirmListener接口提供了两个方法:handleAck和 handleNack,用于处理消息确认和否定确认的事件.
- ConfirmCallback是SpringAMQP框架中的一个接口.专 门为Spring环境设计.用于简化与 RabbitMQ交互的过程.它只包含⼀个confirm方法,用于处理消息确认的回调.
3.2 return模式
消息到达Exchange之后,会根据路由规则匹配,把消息放入Queue中.
Exchange到Queue的过程,如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者.
消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理.
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate confirmRabbitTemplate=new RabbitTemplate(connectionFactory);
//回退回调方法
confirmRabbitTemplate.setMandatory(true);
confirmRabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息退回....."+returned);
}
});
设置消息的mandatory属性为true(默认为false).
作用是告诉RabbitMQ,如果一条消息无法被任何队列消费,RabbitMQ应该将消息返回给发送者,此时ReturnCallback 就会被触发.
ReturnedMessage包含以下属性:
message:返回的消息对象,包含了消息体和消息属性
replyCode:由Broker提供的回复码, 表示消息示法路由的原因. 通常是一个数字代码,每个数字代表不同的含义.
replyText:一个文本字符串, 提供了无法路由消息的额外信息或错误描述.
exchange:消息被发送到的交换机名称
routingKey:消息的路由键,即发送消息时指定的键
面试题
如何保证RabbitMQ消息的可靠传输?
RabbitMQ消息传递图
从这个图中,可以看出,消息可能丢失的场景以及解决方案:
生产者将消息发送到RabbitMQ失败
可能原因:网络问题等
解决办法:发送方确认-confirm确认模式
消息在交换机中无法路由到指定队列
可能原因:代码(routingKey不一致)或者配置层面错误,导致消息路由失败
解决办法: 发送方确认-return模式
队列溢出:死信队列等
消息队列自身数据丢失
可能原因: 消息到达RabbitMQ之后,RabbitMQServer宕机导致消息丢失.
解决办法: 开启RabbitMQ持久化,就是消息写入之后会持久化到磁盘,如果 RabbitMQ挂了,恢复之后会自动读取之前存储的数据.(极端情况下,RabbitMQ还未持久化就挂 了,可能导致少量数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)
消费者异常,导致消息丢失
可能原因: 消息到达消费者,还没来得及消费,消费者宕机.消费者逻辑有问题. 解决办法: 消息确认. RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息.默 认情况下消费者应答机制是自动应答的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消 息丢失. 除此之外,也可以配置重试机制,当消息消费异常时,通过消息重试确保消息的可靠性
4.重试机制
在消息传递过程中,可能会遇到各种问题,如网络故障,服务不可用,资源不足等,这些问题可能导致消息处理失败.
为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败后重新发送. 但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数
4.1自动确认
配置application.yml
spring:
application:
name: springboot-extension
rabbitmq:
host: 121.36.62.67
port: 5672 #默认为5672
username: study
password: study
virtual-host: extension #默认值为 /
listener:
simple:
acknowledge-mode: auto # 消息确认机制
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待时⻓为5秒
max-attempts: 5 # 最大重试次数(包括自⾝消费的⼀次)
配置交换机、队列
public static final String RETRY_QUEUE = "retry.queue";
public static final String RETRY_EXCHANGE = "retry.exchange";
@Bean("retryQueue")
public Queue retryQueue(){
return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
}
@Bean("retryExchange")
public FanoutExchange retryExchange(){
return ExchangeBuilder.fanoutExchange(Constants.RETRY_EXCHANGE).build();
}
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryQueue")Queue queue,@Qualifier("retryExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();
}
生产者代码
@RequestMapping("/retry")
public String retry() {
System.out.println("retry...");
rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");
return "消息发送成功";
}
消费者代码
@Component
public class RetryListener {
@RabbitListener(queues = Constants.RETRY_QUEUE)
// auto
public void handleMessage(Message message) throws IOException {
System.out.printf("["+Constants.RETRY_QUEUE+"]"+"收到信息 %s deliveryTag %s \n",new String(message.getBody(),"utf-8"),message.getMessageProperties().getDeliveryTag());
int sum=3/0;
System.out.println("业务处理完成");
}
如果对异常进行捕获,那么就不会进行重试
4.2手动确认
acknowledge-mode: manual#
消息确认机制`
public void handleMessage(Message message,Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("["+Constants.RETRY_QUEUE+"]"+"收到信息 %s deliveryTag %s \n",new String(message.getBody(),"utf-8"),deliveryTag);
try {
int sum=3/0;
System.out.println("业务处理完成");
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
channel.basicNack(deliveryTag,false,true);
}
}
重试次数的限制不会像在自动确认模式直接生效,因为是否重试以及何时重试更多地取决于应用程序的逻辑和消费者的实现.
自动确认模式下
,RabbitMQ会在消息被投递给消费者后自动确认消息.如果消费者处理消息时抛出异 常,RabbitMQ根据配置的重试参数自动将消息重新入队,从而实现重试.重试次数和重试间隔等参数可 以直接在RabbitMQ的配置中设定,并且RabbitMQ会负责执行这些重试策略.
手动确认模式下
,消费者需要显式地对消息进行确认.如果消费者在处理消息时遇到异常,可以选择不确认消息使消息可以重新入队.重试的控制权在于应用程序本身,而不是RabbitMQ的内部机制.应用程序 可以通过自己的逻辑和利用RabbitMQ的高级特性来实现有效的重试策略
注意:
- 自动确认模式下:程序逻辑异常,多次重试还是失败,消息就会被自动确认,那么消息就丢失 了
- 手动确认模式下:程序逻辑异常,多次重试消息依然处理失败,无法被确认,就一直是 unacked的状态,导致消息积压
5.TTL
过期时间,RabbitMQ可以对消息和队列设置TTL
当消息到达存活时间之后,还没有被消费,就会被自动清除
举例:
网上购物,经常会遇到⼀个场景,当下单超过24小时还未付款,订单会被自动取消 还有类似的,申请退款之后,超过7天未被处理,则自动退款
5.1设置消息的TTL
在发送消息的方法中加入expiration的属性参数
@RequestMapping("/ttl")
public String ttl() {
System.out.println("ttl...");
MessagePostProcessor messagePostProcessor= message -> {
message.getMessageProperties().setExpiration("10000");
return message;
};
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...",messagePostProcessor);
return "消息发送成功";
}
如果不设置TTL,则表示此消息不会过期;如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃.
5.2设置队列的TTL
设置队列TTL的方法是在创建队列时,加入 x-message-ttl 参数实现的,单位是毫秒.
方式一
@Bean("ttlQueue")
public Queue ttlQueue2(){
return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20000).build(); //设置队列的ttl为20s
}
方式二
@Bean("ttlQueue")
public Queue ttlQueue3(){
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 20000);
return QueueBuilder.durable(Constants.TTL_QUEUE).withArguments(map).build(); //设置队列的ttl为20s
}
5.3区别
设置队列TTL属性的方法,⼀旦消息过期,就会从队列中删除
设置消息TTL的方法,即使消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前进行判定 的.
为什么这两种方法处理的方式不一样?
因为设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可.
设置消息TTL的方式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可.
6.死信队列
6.1 死信的概念
死信: 因为种种原因,无法被消费的信息,就是死信.
死信队列: 当消息在⼀个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX( Dead Letter Exchange ),绑定DLX的队列,就称为死信队列
6.2 死信的来源
消息变成死信⼀般是由于以下几种情况:
- 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为false.
- 消息过期.
- 队列达到最大长度.
6.3 死信队列的应用场景
对于RabbitMQ来说,死信队列是⼀个非常有用的特性.它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统.
- 保证消息不丢失,并对数据进行处理
- 消息重试
- 消息丢弃
- 日志收集
7.延迟队列
7.1 概念
延迟队列(Delayed Queue),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费.
7.2 应用场景
- 智能家居:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备.
- 日常管理: 预定会议后,需要在会议开始前十五分钟提醒参会人参加会议
- 用户注册成功后,7天后发送短信,提高用户活跃度等
- …
RabbitMQ本身没有直接支持延迟队列的的功能,但是可以通过前面所介绍的TTL+死信队列的方式组合模拟出延迟队列的功能.
7.3 TTL+死信队列实现
声明队列:
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)
.deadLetterRoutingKey("dlx")
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
//死信交换机和队列
@Bean("dlQueue")
public Queue dlQueue(){
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
@Bean("dlExchange")
public Exchange dlExchange(){
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
生产者代码
发送两条消息,⼀条消息30s后过期,第二条10s后过期
@RequestMapping("/delay")
public String delay() {
//发送带ttl的信息
System.out.println("delay....");
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "delay30 test..."+new Date(), messagePostProcessor-> {
messagePostProcessor.getMessageProperties().setExpiration("30000");
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "delay10 test..."+new Date(), messagePostProcessor-> {
messagePostProcessor.getMessageProperties().setExpiration("10000");
return messagePostProcessor;
});
System.out.printf("%tc 消息发送成功 \n", new Date());
return "消息发送成功";
}
10s过期的消息,也是在30s后才进入到死信队列.
消息过期之后,不⼀定会被马上丢弃.因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列.此时就会造成⼀个问题,如果第⼀个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。
所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是⼀致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每⼀种不同延迟时间的消息建立单独的消息
队列
7.4 延迟队列插件
RabbitMQ官方也提供了⼀个延迟的插件来实现延迟的功能
延迟队列插件官方文档
根据自己的RabbitMQ版本选择相应版本的延迟插件,下载后上传到服务器
/usr/lib/rabbitmq/plugins 是⼀个附加目录,RabbitMQ包本身不会在此安装任何内容,如果没有这个路径,可以自己创建
启动插件
#查看插件列表
rabbitmq-plugins list
#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#重启服务
service rabbitmq-server restart
验证是否成功
7.5基于插件延迟队列实现
- 声明交换机,队列,绑定关系
@Configuration
public class DelayConfig {
@Bean("delayedQueue")
public Queue delayedQueue(){
return QueueBuilder.durable(Constants.DELAY_QUEUE).build();
}
@Bean("delayedExchange")
public Exchange delayedExchange(){
return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
}
}
- 生产者
发送两条消息,并设置延迟时间
@RequestMapping("/delay2")
public String delay2() {
//发送带ttl的信息
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay20 test..."+new Date(), messagePostProcessor-> {
messagePostProcessor.getMessageProperties().setDelayLong(20000L);
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay10 test..."+new Date(), messagePostProcessor-> {
messagePostProcessor.getMessageProperties().setDelayLong(10000L);
return messagePostProcessor;
});
System.out.printf("%tc 消息发送成功 \n", new Date());
return "消息发送成功";
}
- 消费者
@Component
public class DelayListener {
//指定监听队列的名称
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws
Exception {
System.out.printf("%tc 死信队列接收到消息: %s%n", new Date(), new
String(message.getBody(),"UTF-8"));
}
}
使用延迟队列,可以保证消息按照延迟时间到达消费者.
基于插件和基于死信实现的区别
- 基于死信实现的延迟队列
a. 优点:灵活不需要额外的插件支持
b. 缺点: 存在消息顺序问题、需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性 - 基于插件实现的延迟队列
a. 优点: 通过插件可以直接创建延迟队列,简化延迟消息的实现.避免了DLX的时序问题
b. 缺点:需要依赖特定的插件,有运维工作,只适用特定版本
8.事务机制
RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制.Spring AMQP也提供了对事务相关的操作.
RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败.
开启事务管理器
@Bean
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);
}
@Transactional注解
@Transactional
@RequestMapping("/trans")
public String trans(){
transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans1");
int sum=3/0;
transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans2");
return "消息发送成功";
}
发送信息
原因:事务机制和发布订阅模式起冲突
解决:注释掉#消息发送确认的配置
9.消息分发
RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者.
默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息.
这种方式是不太合理的,试想⼀下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降.
消息分发的应用场景:
- 限流
- 负载均衡
9.1 限流
使用场景:
在秒杀、电商大促等场景中,流量会在短时间内爆发式增长,产生大量消息(如下单、支付消息),若直接将峰值流量传递给下游,可能导致系统瞬间过载。
订单处理系统(需调用库存、支付接口)每秒最多处理 5000 条消息,正常情况下,订单系统可以正常满足需求 但是在秒杀时间点,请求瞬间增多,每秒1万个请求,如果这些请求全部通过MQ发送到订单系统,无疑会把订单系统压垮.
RabbitMQ提供了限流机制
通过设置prefetchCount(控制消费者从队列中预取(prefetch)消息的数量)参数,同时也必须要设置消息应答方式设置为手动应答
使用的是spring amqp 中SDK,直接添加配置就可使用
- 配置yml
listener:
simple:
acknowledge-mode: manual #手动确认
prefetch: 5
声明队列交换机
@Configuration public class QosConfig { @Bean("qosQueue") public Queue QosQueue(){ return QueueBuilder.durable(Constants.QOS_QUEUE).build(); } @Bean("qosExchange") public Exchange QosExchange(){ return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build(); } @Bean("qosBinding") public Binding QosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("qos").noargs(); } }
发送消息
@RequestMapping("/qos") public String qos(){ System.out.println("qos test...."); for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","qos......"); } return "消息发送成功"; }
消费者监听消息
public class QosListener { //指定监听队列的名称 @RabbitListener(queues = Constants.QOS_QUEUE) public void handleMessage(Message message, Channel channel) throws IOException { try { System.out.printf("接收到信息 %s ,deliveryTag :%d \n",new String(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag()); System.out.println("业务处理完成"); //肯定确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { //否认确认 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } } }
发送消息时,需要先把手动确认注掉,不然会直接消费掉
注释掉
9.2负载均衡
我们也可以用此配置,来实现"负载均衡"
在有两个消费者的情况下,⼀个消费者处理任务非常快,另⼀个非常慢,就会造成⼀个消费 者会⼀直很忙,而另⼀个消费者很闲.这是因为RabbitMQ只是在消息进入队列时分派消息.它不考虑消费者未确认消息的数量.
我们可以设置prefetch=1的⽅式,告诉RabbitMQ一次只给一个消费者⼀条消息,也就是说,在处理 并确认前⼀条消息之前,不要向该消费者发送新消息.相反,它会将它分派给下一个不忙的消费者.
启动两个消费者
使用Thread.sleep()方法模拟消息接收的快慢
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handleMessage1(Message message, Channel channel) throws IOException {
try {
System.out.printf("111接收到信息 %s ,deliveryTag :%d \n",new String(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());
Thread.sleep(2000);
//肯定确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
//否认确认
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void handleMessage2(Message message, Channel channel) throws IOException {
try {
System.out.printf("222接收到信息 %s ,deliveryTag :%d \n",new String(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());
Thread.sleep(1000);
//肯定确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
//否认确认
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
deliveryTag有重复是因为两个消费者使用的是不同的Channel,每个Channel上的 deliveryTag 是独立计数的.