官方文档:https://www.rabbitmq.com/docs
什么是RabbitMQ?
RabbitMQ 是一个消息代理。它接收来自发布者的消息,路由它们,并且(如果有可路由到的队列)存储它们以供使用,或者立即发送给消费者(如果有)。
发布者根据协议的不同将信息发布到不同的目的地。
协议
RabbitMQ 支持的每个协议中,发布消息的过程都非常相似。所有四种协议都允许用户发布包含有效负载(消息体)和一个或多个消息属性(消息头)的消息。
所有四种协议还支持发布者的确认机制,该机制允许发布应用程序跟踪代理已成功或未成功接受的消息,并继续发布下一批消息或重试发布当前消息。
- AMQP协议(高级消息队列协议):RabbitMQ的核心协议
- STOMP(简单文本导向的消息协议)
- MQTT(消息队列遥测传输)
- HTTP API(REST API)
AMQP协议
是 RabbitMQ 的核心协议,提供可靠、异步的消息传递。
- 支持事务、确认机制(publisher confirms)和持久化。
- 提供丰富的消息路由模型(交换机类型)。
- 广泛支持各种编程语言的客户端库。
主要组件:
Broker(消息代理服务器):RabbitMQ 服务器实例,负责接收客户端连接、管理消息路由和队列
Exchange(交换机):消息路由的核心组件,根据规则将消息分发到对应队列。
- Direct:按消息路由键(Routing Key)精确匹配队列。
- Topic:按通配符模式(如
*.order.*
)匹配队列。 - Fanout:广播消息到所有绑定的队列。
- Headers:按消息头(Headers)属性匹配队列。
Queue(队列):消息的存储容器,保存等待消费的消息,支持持久化(保存在磁盘)、消息过期(TTL)、死信队列(DLQ)等机制。
Binding(绑定):建立交换机与队列的关联关系,定义消息路由规则。
- 对 Direct 交换机:绑定规则为具体的 Routing Key。
- 对 Topic 交换机:绑定规则为通配符模式。
- 对 Fanout 交换机:绑定规则无意义(广播所有消息)。
Connection(连接):客户端与 Broker 之间的 TCP 长连接,维护通信通道。
Channel(信道):基于 Connection 的逻辑会话通道,用于执行具体的消息操作(发布、消费、确认等)
Virtual Host(虚拟主机):逻辑隔离的资源分组,类似数据库中的 “Schema”。
Message(消息):由消息体(Payload)和元数据(路由键、消息头、属性等)组成
- Routing Key:用于 Direct/Topic 交换机的路由。
- Headers:用于 Headers 交换机的匹配条件。
- Delivery Mode:标记消息是否持久化(1 = 非持久,2 = 持久)。
队列(Queue)
- 标准队列(Standard Queue):
- 持久化队列(Durable Queue)
- 惰性队列(Lazy Queue)
- 镜像队列(Mirrored Queue)
- 死信队列(Dead Letter Queue, DLQ)
- 延迟队列(Delayed Queue)
- 临时队列(Temporary Queue)
- 优先级队列(Priority Queue)
1. 标准队列(Standard Queue)
- 特点:
- 最基础的队列类型,遵循 “生产者 - 消费者” 模型,消息被消费后从队列中移除。
- 支持基本的队列属性配置(如持久化、排他性、自动删除等)。
- 关键属性:
durable
:是否持久化(消息和队列元数据保存到磁盘,重启后恢复)。exclusive
:是否排他(仅创建它的连接可访问,连接关闭后队列自动删除)。auto-delete
:是否自动删除(最后一个消费者取消订阅后自动删除)。
- 适用场景:
- 常规的异步任务处理、消息通信。
2. 持久化队列(Durable Queue)
- 特点:
- 通过设置
durable=true
实现,队列元数据和消息会持久化到磁盘。 - 即使 RabbitMQ 服务重启,队列和未消费的消息仍可恢复。
- 通过设置
- 技术实现:
- 队列声明时指定
durable
参数为true
。 - 消息需配合
deliveryMode=2
(持久化标记)才能同时持久化消息体。
- 队列声明时指定
- 适用场景:
- 对消息可靠性要求高的场景(如订单处理、金融交易)。
3. 惰性队列(Lazy Queue)
- 特点:
- 基于
lazy
参数(RabbitMQ 3.6.0 + 引入),将消息优先存储在磁盘而非内存。 - 减少内存占用,适合处理海量消息(百万级以上)。
- 基于
- 技术实现:
- 声明队列时设置
"x-queue-type": "lazy"
(或通过插件配置)。 - 消费时按需将消息加载到内存,降低峰值内存压力。
- 声明队列时设置
- 适用场景:
- 日志收集、大数据流处理、消息积压缓冲。
4. 镜像队列(Mirrored Queue)
- 特点:
- 通过集群镜像策略,将队列数据同步到多个节点,实现高可用性。
- 主节点故障时,从节点自动接管,避免单点故障。
- 配置方式:
- 通过 RabbitMQ 管理界面或策略(Policy)设置
ha-mode
参数(如all
、exactly-n
)。 - 同步方式:异步复制,可能存在短暂数据不一致。
- 通过 RabbitMQ 管理界面或策略(Policy)设置
- 适用场景:
- 对服务可用性要求极高的核心业务(如实时通信、交易系统)。
5. 死信队列(Dead Letter Queue, DLQ)
- 特点:
- 用于存储无法被正常消费的 “死信” 消息,通常因以下原因进入死信队列:
- 消息被消费者拒绝(
basic.reject
/basic.nack
且未重新入队)。 - 消息超时未被消费(设置
x-message-ttl
)。 - 队列达到最大长度(设置
x-max-length
或x-max-length-bytes
)。
- 消息被消费者拒绝(
- 用于存储无法被正常消费的 “死信” 消息,通常因以下原因进入死信队列:
- 技术实现:
- 为队列设置
x-dead-letter-exchange
参数,指定死信交换机。 - 死信交换机再将消息路由到死信队列。
- 为队列设置
- 适用场景:
- 消息重试机制、异常消息监控和处理。
6. 延迟队列(Delayed Queue)
- 特点:
- 消息在指定延迟时间后才允许被消费,非 RabbitMQ 原生功能,需插件或间接实现。
- 实现方式:
- 插件方式:使用
rabbitmq-delayed-message-exchange
插件,通过交换机实现延迟。 - TTL + 死信队列方式:给消息设置
x-message-ttl
,超时后通过死信交换机路由到目标队列。
- 插件方式:使用
- 适用场景:
- 订单超时取消、任务定时执行、消息重试延迟。
7. 临时队列(Temporary Queue)
- 特点:
- 通常设置为
exclusive=true
和auto-delete=true
,连接关闭后自动删除。 - 无需显式删除,适用于临时任务或一次性消费场景。
- 通常设置为
- 典型用法:
- 客户端创建临时队列用于响应式通信(如 RPC 模式中的回调队列)。
8. 优先级队列(Priority Queue)
- 特点:
- 通过
x-max-priority
参数设置队列优先级,高优先级消息优先被消费。
- 通过
- 技术实现:
- 声明队列时指定
x-max-priority
(如 10),消息发送时设置priority
属性。
- 声明队列时指定
- 适用场景:
- 任务调度(如紧急订单优先处理)、资源抢占场景。
交换机(Exchange)
- 直连交换机(Direct Exchange)
- 主题交换机(Topic Exchange)
- 扇形交换机(Fanout Exchange)
- 头交换机(Headers Exchange)
1. 直连交换机(Direct Exchange)
- 路由规则:根据消息的路由键(Routing Key)和绑定键(Binding Key)的精确匹配进行路由。
- 绑定方式:
- 队列通过指定 Binding Key 绑定到直连交换机。
- 当消息的 Routing Key 与某个队列的 Binding Key 完全匹配时,消息被路由到该队列。
- 适用场景:
- 基于类型的任务分发:例如,将用户注册、订单支付等不同类型的消息路由到不同队列。
- 优先级路由:将高优先级订单路由到专门的队列处理。
2. 主题交换机(Topic Exchange)
路由规则:
根据消息的路由键和绑定键的模式匹配进行路由。绑定键支持两种通配符:
*
(星号):匹配一个单词(例如order.*
匹配order.create
但不匹配order.create.paid
)。#
(井号):匹配零个或多个单词(例如order.#
匹配order
、order.create
、order.create.paid
)。
绑定方式:
- 队列通过带通配符的 Binding Key 绑定到主题交换机。
- 当消息的 Routing Key 满足某个队列的 Binding Key 模式时,消息被路由到该队列。
适用场景:
- 系统事件广播:例如,
user.created
、order.paid
等事件按主题分类。 - 日志分类:将不同级别的日志(如
system.error
、application.warning
)路由到不同队列。
- 系统事件广播:例如,
3. 扇形交换机(Fanout Exchange)
- 路由规则:将接收到的所有消息广播到所有与之绑定的队列,完全忽略消息的路由键。
- 绑定方式:
- 队列直接绑定到扇形交换机,无需指定 Binding Key(或 Binding Key 为空)。
- 交换机接收到的所有消息都会被复制到所有绑定的队列。
- 适用场景:
- 配置更新:当系统配置变更时,同时通知所有服务。
- 事件广播:如系统通知、用户登录事件,需要多个服务同时处理。
4. 头交换机(Headers Exchange)
路由规则:
根据消息的头部信息(Headers)而非路由键进行匹配。匹配方式有两种:
x-match=all
:所有指定的头键值对必须匹配。x-match=any
:至少一个头键值对匹配。
绑定方式:
- 队列通过指定 Headers 键值对和匹配方式(
x-match
)绑定到头交换机。 - 当消息的 Headers 满足某个队列的绑定条件时,消息被路由到该队列。
适用场景:
- 多条件过滤:当路由规则复杂且不适合用路由键表达时。
- 元数据路由:根据消息的元数据(如 content-type、version)路由。
Spring AMQP
官方文档:https://springdoc.cn/spring-amqp/
介绍
Spring AMQP
是一个基于AMQP协议的消息中间件框架,它提供了一个简单的API来发送和接收异步、可靠的消息。
来源:https://springdoc.cn/spring-amqp/
AMQP的关键组件
Queue
在 Spring AMQP 中,Queue
对象代表消息队列,是消息的存储和处理单元
参数有:
参数 | 类型 | 描述 |
---|---|---|
name | String | 队列名称(必须唯一)。 |
durable | boolean | 是否持久化(默认true )。持久化队列会在 Broker 重启后保留。 |
exclusive | boolean | 是否排他(默认false )。排他队列仅对首次声明它的连接可见,并在连接断开时自动删除。 |
autoDelete | boolean | 是否自动删除(默认false )。自动删除队列在最后一个消费者断开连接后自动删除。 |
arguments | Map<String, Object> | 自定义参数(如死信交换机、消息 TTL、优先级等)。 |
Binding
将消息从 Exchange 路由到 Queue,通过 绑定键(Binding Key)或消息头(Headers)** 实现匹配。
Spring AMQP 提供了Binding
类及其构建器BindingBuilder
,用于声明不同类型的绑定:
import org.springframework.amqp.core.*;
// 声明Binding的示例(以Direct类型为例)
Binding directBinding = BindingBuilder.bind(queue()) // 目标队列
.to(directExchange()) // 源交换机
.with("order.create"); // 绑定键
Exchange
在 Spring AMQP 中,Exchange
对象是消息路由的核心组件,负责接收生产者发送的消息,并根据规则将消息路由到一个或多个队列
**Exchange
**的实现类:
DirectExchange |
实现 Direct 类型交换机,路由键精确匹配。 |
---|---|
TopicExchange |
实现 Topic 类型交换机,支持通配符匹配。 |
FanoutExchange |
实现 Fanout 类型交换机,广播消息到所有绑定队列。 |
HeadersExchange |
实现 Headers 类型交换机,根据消息头匹配。 |
CustomExchange |
自定义交换机类型(如 RabbitMQ 的延迟交换机x-delayed-message )。 |
Message
在 Spring AMQP 框架中,Message
对象是消息传递的核心载体,它封装了从 AMQP broker(如 RabbitMQ)接收或发送的消息内容和元数据
主要部分:
public class Message {
private final byte[] body; // 消息内容(字节数组)
private final MessageProperties messageProperties; // 消息属性
}
- 消息体(Body):实际传输的数据(字节数组)
- 消息属性(MessageProperties):包含路由信息、优先级、过期时间等元数据
Spring AMQP案例学习
jdk版本:1.8
springboot版本: 2.1.6.RELEASE
rabbitmq版本: rabbitmq:3.12-management(docker部署)
声明队列
@Bean
public Queue queue() {
return new Queue("queue", true);
}
声明交换机
/**
* 直连交换机
* @return
*/
@Bean
public DirectExchange directExchange() {
// 持久化,非自动删除
return new DirectExchange("directExchange", true, false);
}
/**
* 扇形交换机(广播)
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange", true, false);
}
/**
* 主题交换机
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange", true, false);
}
/**
* 头交换机
* @return
*/
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headersExchange", true, false);
}
声明绑定
直连交换机:
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(directExchange())
.with("red");
}
扇形交换机
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(fanoutExchange());
}
主题交换机
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(topicExchange())
.with("topic.#");
}
头交换机
@Bean
public Binding binding() {
HashMap<String, Object> headers = new HashMap<>();
headers.put("key1", "value1");
headers.put("key2", "value2");
headers.put("key3", "value3");
return BindingBuilder
.bind(queue())
.to(headersExchange())
.whereAny(headers)
.match();
}
声明消息转换器及确认机制
/**
* 消息转换器
* 转换成json格式
* @return
*/
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter()); // 设置 JSON 转换器
// 启用发布确认(确保消息到达Exchange)
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息成功发送到Exchange: " + correlationData);
} else {
System.err.println("消息发送失败: " + cause);
// 可在此处实现重试逻辑
}
});
// 启用失败退回(当Exchange无法路由到队列时触发)
template.setMandatory(true);
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.err.println("消息被退回: " + new String(message.getBody()) +
", replyCode: " + replyCode +
", replyText: " + replyText +
", exchange: " + exchange +
", routingKey: " + routingKey);
});
return template;
}
确认机制容器工厂
@Autowired
private ConnectionFactory connectionFactory;
// 手动确认模式的容器工厂
@Bean
public SimpleRabbitListenerContainerFactory manualAckContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
factory.setPrefetchCount(1); // 每次只处理一条消息
factory.setMessageConverter(jsonMessageConverter()); // 必须设置
return factory;
}
// 自动确认模式的容器工厂
@Bean
public SimpleRabbitListenerContainerFactory autoAckContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO); // 自动确认
factory.setMessageConverter(jsonMessageConverter()); // 必须设置
factory.setPrefetchCount(250); // 默认预取数量
return factory;
}
在消费者监听时指定工厂:
@RabbitListener(queues = "queue10", containerFactory = "autoAckContainerFactory") // 自动确认
@RabbitListener(queues = "queue9", containerFactory = "manualAckContainerFactory") // 手动确认
手动确认:
/**
* 测试话题交换机
* 手动确认消息已消费
* @param message 消息内容
* @param channel RabbitMQ Channel
* @param tag deliveryTag
*/
@RabbitListener(queues = "queue7", containerFactory = "manualAckContainerFactory")
public void queue7(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Header(AmqpHeaders.CONSUMER_QUEUE) String queueName,
Message amqpMessage) {
try {
System.out.println("处理消息: " + message + ", 来自队列:" + queueName);
// 手动确认
channel.basicAck(tag, false); // 正确调用 basicAck
} catch (IOException e) {
try {
// 拒绝消息并重新入队
channel.basicNack(tag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
测试: