一、mq的作用和使用场景
MQ的基本作用
MQ(Message Queue,消息队列)是一种应用程序对应用程序的通信方法,主要作用包括:
异步处理:解耦生产者和消费者,允许生产者发送消息后立即返回,消费者异步处理
应用解耦:降低系统间的直接依赖,通过消息进行间接通信
流量削峰:缓冲突发流量,避免系统被压垮
消息通信:实现系统间的可靠消息传递
最终一致性:支持分布式事务的最终一致性方案
主要使用场景
1. 异步处理
用户注册后发送邮件/短信:注册流程快速完成,通知类操作异步处理
日志收集:应用将日志发送到MQ,由专门服务异步处理
2. 应用解耦
电商订单系统:订单服务生成订单后通过MQ通知库存、物流、支付等系统
微服务架构:服务间通过MQ通信而非直接调用
3. 流量削峰
秒杀系统:将大量请求先放入MQ,按系统能力逐步处理
突发流量处理:应对促销活动等流量高峰
4. 日志处理
大数据分析:收集各系统日志到MQ,由大数据平台统一处理
实时监控:系统指标通过MQ传输到监控平台
5. 消息通信
聊天系统:用户消息通过MQ传递
通知系统:系统间的事件通知
适用场景总结表
场景 | 关键技术 | 优势 |
---|---|---|
应用解耦 | 消息队列 | 减少系统间直接依赖 |
异步处理 | 生产者-消费者模型 | 提升响应速度 |
流量削峰 | 队列积压+限速消费 | 保护后端系统 |
跨语言通信 | AMQP 多语言支持 | 统一通信协议 |
发布/订阅 | Exchange(fanout/topic) | 一对多消息广播 |
延迟队列 | TTL + 死信队列 | 实现定时任务 |
二、mq的优点
1. 解耦系统组件
生产者和消费者无需相互感知对方的存在
系统间通过消息通信而非直接调用,降低耦合度
新增消费者不会影响生产者代码
2. 异步处理提升性能
生产者发送消息后无需等待消费者处理完成
非关键路径操作可异步执行(如发送通知、记录日志)
显著减少系统响应时间,提高吞吐量
3. 流量削峰与过载保护
缓冲突发流量,避免系统被瞬间高峰压垮
消费者可按自身处理能力从队列获取消息
特别适合秒杀、促销等瞬时高并发场景
4. 提高系统可靠性
消息持久化确保重要数据不丢失
重试机制和死信队列处理失败消息
网络波动时仍能保证消息最终送达
5. 扩展性强
可轻松增加消费者实例提高处理能力
天然支持分布式系统架构
各组件可独立扩展(生产者、MQ本身、消费者)
6. 顺序保证
某些MQ(如Kafka)可保证消息顺序性
对需要严格顺序的业务场景非常重要
7. 最终一致性支持
实现分布式事务的最终一致性方案
通过消息驱动的方式同步系统状态
比强一致性方案性能更高
8. 灵活的通信模式
支持点对点、发布/订阅等多种模式
可实现广播、组播等不同消息分发方式
适应各种业务场景需求
9. 系统恢复能力
消费者宕机恢复后可从断点继续消费
避免数据丢失或重复处理
支持消息回溯重新消费
10. 平衡资源利用率
平滑系统负载,避免资源闲置或过载
提高整体资源使用效率
降低系统建设成本(无需按峰值配置资源)
三、mq的缺点
1. 系统复杂度增加
引入MQ后,系统架构变得更加复杂,需要额外维护MQ集群
需要处理消息的发送、接收、确认、重试等逻辑
增加了调试和问题排查的难度(如消息丢失、重复消费等)
2. 消息一致性问题
消息丢失:生产者发送失败、MQ宕机、消费者处理失败都可能导致消息丢失
消息重复:网络问题或消费者超时可能导致消息被重复消费(需业务层做幂等处理)
顺序问题:某些MQ(如Kafka)只能保证分区内有序,全局有序需要额外设计
3. 延迟问题
异步处理导致延迟:消息队列的消费通常是异步的,不适合实时性要求极高的场景(如支付交易)
堆积时延迟加剧:如果消费者处理速度跟不上,消息堆积会导致延迟越来越高
4. 运维成本高
集群管理:MQ本身需要高可用部署(如Kafka的ZooKeeper依赖、RabbitMQ的镜像队列)
监控与告警:需监控消息积压、消费延迟、错误率等指标
资源占用:MQ集群可能占用较多CPU、内存和磁盘IO
5. 数据一致性与事务问题
分布式事务挑战:如果业务涉及数据库和MQ的协同(如扣库存+发消息),需要引入事务消息或本地消息表等方案
最终一致性:MQ通常只保证最终一致性,不适合强一致性要求的场景
6. 依赖风险
MQ成为单点故障:如果MQ集群崩溃,可能导致整个系统不可用
版本兼容性问题:MQ升级可能影响生产者和消费者的兼容性
7. 消息积压风险
消费者处理能力不足:如果消费者宕机或处理缓慢,消息会堆积,可能导致MQ存储爆满
影响新消息处理:积压严重时,新消息可能被阻塞或丢弃
8. 不适合所有场景
低延迟场景:如高频交易、实时游戏,MQ的异步机制可能引入不可接受的延迟
小规模系统:如果系统简单,直接调用可能比引入MQ更高效
四、mq相关产品,每种产品的特点
1. RabbitMQ
特点:
基于AMQP协议,支持多种客户端语言
轻量级,易于部署和管理
提供灵活的路由机制(直连/主题/扇出/头交换)
支持消息确认、持久化、优先级队列
集群部署相对简单
社区活跃,文档完善
适用场景:
中小规模消息处理
需要复杂路由规则的场景
企业级应用集成
对延迟要求不高的异步任务
2. Kafka
特点:
超高吞吐量(百万级TPS)
分布式、高可用设计
基于发布/订阅模式
消息持久化存储(可配置保留时间)
支持消息回溯和批量消费
水平扩展能力强
支持流式处理(Kafka Streams)
适用场景:
大数据日志收集与分析
实时流处理
高吞吐量消息系统
事件溯源
监控数据聚合
3. RocketMQ
特点:
阿里开源,经受双11考验
支持事务消息
严格的顺序消息
支持消息轨迹查询
分布式架构,高可用
支持定时/延迟消息
支持消息过滤
适用场景:
电商交易系统
金融支付场景
需要严格顺序的消息处理
分布式事务场景
4. ActiveMQ
特点:
支持JMS规范
支持多种协议(STOMP、AMQP、MQTT等)
提供消息持久化和事务支持
支持集群部署
相对轻量级
适用场景:
传统企业应用集成
需要JMS支持的场景
中小型消息系统
IoT设备通信
5. Pulsar
特点:
云原生设计,计算存储分离架构
支持多租户
低延迟和高吞吐并存
支持多种消费模式(独占/共享/故障转移)
支持分层存储(热数据+冷数据)
内置函数计算能力
适用场景:
云原生应用
多租户SaaS平台
需要统一消息和流处理的场景
混合云部署
6. ZeroMQ
特点:
无中间件,基于库的方式
极高性能(纳秒级延迟)
支持多种通信模式(请求-响应/发布-订阅等)
轻量级,无消息持久化
无broker架构
适用场景:
高性能计算
低延迟通信
进程间通信
不需要持久化的场景
7. NATS
特点:
极简设计,性能优异
无持久化(NATS Streaming提供持久化扩展)
支持请求-响应模式
轻量级,适合云环境
低资源消耗
适用场景:
IoT设备通信
云原生微服务
不需要持久化的实时消息
服务发现和配置分发
选型建议对比表
特性 \ MQ | RabbitMQ | Kafka | RocketMQ | Pulsar | ActiveMQ |
---|---|---|---|---|---|
吞吐量 | 中 | 极高 | 高 | 高 | 中 |
延迟 | 低 | 中 | 低 | 低 | 中 |
顺序保证 | 有限 | 分区有序 | 严格有序 | 分区有序 | 有限 |
持久化 | 支持 | 支持 | 支持 | 支持 | 支持 |
事务支持 | 有限 | 支持 | 支持 | 支持 | 支持 |
集群扩展 | 中等 | 容易 | 中等 | 容易 | 中等 |
运维复杂度 | 低 | 高 | 中 | 中 | 低 |
适用规模 | 中小 | 超大 | 中大 | 中大 | 中小 |
五、rabbitmq的搭建过程
Docker安装方式:
# 拉取镜像
docker pull rabbitmq:management
# 运行容器
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:management
Linux安装方式:
# 1. 安装Erlang
sudo apt-get install erlang
# 2. 下载RabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server_3.8.9-1_all.deb
# 3. 安装
sudo dpkg -i rabbitmq-server_3.8.9-1_all.deb
# 4. 启动服务
sudo systemctl start rabbitmq-server
# 5. 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management
# 6. 创建用户
sudo rabbitmqctl add_user admin password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
六、rabbitmq相关角色
1. 生产者 (Publisher/Producer)
职责:创建并发送消息到 RabbitMQ 服务器
特点:
不直接将消息发送到队列,而是发送到交换器 (Exchange)
可以设置消息属性(如持久化、优先级等)
通常不知道消息最终会被哪些消费者接收
2. 消费者 (Consumer)
职责:接收并处理来自队列的消息
特点:
可以订阅一个或多个队列
可以手动或自动确认消息 (ack/nack)
可以设置 QoS(服务质量)控制预取数量
3. 代理服务器 (Broker)
职责:RabbitMQ 服务本身,负责接收、路由和传递消息
组成:
Exchange(交换器)
Queue(队列)
Binding(绑定)
4. 交换器 (Exchange)
类型:
类型 路由规则 典型用途 Direct 精确匹配 Routing Key 点对点精确路由 Fanout 忽略 Routing Key,广播到所有绑定队列 广播通知 Topic 模糊匹配 Routing Key(支持通配符) 多条件路由 Headers 根据消息头属性匹配 复杂路由条件 特性:
接收生产者发送的消息
根据类型和绑定规则将消息路由到队列
可以持久化(在服务器重启后仍然存在)
5. 队列 (Queue)
特性:
消息存储的缓冲区
可以有多个消费者(竞争消费模式)
可配置属性:
持久化(Durable)
自动删除(Auto-delete)
排他性(Exclusive)
消息 TTL(存活时间)
最大长度等
6. 绑定 (Binding)
作用:连接 Exchange 和 Queue 的规则
组成要素:
Exchange 名称
Queue 名称
Routing Key(或用于 Headers Exchange 的匹配参数)
7. 通道 (Channel)
特点:
在 TCP 连接上建立的虚拟连接
轻量级,减少 TCP 连接开销
每个 Channel 有独立 ID
建议每个线程使用独立的 Channel
8. 虚拟主机 (Virtual Host)
作用:提供逻辑隔离环境
特点:
类似于命名空间
每个 vhost 有独立的 Exchange、Queue 和绑定
需要单独配置权限
默认 vhost 为 "/"
9. 管理员角色 (Administrator)
权限:
管理用户权限
创建/删除 vhost
查看所有资源
通常通过 rabbitmqctl 工具或管理界面操作
10. 插件系统 (Plugins)
常见插件:
rabbitmq_management
:提供 Web 管理界面rabbitmq_shovel
:跨集群消息转移rabbitmq_federation
:分布式部署支持rabbitmq_delayed_message_exchange
:延迟消息
角色交互示意图
+------------+ +---------+ +-------+ +--------+
| Publisher | ----> | Exchange| ====> | Queue | <---- | Consumer|
+------------+ +---------+ +-------+ +--------+
(Binding)
七、rabbitmq内部组件
1、ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。
2、Channel(信道):消息推送使用的通道。
3、Exchange(交换器):用于接受、分配消息。
4、Queue(队列):用于存储生产者的消息。
5、RoutingKey(路由键):用于把生成者的数据分配到交换器上。
6、BindingKey(绑定键):用于把交换器的消息绑定到队列上。
八、生产者发送消息的过程?
一、建立连接阶段
TCP连接建立
生产者应用通过AMQP客户端库发起TCP连接
默认端口5672(带管理插件时为5672/15672)
三次握手完成后建立物理连接
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("rabbitmq-host"); factory.setPort(5672); Connection connection = factory.newConnection();
认证与vhost选择
发送START/START-OK协议帧进行认证
选择虚拟主机(vhost),默认"/"
认证失败会收到CONNECTION-CLOSE帧
二、通道创建阶段
通道(Channel)初始化
在TCP连接上创建虚拟通道(Channel)
每个Channel有唯一ID(从1开始递增)
通道参数协商:
Frame Max Size(默认128KB)
Channel Max(默认2047)
Channel channel = connection.createChannel();
交换器声明(可选)
检查目标Exchange是否存在
不存在时根据参数自动创建
关键参数:
type:exchange类型(direct/fanout/topic/headers)
durable:是否持久化
autoDelete:无绑定时是否自动删除
channel.exchangeDeclare("order.exchange", "direct", true);
三、消息发布阶段
消息构造
组成结构:
{ "body": "消息内容(二进制)", "properties": { "delivery_mode": 2, # 1-非持久化 2-持久化 "priority": 0, # 0-9优先级 "headers": {}, # 自定义头 "timestamp": 1620000000 } }
发布消息到Exchange
通过Basic.Publish命令发送
关键参数:
exchange:目标交换器名称
routingKey:路由键
mandatory:是否触发Return回调
immediate:已废弃参数
channel.basicPublish( "order.exchange", "order.create", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes );
四、消息路由阶段
Exchange路由决策
根据Exchange类型处理:
Direct:精确匹配routingKey
Fanout:忽略routingKey,广播到所有绑定队列
Topic:通配符匹配(*匹配一个词,#匹配零或多个词)
Headers:匹配header键值对
队列投递
成功匹配:消息进入队列内存缓冲区
无匹配时处理:
设置了alternate-exchange:转到备用交换器
未设置备用交换器且mandatory=true:触发Return回调
否则丢弃消息
五、确认阶段
Confirm模式(可选)
开启方式:
channel.confirmSelect(); // 开启Confirm模式
确认机制:
单条确认:
waitForConfirms()
批量确认:
waitForConfirmsOrDie()
异步回调:
channel.addConfirmListener((sequenceNumber, multiple) -> { // 处理ack }, (sequenceNumber, multiple) -> { // 处理nack });
事务模式(可选)
事务操作流程:
channel.txSelect(); // 开启事务 try { channel.basicPublish(...); channel.txCommit(); // 提交事务 } catch (Exception e) { channel.txRollback(); // 回滚事务 }
六、资源释放阶段
通道关闭
发送Channel.Close命令
处理未确认消息:
事务模式:回滚未提交消息
Confirm模式:未确认消息会触发nack
连接关闭
发送Connection.Close命令
服务端释放相关资源
客户端等待TCP连接正常关闭
九、消费者接收消息过程?
一、连接建立阶段
TCP连接初始化
消费者客户端与RabbitMQ服务器建立TCP连接(默认端口5672)
完成AMQP协议握手:
通道创建
在TCP连接上创建虚拟通道(Channel)
每个Channel独立维护消息流状态
关键参数设置:
Channel channel = connection.createChannel(); channel.basicQos(10); // 设置prefetch count
二、队列订阅阶段
队列声明与检查
检查目标队列是否存在
自动创建队列(如果不存在且允许):
channel.queueDeclare("order.queue", true, false, false, null);
队列参数解析:
durable:是否持久化
exclusive:是否排他队列
autoDelete:无消费者时是否自动删除
arguments:扩展参数(TTL、死信等)
消费者注册
向Broker注册消费者标签(consumer tag)
选择消费模式:
推模式(Push API):服务端主动推送
拉模式(Basic.Get):客户端主动拉取
三、消息接收阶段
消息推送机制
Broker按照QoS设置推送消息:
while (unacked_count < prefetch_count) and (queue.has_messages): message = queue.next_message() send_to_consumer(message) unacked_count += 1
消息帧结构:
Basic.Deliver( consumer-tag, delivery-tag, redelivered, exchange, routing-key ) Message Body
消息处理流程
消费者接收消息后的处理步骤:
反序列化消息体
验证消息完整性
执行业务逻辑
发送ack/nack
处理异常情况
四、确认与反馈阶段
消息确认机制
自动确认(autoAck=true):
消息发出即视为成功
高风险(消息可能处理失败但已确认)
手动确认(autoAck=false):
// 成功处理 channel.basicAck(deliveryTag, false); // 处理失败(requeue=true重新入队) channel.basicNack(deliveryTag, false, true);
关键参数:
deliveryTag:消息唯一标识
multiple:是否批量操作
requeue:是否重新入队
拒绝消息处理
三种拒绝方式对比:
方法 是否批量 是否重入队列 适用场景 basicReject 否 可配置 单条消息处理失败 basicNack 是 可配置 批量消息处理异常 basicRecover - 是 重新投递未ack消息
五、流量控制机制
QoS预取设置
作用:限制未确认消息数量
全局 vs 通道级:
// 单通道限制 channel.basicQos(10); // 全局限制(所有通道总和) channel.basicQos(10, true);
最佳实践值:
高吞吐场景:100-300
高延迟任务:5-10
流控(Flow Control)
当消费者处理能力不足时:
Broker暂停发送新消息
触发Channel.Flow命令
消费者处理积压后恢复流动
六、异常处理阶段
连接中断处理
自动恢复机制:
factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000);
恢复过程:
重建TCP连接
恢复所有Channel
重新注册消费者
恢复未ack消息(根据redelivered标记)
死信处理
触发条件:
消息被拒绝且requeue=false
消息TTL过期
队列达到长度限制
死信队列配置:
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); channel.queueDeclare("order.queue", true, false, false, args);
消费者最佳实践
幂等性设计
// 使用消息ID实现幂等 if (processedMessageIds.contains(messageId)) { channel.basicAck(tag, false); return; }
批量确认优化
// 每处理100条消息批量确认一次 if (messageCount % 100 == 0) { channel.basicAck(lastTag, true); }
死信监控
// 监听死信队列 channel.basicConsume("dlx.queue", false, (tag, msg) -> { log.error("死信消息: {}", msg.getBody()); channel.basicAck(tag, false); });
消费者标签管理
// 优雅关闭消费者 void shutdown() { channel.basicCancel(consumerTag); // 等待处理中的消息完成 while (inProgressCount > 0) { Thread.sleep(100); } }
十、springboot项目中如何使用mq?
十一、如何保障消息不丢失?
1、发送阶段:发送阶段保障消息到达交换机 事务机制|confirm确认机制
2、存储阶段:持久化机制 交换机持久化、队列的持久化、消息内容的持久化
3、消费阶段:消息的确认机制 自动ack|手动ack
接收方消息确认机制
自动ack|手动ack
spring:
rabbitmq:
host: 1.94.230.82
port: 5672
username: admin
password: 123456
virtual-host: /yan3
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
package com.hl.rabbitmq01.web;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
@RestController
@RequestMapping("/c")
public class ConsumerController {
@RabbitListener(queues = {"topicQueue01"})
public void receive(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println(msg);
//业务逻辑 比如传入订单id,根据订单id,减少库存、支付等,
// 如果操作成功,确认消息(从队列移除),如果操作失败,手动拒绝消息
if(msg.length() >= 5){
//确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
//拒绝消息 not ack
// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
消息的持久化机制
交换机的持久化
队列的持久化
消息内容的持久化
package com.hl.rabbitmq01.direct;
import com.hl.rabbitmq01.util.MQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
生产者 javaSE方式简单测试
发布订阅-------direct模型
生产者----消息队列----消费者
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接
Connection connection = MQUtil.getConnection();
//2、基于连接,创建信道
Channel channel = connection.createChannel();
//3、基于信道,创建队列
/*
参数:
1. queue:队列名称,如果没有一个名字叫simpleQueue01的队列,则会创建该队列,如果有则不会创建
2. durable:是否持久化,当mq重启之后,消息还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列
4。当Connection关闭时,是否删除队列
autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:参数。
*/
channel.queueDeclare("directQueue01", true, false, false, null);
channel.queueDeclare("directQueue02", false, false, false, null);
/*
声明交换机
参数1:交换机名称
参数2:交换机类型
*/
channel.exchangeDeclare("directExchange01", BuiltinExchangeType.DIRECT,true);
/*
绑定交换机和队列
参数1:队列名
参数2:交换机名称
参数3:路由key 广播模型 不支持路由key ""
*/
channel.queueBind("directQueue01","directExchange01","error");
channel.queueBind("directQueue02","directExchange01","error");
channel.queueBind("directQueue02","directExchange01","info");
channel.queueBind("directQueue02","directExchange01","trace");
//发送消息到消息队列
/*
参数:
1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
2. routingKey:路由名称,简单模式下路由名称使用消息队列名称
3. props:配置信息
4. body:发送消息数据
*/
channel.basicPublish("directExchange01","user", MessageProperties.PERSISTENT_TEXT_PLAIN,("Hello World ").getBytes());
//4、关闭信道,断开连接
channel.close();
connection.close();
}
}
package com.hl.rabbitmq01.web;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("/p")
public class ProducerController {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public void send(@RequestParam(defaultValue = "user") String key,
@RequestParam(defaultValue = "hello") String msg) throws IOException {
//amqpTemplate.convertAndSend("topicExchange", key, msg);
// rabbitTemplate.convertAndSend("topicExchange",key,msg);
Channel channel = rabbitTemplate
.getConnectionFactory().
createConnection()
.createChannel(false); //false 非事务模式运行 无需手动提交
channel.basicPublish(
"topicExchange", key,
MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBytes());
}
}
/*
创建交换机
*/
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder
.topicExchange("topicExchange")
.durable(true) //是否支持持久化机制
.build();
}
/*
创建队列
*/
@Bean
public Queue queue(){
return QueueBuilder.durable("topicQueue01").build();
}
发送方的消息确认机制
1、事务机制
消耗资源
RabbitMQ中与事务有关的主要有三个方法:
txSelect() 开始事务
txCommit() 提交事务
txRollback() 回滚事务
txSelect主要用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。
当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。
示例
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage(String message){
rabbitTemplate.setChannelTransacted(true); //开启事务操作
rabbitTemplate.execute(channel -> {
try {
channel.txSelect();//开启事务
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
int i = 5/0;
channel.txCommit();//没有问题提交事务
}catch (Exception e){
e.printStackTrace();
channel.txRollback();//有问题回滚事务
}
return null;
});
return "success";
}
}
消费者没有任何变化。
通过测试会发现,发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常,就会捕获异常通过txRollback方法进行回滚事务了,则消息不会发送,消费者就获取不到消息。
2、confirm确认机制
推荐
同步通知
channel.confirmSelect(); //开始confirm操作
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
if (channel.waitForConfirms()){
System.out.println("发送成功");
}else{
//进行消息重发
System.out.println("消息发送失败,进行消息重发");
}
异步通知
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
//消息正确到达broker,就会发送一条ack消息
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("发送消息成功");
}
//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("发送消息失败,重新发送消息");
}
});
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
十二、死信交换机和死信队列
在实际开发项目时,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中进行处理。
死信队列:RabbitMQ中并不是直接声明一个公共的死信队列,然后死信消息就会跑到死信队列中。而是为每个需要使用死信的消息队列配置一个死信交换机,当消息成为死信后,可以被重新发送到死信交换机,然后再发送给使用死信的消息队列。
死信交换机:英文缩写:DLX 。Dead Letter Exchange(死信交换机),死信交换机其实就是普通的交换机,通过给队列设置参数: x-dead-letter-exchange 和x-dead-letter-routing-key,来指向死信交换机
RabbitMQ规定消息符合以下某种情况时,将会成为死信
队列消息长度到达限制(队列消息个数限制);
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
原队列存在消息过期设置,消息到达超时时间未被消费;
死信消息会被RabbitMQ特殊处理,如果配置了死信队列,则消息会被丢到死信队列中,如果没有配置死信队列,则消息会被丢弃。
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","deadExchange");//当前队列和死信交换机绑定
map.put("x-dead-letter-routing-key","user.#");//当前队列和死信交换机绑定的路由规则
// map.put("x-max-length",2);//队列长度
map.put("x-message-ttl",10000);//队列消息过期时间,时间ms
// return QueueBuilder.durable("topicQueue01").build();
return QueueBuilder.durable("topicQueue").withArguments(map).build();
十三、延迟队列简介
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
RabbitMQ中没有延迟队列,但是可以用ttl+死信队列方式和延迟插件两种方式来实现
ttl+死信队列
ttl+死信队列代码在讲死信队列时已经实现,这个不再阐述。
延迟插件
人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
十四、RabbitMQ消息重复消费
RabbitMQ消息重复消费问题_rabbitmq重复消费的问题解决-CSDN博客
业务背景 消息队列在数据传输的过程中,为了保证消息传递的可靠性,一般会对消息采用ack确认机制,如果消息传递失败,消息队列会进行重试,此时便可能存在消息重复消费的问题。
比如,用户到银行取钱后会收到扣款通知短信,如果用户收到多条扣款信息通知则会有困惑。
解决方法一:send if not exist 首先将 RabbitMQ 的消息自动确认机制改为手动确认,然后每当有一条消息消费成功了,就把该消息的唯一ID记录在Redis 上,然后每次发送消息时,都先去 Redis 上查看是否有该消息的 ID,如果有,表示该消息已经消费过了,不再处理,否则再去处理。
2.1 利用数据库唯一约束实现幂等
解决方法二:insert if not exist 可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据
解决方法三:sql的乐观锁 比如给用户发送短信,变成如果该用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,每修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。
十五、RabbitMQ消息积压
RabbitMq——消息积压分析和解决思路_rabbitmq消息积压-CSDN博客
消息积压产生的原因 正常而言,一般的消息从消息产生到消息消费需要经过以下几种阶段。
以Direct模式为例:
消息由生产者产生,比如新订单的创建等,经过交换机,将消息发送至指定的队列中,然后提供给对应的消费者进行消费。
在这个链路中,存在消息积压的原因大致分为以下几种:
1、消费者宕机,导致消息队列中的消息无法及时被消费,出现积压。 2、消费者没有宕机,但因为本身逻辑处理数据耗时,导致消费者消费能力不足,引起队列消息积压。 3、消息生产方单位时间内产生消息过多,比如“双11大促活动”,导致消费者处理不过来。 消息积压问题解决 针对上面消息积压问题的出现,大致进行了分析,那么根据分析则能制定相关的应对方法。如下所示:
1、大促活动等,导致生产者流量过大,引起积压问题。
提前增加服务器的数量,增加消费者数目,提升消费者针对指定队列消息处理的效率。
2、上线更多的消费者,处理消息队列中的数据。(和1中的大致类似)
3、如果成本有限,则可以专门针对这个队列,编写一个另类的消费者。
当前另类消费者,不进行复杂逻辑处理,只将消息从队列中取出,存放至数据库中,然后basicAck反馈给消息队列。
十六、消息入库(消息补偿)
如果RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,这样也不太好进行处理。所以为了避免RabbitMQ持久化失败而导致数据丢失,我们自己也要做一些消息补偿机制,以应对一些极端情况。
在使用消息队列(Message Queue)时,消息的补偿机制是一种处理消息处理失败或异常情况的方法。当消息消费者无法成功处理消息时,补偿机制允许系统将消息重新发送或执行其他操作,以确保消息的可靠传递和处理。
补偿机制通常涉及以下几个方面:
重试机制:当消息处理失败时,补偿机制会尝试重新发送消息给消费者,以便重新处理。重试间隔和重试次数可以根据具体情况进行配置,以避免重复投递导致的消息处理失败。
延时队列:补偿机制还可以使用延时队列来处理无法立即处理的消息。当某个消息处理失败时,可以将该消息放入到延时队列中,在一定的延时之后再次尝试发送给消费者进行处理。
死信队列:当消息无法被成功处理时,可以将这些无法处理的消息发送到死信队列(Dead Letter Queue)。死信队列通常用于存储无法被消费者处理的消息,以便后续进行排查和处理。
可视化监控和报警:补偿机制还可以包括对消息队列的监控和报警功能,以便及时发现和处理异常情况。通过可视化监控工具可以实时查看消息队列的状态和处理情况,及时发现问题并采取相应的补救措施。
补偿机制的设计和实现密切依赖于具体的消息中间件和使用场景,不同的消息队列系统可能提供不同的补偿机制。因此,在选择和使用消息队列时,需要根据自身的需求和系统特点来选择适合的消息补偿机制。