Sping AMQP 三种确认机制
NONE:自动确认,当队列将消息发送给消费者之后,不管消费者消费成功与否,都会自动确认该消息并将该消息移除
MANUAL:手动确认,需要消费者发送 ack(肯定确认,说明消息成功被消费掉) 或者 nack(否定确认,表示消息没有成功被消费掉)
AUTO:默认模式,当我们不进行设置的时候,Spring 会默认 rabbitmq 的确认机制为 NONE,当消费者在消息处理成功之后会自动确认消息,如果处理过程抛出了异常,则不会确认消息
SpringBoot 对应的配置信息:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 设置确认模式
通过 acknowledge-mode 这个参数设置确认机制的模式
前期准备
队列名称:
public class MQConstants {
//确认机制
public static final String AXK_QUEUE = "ACK_QUEUE";
}
队列声明:
@Configuration
public class MQConfig {
//声明队列
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(MQConstants.AXK_QUEUE).build();
}
}
生产者:
@RequestMapping("/proc")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("", MQConstants.AXK_QUEUE, "ack: " + i);
}
return "消息发送成功";
}
}
NONE(自动确认)
消费者:
@Component
@RabbitListener(queues = MQConstants.AXK_QUEUE)
public class ACKListener {
@RabbitHandler
public void handle(String messageContent, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("接收到的消息为:" + messageContent);
int n = 3 / 0;
}
}
这里的消费者我们设置了一个算术异常,并且没有进行 catch,便于我们观察
开启 NONE 模式
acknowledge-mode: none # 设置确认模式
从上面的信息可以看出,即使我们的消费者消费消息失败,由于我们采用自动确认机制,因此我们的队列消息全部被消费掉了
并且在自动确认模式下,我们不需要通过 channel 发送 ack 或者 nack ,这部分确认代码可以省略
AUTO (默认确认)
设置为 AUTO 模式
acknowledge-mode: auto # 设置确认模式
@Component
@RabbitListener(queues = MQConstants.AXK_QUEUE)
public class ACKListener {
@RabbitHandler
public void handle(String messageContent, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("接收到的消息为:" + messageContent);
int n = 3 / 0;
}
}
启动程序的时候会发生一直死循环消费消息,因为在 AUTO 模式下,如果我们的消息没有设置如果消费失败或者消费中抛出异常的处理方法,这里是默认将失败的消息重新放入队列中的,我们可以看到 这 10 条消息都没有被确认
MANUAL (手动确认)
设置手动确认:
acknowledge-mode: manual # 设置确认模式
确认消息的方法有三种:这些方法通过 channel 进行调用
basicAck: deliveryTag 消息的唯一标识,multiple (true 表示批量确认,false 表示不进行批量确认)
批量确认介绍:当确认 序列号为deliveryTag 的消息的时候,小于 deliveryTag 的消息也会被一同确认掉
basicNack: requeue 表示是否重新入队,也就是说消费失败的消息要不要重新入队,true 表示重新入队,false 表示不重新入队
- basicReject 这个方法不能设置批量否定确认
消费者:
@Component
@RabbitListener(queues = MQConstants.AXK_QUEUE)
public class ACKListener {
@RabbitHandler
public void handle(String messageContent, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到的消息为:" + messageContent);
int n = 3 / 0;
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
}
由于我们捕获到异常之后设置 requeue 为 false (不重新入队),所以我们不会重复消费,队列现在也为空,如果我们设置 true(重新入队),我们会在控制台看到消费者不断进行消费。
@Component
@RabbitListener(queues = MQConstants.AXK_QUEUE)
public class ACKListener {
@RabbitHandler
public void handle(String messageContent, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到的消息为:" + messageContent);
int n = 3 / 0;
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
channel.basicNack(deliveryTag, false, false);
}
}
}
如果我们没有捕获到异常:
停掉程序之后,我们依旧能看到消息还在队列中,也就是说如果消息没有被确认(肯定/否定确认)的话,消息是不会从队列中删除的。