目录
10.6:重试2次(默认16次)依旧消费不了的消息进入死信队列
1:案例介绍
mq的架构
我们使用springboot整合RocketMq,学习使用案例,在这里我们的项目架构是一个生产者、两个消费者。
2:开始整合
整合须知:
springboot版本:3.3.2
JDK版本:17
RocketMq版本:5.3.1
2.1:生产者整合
1:导入依赖
<!-- RocketMQ Spring Boot 启动器 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2:配置yml
3:生产者发送不同类型的消息
4:生产者发送普通消息
4.1:什么是普通消息
普通消息为 Apache RocketMQ 中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息。本文为您介绍普通消息的应用场景、功能原理、使用方法和使用建议。
普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。
4.2:发送者代码如下
/**
* 普通消息发送
* 特点:
* 1:无返回值
* 2:接收方消息无序
* 3:同一个订单的下单、支付、积分 会被不同的消费者消费到消息
*/
@RequestMapping(value = "putong")
public String send_普通消息1(){
// 普通消息的主题 生产者组是my-producer-group0
String topic = "PuTong11_Topic";
for (int i = 0; i < 5; i++) {
//发送消息无返回值s
System.out.println("发送消息:"+i);
//convertAndSend 调用了同步发行消息 syncSend
rocketMQTemplate.convertAndSend(topic,"顺序消息发送_下单:"+i);
rocketMQTemplate.convertAndSend(topic,"普通消息发送_支付:"+i);
rocketMQTemplate.convertAndSend(topic,"普通消息发送_积分:"+i);
}
return "普通消息发送";
}
可以知道一共发送了15条消息到PuTong11_Topic,根据前边的主题中默认有4个队列,我们可以观察到这15条消息被随机的存放到4个对列中
4.3:发送数据对列截图
4.4:消费者代码如下
消费者一共两个分别是9092,9091。代码一样这里只会截图一个
#------rocketMQ消费者配置
#NameServer 地址,集群使用';'隔开
rocketmq.name-server=localhost:9876
代码如下,他们处于同一个consumerGroup
/**
* 设置消费者监听
* topic 主题名字
* consumerGroup 消费者分组
*/
@Service
@RocketMQMessageListener(topic = "PuTong11_Topic",consumerGroup = "my-consumer-PuTong-group1")
public class 普通消息接受 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者92获取到的消息:"+message);
}
}
@Service
@RocketMQMessageListener(topic = "PuTong11_Topic",consumerGroup = "my-consumer-PuTong-group1")
public class 普通消息接受 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者93获取到的消息:"+message);
}
}
4.6:运行结果截图
4.7:问题总结
在以上的普通消息中,我们知道了生产者发送消息到主题的4个队列,消费者消费队列的消息。
1:消费者消息重试:
消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端(MQ服务器)会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理(推送)。
2:消息提交
消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
5:顺序消息(重点)
5.1:什么是顺序消息
我们知道,我们发送的消息,存储到了mq的topic的队列里边,默认的topic是4个队列
根据消息的key将消息轮训的插入队列中,队列的消息能保证FIFO,但是我们并不知道实际具体那条消息在那个队列,无法保证比如订单号是01的所有操作在同一个队列。如下图
消费者再消费的时候,无法保证业务的一致性。所有才有了顺序发送,我们传入指定的订单号,只要订单号相同,根据订单号hash,就一定会存到相同的队列。
5.2:发送者代码如下
/**
* 顺序同步 消息发送
* 特点:
* 1:有返回值 SendResult [sendStatus=SEND_OK, msgId
* 2:接收方消息有序,原理是根据发送的id进行hash,将消息发送到指定指定的队列
* 3:同一个订单的下单、支付、积分 被同一个消费者消费
* 但是下单支付积分在同一个消费者中不一定顺序
* 4:消息是同步发送的,每次发送一条消息,等待返回值ok,才会发送下一条 sendStatus=SEND_OK
*/
@RequestMapping(value = "shunxu")
public String send_顺序消息(){
//PuTong11_Topic ShunXu11_Topic
String topic = "ShunXu11_Topic";
for (int i = 0; i < 5; i++) {
//根据hashKey 进行hash到相同的队列
SendResult sendResult1 = rocketMQTemplate.syncSendOrderly(topic, "顺序消息发送_下单:" + i, String.valueOf(i));
System.out.println("顺序消息_下单"+i+"返回值:"+sendResult1);
SendResult sendResult2 = rocketMQTemplate.syncSendOrderly(topic,"顺序消息发送_支付:"+i,String.valueOf(i));
System.out.println("顺序消息_支付"+i+"返回值:"+sendResult2);
SendResult sendResult3 = rocketMQTemplate.syncSendOrderly(topic,"顺序消息发送_积分:"+i,String.valueOf(i));
System.out.println("顺序消息_积分"+i+"返回值:"+sendResult3);
}
return "顺序消息发送";
}
5.3:接受者代码
/**
* 设置消费者监听
* topic 主题名字
* consumerGroup 消费者分组
*/
@Component
@RocketMQMessageListener(
topic = "ShunXu11_Topic",
consumerGroup = "my-consumer-ShunXu-group2",
consumeMode = ConsumeMode.ORDERLY)
public class 顺序消息接受 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者92获取到的顺序消息:"+message);
}
}
5.4:运行结果截图
6:同步消息(重点)
6.1:什么是同步消息
同步发送:每次发送一条,等待mq的返回值成功,然后发送下一条,适合可靠的消息传递,适用范围最广泛。如重要的通知消息、短消息通知等。
6.2:发送者代码如下
/**
* 同步消息发送(syncSend)
* 特点:
* 1:有返回值 SendResult [sendStatus=SEND_OK, msgId
* 2:消息是同步发送的,每次发送一条消息,等待返回值ok,才会发送下一条 sendStatus=SEND_OK
*
*/
@RequestMapping(value = "tongbu")
public String send_同步消息(){
String topic = "TongBu11_Topic";
for (int i = 0; i < 5; i++) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, "同步消息:" + i);
System.out.println("同步消息发送结果:"+sendResult);
}
return "同步消息发送";
}
6.3:接受者代码如下
@Component
@RocketMQMessageListener(topic = "TongBu11_Topic",consumerGroup = "my-consumer-TongBu-group3")
public class 同步消息接受 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者93获取到的同步消息:"+message);
}
}
6.4:运行结果如下:
7:延时消息
7.1:什么是延时消息
定时消息发送:将消息发送到mq,mq根据定时将消息发送给消费者。切记不要搞反了,消息是发送到mq之后,定时发送个消费者。
7.2:发送者代码如下
/**
* 延时消息发送
* 特点:
* 1:有返回值,只管发,亲妈都不管,有会回调函数,成功或者异常
* 2:创建了一个线程 单独发送数据 new Runnable
* 3: 消息无序
* RocketMQ 异步消息的方法形如 syncXx()。
*
*/
@RequestMapping(value = "yanshi")
public String send_延时消息(Integer a){
String topic = "YanShi11_Topic";
// delayTimeLevel代表延迟级别
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
for (int i = a; i < a+5; i++) {
Message<String> message = MessageBuilder
.withPayload("延时消息发送" + i)
//.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL,3)
.build();
SendResult sendResult = rocketMQTemplate.syncSend(topic,
message,
3000,
3);//这里的延时消息设置生效 3级就是10S
System.out.println("延时消息发送"+i+":"+new Date()+sendResult);
}
return "延时消息发送";
}
7.3:接受者代码
/**
* 设置消费者监听
* topic 主题名字
* consumerGroup 消费者分组
*/
@Component
@RocketMQMessageListener(topic = "YanShi11_Topic",consumerGroup = "my-consumer-YanShi-group3")
public class 延时消息接受 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者92获取到的延时消息:"+message+":"+new Date());
}
}
7.4:运行结果如下
8:单向消息
8.1:什么是单向消息
单向发送:生产者向mq发送消息,不等待mq的返回值,是否消息接收成功或者失败。生产者只管发送,适用于日志等可靠性不高的场景。发送速度很快,微秒级的速度
8.2:发送者代码如下
/**
* 单项消息发送
* 特点:
* 1:无返回值,不管成功不成功,一直发送消息,速度很快
* 2:生产者向mq发送消息,不等待mq的返回值,是否消息接收成功或者失败。
* 生产者只管发送,适用于日志等可靠性不高的场景。发送速度很快,微秒级的速度
*
*/
@RequestMapping(value = "danxiang")
public String send_单向消息(){
String topic = "DanXiang11_Topic";
for (int i = 0; i < 5; i++) {
rocketMQTemplate.sendOneWay(topic,"单项消息无序:"+i);
System.out.println("单项消息发送"+i+":"+new Date());
}
return "单项消息发送";
}
8.3:接受者代码
/**
* 设置消费者监听
* topic 主题名字
* consumerGroup 消费者分组
*/
@Component
@RocketMQMessageListener(topic = "DanXiang11_Topic",consumerGroup = "my-consumer-DanXiang")
public class 单项消息接受 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者93获取到的单项消息:"+message);
}
}
8.4:运行结果如下
9:事务消息
9.1:什么是事务消息
一定要先创建事务性的topic
事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。
事务消息就是两阶段提交
9.2:发送者代码如下
/**
* 事务消息发送
*
*
*/
@RequestMapping(value = "shiwu")
public String send_事务消息(String name){
//发送事务消息 前端接收数据
String topic = "ShiWu11_Topic";
Message<String> message=MessageBuilder
.withPayload(name)
.setHeader("key","key1")
.build();
//将参数传递给事务监听器的 执行本地事务方法(executeLocalTransaction)
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic, message, "事务参数1");
System.out.println("事务消息返回值:"+transactionSendResult);
return "事务消息发送";
}
@Component
@RocketMQTransactionListener
public class MyTranscationRocketListener implements RocketMQLocalTransactionListener {
@Resource
StudentsService service;
// 事务消息共有三种状态,提交状态、回滚状态、中间状态:
// RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
// RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
// RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。
/**
* 执行本地事务(在发送消息成功时执行)
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("1执行事务方法msg:" + msg + ":" + new Date());
System.out.println("1执行事务方法arg:" + arg + ":" + new Date());
byte[] payload = (byte[]) msg.getPayload();
String name = new String(payload);
System.out.println("1执行事务方法Payload:" + name);
try {
int i = service.saveStudent(name);
if(i==1){
System.out.println("1执行事务提交成功" + ":" + new Date());
return RocketMQLocalTransactionState.COMMIT;//事务方法没有异常 提交消息
}
// int a=10/0;
} catch (Exception e) {
e.printStackTrace();
System.out.println("1执行事务回滚成功" + ":" + new Date());
return RocketMQLocalTransactionState.ROLLBACK;//事务方法有异常 回滚消息
}
//模拟 broke中断,没有收到消息状态回执,设置UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 如果生产者宕机,mq没有收到本地事务的提交或者回滚,这里就会检查本地事务的状态
*
* 回查间隔时间:系统默认每隔60秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。
* 第一次消息回查最快
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("2检查事务方法:" + msg + ":" + new Date());
Object o = msg.getHeaders().get("key");
//回查方法,根据name查询数据库,存在数据,就是commit
if ("key1".equals(o)) {
System.out.println("2检查事务方法提交" + ":" + new Date());
return RocketMQLocalTransactionState.COMMIT;
} else {
System.out.println("2检查事务方法回滚" + ":" + new Date());
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
//数据插入数据库代码
@Transactional
@Override
public int saveStudent(String name) {
//1:数据库操作
Date date = new Date();
Students student1 = new Students();
student1.setSname(name+"11");
student1.setSdatetime(date);
int a=studentsMapper.insert(student1);
//int a=10/0;
return a;
}
9.3:接受者代码
/**
* 设置消费者监听
* topic 主题名字
* consumerGroup 消费者分组
*/
@Component
@RocketMQMessageListener(topic = "ShiWu11_Topic",consumerGroup = "my-consumer-ShiWu")
public class 事务消息接受 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者92获取到的事务消息:"+message);
}
}
9.4:运行结果如下
10:消息重试和死信
10.1:消息重复发送
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化
此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败
如果此时生产者意识到消息发送失败并尝试再次发送消息
消费者后续会收到两条内容相同并且 Message ID 也相同的消息
#rocketMQ配置
rocketmq:
name-server: localhost:9876 # NameServer 地址,集群使用';'隔开
producer:
group: my-producer-group0
#发送消息超时 默认3秒
send-message-timeout: 3000
#发送异步消息失败的重试次数
retry-times-when-send-async-failed: 2
#发送同步消息失败重试次数
retry-times-when-send-failed: 2
#消息最大 默认4M
max-message-size: 4096
(这个无解,可以设置为0次)
10.2:消费者重复消费
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断,
为了保证消息至少被消费一次。mq的服务端将在网络恢复后再次尝试投递之前已被处理过的消息消费者后续会收到两条内容相同并且 Message ID 也相同的消息
默认重试16次,还不行 进入死信队列
- 默认的重试间隔:
10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- 默认
多线程
模式下,重试16
次,设置超过 16 次的重试时间间隔均为每次 2 小时 - 某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递
- 在单线程的
顺序
模式下,重试Integer.MAX_VALUE次,间隔1秒
1:当生产者重复发送时候为了避免重复消费。
解决方法:消费者可以根据唯一标识,做业务过滤,比如存在修改,不存在添加
2:当消费者的消费了没有返回给mq处理结果,为避免重复消费
10.3:发送者代码如下
@RequestMapping(value = "chongshi")
public String send_消息重试(){
//2:发送重试消息
String topic = "ChongShi11_Topic";
Date date = new Date();
rocketMQTemplate.convertAndSend(topic,"重试消息:"+date);
System.out.println("发送重试消息:"+date);
return "重试消息发送:"+date;
}
#rocketMQ配置
rocketmq:
name-server: localhost:9876 # NameServer 地址,集群使用';'隔开
producer:
group: my-producer-group0
#发送消息超时 默认3秒
send-message-timeout: 3000
#发送异步消息失败的重试次数
retry-times-when-send-async-failed: 2
#发送同步消息失败重试次数
retry-times-when-send-failed: 2
#消息最大 默认4M
max-message-size: 4096
10.4:接收者代码如下
92,93服务相同
/**
* 设置消费者监听
* topic 主题名字
* consumerGroup 消费者分组
* 实现RocketMQPushConsumerLifecycleListener接口,从prepareStart方法中获取消费者并设置它
* 消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效
*/
@Component
@RocketMQMessageListener(topic = "ChongShi11_Topic",consumerGroup = "my-consumer-ChongShi")
public class 重试消息接受 implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String s=new String(body);
System.out.println("消费者92获取到的重试消息:"+s+":当前时间:"+new Date());
//模拟消费者异常
throw new RuntimeException("测试重试次数");
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设置重试次数是2
consumer.setMaxReconsumeTimes(2);
// 实例名称-控制面板可以看到
consumer.setInstanceName("消费者92");
}
}
10.5:重试案例截图分析
10.6:重试2次(默认16次)依旧消费不了的消息进入死信队列
消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列,一个单独的topic
死信队列是死信Topic下分区数唯一的单独队列
死信Topic名称为%DLQ%原消费者组名
,死信队列的消息将不会再被消费
10.7:死信队列单独处理(监听死信队列处理消息)
//方案一
@Component
@RocketMQMessageListener(
topic = "%DLQ%my-consumer-ChongShi",
consumerGroup = "retry-dead-consumer-group"
)
public class 死信队列人工处理 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理消息 签收了
System.out.println("记录到特别的位置 文件 mysql 通知人工处理:"+message);
}
}
//方案二
@Component
@RocketMQMessageListener(
topic = "%DLQ%my-consumer-ChongShi",
consumerGroup = "retry-dead-consumer-group"
)
public class RetryDeadConsumer2 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
// 业务处理
try {
int i = 1 / 0;
} catch (Exception e) {
// 重试
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 不要重试了
System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
}else {
throw new RuntimeException("异常");
}
}
}
}
截图如下:
11:消息堆积
消息堆积顾名思义就是消息队列中堆积了大量未被处理的消息,主要发生在高并发的场景下,生产者发送消息的速率远大于消费者组消息的速度。在物联网的AIOT场景中比较常见。
在RocketMQ的Console上可以查看某个Topic上消息堆积的情况。