一、概念
1. 什么是RabbitMQ
使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
2. 核心特点
RabbitMQ 是基于 Erlang 语言开发、遵循 AMQP(高级消息队列协议)的开源消息中间件,具备以下显著特性:
- 高可靠性保障:通过消息持久化、传输确认及发布确认等机制,确保消息在生产、传输与消费过程中的可靠性,避免数据丢失风险。
- 灵活的消息分发策略:依托 Exchange(交换机)实现多样化消息路由,支持简单模式、工作队列模式、发布订阅模式、路由模式及通配符模式,可灵活适配不同业务场景需求。
- 集群化部署能力:支持多台 RabbitMQ 服务器组成集群,构建逻辑统一的 Broker,提升系统可用性与负载均衡能力。
- 多协议兼容性:除 AMQP 外,还支持 STOMP、MQTT 等多种消息队列协议,便于与不同技术栈的系统集成。
- 广泛的客户端支持:提供对 Java、.NET、Ruby 等几乎所有常用编程语言的客户端支持,降低开发接入门槛。
- 可视化管理界面:内置直观易用的管理界面,方便用户实时监控消息队列状态、管理 Broker 资源及配置参数。
- 插件化扩展体系:具备丰富的官方插件,并支持自定义开发,可根据业务需求灵活扩展功能,如权限控制、消息追踪等。
3. RabbitMQ能做什么
RabbitMQ可以帮助开发者构建可靠、高效、可扩展的分布式系统,实现异步通信、任务分发和事件驱动等功能。它被广泛应用于各种场景,包括微服务架构、消息驱动的架构、实时数据处理等。
4. RabbitMQ的工作流程
RabbitMQ的工作流程主要基于生产者-消费者模型和AMQP协议。以下是RabbitMQ消息传递的基本流程:
- **生产者发送消息:**生产者通过RabbitMQ的客户端库创建消息,并指定交换机的名称和路由键。然后,生产者将消息发送到RabbitMQ服务器上的指定交换机。
- 交换机接收并路由消息:交换机接收到生产者的消息后,根据配置的路由规则和路由键将消息分发到相应的队列中。如果消息没有匹配到任何队列,则可能会被丢弃或返回给生产者。
- **消费者消费消息:**消费者连接到RabbitMQ服务器,并监听指定的队列。当队列中有新消息时,消费者从队列中获取并处理消息。处理完成后,消费者可以选择发送确认消息给RabbitMQ服务器,以表示消息已被成功处理。
- **消息确认机制:**RabbitMQ使用消息确认机制来确保消息的可靠传递。生产者在发送消息后会收到一个确认,表示消息已成功发送到交换机。消费者在处理完
5. 消息队列优缺点
消息队列在特殊场景下具备显著优势,主要体现在解耦、异步处理以及削峰等方面。然而,引入消息队列也伴随着一些不可忽视的缺点:
- 系统可用性降低:随着外部依赖的增加,系统的稳定性面临挑战。例如,原本 A 系统只需调用 B、C、D 三个系统的接口,系统运行正常。但引入消息队列(MQ)后,若 MQ 出现故障,可能导致整个系统崩溃。因此,如何保障消息队列的高可用性成为关键,可点击 [具体链接] 查看相关解决方案。
- 系统复杂度提升:消息队列的加入使得系统逻辑变得复杂。例如,需要处理消息的重复消费问题,即确保每条消息仅被处理一次;处理消息丢失的情况,通过可靠的消息传递机制保证消息不丢失;以及保证消息传递的顺序性,尤其是在对消息顺序有严格要求的场景中。这些问题增加了系统开发和维护的难度。
- 数据一致性问题:当 A 系统处理请求并返回成功后,可能出现数据不一致的情况。例如,B、D 系统写库成功,而 C 系统写库失败,这会导致数据状态的不一致。在使用消息队列时,需要额外的技术方案来确保数据的一致性,例如采用分布式事务、消息补偿机制等。
消息队列虽然功能强大,能够带来诸多好处,但同时也引入了系统复杂性。在使用消息队列时,需要针对其缺点制定相应的技术架构和解决方案,以平衡系统的性能、可用性和一致性。
6. AMQP和JMS
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
二者的区别与联系:
- 接口与协议:JMS(Java Message Service)定义了统一的接口,用于规范消息操作;而 AMQP(Advanced Message Queuing Protocol)则是通过制定协议,来统一数据交互的格式。
- 语言限制:JMS 主要限定在 Java 语言环境中使用,专为 Java 平台提供消息服务;AMQP 作为一种协议,不限制实现语言,具备跨语言的特性,可在多种编程语言的项目中使用。
- 消息模型:JMS 规定了特定的两种消息模型,满足 Java 平台下的消息通信需求;相比之下,AMQP 的消息模型更为丰富多样,能够适应更多不同场景下的消息通信需求。
7. 常见MQ产品
消息队列(MQ)作为重要的中间件,在消息通信中起着关键作用。目前主流的 MQ 产品有 Kafka、ActiveMQ、RocketMQ 和 RabbitMQ,它们各自具有不同的特点和适用场景。
- ActiveMQ:基于 JMS(Java Message Service),在早期项目中广泛使用。但随着技术发展,其官网活跃度较低,更新频率低。单机吞吐量为万级,对于高并发的互联网项目,性能上无法满足需求。高可用方面采用主从架构实现,不过消息可靠性较低,存在数据丢失的可能。由于其性能和可靠性的限制,在现代项目中逐渐被替代,可被 RabbitMQ 取代。
- RabbitMQ:基于 AMQP 协议,使用 Erlang 语言开发。社区活跃,单机吞吐量同样为万级,无法应对超高并发场景。高可用上采用镜像集群模式,能保障系统的高可用性。消息可靠性较高,可保证数据不丢失,还支持消息重试、死信队列等高级功能。然而,由于其使用 Erlang 语言开发,国内精通该语言的人较少,导致源码阅读困难。适合中小型公司,在不需要面对复杂技术挑战的场景下使用。
- RocketMQ:阿里巴巴开源的消息中间件,基于 JMS。单机吞吐量可达 10 万级,能应对互联网项目的高并发挑战。高可用方面采用分布式架构,可搭建大规模集群,性能表现出色。消息可靠性高,通过配置能确保数据不丢失,同时支持延迟消息、事务消息、消息回溯、死信队列等丰富的高级功能。使用 Java 语言开发,便于阅读源码理解底层原理。在技术选型中是一个优秀的选择,适合大型互联网公司和中小型公司使用,商业版存在收费情况。
- Kafka:分布式消息系统,单机吞吐量极高,可达十几万的并发量。高可用上支持分布式集群部署。消息可靠性方面,由于消息先存储在磁盘缓冲区,机器故障时可能导致缓冲区数据丢失,即异步性能和数据可靠性存在一定矛盾。功能相对单一,主要用于消息的接收与发送。在行业内常用于大数据领域,如采集用户行为日志并进行计算,实现 “猜你喜欢” 等功能。如果项目没有大数据相关需求,一般不会选择 Kafka。
7.1. 选择 RabbitMQ 的原因
- 对比 ActiveMQ:ActiveMQ 性能不佳,在高并发场景下无法满足需求,虽然其 API 完善,但在高并发场景下不适用,而 RabbitMQ 的稳定性和可靠性更好,能满足更多场景需求。
- 对比 Kafka:Kafka 强调高性能,但在业务对消息可靠性要求高时,其可能丢失消息的特性无法满足要求。而 RabbitMQ 能保证消息不丢失,更适合对消息可靠性有要求的业务场景。
- 对比 RocketMQ:RocketMQ 具备高性能、高可靠性、支持分布式事务、水平扩展和大量消息堆积等优点,但商业版收费,某些功能不对外提供。RabbitMQ 社区活跃,功能也较为丰富,对于没有 RocketMQ 高级功能需求且不想承担商业版费用的项目来说,是一个不错的选择。
8. 核心组件
Publisher(生产者):消息的生产者,也是一个向交换器发布消息的客户端应用程序,负责将业务数据封装为消息,并推送至 RabbitMQ 消息服务器。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序
Exchange(交换机):消息分发枢纽,接收来自生产者的消息,并根据消息携带的 RoutingKey(路由键)及绑定规则,将消息分配至对应的 Queue(队列)。
**Binding:**绑定,用于将消息队列和交换器之间建立关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将它理解成一个由绑定构成的路由表。
Queue(队列):消息存储容器,用于暂存待消费的消息,直到被消费者获取处理。
**Connection:**网络连接,比如一个 TCP 连接
消息服务器(RabbitMQ):作为消息中转站,本身不生产或消费消息,仅负责接收生产者的消息,并根据规则路由至对应的消费者。
ConnectionFactory:连接管理器,负责创建并管理应用程序(生产者或消费者)与 RabbitMQ 之间的网络连接,确保通信链路的稳定建立与维护。
Channel(信道):消息传输通道,所有消息的发布、接收操作均通过信道完成。多个信道可复用同一物理连接,提升资源利用率与并发性能。
RoutingKey:消息携带的路由标识,作为交换机路由消息的依据,决定消息最终投递到哪个队列。
BindingKey:队列与交换机的绑定标识,通过 BindingKey 建立 Queue 与 Exchange 之间的关联关系,使交换机能够按规则将消息路由至目标队列 。
Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
Broker:表示消息队列服务器实体
Message:消息实体,它由消息头和消息体组成。消息头主要由路由键、交换器、队列、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等属性组成,而消息体就是指具体的业务对象。
相比传统的 JMS 模型,AMQP 主要多了 Exchange、Binding 这个新概念。
在 AMQP 模型中,消息的生产者不是直接将消息发送到Queue队列,而是将消息发送到Exchange交换机,其中还新加了一个中间层Binding绑定,作用就是通过路由键Key将交换机和队列建立绑定关系。
就好比类似用户表和角色表,中间通过用户角色表来将用户和角色建立关系,从而实现关系绑定,在 RabbitMQ 中,消息生产者不直接跟队列建立关系,而是将消息发送到交换器之后,由交换器通过已经建立好的绑定关系,将消息发送到对应的队列。
RabbitMQ 最终的架构模型,核心部分就变成如下图所示:
从图中很容易看出,与 JMS 模型最明显的差别就是消息的生产者不直接将消息发送给队列,而是由Binding绑定决定交换器的消息应该发送到哪个队列,进一步实现了在消息的推送方面,更加灵活!
8.1. 生产者发送消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
8.2. 消费者接收消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复
二、安装使用
1. Windows安装
1.1. 安装erlang
进入erlang的官方下载页面进行下载:Downloads - Erlang/OTP
在下载过程中一定要对应匹配RabbitMQ的版本
双击安装并配置环境变量
1.2. 下载RabbitMQ
RabbitMQ下载地址:Installing RabbitMQ | RabbitMQ
双击安装,安装完成后,开始安装RabbitMQ-Plugins插件
先cd D:softwareRabbitMQ abbitmq_server-3.8.8sbin
然后运行命令:rabbitmq-plugins enable rabbitmq_management
执行rabbitmqctl status,出现以下内容,说明成功
然后双击运行rabbitmq-server.bat
进入登录页面,发现启动成功
然后我们可以将RabbitMQ做成Windows服务
以管理员身份运行cmd
cd D:softwareRabbitMQ abbitmq_server-3.8.8sbin
执行rabbitmq-service.bat install
可以通过任务管理器去查看RabbitMQ服务
以上就是Windows安装RabbitMQ的全部过程,页面设置跟上面的Linux一样。
2. 管理界面介绍
2.1. 概览
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
端口:
2.2. 连接
2.3. 通道
2.4. 交换机
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Type
解释
direct
它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中
fanout
它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
headers
headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。(headers 类型的交换器性能差,不实用,基本不会使用。)
topic
与 direct 模型相比,多了个可以使用通配符!,这种模型 Routing key 一般都是由一个或多个单词组成,多个单词之间以 “.” 分割,例如:item.insert 。星号 匹配一个词,例 audit.* ;# 号匹配一个或多个词 audit.#
x-delayed-message
延迟交换机,可以延迟接收消息
Features
解释
D
d
是 durable
的缩写,代表这个队列中的消息支持持久化
AD
ad
是 autoDelete
的缩写,代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除
excl
是 exclusive
的缩写,代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。需注意:
1. 排他队列基于连接可见,同一连接的不同信道可访问同一连接创建的排他队列;
2. 首次声明后,其他连接不允许创建同名排他队列;
3. 即使队列持久化,连接关闭或客户端退出,排他队列仍会自动删除。适用于单个客户端发送读取消息的场景
Args
是 arguments
的缩写,代表该队列配置了 arguments
参数
TTL
是 x-message-ttl
的缩写,用于设置队列中所有消息的生存周期(统一配置),也可在发布消息时单独为某个消息指定剩余生存时间,单位为毫秒
Exp
Auto Expire
,是 x-expires
配置的缩写。当队列在指定时间内未被访问(如 consume
、basicGet
、queueDeclare
等操作),将会被删除
Lim
说明该队列配置了 x-max-length
,用于限定队列中消息的最大数量,超过指定长度将删除最早的消息
Lim B
说明队列配置了 x-max-length-bytes
,用于限定队列最大占用空间大小,通常受内存、磁盘空间限制
DLX
说明该队列配置了 x-dead-letter-exchange
。当队列消息长度超过限制、过期等情况发生时,从队列中删除的消息将被推送到指定的交换机,而非直接丢弃
DLK
x-dead-letter-routing-key
的缩写,用于将删除的消息推送到指定交换机的指定路由键对应的队列中
Pri
x-max-priority
的缩写,标识该队列为优先级队列。需先定义最大优先级值(通常不宜过大),发布消息时可指定消息优先级,数值更大(优先级更高)的消息优先被消费
Ovfl
x-overflow
的缩写,用于配置队列消息溢出时的处理方式。可选择 drop-head
(默认,丢弃队列头部消息)或 reject-publish
(拒绝接收生产者后续发送的所有消息)
ha-all
镜像队列。all
表示将队列镜像到集群上的所有节点,ha-params
参数将被忽略
2.5. 队列
点击名称进去,可以看到队列的详细信息
get Message可以看到消息的内容
arguments具体参数如下:
参数名
作用
x-message-ttl
发送到队列的消息在被丢弃之前可存活的时间(单位:毫秒)
x-max-length
限定队列可容纳的最大消息数量
x-expires
队列在自动删除前可使用的时长(单位:毫秒)
x-max-length-bytes
以字节为单位限制队列的容量大小,通过控制队列占用的存储空间来达到容量限制目的,需设置为非负整数
x-dead-letter-exchange
配置队列溢出行为(有效值为 drop-head
或 reject-publish
);同时指定当消息被拒绝或过期时,将消息重新发布到的交换机名称(可选)
x-dead-letter-routing-key
当消息变为死信时,可选的替换路由键;若未设置,则使用消息的原始路由键
x-max-priority
定义队列支持的最大优先级数值;若未设置,队列不支持消息优先级功能
x-queue-mode
将队列设置为延迟模式,使队列尽可能将消息存储在磁盘以降低内存占用;若未设置,队列将保留内存缓存以实现消息快速传递
x-queue-master-locator
设置队列的主位置模式,确定在集群节点上声明队列时,队列主节点所在位置的规则
2.6. Admin用户和虚拟主机管理
2.6.1. 添加用户
上面的Tags选项,其实是指定用户的角色,可选的有以下几个:
- 超级管理员 (administrator):能够登录管理控制台,可查看所有信息,还能对用户和策略(policy)进行操作。
- 监控者 (monitoring):可以登录管理控制台,同时能查看 RabbitMQ 节点的相关信息,如进程数、内存使用情况、磁盘使用情况等。
- 策略制定者 (policymaker):可登录管理控制台,对策略(policy)进行管理,但无法查看节点的相关信息(如进程数、内存使用情况等,即上图红框标识的部分) 。
- 普通管理者 (management):仅能登录管理控制台,既无法看到节点信息,也不能对策略进行管理。
- 其他:不具备登录管理控制台的权限,通常作为普通的生产者和消费者,专注于消息的生产和消费操作。
2.6.2. 创建虚拟主机
为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。
创建好虚拟主机,我们还要给用户添加访问权限:
点击添加好的虚拟主机:
进入虚拟主机设置界面:
2.7. 开启日志
进入容器,输入
rabbitmq-plugins enable rabbitmq_tracing
此时会多一个tracing标签,输入信息添加日志。
**Name:**自定义,建议标准点容易区分
**Format:**表示输出的消息日志格式,有Text和JSON两种,Text格式的日志方便人类阅读,JSON的方便程序解析。
JSON格式的payload(消息体)默认会采用Base64进行编码,如上面的“trace test payload.”会被编码成“dHJhY2Ug dGVzdCBwYXlsb2FkLg==”。
Max payload bytes: 表示每条消息的最大限制,单位为B。比如设置了了此值为10,那么当有超过10B的消息经过Rabbit MQ流转时,在记录到trace文件的时候会被截断。如上text日志格式中“trace test payload.”会被截断成“trace tes
Pattern:
#:追踪所有进入和离开MQ的消息
publish.#:追踪所有进入MQ的消息
publish.myExchage:追踪所有进入到myExchange的消息
deliver.#:跟踪所有离开MQ的消息
deliver.myQueue:追踪所有从myQueue离开的消息
#.myQueue:实测效果等同于deliver.myQueue
添加后,点击即可查看日志
如果出现错误,是因为插件默认是使用 guest 用户,是因为把 guest 用户删除了,或者在配置文件里面使用其他用户
解决: 配置/etc/rabbitmq/rabbitmq.config,添加配置
{rabbitmq_tracing,
[
{directory, "/var/vcap/sys/log/rabbitmq-server/tracing"},
{username, <<"admin">>},
{password, <<"password">>}
]
},
三、详细说明
1. RabbitMQ 的 5 种核心消息模式
RabbitMQ 提供五种核心工作模式,适用于不同业务场景,满足多样化消息传递需求:
1.1. 简单模式(HelloWorld)
架构:单生产者、单消费者、单队列,无需手动配置交换机,默认使用 RabbitMQ 内置的 Direct 交换机。
原理:生产者发送消息直接进入队列,消费者从队列获取并处理消息。
应用场景:适用于快速验证消息收发功能,如基础程序间通信测试、简单任务处理,可实现高效的消息生产与消费。
1.2. 工作队列模式(Work Queue)
架构:一个生产者对应多个消费者,依赖默认交换机,消费者间形成竞争关系。
原理:生产者将任务消息发送至队列,多个消费者从队列拉取任务,同一消息仅被一个消费者处理,实现任务并行处理。若消费者宕机,未确认的消息会在一段时间后重新分配,保障任务可靠性。
应用场景:适用于异步处理耗时任务(如文件处理、数据分析)、批量执行后台任务(发送邮件、生成报表)等场景,提升系统处理能力。
1.3. 发布订阅模式(Publish/Subscribe)
架构:需配置fanout
类型交换机,交换机绑定多个队列,队列与消费者解耦。
原理:生产者将消息发送至fanout
交换机,交换机无视路由键,直接将消息广播至所有绑定队列,实现一对多消息分发。
应用场景:常用于实时通知(如系统告警、新闻推送)、日志广播等场景,确保消息快速扩散至多个接收端。
1.4. 路由模式(Routing)
架构:采用direct
类型交换机,交换机与队列绑定需指定精确的routing key
(路由键)。
原理:生产者发送消息时携带routing key
,交换机根据该键值将消息精准投递至匹配的队列,实现按条件筛选消息的定向分发。
应用场景:适用于根据业务规则分类处理消息的场景,如订单系统中按订单类型(普通订单、加急订单)分发至不同处理队列。
1.5. 主题模式(Topic)
架构:使用topic
类型交换机,支持通配符规则(*
匹配单个单词,#
匹配零个或多个单词)。
原理:生产者发送消息时携带routing key
,交换机依据通配符规则将消息路由至多个符合条件的队列,实现灵活的消息过滤与动态分发。
应用场景:适用于复杂消息分类场景,如日志系统按级别(info.*
、error.#
)和模块(user.log
、order.log
)归档消息,或物联网设备状态监控中的多维度数据分发。
通过灵活选择不同工作模式,RabbitMQ 能够高效支持从基础通信到复杂业务场景的消息处理需求,保障系统的可靠性与扩展性。
2. 交换机分发策略
当消息的生产者将消息发送到交换器之后,是不会存储消息的,而是通过中间层绑定关系将消息分发到不同的队列上,其中交换器的分发策略分为四种:Direct、Topic、Headers、Fanout!
- Direct:直连类型,即在绑定时设定一个 routing_key, 消息的 routing_key 匹配时, 才会被交换器投送到绑定的队列中去,原则是先匹配、后投送;
- Topic:按照规则转发类型,支持通配符匹配,和 Direct 功能一样,但是在匹配 routing_key的时候,更加灵活,支持通配符匹配,原则也是先匹配、后投送;
- Headers:头部信息匹配转发类型,根据消息头部中的 header attribute 参数类型,将消息转发到对应的队列,原则也是先匹配、后投送;
- Fanout:广播类型,将消息转发到所有与该交互机绑定的队列上,不关心 routing_key;
2.1. Direct(直接交换机)
Direct 交换机是 RabbitMQ 的默认交换机类型,遵循精确匹配路由规则:只有当消息携带的路由键(routing key)与队列绑定到交换机时配置的绑定键(binding key)完全一致,消息才会被转发至该队列。这种模式属于单播机制,确保消息精准投递。
RabbitMQ 默认提供了一个名称为空字符串的 Direct 交换机(在 Web 管理界面显示为(AMQP default)
),该交换机自动绑定到所有队列,且每个队列的绑定键为自身名称。因此,在未显式指定交换机的场景下,生产者发送的消息实际通过此默认交换机完成路由,这也是部分开发者误以为 “不配置交换机也能收发消息” 的原因。
例如,若交换机绑定队列时设置绑定键为black
,仅当消息路由键也为black
时,消息才会被转发至该队列;若消息路由键为black.green
,则不会被投递。Direct 交换机以严格的一对一匹配逻辑,实现消息的确定性路由。
示例1:
如图,假设生产发送消息到直接交换机,消息路由键为:green,那么消息将被投放入Queue1队列中,因为Queue1的绑定建和消息路由键精确匹配。
示例2:
如图,假设生产发送消息到直接交换机,消息路由键为:green,那么消息将被投放入Queue1和Queue2队列中,因为Queue1,Queue2的绑定建和消息路由键精确匹配。
2.2. Fanout(扇出交换机)
扇出(fanout)类型的交换机与其他类型不同,它采用消息广播模式。其消息转发规则是:会将所有发送到该交换机的消息转发到与之绑定的所有消息队列中,完全不考虑路由键和绑定键,即便配置了路由键也会被忽略。
这就类似于子网广播,子网内的每台主机都会获得一份复制的消息。在 RabbitMQ 的四种交换机类型(Direct、Topic、Headers、Fanout)中,扇出交换机转发消息的速度是最快的,能够高效地将消息同时分发给多个绑定队列。
示例:
假设生产发送消息到直接交换机,消息路由键为:quick,那么最后消息将被投放入所有队列中(和路由键,绑定键无关)
2.3. Topic(主题交换机)
Topic 类型交换机基于通配符匹配规则实现消息路由,相比 Direct 交换机更加灵活:仅当消息携带的路由键(routing key)符合队列绑定的通配符规则时,才会将消息转发至对应队列。该模式支持两种特殊通配符:
*****
(星号):匹配单个单词。例如绑定键为logs.*
,仅接收路由键中包含两个单词且第一个单词为logs
的消息(如logs.error
)。**#**
(井号):匹配零个或多个单词。若绑定键设置为logs.#
,可接收logs
开头的任意长度单词消息(如logs
、logs.error
、logs.error.server
);当绑定键为#
时,该队列将接收交换机的所有消息,效果等同于 Fanout 交换机。
当绑定键中不使用通配符时,Topic 交换机将退化为 Direct 交换机的精确匹配模式。例如,若绑定键为black
,仅当消息路由键也为black
时才会投递;而使用通配符(如black.#
),则可匹配black
、black.green
等包含black
前缀的多种路由键,实现动态、灵活的消息过滤与分发,适用于日志分级处理、多维度消息分类等场景。
示例:
假设
- 生产发送消息到主题交换机,消息路由键为:
quick.orange.rabbit
、lazy.orange.element
,那么消息将被投放入所有队列中。 - 生产发送消息到主题交换机,消息路由键为:
quick.orange.fox
,那么消息将被投放入Queue1队列中。 - 生产发送消息到主题交换机,消息路由键为:
lazy.pink.rabbit"
,那么消息将被投放入Queue2队列中,且只会放入一次,虽然匹配两个绑定键。 - 生产发送消息到主题交换机,消息路由键为:
quick.brown.fox
、orange
、quick.orange.new.rabbit
,那么消息将不会被投放入任何队列中。 - 生产发送消息到主题交换机,消息路由键为:
lazy.orange.new.rabbit
,那么消息将被投放入Queue2队列中。
2.4. Headers(消息头交换机)
头部交换机(Headers)依据消息的自定义消息头属性(headers)进行消息路由匹配,与路由键和绑定键无关,具体规则如下:
2.4.1. 绑定指定键值对且包含 x - match
当在绑定队列和交换机时指定了一组键值对,并且其中包含一个键为 x - match
,其值为 any
或 all
:
**all**
:消息携带的键值对(即消息头)必须全部匹配消息和队列绑定时配置的全部键值对(消息携带的键值对可以包含绑定键值中没有的键值对),消息才会被转发到该队列。**any**
:消息携带的键值对只要匹配消息和队列绑定时配置的任一键值对,消息就会被转发到该队列。
2.4.2. 绑定指定键值对但未指定 x - match
如果在绑定队列和交换机时指定了一组键值对,但没有指定 x - match
键值对,那么默认 x - match
为 all
。
2.4.3. 绑定未指定键值对
若在绑定队列和交换机时未指定键值对,交换机也会把消息发送到该队列。
2.4.4. 特殊匹配说明
对于 any
和 all
模式,以字符串 x -
开头的消息头不会用于路由匹配。若将 x - match
设置为 any - with - x
或 all - with - x
,则会使用以字符串 x -
开头的请求头进行路由匹配。
2.4.5. 与其他交换机对比
Headers 交换机同样是基于规则匹配来路由消息,与 Direct 和 Topic 交换机固定使用路由键不同,它通过自定义的消息头部键值对规则进行匹配。在队列与交换机绑定时设定一组键值对规则,消息中也包含一组键值对(即 headers
属性),当这些键值对部分或全部匹配时,消息就会被投送到对应队列。不过,Headers 交换机和 Direct 交换机功能类似,但性能远不如 Direct 交换机,目前在实际应用中几乎很少使用。
2.5. 不同交换机使用场景
不同类型的交换机性能排序:fanout > direct > topic > headers。
不同类型的 RabbitMQ 交换机适用于不同的使用场景,以下是它们的一些常见用例:
- Direct Exchange(直连交换机):适用于一对一的消息分发,如订单处理系统,每个订单只发送到特定的处理队列。
- Fanout Exchange(扇出交换机):适用于广播消息给多个消费者,不关心消息的具体内容。如:实时日志处理,将日志广播给多个监控系统。
- Topic Exchange(主题交换机):根据消息的特定属性选择性地将消息路由到不同的队列。如:新闻发布系统,根据主题和地区将新闻分发给不同的订阅者。
- Headers Exchange(头交换机)尽量不要用,性能太低。根据消息的头部属性来选择性地将消息路由到队列。
- Default Exchange(默认交换机):用于简单的队列名称与绑定键相同的情况,不需要显式声明交换机。如:简单的消息发布和订阅,不需要复杂的路由规则。
选择适当的交换机类型取决于您的应用程序需求。通常情况下,您可以根据以下考虑来选择:
- 如果消息需要精确路由到特定队列,使用直连交换机。
- 如果需要将消息广播给多个队列,使用扇出交换机。
- 如果消息需要根据多个属性进行选择性路由,使用主题交换机
- 如果只是简单的队列名称与绑定键相同,可以使用默认交换机。
3. 队列 – Queue
RabbitMQ中的队列是一个缓冲区,用于为消费者应用程序存储消息。这些应用程序可以创建、使用和删除队列。RabbitMQ中的队列可以是持久的、临时的或自动删除的。持久队列一直保留,直到它们被删除。临时队列一直存在,直到服务器关闭。自动删除队列在不再使用时被删除。
3.1. Queue的属性
Type:指定Queue类型。RabbitMQ提供了三种Type:
- Classic Queues:经典队列。这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性
- Quorum Queues:仲裁队列。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。
- Stream Queues:流式队列。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景
Durability:声明队列是否持久化,即服务器重启后是否还存在
**Auto delete:**是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除
**Exclusive:**exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除;基本上不设置它,设置成false
**Arguments:**队列的其他属性,例如指定DLX(死信交换机等);
通过设置不同的属性,可以实现不同功能的Queue,如优先级队列,死信队列,临时队列等
3.2. 3.2 优先级队列
3.2.1 什么是优先级队列
优先级队列是一种特殊类型的队列,它根据消息的优先级进行排序和发送,在这种队列中,高优先级的消息将先被消费。
优先级可以设置为1255,但建议设置为15,如果最大值5满足不了需求建议1~10。更高的优先级需要更多的CPU和内存资源,因为RabbitMq需要在内部为每个优先级维护一个子队列,从1到你配置最大值。
优先级队列与普通队列有相同特性,如支持持久化,分页和镜像等功能,但需要注意的是消息的过期机制,过期的消息是从队列的头部开始过期的,即使你设置了队列级别的TTL,低优先级的过期消息仍然会被高优先级的未过期消息阻塞,导致无法传递,但它会出现在队列统计信息中;
另外,如果你队列设置了最大长度限制,RabbitMq会按正常流程从队列中删除消息,无论是高优先级还是低优先级。
3.2.2 如何配置优先级队列
- 创建Queue时,需要添加 x-max-priority 参数。此参数应为介于 1 和 255 之间的正整数,指示队列应支持的最大优先级。
- 创建Exchange及Binding
- 发布者发布消息时,在**Properties**中添加 priority 属性,指定消息的优先级。值越大,优先级越高。
没有 priority 属性的消息其优先级被视为 0。优先级高于队列最大值的消息将被视为以最大优先级发布。
3.2.3 优先级队列的应用
- 订单催付,客户下单后,商家希望在客户设定的时间内完成付款,如果为付款,我们需要推送催付短信。对于大客户的订单,我们就可以使用优先队列去处理。
- 任务调度,后台调度任务中,有些任务可以稍后执行,有些需要立即执行,所以我们可以用优先队列发送一些紧急任务。
3.3. 死信队列
3.3.1 什么是死信
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
死信的来源主要有三类:
- 消息 TTL 过期:通过配置 x-message-ttl 参数指定消息过期实践
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中):通过配置 x-max-length参数指定队列最大长度
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
3.3.2 如何配置死信队列
上图是死信队列的一个例子。我们通过交换机 tempreture.ex 接收消息,消息路由到 队列 tempreture.queue。在队列中队列 tempreture.queue中,配置超时时间(如3秒)和死信交换机tempreture.dlx.ex。 消息超时后,消息会被转移到死信交换机 tempreture.dlx.ex 中,该死信交换机绑定队列tempreture.dlx.queue。
如上例子中,队列tempreture.queue的 arguments配置了:
x-dead-letter-exchange:
tempreture.dlx.ex
x-message-ttl:
5000
3.4. 自动删除队列 – Auto delete
临时队列是一种自动删除队列。当这个队列被创建后,如果没有消费者监听,则会一直存在,还可以不断向其发布消息。但是一旦的消费者开始监听,然后断开监听后,它就会被自动删除。
PS:有些地方将自动删除队列称为 临时队列。不过临时队列有时还指 非持久化队列 (与持久化队列相对应,对应参数Durability)
自动删除队列创建时,将Auto delete配置成Yes 即可。
3.5. 惰性队列
在RabbitMQ 3.12之前,Classic的队列可以配置为在惰性模式下运行,这意味着它们会将所有消息写入磁盘,而不会将消息保存在内存中。 3.12之后,该配置被忽略。当前的行为是:
- 通常情况下,消息被短暂的存储在内存中,知道缓存的消息写入磁盘
- 一小部分消息仍保留在内存中,以满足消息的快速传递(存储在内存中的消息数量,取决于消费者的消费速度)
- 如果发布的消息可以立即传递给消费者,并且消费者在消息被写入磁盘之前确认了消息,那么消息不会被写入磁盘(此时消息已经传递并得到确认,因此不需要写入)
3.6. 延迟队列
RabbitMQ本身不直接支持延迟消息队列**,**但是可以通过一些高级特性配合使用来实现延迟消息的功能。一种常见的方法是使用RabbitMQ的"死信队列"(Dead Letter Queues)特性配合消息的TTL(Time-To-Live)。
- 首先,设置一个交换机(Exchange)和队列(Queue),并将它们绑定,用于存放需要延迟处理的消息。
- 将消息的TTL设置为所需的延迟时间。
- 当消息过期,它们会被自动发送到一个死信交换机(Dead Letter Exchange)。
- 创建一个死信队列,并将其绑定到死信交换机。
- 消费者从死信队列中获取消息进行处理。
3.6.1. 延迟队列使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
4. 其它工作机制
4.1. 消息确认机制
RabbitMQ 提供两种消息确认模式,用于解决消息处理失败、消费者宕机等场景下的消息生命周期管理问题:
- 自动确认模式:消费者接收消息后,RabbitMQ 自动认为消息已被处理,无需额外确认操作。此模式虽高效,但存在消息丢失风险,适用于允许少量数据丢失的场景。
- 显式确认模式:消费者需调用
basic.ack
方法手动确认消息。确认时机灵活,可在消息接收后、持久化存储前或处理完成后执行。若消费者未发送确认即消亡,RabbitMQ 会将消息重新分配给其他消费者或等待新消费者接入,确保消息不丢失。但需注意,若消费者遗漏确认,可能导致队列消息堆积。
4.2. 生产者确认机制
为确保消息成功抵达 RabbitMQ,生产者可采用以下两种方式:
- 事务机制:通过
Tx()
、TxCommit()
和TxRollback()
方法实现事务控制。开启事务后,仅当消息成功提交至 RabbitMQ,事务才会生效;若提交前发生异常,可回滚事务并重发消息。但事务机制会使生产者处于阻塞状态,严重影响性能。 - 发送者确认机制:将信道设置为确认(confirm)模式后,每条消息会被分配唯一 ID。消息抵达匹配队列或持久化到磁盘后,RabbitMQ 会发送确认(Ack)或否定确认(Nack)。生产者可通过回调方法异步处理确认结果,支持批量确认(
multiple
参数)。该机制虽提升性能,但无法回滚,服务器崩溃时可能导致消息重复,需业务层实现去重逻辑。
4.3. 消息持久化
为防止 RabbitMQ 重启导致消息丢失,需将队列和消息均标记为持久化:
- 队列持久化:声明队列时设置
durable=True
,确保队列元数据存储在磁盘,如channel.queue_declare(queue='hello', durable=True)
。 - 消息持久化:发送消息时设置
delivery_mode = pika.DeliveryMode.Persistent
,使消息写入磁盘。但需注意,持久化存在短暂未写入磁盘的窗口风险,结合发布确认机制可增强可靠性。
4.4. 工作队列消息分发
工作队列通过多个消费者处理同一队列消息,提升处理效率,支持两种分发策略:
- 轮询分发(Round-robin):RabbitMQ 按顺序将消息发送给消费者,实现负载均衡。但可能出现消息丢失(未处理即确认)、任务分配不均(复杂任务积压)等问题。
- 公平分发(Fair dispatch):消费者设置
basic_qos(prefetch_count=1)
,每次仅接收一条消息,处理完成并手动确认后再获取下一条,避免忙闲不均。适用于任务处理耗时差异大的场景。
4.5. 备用交换机(Alternate Exchange, AE)
用于处理无法路由的消息(无匹配队列或绑定规则):当消息无法路由至目标队列时,交换机将其转发至配置的备用交换机。若 AE 仍无法路由,消息将继续沿 AE 链传递,直至成功路由或到达链尾。此机制可捕获异常消息、实现分级处理逻辑。
4.6. 队列长度限制
可通过策略或客户端参数设置队列最大长度(消息数或字节数):
- 限制规则:仅计算就绪态消息(未确认消息不计入)。达到限制后,默认丢弃队首消息,可通过
overflow
参数调整行为(如reject-publish
拒绝新消息、reject-publish-dlx
将拒绝消息转为死信)。 - 监控指标:通过
messages_ready
和message_bytes_ready
查看就绪态消息数量及占用空间。
4.7. 死信交换机(Dead Letter Exchange, DLX)
当消息出现以下情况时,会被重新发布到死信交换机:
- 消费者使用
basic.reject
或basic.nack
且requeue=false
; - 消息因 TTL 过期;
- 队列超过长度限制;
- 仲裁队列中消息重试次数超过
delivery-limit
。
可通过客户端参数(x-dead-letter-exchange
)或策略配置 DLX,死信消息将使用原路由键或指定路由键重新路由,需避免死信循环导致消息无限流转。
4.8. 优先级队列
RabbitMQ 支持为经典队列设置优先级(1-255,建议 1-5):
- 声明方式:通过
x-max-priority
参数声明队列支持的最大优先级,如args.put("x-max-priority", 10)
;发布消息时设置priority
字段指定优先级。 - 行为特性:高优先级消息优先处理,但需注意内存、CPU 开销;未设置优先级的消息默认为 0;结合
basic.qos
可避免优先级失效(如消费者预取过多导致高优先级消息等待)。
4.9. 延迟消息
通过rabbitmq_delayed_message_exchange
插件实现:发送消息时设置x-delay
头(单位毫秒)指定延迟时间,到期后消息将路由至匹配队列。若无法路由,消息将被丢弃。
4.10. 生存时间(Time-To-Live, TTL)
可分别为消息和队列设置过期时间:
- 消息 TTL:通过策略(
message-ttl
)或消息属性(expiration
)设置,取队列级和消息级 TTL 的最小值。过期消息将被丢弃或转为死信(取决于队列类型)。 - 队列 TTL:通过策略(
expires
)或参数(x-expires
)设置,仅适用于非持久化经典队列,未使用(无消费者、未重声明、无basic.get
调用)的队列将在到期后自动删除。
四、SpringBoot中使用RabbitMQ
1. 简单模式
基础配置
首先创建两个SpringBoot项目,一个作为消费者,一个作为生产者,并导入依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在两个项目的yaml文件中写入rabbitmq的配置信息
spring:
rabbitmq:
host: localhost # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: guest # 用户名
password: guest # 密码
在rabbitmq中创建一个队列叫做queue1
生产者
在生产者中编写测试类,发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitPublisherApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void SimpleQueue(){
//队列名字
String name = "queue1";
//具体的消息
String message = "你好!!!rabbit";
//发送消息
rabbitTemplate.convertAndSend(name,message);
}
}
执行测试类后在rabbitmq的相应队列中就会有消息了。
消费者
随后编写消费者来消费消息。
创建一个监听类,来监听具体的队列
@Component
public class SpringRabbitListener {
/**
* 从队列queue1中取消息
* @param msg
*/
@RabbitListener(queues = "queue1")
public void listenSimpleQueueMessage(String msg) {
System.out.println("spring 消费者接收到消息:" + msg);
}
}
随后启动消费者项目,就可以从队列中成功获取到消息了。
2. 工作队列模式
编写方法发送20条消息
@Test
public void WorkQueue(){
String name = "work.queue";
for(int i=0;i<20;i++){
String message = "第"+i+"消息";
rabbitTemplate.convertAndSend(name,message);
}
}
用两个消费者来接收
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage1(String msg) {
System.out.println("第一位消费者接收到消息:" + msg);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage2(String msg) {
System.out.println("第二位消费者接收到消息:" + msg);
}
先启动消费者再发送消息:
此时可以发现两个消费者将消息均匀分配,每位各接收10条消息,这里是因为mq有预分配,会先预取,每个消费者和队列中有一个通道,存放预取的消息,所以会平均分消息,然后各自独立消费。
但是假如我们的消费者消费能力不一样呢,rabbitmq也会将其平均分配,这样的机制显然是不合理的,因此面对这种情况时,可以通过控制预取数量来改变这种情况,我们可以在yml文件中编写配置。这样就实现了取一个消费一个,不会先预取很多个,如果消费能力强取得就快,消费能力弱,取得就慢。
spring:
rabbitmq:
host: localhost # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: guest # 用户名
password: guest # 密码
listener:
simple:
prefetch: 1
3. 发布/订阅模式(广播模式)
广播模式使用fanout交换机
首先编写一个配置类,配置交换机和队列
@Configuration
public class FanoutConfig {
//声明交换机,设置名称
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("exchange1");
}
//队列1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout1",true);
}
//绑定队列1和交换机
@Bean
public Binding binding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
//队列2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout2",true);
}
//绑定交换机和队列2
@Bean
public Binding bindingQueue2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
发送消息
@Test
public void fanout(){
String name = "exchange1";
String meg = "这里时fanout";
rabbitTemplate.convertAndSend(name,"",meg);
}
接收消息
@RabbitListener(queues = "fanout1")
public void listenWorkQueueMessage1(String msg) {
System.out.println("第一位消费者接收到消息:" + msg);
}
@RabbitListener(queues = "fanout2")
public void listenWorkQueueMessage2(String msg) {
System.out.println("第二位消费者接收到消息:" + msg);
}
结果我们可以看到两个消费者都接收到了消息。
4. 路由模式
路由模式使用direct交换机,除了使用配置类的方式进行队列和交换机的绑定,还可以在消费者中使用注解的方式,例如
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name="direct",type = ExchangeTypes.DIRECT),
key = {"one","two"}
))
public void directListener1(String msg){
System.out.println("第一位消费者接收到来自direct.queue1的消息:"+ msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name="direct",type = ExchangeTypes.DIRECT),
key = {"three"}
))
public void directListener2(String msg){
System.out.println("第二位消费者接收到来自direct.queue2的消息:"+ msg);
}
发送消息
@Test
public void directTest(){
// 交换机名称
String exchangeName = "direct";
// 消息
String message = "第 one 消息";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "one", message);
// 消息
String message2 = "第 two 消息";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "two", message2);
// 消息
String message3 = "第 three 消息";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "three", message3);
}
最后可以看到,根据key路由接收到了消息。
5. 主题模式
主题模式和路由模式类似,使用topic交换机,只是可以在key中使用通配符,
主题模式中RoutingKey一般由一个或多个单词组成,用“.”分割。
通配符规则
# 匹配一个或多个词
* 匹配一个词
修改消费者配置
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name="topic",type = ExchangeTypes.TOPIC),
key = {"a.*"}
))
public void topicListener1(String msg){
System.out.println("第一位消费者接收到来自topic.queue1的消息:"+ msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name="topic",type = ExchangeTypes.TOPIC),
key = {"a.#"}
))
public void topocListener2(String msg){
System.out.println("第二位消费者接收到来自topic.queue2的消息:"+ msg);
}
发送消息
@Test
public void topicTest(){
// 交换机名称
String exchangeName = "topic";
String message = "消息:a.b";
rabbitTemplate.convertAndSend(exchangeName, "a.b", message);
String message2 = "消息:a.b.c";
rabbitTemplate.convertAndSend(exchangeName, "a.b.c", message2);
String message3 = "消息:a.b.c.d";
rabbitTemplate.convertAndSend(exchangeName, "a.b.c.d", message3);
}
最后结果可以看到,第二个消费者可以拿到所有消息,第一个消费者只能拿到a.b的消息
6. 消息转换器
代码里直接发送对象
,虽然接收的到消息,但是rabbitmq的界面上看到的消息会是乱码
依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
配置
@Configuration
public class rabbitmqConfig {
// 消息转换配置
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
再次发送就会是转换好的消息
7. 进阶
7.1. 消费者并发消费
让消费者可以开启多个线程并发
去消费消息,可以配合上方工作队列
只需要加配置
spring:
rabbitmq:
addresses: 127.0.0.1:5672
# host: 127.0.0.1
# port: 5672
username: 你的账号
password: 你的密码
virtual-host: /
# 消费者配置
listener:
simple:
concurrency: 2 # 并发数
max-concurrency: 10 #最大并发数
7.1.1. 生产者
正常发送消息,发送10个
// 并发消费
@GetMapping("/test")
public void test(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("concurrency","测试并发消费消息");
}
}
7.1.2. 消费者
@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency(String msg) throws InterruptedException {
long l = System.currentTimeMillis();
// 打印线程名
String name = Thread.currentThread().getName();
System.out.println("name = " + name);
Thread.sleep(1000); // 休眠一秒,好看效果
long l2 = System.currentTimeMillis();
System.out.println("time" + (l2-l)/1000);
}
打印日志发现,并发消费生效
配合工作队列,消费速度就非常快了
7.1.3. 多个消费者
@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency(String msg) throws InterruptedException {
// 打印线程名
String name = Thread.currentThread().getName();
System.out.println("name = "+new Date() + name);
Thread.sleep(1000); // 休眠一秒,好看效果
}
@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency1(String msg) throws InterruptedException {
// 打印线程名
String name = Thread.currentThread().getName();
System.out.println("name1 = "+new Date() + name);
Thread.sleep(1000); // 休眠一秒,好看效果
}
7.2. 批量发送
实现多个消息批量发送,可配置每次发送几个,不足发送数时,等待超时后也继续发送
7.2.1. 批量发送配置类
// 批量发送配置
@Configuration
public class RabbitBatchSendConfig {
@Resource
ConnectionFactory connectionFactory;
/**
* 注入一个批量 template
* Spring-AMQP 通过 BatchingRabbitTemplate 提供批量发送消息的功能。如下是三个条件,满足任一即会批量发送:
*
* 【数量】batchSize :超过收集的消息数量的最大条数。
* 【空间】bufferLimit :超过收集的消息占用的最大内存。
* 【时间】timeout :超过收集的时间的最大等待时长,单位:毫秒。
* 不过要注意,这里的超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达 timeout 毫秒才算超时。
*
* @return BatchingRabbitTemplate
*/
@Bean
public BatchingRabbitTemplate batchRabbitTemplate() {
// 创建 BatchingStrategy 对象,代表批量策略
// 超过收集的消息数量的最大条数。
int batchSize = 10; // 例:每次发送10条
// 每次批量发送消息的最大内存 b
int bufferLimit = 1024 * 1024;
// 超过收集的时间的最大等待时长,单位:毫秒
int timeout = 10 * 1000; // 例:不足10条时,等待10秒继续发送
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(batchSize, bufferLimit, timeout);
// 创建 TaskScheduler 对象,用于实现超时发送的定时器
TaskScheduler taskScheduler = new ConcurrentTaskScheduler();
// 创建 BatchingRabbitTemplate 对象
BatchingRabbitTemplate batchTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);
batchTemplate.setConnectionFactory(connectionFactory);
return batchTemplate;
}
}
7.2.2. 生产者
@Resource
private BatchingRabbitTemplate batchingRabbitTemplate;
// 批量发送
@GetMapping("/test14")
public void test14(){
for (int i = 0; i < 15; i++) {
batchingRabbitTemplate.convertAndSend("batchSend","批量发送");
}
}
7.2.3. 消费者
@RabbitListener(queuesToDeclare = @Queue(value = "batchSend"))
public void batchSend(String msg){
System.out.println("msg = " +new Date() + msg);
}
可以看到消息批量发送已实现,不足10条的按配置等待10秒后发送
7.3. 批量消费
实现多个消息批量消费,可配置每次消费几个,不足消费数时,等待超时后也继续消费
7.3.1. 批量消费配置类
/**
* 批量消费
*/
@Configuration
public class RabbitBatchConsumerConfig {
@Resource
ConnectionFactory connectionFactory;
@Resource
SimpleRabbitListenerContainerFactoryConfigurer configurer;
/**
* 配置一个批量消费的 SimpleRabbitListenerContainerFactory
*/
@Bean(name = "consumer10BatchContainerFactory")
public SimpleRabbitListenerContainerFactory consumer10BatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// 这里是重点 配置消费者的监听器是批量消费消息的类型
factory.setBatchListener(true);
// 一批十个
factory.setBatchSize(10);
// 等待时间 毫秒 , 这里其实是单个消息的等待时间 指的是单个消息的等待时间
// 也就是说极端情况下,你会等待 BatchSize * ReceiveTimeout 的时间才会收到消息
factory.setReceiveTimeout(10 * 1000L);
factory.setConsumerBatchEnabled(true);
return factory;
}
}
7.3.2. 生产者
@GetMapping("/test13")
public void test13(){
for (int i = 0; i < 16; i++) {
rabbitTemplate.convertAndSend("batchConsume","测试批量消费消息");
}
}
7.3.3. 消费者
// 指定containerFactory
@RabbitListener(queuesToDeclare = @Queue(value = "batchConsume"),containerFactory = "consumer10BatchContainerFactory")
public void batchSend(List<Message> msg){ //接收换成List
for (int i = 0; i < msg.size(); i++) {
System.out.println("msg = " +new Date() + msg.get(i).getBody());
}
}
可以看到消息批量消费已实现,不足10条的按配置等待10秒后消费
7.4. 基于插件延迟队列
延迟队列可以将消息发送后使消费者延迟接收。
7.4.1. RabbitAdmin配置
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
7.4.2. 封装发送延迟队列工具类
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Component
public class DelayedQueue {
// routingKey
private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 延迟队列交换机
private static final String DELAYED_EXCHANGE = "delayed.exchange";
@Autowired
RabbitTemplate rabbitTemplate;
@Resource
RabbitAdmin rabbitAdmin;
/**
* 发送延迟队列
* @param queueName 队列名称
* @param params 消息内容
* @param expiration 延迟时间 毫秒
*/
public void sendDelayedQueue(String queueName, Object params, Integer expiration) {
// 先创建一个队列
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
// 创建延迟队列交换机
CustomExchange customExchange = createCustomExchange();
rabbitAdmin.declareExchange(customExchange);
// 将队列和交换机绑定
Binding binding = BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
rabbitAdmin.declareBinding(binding);
// 发送延迟消息
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE, DELAYED_ROUTING_KEY, params, msg -> {
// 发送消息的时候 延迟时长
msg.getMessageProperties().setDelay(expiration);
return msg;
});
}
public CustomExchange createCustomExchange() {
Map<String, Object> arguments = new HashMap<>();
/**
* 参数说明:
* 1.交换机的名称
* 2.交换机的类型
* 3.是否需要持久化
* 4.是否自动删除
* 5.其它参数
*/
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", true, false, arguments);
}
}
7.4.3. 生产者
@Autowired
private DelayedQueue delayedQueue;
/**
* 发送延迟队列
* @param queueName 队列名称
* @param params 消息内容
* @param expiration 延迟时间 毫秒
*/
@GetMapping("/test9")
public void topicTest8() {
delayedQueue.sendDelayedQueue("delayTest2","这是消息",5000);
}
7.4.4. 消费者
@RabbitListener(queuesToDeclare = @Queue(value = "delayTest2",durable = "true"))
public void declareExchange2(String message){
System.out.println("delayTest2 = " + message);
}
7.5. TTL队列
TTL是time to live的缩写,生存时间,RabbitMQ支持消息的过期时间,消息发送时可以指定,从消息入队列开始计算,只要超过队列的超时时间配置,消息没被接收,消息就会自动清除
7.5.1. 封装发送TTL队列工具类
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Component
public class TtlQueue {
// routingKey
private static final String TTL_KEY = "ttl.routingkey";
private static final String TTL_EXCHANGE = "ttl.exchange";
@Autowired
RabbitTemplate rabbitTemplate;
@Resource
RabbitAdmin rabbitAdmin;
/**
* 发送TTL队列
* @param queueName 队列名称
* @param params 消息内容
* @param expiration 过期时间 毫秒
*/
public void sendTtlQueue(String queueName, Object params, Integer expiration) {
/**
* ----------------------------------先创建一个ttl队列--------------------------------------------
*/
Map<String, Object> map = new HashMap<>();
// 队列设置存活时间,单位ms,必须是整形数据。
map.put("x-message-ttl",expiration);
/*参数1:队列名称 参数2:持久化 参数3:是否排他 参数4:自动删除队列 参数5:队列参数*/
Queue queue = new Queue(queueName,true,false,false,map);
rabbitAdmin.declareQueue(queue);
/**
* ---------------------------------创建交换机---------------------------------------------
*/
DirectExchange directExchange = new DirectExchange(TTL_EXCHANGE, true, false);
rabbitAdmin.declareExchange(directExchange);
/**
* ---------------------------------队列绑定交换机---------------------------------------------
*/
// 将队列和交换机绑定
Binding binding = BindingBuilder.bind(queue).to(directExchange).with(TTL_KEY);
rabbitAdmin.declareBinding(binding);
// 发送消息
rabbitTemplate.convertAndSend(TTL_EXCHANGE,TTL_KEY,params);
}
}
7.5.2. 生产者
@Autowired
private TtlQueue ttlQueue;
/**
* 发送TTL队列
* @param queueName 队列名称
* @param params 消息内容
* @param expiration 过期时间 毫秒
*/
@GetMapping("/test10")
public void topicTest10() {
ttlQueue.sendTtlQueue("ttlQueue","这是消息内容",5000);
}
7.5.3. 消费者
@RabbitListener(queues = "ttlQueue" )
public void ttlQueue(String message){
System.out.println("message = " + message);
}
7.6. 死信队列
DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器。队列消息变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信的几种情况:
1.消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2. 消息TTL过期
3. 队列达到最大长度
流程:发送消息,消息过期后进入到另一个队列(这个队列设置持久化,不过期)的过程。
7.6.1. 封装发送死信队列工具类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Component
public class DLXQueue {
// routingKey
private static final String DEAD_ROUTING_KEY = "dead.routingkey";
private static final String ROUTING_KEY = "routingkey";
private static final String DEAD_EXCHANGE = "dead.exchange";
private static final String EXCHANGE = "common.exchange";
@Autowired
RabbitTemplate rabbitTemplate;
@Resource
RabbitAdmin rabbitAdmin;
/**
* 发送死信队列,过期后进入死信交换机,进入死信队列
* @param queueName 队列名称
* @param deadQueueName 死信队列名称
* @param params 消息内容
* @param expiration 过期时间 毫秒
*/
public void sendDLXQueue(String queueName, String deadQueueName,Object params, Integer expiration){
/**
* ----------------------------------先创建一个ttl队列和死信队列--------------------------------------------
*/
Map<String, Object> map = new HashMap<>();
// 队列设置存活时间,单位ms,必须是整形数据。
map.put("x-message-ttl",expiration);
// 设置死信交换机
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
// 设置死信交换器路由键
map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
/*参数1:队列名称 参数2:持久化 参数3:是否排他 参数4:自动删除队列 参数5:队列参数*/
Queue queue = new Queue(queueName,true,false,false,map);
rabbitAdmin.declareQueue(queue);
/**
* ---------------------------------创建交换机---------------------------------------------
*/
DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);
rabbitAdmin.declareExchange(directExchange);
/**
* ---------------------------------队列绑定交换机---------------------------------------------
*/
Binding binding = BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
rabbitAdmin.declareBinding(binding);
/**
* ---------------------------------在创建一个死信交换机和队列,接收死信队列---------------------------------------------
*/
DirectExchange deadExchange = new DirectExchange(DEAD_EXCHANGE, true, false);
rabbitAdmin.declareExchange(deadExchange);
Queue deadQueue = new Queue(deadQueueName,true,false,false);
rabbitAdmin.declareQueue(deadQueue);
/**
* ---------------------------------队列绑定死信交换机---------------------------------------------
*/
// 将队列和交换机绑定
Binding deadbinding = BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
rabbitAdmin.declareBinding(deadbinding);
// 发送消息
rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,params);
}
}
7.6.2. 生产者
@Autowired
private DLXQueue dlxQueue;
/**
* 发送死信队列,过期后进入死信交换机,进入死信队列
* @param queueName 队列名称
* @param deadQueueName 死信队列名称
* @param params 消息内容
* @param expiration 过期时间 毫秒
*/
@GetMapping("/test11")
public void topicTest11() {
dlxQueue.sendDLXQueue("queue","deadQueue","这是消息内容",5000);
}
7.6.3. 消费者
// 接收转移后的队列消息
@RabbitListener(queuesToDeclare = @Queue(value = "deadQueue",durable = "true"))
public void ttlQueue(String message){
System.out.println("message = " + message);
}
7.7. 消息确认
7.7.1. 发送消息确认机制
为确保消息发送有真的发送出去,设置发布时确认,确认消息是否到达 Broker 服务器
spring:
rabbitmq:
host: 47.99.110.29
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
如果有使用rabbitAdmin配置的话,那里也需要加配置
7.7.1.1. 修改RabbitAdmin配置
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitAdminConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtualhost}")
private String virtualhost;
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualhost);
// 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
7.7.1.2. 实现发送消息确认接口
消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。
/**
* 消息发送确认配置
*/
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // @PostContruct是spring框架的注解,在?法上加该注解会在项?启动的时候执?该?法,也可以理解为在spring容器初始化的时候执
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机不管是否收到消息的一个回调方法
* @param correlationData 消息相关数据
* @param ack 交换机是否收到消息
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){ // 消息投递到broker 的状态,true表示成功
System.out.println("消息发送成功!");
}else { // 发送异常
System.out.println("发送异常原因 = " + cause);
}
}
}
7.7.1.3. 实现发送消息回调接口
如果消息未能投递到目标queue里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // @PostContruct是spring框架的注解,在?法上加该注解会在项?启动的时候执?该?法,也可以理解为在spring容器初始化的时候执
public void init(){
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息"+returnedMessage.getMessage().toString()+"被交换机"+returnedMessage.getExchange()+"回退!"
+"退回原因为:"+returnedMessage.getReplyText());
// 回退了所有的信息,可做补偿机制
}
}
7.7.2. 消费者消息确认机制
为确保消息消费成功,需设置消费者消息确认机制,如果消费失败或异常了,可做补偿机制。
7.7.2.1. 配置
spring:
rabbitmq:
host: 47.99.110.29
port: 5672
username: guest
password: guest
virtual-host: /
# 消费者配置
listener:
simple:
prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
acknowledge-mode: manual # 设置消费端手动ack确认
retry:
enabled: true # 是否支持重试
# 生产者配置
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
7.7.2.2. channel.basicAck消息确认
消费者修改,利用消费者参数Channel 进行消息确认操作
@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
// 消息
System.out.println("msg = " + msg);
/**
* 确认
* deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
* multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
7.7.2.3. channel.basicNack消息回退
将消息重返队列
@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
try {
// 消息
System.out.println("msg = " + msg);
throw new RuntimeException("来个异常");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息消费异常,重回队列");
/**
* deliveryTag:表示消息投递序号。
* multiple:是否批量确认。
* requeue:值为 true 消息将重新入队列。
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
// 确认
/**
* deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
* multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
7.7.2.4. channel.basicReject消息拒绝
拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
/**
* 消息拒绝
* deliveryTag:表示消息投递序号。
* requeue:值为 true 消息将重新入队列。
*/
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
7.7.3. 封装消息确认处理类
我们在消费mq的时候都会做消息确认机制,消息要是消费过程中发送异常,将消息回退。
7.7.3.1. 正常代码
@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
try {
// 消息消费
System.out.println("msg = " + msg);
// 消息消费。。。。。。
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息消费异常,重回队列");
// 回退
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
// 确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
但是这样会有一个缺点,要是我们有很多个消费者,重复的代码太多。
7.7.3.2. 封装后的代码
封装后我们只需要处理消息内容
@RabbitListener(queues = "rabbitListener")
public void nonTicketUploadHandler(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(json, deliveryTag, channel, new MqListener<String>() {
@Override
public void handler(String msg, Channel channel) throws IOException {
// 消息消费
System.out.println("msg = " + msg);
// 消息消费。。。。。。
}
});
}
封装消息确认
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import java.io.IOException;
@Slf4j
public class BaseRabbiMqHandler<T> {
public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) {
try {
// 通过实现这个接口,处理我们的业务代码
mqListener.handler(t, channel);
/**
* 手动确认消息
* deliveryTag:该消息的index
* false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
*/
channel.basicAck(deliveryTag, false);
}
catch (Exception e) { log.error("接收消息失败,重新放回队列,异常原因:{}",e.getMessage());
try {
/**
* 重回队列
* deliveryTag:该消息的index
* multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
* requeue:被拒绝的是否重新入队列
*/
// channel.basicAck(deliveryTag, false);
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
mqListener接口
业务通过实现这个接口处理
import com.rabbitmq.client.Channel;
import java.io.IOException;
public interface MqListener<T> {
// 业务通过实现这个接口处理
default void handler(T map, Channel channel) throws IOException {
}
}
mq消费者
消费者代码示例
import com.rabbitmq.client.Channel;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import java.io.IOException;
public class TestListener extends BaseRabbiMqHandler<String> { // 继承封装的消息确认类
@RabbitListener(queues = "rabbitListener")
public void nonTicketUploadHandler(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(json, deliveryTag, channel, new MqListener<String>() { // 实现父类的消息方法里的接口
@Override
public void handler(String msg, Channel channel) throws IOException {
// 业务代码写这里,try/catch不用再写了
System.out.println("msg = " + msg);
// 消息消费。。。。。。
}
});
}
}
8. rabbitmq集群搭建
8.1. 普通集群
1、新建三个docker容器
docker run -d --hostname rabbit1 --name myrabbit1 -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15671:15672 -p 5671:5672 rabbitmq
docker run -d --hostname rabbit2 --name myrabbit2 -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 --link myrabbit1:rabbit1 rabbitmq
docker run -d --hostname rabbit3 --name myrabbit3 -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 --link myrabbit1:rabbit1 --link myrabbit2:rabbit2 rabbitmq
2、三个都进入容器下载可视化工具
3、进入第一个mq容器重启
docker exec -it ef4a1f0fade7 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
4、进入第二个 和 第三个 mq容器执行
docker exec -it e36d94d40008 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit1 //如遇到报错再执行上句、再继续执行
rabbitmqctl start_app
exit
5、进去mq可视化界面,overview面板中的Nodes可查看到节点信息。
6、测试,在mq上新建交换机、其余两个也出现新建的交换机
此时普通集群已构建完成
1、此种集群主节点down掉后,消费者也无法消费从节点的消息,不能做故障转移,只能当作备份。
2、主节点正常,从节点则可以消费消息
8.2. 镜像集群(高可用)
这种集群弥补第一种的缺陷,需在普通集群的基础下搭建(确保第一种集群可用)
镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升mq集群的高可用性。
1、配置集群架构
2、进入任意节点配置策略
docker exec -it ef4a1f0fade7 /bin/bash
rabbitmqctl set_policy ha-all "^rabbitmq" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
3、测试,新建一个rabbitmq开头的队列
此时某个节点down掉(包括主节点),其余节点也能消费
将主节点down掉,节点自动切换
4、清除策略
rabbitmqctl clear_policy ha-all