文章目录
一、消息确认
1.1、消息确认机制
生产者发送消息之后, 到达消费端之后, 可能会有以下情况:
a. 消息处理成功
b. 消息处理异常
RabbitMQ向消费者发送消息之后, 就会把这条消息删掉, 那么第两种情况, 就会造成消息丢失. 那么如何确保消费端已经成功接收了, 并正确处理了呢?
为了保证消息从队列可靠地到达消费者, RabbitMQ提供了消息确认机制(message acknowledgement)。 消费者在订阅队列时,可以指定 autoAck 参数, 根据这个参数设置, 消息确认机制分为以下两种:
• 自动确认: 当autoAck 等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除, 而不管消费者是否真正地消费到了这些消息. 自动确认模式适合对于消息可靠性要求不高的场景.
• 手动确认: 当autoAck等于false时,RabbitMQ会等待消费者显式地调用Basic.Ack命令, 回复确认信号后才从内存(或者磁盘) 中移去消息. 这种模式适合对消息可靠性要求比较高的场景
代码示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1,true,consumer);
当autoAck参数置为false, 对于RabbitMQ服务端额而言, 队列中的消息分成了两个部分: ⼀是等待投递给消费者的消息. ⼆是已经投递给消费者, 但是还没有收到消费者确认信号的消息. 如果RabbitMQ一直没有收到消费者的确认信号, 并且消费此消息的消费者已经断开连接, 则RabbitMQ会安排该消息重新进入队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者
从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的消息数:
Ready: 等待投递给消费者的消息数
Unacked: 已经投递给消费者, 但是未收到消费者确认信号的消息数
1.2、手动确认方法
消费者在收到消息之后, 可以选择确认, 也可以选择直接拒绝或者跳过, RabbitMQ也提供了不同的确认应答的方式, 消费者客户端可以调⽤与其对应的channel的相关⽅法, 共有以下三种:
1. 肯定确认: Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ 已知道该消息并且成功的处理消息. 可以将其丢弃了.
参数说明:
- deliveryTag: 消息的唯⼀标识,它是⼀个单调递增的64 位的⻓整型值. deliveryTag 是每个通道(Channel)独⽴维护的, 所以在每个通道上都是唯⼀的. 当消费者确认(ack)⼀条消息时, 必须使⽤对应 的通道上进⾏确认.
2 ) multiple: 是否批量确认. 在某些情况下, 为了减少⽹络流量, 可以对⼀系列连续的 deliveryTag 进行批量确认. 值为 true 则会⼀次性 ack所有⼩于或等于指定 deliveryTag 的消息. 值为false, 则只确认当 前指定deliveryTag的消息
若deliverTag = 8时,如果此时multiple 为true ,那么此时8以前的消息都会被确认
反之,如果multiple为false ,那么此时只确认消息8。
deliveryTag 是RabbitMQ中消息确认机制的⼀个重要组成部分, 它确保了消息传递的可靠性和顺序性。
2. 否定确认: Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ在2.0.0版本开始引⼊了 Basic.Reject 这个命令, 消费者客⼾端可以调用channel.basicReject方法来告诉RabbitMQ拒绝这个消息.
参数说明:
- deliveryTag: 参考channel.basicAck
- requeue: 表⽰拒绝后, 这条消息如何处理. 如果requeue 参数设置为true, 则RabbitMQ会重新将这条 消息存⼊队列,以便可以发送给下⼀个订阅的消费者.如果requeue参数设置为false, 则RabbitMQ会把 消息从队列中移除, ⽽不会把它发送给新的消费者
3. 否定确认: Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令. 消费者客户端可以调用channel.basicNack方法来实现.
参数介绍参考上面两个方法.
multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息.
代码示例:
Spring-AMQP 对消息确认机制提供了三种策略
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
- AcknowledgeMode.NONE
◦ 这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会自动确认 消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失. - AcknowledgeMode.AUTO(默认)
◦ 这种模式下, 消费者在消息处理成功时会自动确认消息, 但如果处理过程中抛出了异常, 则不会确认消息. - AcknowledgeMode.MANUAL
◦ ⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可用时重新投递该消息, 这 种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, 而是可以被重新处理.
主要流程:
- 配置确认机制(自动确认/手动机制)
- 生产者发送消息
- 消费端逻辑
- 测试
1.2.1、AcknowledgeMode.NONE
- 配置确认机制
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: none
- 发送消息:
队列,交换机配置
public class Constant {
public static final String ACK_EXCHANGE_NAME = "ack_exchange";
public static final String ACK_QUEUE = "ack_queue";
}
/*
以下为消费端⼿动应答代码⽰例配置
*/
@Bean("ackExchange")
public Exchange ackExchange() {
return
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
;
}
//2. 队列
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,
@Qualifier("ackQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
}
通过接口发送消息:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProductController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack",
"consumer ack test...");
return "发送成功!";
}
}
- 写消费端逻辑
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {
//指定监听队列的名称
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws
Exception {
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
//模拟处理失败
//int num = 3 / 0;
System.out.println("处理完成");
}
这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0
- 运行程序
调用接口, 发送消息 可以看到队列中有⼀条消息, unacked的为0(需要先把消费者注掉)
开启消费者, 控制台输出:
接收到消息: consumer ack test..., deliveryTag: 1
2 2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
//....
管理界面:
1.2.2、AcknowledgeMode.AUTO
1、配置确认机制
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: auto
- 重新运行程序
调用接口, 发送消息 可以看到队列中有⼀条消息, unacked的为0(需要先把消费者注掉)
开启消费者, 控制台不断输出错误信息:
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
从日志上可以看出, 当消费者出现异常时, RabbitMQ会不断的重发. 由于异常,多次重试还是失败,消息没被确认,也无法nack,就⼀直是unacked状态,导致消息积压
1.3.3、AcknowledgeMode.MANUAL
- 配置确认机制
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: manual
- 消费端⼿动确认逻辑
import com.bite.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {
//指定监听队列的名称
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws
Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
//2. 处理业务逻辑
System.out.println("处理业务逻辑");
//⼿动设置⼀个异常, 来测试异常拒绝机制
// int num = 3/0;
//3. ⼿动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//4. 异常了就拒绝签收
//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
则直接丢弃
channel.basicNack(deliveryTag, true, true);
}
}
}
这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0
控制台输出:
接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑.....
管理界面:
- 异常时拒绝签收
@Component
public class AckQueueListener {
//指定监听队列的名称
@RabbitListener(queues = Constant.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws
Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
//2. 处理业务逻辑
System.out.println("处理业务逻辑");
//⼿动设置⼀个异常, 来测试异常拒绝机制
int num = 3/0;
//3. ⼿动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//4. 异常了就拒绝签收
//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false,
则直接丢弃
channel.basicNack(deliveryTag, true, true);
}
}
}
运行结果: 消费异常时不断重试, deliveryTag 从1递增控制台日志:
接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 2
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 3
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 4
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 5
处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 6
处理业务逻辑.....
管理界⾯上unacked也变成了1
Unacked的状态变化很快, 为方便观察, 消费消息前增加⼀下休眠时间Thread.sleep(10000);
二、持久性
在前⾯讲了消费端处理消息时, 消息如何不丢失, 但是如何保证当RabbitMQ服务停掉以后, 生产者发送的消息不丢失呢. 默认情况下, RabbitMQ 退出或者由于某种原因崩溃时, 会忽视队列和消息, 除非告知他不要这么做.
RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化.
2.1、 交换机持久化
交换器的持久化是通过在声明交换机时是将durable参数置为true实现的.相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建立交换机, 交换 机会自动建立,相当于⼀直存在. 如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换机元数据会丢失, 对⼀个⻓期 使用的交换器来说,建议将其置为持久化的.
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
2.2、队列持久化
队列的持久化是通过在声明队列时将 durable 参数置为 true实现的. 如果队列不设置持久化, 那么在RabbitMQ服务重启之后,该队列就会被删掉, 此时数据也会丢失. (队列没 有了, 消息也无处可存了)。 队列的持久化能保证该队列本身的元数据不会因异常情况而丢失, 但是并不能保证内部所存储的消息不会丢失. 要确保消息不会丢失, 需要将消息设置为持久化. 咱们前面用的创建队列的方式都是持久化的
QueueBuilder.durable(Constant.ACK_QUEUE).build();
点进去看源码会发现,该⽅法默认durable 是true
public static QueueBuilder durable(String name) {
return (new QueueBuilder(name)).setDurable();
}
private QueueBuilder setDurable() {
this.durable = true;
return this;
}
通过下⾯代码,可以创建非持久化的队列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3、消息持久化
消息实现持久化, 需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2,也就是MessageDeliveryMode.PERSISTENT
NON_PERSISTENT,//⾮持久化
PERSISTENT;//持久化
设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依旧存在. 如果只设置队列持久化, 重启之后消息会丢失. 如果只设置消息的持久化, 重启之后队列消失, 继而消息也丢失. 所以单单设置消息 持久化而不设置队列的持久化显得毫无意义
//⾮持久化信息
channel.basicPublish(“”,QUEUE_NAME,null,msg.getBytes());
//持久化信息
channel.basicPublish(“”,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2, //deliveryMode
0, null, null, null,
null, null, null, null,
null, null
);
如果使⽤RabbitTemplate 发送持久化消息, 代码如下:
// 要发送的消息内容
String message = "This is a persistent message";
// 创建⼀个Message对象,设置为持久化
Message messageObject = new Message(message.getBytes(), newMessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使⽤RabbitTemplate发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
注意:RabbitMQ默认情况下会将消息视为持久化的,除⾮队列被声明为非持久化,或者消息在发送时被标记为非持久化
将所有的消息都设置为持久化, 会严重影响RabbitMQ的性能(随机). 写入磁盘的速度比写入内 存的速度慢得不只一点点. 对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量. 在选择是否要将消息持久化时, 需要在可靠性和吐吞量之间做⼀个权衡
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗? 答案是否定的
- 从消费者来说, 如果在订阅消费队列时将autoAck参数设置为true, 那么当消费者接收到相关消息之 后, 还没来得及处理就宕机了, 这样也算数据居丢失. 这种情况很好解决, 将autoAck参数设置为false, 并进行手动确认。
2 . 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存入磁盘 中.RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法)的处理, 可能仅仅保存到操 作系统缓存之中而不是物理磁盘之中. 如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异 常情况, 消息保存还没来得及落盘, 那么这些消息将会丢失
这个问题怎么解决呢?
可以在发送端引入事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储至RabbitMQ中,即"发送方确认"
三、发送方确认
在使用 RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失, 但是还 有⼀个问题, 当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到 达服务器之前已经丢失(比如RabbitMQ重启, 那么RabbitMQ重启期间生产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决⽅案:
a. 通过事务机制实现
b. 通过发送方确认(publisher confirm) 机制实现
事务机制⽐较消耗性能, 在实际工作中使用也不多, 咱们主要介绍confirm机制来实现发送方的确认.RabbitMQ为我们提供了两个方式来控制消息的可靠性投递
- confirm确认模式
- return退回模式
3.1、confirm确认模式
Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, 无论消息是否到达Exchange, 这个监听都会被执行, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false.
步骤如下:
- 配置RabbitMQ
- 设置确认回调逻辑并发送消息
接下来看实现步骤
- 配置RabbitMQ
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/nums
listener:
simple:
acknowledge-mode: manual #消息接收确认
publisher-confirm-type: correlated #消息发送确认
- 设置确认回调逻辑并发送消息
⽆论消息确认成功还是失败, 都会调⽤ConfirmCallback的confirm方法. 如果消息成功发送到Broker,ack为true.如果消息发送失败, ack为false, 并且cause提供失败的原因
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory
connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
System.out.printf("");
if (ack) {
System.out.printf("消息接收成功, id:%s \n",
correlationData.getId());
} else {
System.out.printf("消息接收失败, id:%s, cause: %s",
correlationData.getId(), cause);
}
}
});
return rabbitTemplate;
}
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() throws InterruptedException
CorrelationData correlationData1 = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,
"confirm","confirm test...",correlationData1);
return"确认成功";
}
⽅法说明:
public interface ConfirmCallback {
/**
* 确认回调
* @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消 息
* @param ack: 交换机是否收到消息, 收到为true, 未收到为false
* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调 试和错误处理.
* 成功时, cause为null
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack,
@Nullable String cause);
}
RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别
在RabbitMQ中, ConfirmListener和ConfirmCallback都是用来处理消息确认的机制, 但它们属于不同的客户端库, 并且使用的场景和方式有所不同.
1 . ConfirmListener 是 RabbitMQ Java Client 库中的接口. 这个库是 RabbitMQ 官方提供的一个直接与RabbitMQ服务器交互的客户端库. ConfirmListener 接口提供了两个方法: handleAck 和handleNack, 用于处理消息确认和否定确认的事件.
2 . ConfirmCallback 是 Spring AMQP 框架中的⼀个接口. 专门为Spring环境设计. 用于简化与RabbitMQ交互的过程. 它只包含⼀个 confirm方法,⽤于处理消息确认的回调.
在 Spring Boot 应用中, 通常会使用 ConfirmCallback, 因为它与 Spring 框架的其他部分更加整合, 可 以利用 Spring 的配置和依赖注入功能. 而在使用 RabbitMQ Java Client 库时, 则可能会直接实现ConfirmListener 接口, 更直接的RabbitMQChannel交互
3.2、return退回模式
消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回 给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调方法, 对消息进行处理.
步骤如下:
- 配置RabbitMQ
- 设置返回回调逻辑并发送消息
1 . 配置RabbitMQ
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/nums
listener:
simple:
acknowledge-mode: manual #消息接收确认
publisher-confirm-type: correlated #消息发送确认
- 设置返回回调逻辑并发送消息
这里是引用消息无法被路由到任何队列, 它将返回给发送者,这时setReturnCallback设置的回调将被触发
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory
connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.printf("消息被退回: %s", returned);
}
});
return rabbitTemplate;
}
@RequestMapping("/msgReturn")
public String msgReturn() {
CorrelationData correlationData = new CorrelationData("2");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,
"confirm11", "message return test...", correlationData);
return "消息发送成功";
}
使⽤RabbitTemplate的setMandatory方法设置消息的mandatory属性为true(默认为false). 这个属性的作用是告诉RabbitMQ, 如果⼀条消息无法被任何队列消费, RabbitMQ应该将消息返回给发送者, 此 时 ReturnCallback 就会被触发.
回调函数中有⼀个参数: ReturnedMessage, 包含以下属性:
public class ReturnedMessage {
//返回的消息对象,包含了消息体和消息属性
private final Message message;
//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同 的含义.
private final int replyCode;
//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.
private final String replyText;
//消息被发送到的交换机名称
private final String exchange;
//消息的路由键,即发送消息时指定的键
private final String routingKey;
}
3.3、常见面试题
如何保证RabbitMQ消息的可靠传输?
先放⼀张RabbitMQ消息传递图:
从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:
- ⽣产者将消息发送到 RabbitMQ失败
a. 可能原因: 网络问题等
b. 解决办法: 参考本章节[发送方确认-confirm确认模式] - 消息在交换机中无法路由到指定队列:
a. 可能原因: 代码或者配置层面错误, 导致消息路由失败
b. 解决办法: 参考本章节[发送方确认-return模式] - 消息队列自身数据丢失
a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失.
b. 解决办法: 参考本章节[持久性]. 开启 RabbitMQ持久化, 就是消息写入之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会自动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的方式提高可靠性) - 消费者异常, 导致消息丢失
a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
b. 解决办法: RabbitMQ 提供了 消费者应答机制来使 RabbitMQ 能够感知 到消费者是否消费成功消息. 默认情况下消费者应答机制是自动应答的, 可以开启手动确认, 当消费者确认消费成功后才会删除消息, 从而避免消息丢失. 除此之外, 也可以配置重试机制(下篇文章会为大家介绍), 当消息消费异常时, 通过消息重试确保消息的可靠性
结语
本篇文章主要介绍了RAbbitMQ中的部分高级特性,主要从消息确认,持久化,发送方确认三个方面展开
以上就是本文全部内容,感谢各位能够看到最后,如有问题,欢迎各位大佬在评论区指正,希望大家可以有所收获!创作不易,希望大家多多支持!
最后,大家再见!祝好!我们下期见!