一、手动确认消息模式
模式 | 成功消费 | 消费失败 | 连接中断 | 性能 |
---|---|---|---|---|
NONE | 自动删除 | 消息丢失 | 消息丢失 | 最高 |
AUTO | 自动删除 | 重试/死信 | 消息重回队列 | 中等 |
MANUAL | 手动删除 | 自定义处理 | 消息重回队列 | 最低 |
手动确认: 消息到达消费者,不会自动确认,会等待消费者调用Basic.Ack命令,才会从内存/磁盘 移除这条消息。
自动确认: 消息只要到达消费者就会自动确认,不会考虑消费者是否正确消费了这些消息,直接从 内存/磁盘 中删除消息;
ack:成功处理消息,RabbitMO从队列中删除该消息
nack:消息处理失败,RabbitMO需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息
手动确认消息basicAck、basicNack、basicReject的使用
1、basicAck
// 确认消息处理成功,basicAck 是 RabbitMQ 的手动确认机制核心方法
// 第一个参数 tag:消息的交付标签,唯一标识这条消息 第二个参数 false:表示不批量确认,仅确认当前这条消息
channel.basicAck(deliveryTag,false);
2、basicNack
//拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。
//1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。
//2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息
//3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,
//而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。
channel.basicNack(tag, false, true);
3、basicReject
// 拒绝消息,
// 1、第一个参数是消息的交付标签,
// 2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)
channel.basicReject(tag, false);
二、准备基本环境
1、pom.xml引入的java包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${springboot-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.57</version>
</dependency>
</dependencies>
2、yaml配置文件
# 8004是zookeeper服务器的支付服务提供者端口号
server:
port: 8004
spring:
application:
name: cloud-mq
rabbitmq:
addresses: 192.168.96.133
port: 5672
username: guest
password: guest
virtual-host: /
#消费者配置
listener:
#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效
simple:
#开启ack 手动确认消息是否被消费成功
acknowledge-mode: manual
retry:
enabled: true
# 消费失败后,继续消费,然后最多消费5次就不再消费。
max-attempts: 5
# 消费失败后 ,重试初始间隔时间 2秒
initial-interval: 2000
# 重试最大间隔时间5秒
max-interval: 5000
# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
multiplier: 2
direct:
#开启ack 手动确认消息是否被消费成功
acknowledge-mode: manual
#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效
retry:
enabled: true
# 消费失败后,继续消费,然后最多消费3次就不再消费。
max-attempts: 3
# 消费失败后 ,重试初始间隔时间 3秒
initial-interval: 3000
# 重试最大间隔时间
max-interval: 7000
# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
multiplier: 2
# 生产者配置
template:
retry:
# 开启消息发送失败重试机制
enabled: true
# 生产者 true-开启消息抵达队列的确认
publisher-returns: false
#simple 配置用于设置 RabbitMQ 消息生产者的消息确认类型为“简单确认”。这意味着当消息被发送到 RabbitMQ 之后,只有在消息成功投递到队列中后,RabbitMQ 才会向生产者发送一个确认(ack)通知。如果消息未能成功投递,则不会收到确认。
#该配置通常与 publisher-returns: true 一起使用以启用消息返回机制,但在此配置中 publisher-returns 被设置为 false,表示不启用消息返回功能
publisher-confirm-type: simple
3、主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author 10564
*/
@SpringBootApplication
public class ApplicationRabbitmq {
public static void main(String[] args) {
SpringApplication.run(ApplicationRabbitmq.class, args);
}
}
三、手动确认消息(以普通消息为例)
1、定义消息队列Queue名称
package org.xwb.springcloud.constant;
/**
* @author 10564
*/
public class MqConstant {
/**
* 手动确认消息
*/
public static final String ACK_MQ_NAME = "ackQueue";
}
2、配置类Configuration
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;
/**
* 创建RabbitMQ的配置类
* @author 10564
*/
@Configuration
public class RabbitmqAckConfig {
/**
* 简单消息队列
*/
@Bean
public Queue ackQueue() {
//名字(name):队列的名字,用来区分不同的队列。
//是否持久化(durable):如果设置为 true,表示即使服务器重启了,这个队列依然存在。
//是否独占(exclusive):如果设置为 true,表示只有创建它的连接才能使用这个队列。
//是否自动删除(autoDelete):如果设置为 true,表示当不再有消费者使用这个队列时,服务器会自动删除它。
return new Queue(MqConstant.ACK_MQ_NAME,true,false,false);
}
}
3、生产者Producer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;
import javax.annotation.Resource;
/**
* @author 10564
*/
@Component
public class AckProducer {
private static final Logger log = LoggerFactory.getLogger(AckProducer.class);
@Resource
private RabbitTemplate rabbitTemplate;
public void senderAckMessage(String message) {
log.info("\nack生产者发送消息:{}\n", message);
rabbitTemplate.convertAndSend(MqConstant.ACK_MQ_NAME, message);
}
}
4、消费者Consumer
package org.xwb.springcloud.messagetype.ack;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;
import java.util.Date;
/**
* @author 10564
*/
@Component
public class AckConsumer {
private static final Logger log = LoggerFactory.getLogger(AckConsumer.class);
@RabbitListener(queues = MqConstant.ACK_MQ_NAME)
public void receiveAckQueueMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
log.info("\nack消费者接收消息:{},tag:{} \n", message, tag);
if("basicAck".equals( message)){
//todo 确认消息处理成功,
// 第一个参数 tag:消息的交付标签,唯一标识这条消息
// 第二个参数 false:表示不批量确认,仅确认当前这条消息
log.info("\n手动确认 处理成功 basicAck :{} \n", new Date());
channel.basicAck(tag, false);
}else if("basicNack".equals( message)){
//todo 拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。
//todo 1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。
//todo 2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息
//todo 3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,
//todo 而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。
log.info("\n手动拒绝确认消息 basicNack :{} \n", new Date());
channel.basicNack(tag, false, false);
}else if("basicReject".equals( message)){
//todo 拒绝消息,
//todo 1、第一个参数是消息的交付标签,
//todo 2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)
log.info("\n手动确认 拒绝消息 basicReject :{} \n", new Date());
channel.basicReject(tag, false);
}else{
throw new RuntimeException("模拟消费失败");
}
} catch (Exception e) {
log.error("\n消费消息异常,抛出异常{} message:{}\n",tag, e.getMessage());
//todo 抛出异常,触发 spring retry
throw e;
}
}
}
5、测试Test
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.ack.AckProducer;
import javax.annotation.Resource;
/**
* @author 10564
*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {
@Resource
private AckProducer ackProducer;
@GetMapping("/ack")
public void ack(String message) {
ackProducer.senderAckMessage(message);
}
6、测试结果
### ack
GET http://localhost:8004/mq/ack?message=basicAck
2025-06-21 23:33:21.758 INFO 19824 --- [nio-8004-exec-1] o.x.s.messagetype.ack.AckProducer :
ack生产者发送消息:basicAck
2025-06-21 23:33:21.771 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
ack消费者接收消息:basicAck,tag:1
2025-06-21 23:33:21.772 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
手动确认 处理成功 basicAck :Sat Jun 21 23:33:21 CST 2025
### ack
GET http://localhost:8004/mq/ack?message=basicNack
2025-06-21 23:33:52.687 INFO 19824 --- [nio-8004-exec-2] o.x.s.messagetype.ack.AckProducer :
ack生产者发送消息:basicNack
2025-06-21 23:33:52.690 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
ack消费者接收消息:basicNack,tag:2
2025-06-21 23:33:52.690 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
手动拒绝确认消息 basicNack :Sat Jun 21 23:33:52 CST 2025
### ack
GET http://localhost:8004/mq/ack?message=basicReject
2025-06-21 23:34:14.653 INFO 19824 --- [nio-8004-exec-3] o.x.s.messagetype.ack.AckProducer :
ack生产者发送消息:basicReject
2025-06-21 23:34:14.656 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
ack消费者接收消息:basicReject,tag:3
2025-06-21 23:34:14.656 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
手动确认 拒绝消息 basicReject :Sat Jun 21 23:34:14 CST 2025