RabbitMQ-高级
文章目录
前言:消息可靠性问题
如图,在实现业务中可能会出现以下几个问题:
- 支付服务向MQ发送消息时网络故障,导致消息消息丢失
- MQ还没有来得及发送至交易服务,自己宕机导致消息丢失
- 交易服务执行时抛出异常或者宕机
这三个任意一部分出现问题都会导致业务的执行失败,因此我们来一起学习MQ如何保证消息可靠性。
1.生产者可靠性
1.生产者重连
有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
spring:
rabbitmq:
connection-timeout: ls # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: l # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-atempts: 3 # 最大重试次数
注意:(客户端连接的重试,不是消息发送的重试)
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQ提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
2.生产者确认机制
RabbitMq实现了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种:
- 消息投递到了MQ,但是路由失败(一般会是代码或者路由配置的问题)。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功。
- 临时消息投递到了MQ,并且成功进入队列,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且成功进入队列和持久化,返回ACK,告知投递成功
- 其他情况都会返回NACK,告知投递失败
3.生产者代码实现原理
使用SpringAMQP实现生产者确认
在publisher这个微服务的application.yml中添加配置:
spring: rabbitmq: publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型 publisher-returns: true # 开启publisher return机制 #配置说明: #这里publisher-confirm-type有三种模式可选: #none:关闭confirm机制 #simple:同步阻塞等待MQ的回执消息 #correlated:MQ异步回调方式返回回执消息
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
@S1f4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException
{
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("收到消息的ReturnBack: exchange:{}, route:{}, replyCode:{}, replyText:{}",
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText());
}
});
}}
关于ApplicationContextAware :
定义
ApplicationContextAware
是 Spring 框架提供的一个接口,允许 Bean 感知并获取 Spring 容器的引用(即ApplicationContext
)。当一个类实现该接口后,Spring 会在初始化该 Bean 时自动调用setApplicationContext()
方法,并传入当前的ApplicationContext
对象。作用
获取 Spring 容器(
ApplicationContext
),用于动态管理 Bean、读取配置、发布事件等。在非依赖注入场景下访问 Spring 功能,例如在工具类或配置类中手动获取 Bean。
使用场景
动态获取 Bean(如运行时根据条件加载不同的组件)。
配置全局回调(如示例中的
RabbitTemplate
设置ReturnCallback
)。框架扩展(如自定义 Starter 需要与 Spring 容器交互)
3.发送消息,指定消息Id,消息ConfirmCallBack(每次发消息时指定)
@Test
void testPublisherConfirm() throws InterruptedException {
// 1. 创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2. 给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1. Future发生异常时的处理逻辑,基本不会触发
log.error("Handle message ack fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2. Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.IsAck(), boolean类型, true代表ack回执, false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(), String类型, 返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3. 发送消息
rabbitTemplate.convertAndSend("ExchangeName", "RoutingKey", "hello", cd);
}
关于CorrelationData:
- 唯一标识消息
CorrelationData
通常包含一个唯一的ID
(如correlationId
),用于标识某条消息,以便在收到 ACK/NACK 回执时能正确匹配到对应的消息。- 如果不手动设置
ID
,RabbitMQ 会自动生成一个。
- 接收消息确认结果
- 通过
cd.getFuture()
可以获取一个ListenableFuture
,用于异步监听该消息的 ACK(成功) 或 NACK(失败) 回执。 - 示例代码中通过
addCallback()
方法注册回调,处理消息的确认结果。
- 通过
- 与
rabbitTemplate
配合使用- 在发送消息时(
convertAndSend
),将CorrelationData
传入,RabbitMQ 会在 Broker 确认消息后,通过该对象返回结果
- 在发送消息时(
2.MQ的可靠性
在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞
如图当MQ的消息堆积时,MQ会执行PageOut,将老的消息落到磁盘上,给内存腾出空间,而这个过程MQ是阻塞执行的,此时来新的消息就可能无法执行,严重影响效率。
1.数据持久化
RabbitMQ实现数据持久化的3个方面:
交换机持久化(默认临时,在java代码中实现则是默认持久化,队列一样)
队列持久化
消息持久化
Delivery mode为2时则为持久模式
这里给出消息持久和非持久代码,大家可以尝试测试MQ的运行过程:
@Test
void testPageOut(){
Message message = MessageBuilder
.withBody(("Hello,world!").getBytes())
//持久化
.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
//非持久化
//.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
for (int i=0;i<1000000;i++){
rabbitTemplate.convertAndSend("study.direct","blue",message);
}
}
如图为开启了生产者确认机制下消息非持久化,会发现大约三十秒左右只发送了十分之一。(看In memory)
此时我们关闭后再次执行会发现消息处理快很多,此时出现Paged Out,图中折线降为0处即执行Paged Out出现阻塞。
下图为消息持久化,会发现并未执行Paged Out,Persistent表示持久化到磁盘中,每当In memory到达阈值会删除一些信息,此时折线图的暂时下降一些(效率暂时降低)。
2.LazyQueue
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
要设置一个队列为惰性队列,只需要在声明队列时,制定x-queue-mode属性为lazy即可:(3.12后的默认为lazy)
在java代码中
基于注解
如图为我们向lazy队列发送一百万条数据:
会发现所有消息直接写进Paged Out,且几乎一直在峰值,执行时间也只有19s
相比消息持久化快了很多(虽然是写入磁盘,但底层对IO有特殊处理)。
3.消费者可靠性
1.消费者确认机制
1.确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。
回执有三种可选值:
- ack:
成功处理
消息,RabbitMQ从队列中删除该消息
- nack: 消息
处理失败
,RabbitMQ需要再次投递消息
- reject:
消息处理失败并拒绝该消息
,RabbitMQ**从队列中删除
**该消息
2.确认功能
SpringAMQP实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式。
有以下三种方式:
none:**不处理。**即消息发送给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:**手动模式。**需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。
业务异常时,根据异常判断返回不同结果:
如果是业务异常,会自动返回nack
如果是消息处理或检验异常,会自动返回reject
spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: none #关闭ack;manual,手动ack;auto,自动ack
大家可以在编写代码测试效果,这里不做演示。
2.失败重试机制
1.开启失败重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理频升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态; false有状态。如果业务中包含事务,这里改为false
#如果消息处理逻辑不涉及事务(如纯计算、查询等),用 stateless: true(默认值)。
#如果消息处理包含事务(如订单支付等),需设为 stateless: false,否则重试时事务可能失效
2.多次失败处理
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer
接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接
reject
,丢弃消息。默认就是这种方式。ImmediateRequeueMessageRecoverer:重试耗尽后,返回
nack
,消息重新入队。RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机,后续人工介入。
这里我们演示一下第三种方式:
将失败处理策略改为RepublishMessageRecoverer
a. 首先,定义接收失败消息的交换机,队列及其绑定关系
b. 然后,定义RepublishMessageRecoverer@Configuration public class DirectConfiguration { @Bean public DirectExchange directExchange() { return new DirectExchange("error.exchange"); } @Bean public Queue errorQueue() { return new Queue("error.queue"); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange directExchange) { return BindingBuilder.bind(errorQueue). to(directExchange).with("error"); } @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer( rabbitTemplate, "error.exchange", "error"); } }
启动消费者并使用生产者发送消息
//消费者 @RabbitListener(queues = "simple.queue") public void listener2(String msg){ log.info("我是超人收到消息:{}",msg); throw new RuntimeException("故意的"); } //----------------------------------------------------------------- //生产者 @Test void testSendMessage2Queue(){ String queueName="simple.queue"; String msg="Hello,world!"; rabbitTemplate.convertAndSend(queueName,msg); }
运行结果
error.queue收到的消息
3.业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x))
。在程序开发中,则是只同一个业务,执行一次或多次对业务状态的影响是一致的。
因此为了保证业务幂等性我们应该怎么做呢?
这里给出个解决方案:
唯一消息Id
- 每一条消息都生成一个唯一的Id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将Id保存到数据库。
- 如果下次又收到相同的消息,去数据库查询是都存在,存在则为重复消息并放弃处理。
@Bean public MessageConverter jackaonMessageConvert(){ Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter(); jjmc.setCreateMessageIds(true); return jjmc; } // jjmc.setCreateMessageIds(true);这个方法会给转化的消息加入一个UUID确保Id唯一
业务判断
结合业务逻辑,基于业务本身做判断,以修改订单状态为例:我们需要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否为未支付。只有未支付订单才需要修改,其他状态不做处理:
根据本节学习内容回答问题:
- 如何保证支付服务与交易服务之间的订单状态一致性?
- 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
- 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务有机导致消息丢失。
- 最后,我们还在交易服务更新订单状态时做了业务量等判断,避免因消息重复消费导致订单状态异常。
- 如果交易服务消息处理失败,有没有什么兜底方案?
- 我们可以在交易服务设置定时任务(主动方案,可以想象为高考结束学校(MQ)一直不通知查分(不发送订单信息,可能发送失败),我们自己查(定时任务)),定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
4.延迟消息
1.理解延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延时任务:设置在一定时间之后才执行的任务。
2.延迟消息的实现
1.死信交换机
什么是死信交换机
?
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange
属性制定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机成为死信交换机(Dead Letter Exchange,简称DLX)。
如图给simple.queue
队列通过属性绑定了dlx.direct交换机
(称作死信交换机,与普通交换机无异),当发布一个过期时间为30s的消息进入队列后,没有消费者消费导致消息过期,此时消息被被转发至死信交换机,最终被消费者消费,利用死信机制实现了延迟消息发送。
我们来简单实现一下:
创建两组交换机和队列(图中的两组)并进行关系和属性绑定(这里不做演示自行操作)
编写生产者与消费者代码,并设置超时时间
//生产者 @Test void testSendTTLMessage(){ String exchangeName="simple.direct"; String msg="死信"; rabbitTemplate.convertAndSend(exchangeName, "dlx", msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000");//10s return message; } }); log.info("发送成功!"); }
//消费者 @RabbitListener(queues = "dlx.queue") public void listenerDlx(String msg) { log.info("我是超人,收到死信消息:{}",msg); }
消费者查看消息到达时间
发送时间
消费者收到时间
2.延迟消息插件
RabbitMQ官方推出的一个原生支持延迟消息功能的插件。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
插件需要自己安装,具体安装方式自行学习。
在java代码中实现发送并监听延迟队列的消息:
//消费者
@RabbitListener(bindings = @QueueBinding(
value=@Queue(name="delay.queue",durable ="true"),
exchange = @Exchange(name ="delay.direct",delayed = "true"),
key ="delay"))
public void listenerDelay(String msg)
{
log.info("收到delay.queue的延迟消息:{}"+msg);
}
//--------------------------------------------
//生产者
@Test
void testSendDelayMessage()
{
String exchangeName="delay.direct";
String msg="延迟999999ms+";
rabbitTemplate.convertAndSend(exchangeName, "delay", msg, new MessagePostProcessor()
{
@Override
public Message postProcessMessage(Message message) throws AmqpException
{
message.getMessageProperties().setDelay(5000);//延迟5s
return message;
}});
log.info("延迟消息发送成功!");
}
发送结果:
生产者发送消息
消费者5s后收到消息
如上我们推荐实现延迟消息时使用插件实现,当然所有的定时功能都是有性能损耗的(redis除外),MQ和Spring内部在程序内部维护一个时钟,始终每隔一秒会向前跳一次(精度高的可能毫秒甚至纳秒级跳一次),每定一个定时任务,都需要维护自己的时钟,时钟的运行就需要cpu不断计算,因此定时任务是一种cpu密集型任务,定时任务越多,cpu消耗越大,导致cpu压力增大,所以什么时候去使用它取决于业务的具体情况。
本文系统讲解了RabbitMQ高可靠方案,涵盖生产者确认、持久化机制、消费者重试策略及幂等性保障,同时解析了延迟消息的两种实现方式(死信交换机和插件)。通过配置重试、ACK机制和Lazy Queue优化性能与可靠性,结合业务场景设计兜底方案,确保消息最终一致性。学习后深刻体会到消息队列在分布式系统中保障数据安全与高效传输的核心价值。