RabbitMq详解

发布于:2025-06-11 ⋅ 阅读:(41) ⋅ 点赞:(0)

一、概念

1. 什么是RabbitMQ

使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

2. 核心特点

RabbitMQ 是基于 Erlang 语言开发、遵循 AMQP(高级消息队列协议)的开源消息中间件,具备以下显著特性:

  1. 高可靠性保障:通过消息持久化、传输确认及发布确认等机制,确保消息在生产、传输与消费过程中的可靠性,避免数据丢失风险。
  2. 灵活的消息分发策略:依托 Exchange(交换机)实现多样化消息路由,支持简单模式、工作队列模式、发布订阅模式、路由模式及通配符模式,可灵活适配不同业务场景需求。
  3. 集群化部署能力:支持多台 RabbitMQ 服务器组成集群,构建逻辑统一的 Broker,提升系统可用性与负载均衡能力。
  4. 多协议兼容性:除 AMQP 外,还支持 STOMP、MQTT 等多种消息队列协议,便于与不同技术栈的系统集成。
  5. 广泛的客户端支持:提供对 Java、.NET、Ruby 等几乎所有常用编程语言的客户端支持,降低开发接入门槛。
  6. 可视化管理界面:内置直观易用的管理界面,方便用户实时监控消息队列状态、管理 Broker 资源及配置参数。
  7. 插件化扩展体系:具备丰富的官方插件,并支持自定义开发,可根据业务需求灵活扩展功能,如权限控制、消息追踪等。

3. RabbitMQ能做什么

RabbitMQ可以帮助开发者构建可靠、高效、可扩展的分布式系统,实现异步通信、任务分发和事件驱动等功能。它被广泛应用于各种场景,包括微服务架构、消息驱动的架构、实时数据处理等。

4. RabbitMQ的工作流程

RabbitMQ的工作流程主要基于生产者-消费者模型和AMQP协议。以下是RabbitMQ消息传递的基本流程:

  • **生产者发送消息:**生产者通过RabbitMQ的客户端库创建消息,并指定交换机的名称和路由键。然后,生产者将消息发送到RabbitMQ服务器上的指定交换机。
  • 交换机接收并路由消息:交换机接收到生产者的消息后,根据配置的路由规则和路由键将消息分发到相应的队列中。如果消息没有匹配到任何队列,则可能会被丢弃或返回给生产者。
  • **消费者消费消息:**消费者连接到RabbitMQ服务器,并监听指定的队列。当队列中有新消息时,消费者从队列中获取并处理消息。处理完成后,消费者可以选择发送确认消息给RabbitMQ服务器,以表示消息已被成功处理。
  • **消息确认机制:**RabbitMQ使用消息确认机制来确保消息的可靠传递。生产者在发送消息后会收到一个确认,表示消息已成功发送到交换机。消费者在处理完

5. 消息队列优缺点

消息队列在特殊场景下具备显著优势,主要体现在解耦、异步处理以及削峰等方面。然而,引入消息队列也伴随着一些不可忽视的缺点:

  1. 系统可用性降低:随着外部依赖的增加,系统的稳定性面临挑战。例如,原本 A 系统只需调用 B、C、D 三个系统的接口,系统运行正常。但引入消息队列(MQ)后,若 MQ 出现故障,可能导致整个系统崩溃。因此,如何保障消息队列的高可用性成为关键,可点击 [具体链接] 查看相关解决方案。
  2. 系统复杂度提升:消息队列的加入使得系统逻辑变得复杂。例如,需要处理消息的重复消费问题,即确保每条消息仅被处理一次;处理消息丢失的情况,通过可靠的消息传递机制保证消息不丢失;以及保证消息传递的顺序性,尤其是在对消息顺序有严格要求的场景中。这些问题增加了系统开发和维护的难度。
  3. 数据一致性问题:当 A 系统处理请求并返回成功后,可能出现数据不一致的情况。例如,B、D 系统写库成功,而 C 系统写库失败,这会导致数据状态的不一致。在使用消息队列时,需要额外的技术方案来确保数据的一致性,例如采用分布式事务、消息补偿机制等。

消息队列虽然功能强大,能够带来诸多好处,但同时也引入了系统复杂性。在使用消息队列时,需要针对其缺点制定相应的技术架构和解决方案,以平衡系统的性能、可用性和一致性。

6. AMQP和JMS

MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

二者的区别与联系:

  1. 接口与协议:JMS(Java Message Service)定义了统一的接口,用于规范消息操作;而 AMQP(Advanced Message Queuing Protocol)则是通过制定协议,来统一数据交互的格式。
  2. 语言限制:JMS 主要限定在 Java 语言环境中使用,专为 Java 平台提供消息服务;AMQP 作为一种协议,不限制实现语言,具备跨语言的特性,可在多种编程语言的项目中使用。
  3. 消息模型:JMS 规定了特定的两种消息模型,满足 Java 平台下的消息通信需求;相比之下,AMQP 的消息模型更为丰富多样,能够适应更多不同场景下的消息通信需求。

7. 常见MQ产品

消息队列(MQ)作为重要的中间件,在消息通信中起着关键作用。目前主流的 MQ 产品有 Kafka、ActiveMQ、RocketMQ 和 RabbitMQ,它们各自具有不同的特点和适用场景。

  1. ActiveMQ:基于 JMS(Java Message Service),在早期项目中广泛使用。但随着技术发展,其官网活跃度较低,更新频率低。单机吞吐量为万级,对于高并发的互联网项目,性能上无法满足需求。高可用方面采用主从架构实现,不过消息可靠性较低,存在数据丢失的可能。由于其性能和可靠性的限制,在现代项目中逐渐被替代,可被 RabbitMQ 取代。
  2. RabbitMQ:基于 AMQP 协议,使用 Erlang 语言开发。社区活跃,单机吞吐量同样为万级,无法应对超高并发场景。高可用上采用镜像集群模式,能保障系统的高可用性。消息可靠性较高,可保证数据不丢失,还支持消息重试、死信队列等高级功能。然而,由于其使用 Erlang 语言开发,国内精通该语言的人较少,导致源码阅读困难。适合中小型公司,在不需要面对复杂技术挑战的场景下使用。
  3. RocketMQ:阿里巴巴开源的消息中间件,基于 JMS。单机吞吐量可达 10 万级,能应对互联网项目的高并发挑战。高可用方面采用分布式架构,可搭建大规模集群,性能表现出色。消息可靠性高,通过配置能确保数据不丢失,同时支持延迟消息、事务消息、消息回溯、死信队列等丰富的高级功能。使用 Java 语言开发,便于阅读源码理解底层原理。在技术选型中是一个优秀的选择,适合大型互联网公司和中小型公司使用,商业版存在收费情况。
  4. Kafka:分布式消息系统,单机吞吐量极高,可达十几万的并发量。高可用上支持分布式集群部署。消息可靠性方面,由于消息先存储在磁盘缓冲区,机器故障时可能导致缓冲区数据丢失,即异步性能和数据可靠性存在一定矛盾。功能相对单一,主要用于消息的接收与发送。在行业内常用于大数据领域,如采集用户行为日志并进行计算,实现 “猜你喜欢” 等功能。如果项目没有大数据相关需求,一般不会选择 Kafka。

7.1. 选择 RabbitMQ 的原因
  1. 对比 ActiveMQ:ActiveMQ 性能不佳,在高并发场景下无法满足需求,虽然其 API 完善,但在高并发场景下不适用,而 RabbitMQ 的稳定性和可靠性更好,能满足更多场景需求。
  2. 对比 Kafka:Kafka 强调高性能,但在业务对消息可靠性要求高时,其可能丢失消息的特性无法满足要求。而 RabbitMQ 能保证消息不丢失,更适合对消息可靠性有要求的业务场景。
  3. 对比 RocketMQ:RocketMQ 具备高性能、高可靠性、支持分布式事务、水平扩展和大量消息堆积等优点,但商业版收费,某些功能不对外提供。RabbitMQ 社区活跃,功能也较为丰富,对于没有 RocketMQ 高级功能需求且不想承担商业版费用的项目来说,是一个不错的选择。

8. 核心组件

  • Publisher(生产者:消息的生产者,也是一个向交换器发布消息的客户端应用程序,负责将业务数据封装为消息,并推送至 RabbitMQ 消息服务器。

  • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序

  • Exchange(交换机):消息分发枢纽,接收来自生产者的消息,并根据消息携带的 RoutingKey(路由键)及绑定规则,将消息分配至对应的 Queue(队列)。

  • **Binding:**绑定,用于将消息队列和交换器之间建立关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将它理解成一个由绑定构成的路由表。

  • Queue(队列):消息存储容器,用于暂存待消费的消息,直到被消费者获取处理。

  • **Connection:**网络连接,比如一个 TCP 连接

  • 消息服务器(RabbitMQ):作为消息中转站,本身不生产或消费消息,仅负责接收生产者的消息,并根据规则路由至对应的消费者。

  • ConnectionFactory:连接管理器,负责创建并管理应用程序(生产者或消费者)与 RabbitMQ 之间的网络连接,确保通信链路的稳定建立与维护。

  • Channel(信道):消息传输通道,所有消息的发布、接收操作均通过信道完成。多个信道可复用同一物理连接,提升资源利用率与并发性能。

  • RoutingKey:消息携带的路由标识,作为交换机路由消息的依据,决定消息最终投递到哪个队列。

  • BindingKey:队列与交换机的绑定标识,通过 BindingKey 建立 Queue 与 Exchange 之间的关联关系,使交换机能够按规则将消息路由至目标队列 。

  • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

  • Broker:表示消息队列服务器实体

  • Message:消息实体,它由消息头和消息体组成。消息头主要由路由键、交换器、队列、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等属性组成,而消息体就是指具体的业务对象。

相比传统的 JMS 模型,AMQP 主要多了 Exchange、Binding 这个新概念。

在 AMQP 模型中,消息的生产者不是直接将消息发送到Queue队列,而是将消息发送到Exchange交换机,其中还新加了一个中间层Binding绑定,作用就是通过路由键Key将交换机和队列建立绑定关系。

就好比类似用户表和角色表,中间通过用户角色表来将用户和角色建立关系,从而实现关系绑定,在 RabbitMQ 中,消息生产者不直接跟队列建立关系,而是将消息发送到交换器之后,由交换器通过已经建立好的绑定关系,将消息发送到对应的队列。

RabbitMQ 最终的架构模型,核心部分就变成如下图所示:

从图中很容易看出,与 JMS 模型最明显的差别就是消息的生产者不直接将消息发送给队列,而是由Binding绑定决定交换器的消息应该发送到哪个队列,进一步实现了在消息的推送方面,更加灵活!

8.1. 生产者发送消息流程:

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

8.2. 消费者接收消息流程:

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

6、ack回复

二、安装使用

1. Windows安装

1.1. 安装erlang

进入erlang的官方下载页面进行下载:Downloads - Erlang/OTP

在下载过程中一定要对应匹配RabbitMQ的版本

双击安装并配置环境变量

1.2. 下载RabbitMQ

RabbitMQ下载地址:Installing RabbitMQ | RabbitMQ

双击安装,安装完成后,开始安装RabbitMQ-Plugins插件

先cd D:softwareRabbitMQ abbitmq_server-3.8.8sbin

然后运行命令:rabbitmq-plugins enable rabbitmq_management

执行rabbitmqctl status,出现以下内容,说明成功

然后双击运行rabbitmq-server.bat

进入登录页面,发现启动成功

然后我们可以将RabbitMQ做成Windows服务

以管理员身份运行cmd

cd D:softwareRabbitMQ abbitmq_server-3.8.8sbin

执行rabbitmq-service.bat install

可以通过任务管理器去查看RabbitMQ服务

以上就是Windows安装RabbitMQ的全部过程,页面设置跟上面的Linux一样。

2. 管理界面介绍

2.1. 概览

connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况

channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。

Exchanges:交换机,用来实现消息的路由

Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

端口:

2.2. 连接

2.3. 通道

2.4. 交换机

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

Type

解释

direct

它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中

fanout

它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中

headers

headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。(headers 类型的交换器性能差,不实用,基本不会使用。)

topic

与 direct 模型相比,多了个可以使用通配符!,这种模型 Routing key 一般都是由一个或多个单词组成,多个单词之间以 “.” 分割,例如:item.insert 。星号 匹配一个词,例 audit.* ;# 号匹配一个或多个词 audit.#

x-delayed-message

延迟交换机,可以延迟接收消息

Features

解释

D

ddurable的缩写,代表这个队列中的消息支持持久化

AD

adautoDelete的缩写,代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除

excl

exclusive

的缩写,代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。需注意:
1. 排他队列基于连接可见,同一连接的不同信道可访问同一连接创建的排他队列;
2. 首次声明后,其他连接不允许创建同名排他队列;
3. 即使队列持久化,连接关闭或客户端退出,排他队列仍会自动删除。适用于单个客户端发送读取消息的场景

Args

arguments的缩写,代表该队列配置了 arguments参数

TTL

x-message-ttl的缩写,用于设置队列中所有消息的生存周期(统一配置),也可在发布消息时单独为某个消息指定剩余生存时间,单位为毫秒

Exp

Auto Expire,是 x-expires 配置的缩写。当队列在指定时间内未被访问(如 consumebasicGetqueueDeclare 等操作),将会被删除

Lim

说明该队列配置了 x-max-length,用于限定队列中消息的最大数量,超过指定长度将删除最早的消息

Lim B

说明队列配置了 x-max-length-bytes,用于限定队列最大占用空间大小,通常受内存、磁盘空间限制

DLX

说明该队列配置了 x-dead-letter-exchange。当队列消息长度超过限制、过期等情况发生时,从队列中删除的消息将被推送到指定的交换机,而非直接丢弃

DLK

x-dead-letter-routing-key的缩写,用于将删除的消息推送到指定交换机的指定路由键对应的队列中

Pri

x-max-priority的缩写,标识该队列为优先级队列。需先定义最大优先级值(通常不宜过大),发布消息时可指定消息优先级,数值更大(优先级更高)的消息优先被消费

Ovfl

x-overflow的缩写,用于配置队列消息溢出时的处理方式。可选择 drop-head(默认,丢弃队列头部消息)或 reject-publish(拒绝接收生产者后续发送的所有消息)

ha-all

镜像队列。all表示将队列镜像到集群上的所有节点,ha-params参数将被忽略

2.5. 队列

点击名称进去,可以看到队列的详细信息

get Message可以看到消息的内容

arguments具体参数如下:

参数名

作用

x-message-ttl

发送到队列的消息在被丢弃之前可存活的时间(单位:毫秒)

x-max-length

限定队列可容纳的最大消息数量

x-expires

队列在自动删除前可使用的时长(单位:毫秒)

x-max-length-bytes

以字节为单位限制队列的容量大小,通过控制队列占用的存储空间来达到容量限制目的,需设置为非负整数

x-dead-letter-exchange

配置队列溢出行为(有效值为 drop-head

reject-publish

);同时指定当消息被拒绝或过期时,将消息重新发布到的交换机名称(可选)

x-dead-letter-routing-key

当消息变为死信时,可选的替换路由键;若未设置,则使用消息的原始路由键

x-max-priority

定义队列支持的最大优先级数值;若未设置,队列不支持消息优先级功能

x-queue-mode

将队列设置为延迟模式,使队列尽可能将消息存储在磁盘以降低内存占用;若未设置,队列将保留内存缓存以实现消息快速传递

x-queue-master-locator

设置队列的主位置模式,确定在集群节点上声明队列时,队列主节点所在位置的规则

2.6. Admin用户和虚拟主机管理
2.6.1. 添加用户

上面的Tags选项,其实是指定用户的角色,可选的有以下几个:

  • 超级管理员 (administrator):能够登录管理控制台,可查看所有信息,还能对用户和策略(policy)进行操作。
  • 监控者 (monitoring):可以登录管理控制台,同时能查看 RabbitMQ 节点的相关信息,如进程数、内存使用情况、磁盘使用情况等。
  • 策略制定者 (policymaker):可登录管理控制台,对策略(policy)进行管理,但无法查看节点的相关信息(如进程数、内存使用情况等,即上图红框标识的部分) 。
  • 普通管理者 (management):仅能登录管理控制台,既无法看到节点信息,也不能对策略进行管理。
  • 其他:不具备登录管理控制台的权限,通常作为普通的生产者和消费者,专注于消息的生产和消费操作。
2.6.2. 创建虚拟主机

为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

创建好虚拟主机,我们还要给用户添加访问权限:

点击添加好的虚拟主机:

进入虚拟主机设置界面:

2.7. 开启日志

进入容器,输入

rabbitmq-plugins enable rabbitmq_tracing

此时会多一个tracing标签,输入信息添加日志。

**Name:**自定义,建议标准点容易区分

**Format:**表示输出的消息日志格式,有Text和JSON两种,Text格式的日志方便人类阅读,JSON的方便程序解析。

JSON格式的payload(消息体)默认会采用Base64进行编码,如上面的“trace test payload.”会被编码成“dHJhY2Ug dGVzdCBwYXlsb2FkLg==”。

Max payload bytes: 表示每条消息的最大限制,单位为B。比如设置了了此值为10,那么当有超过10B的消息经过Rabbit MQ流转时,在记录到trace文件的时候会被截断。如上text日志格式中“trace test payload.”会被截断成“trace tes

Pattern:

#:追踪所有进入和离开MQ的消息

publish.#:追踪所有进入MQ的消息

publish.myExchage:追踪所有进入到myExchange的消息

deliver.#:跟踪所有离开MQ的消息

deliver.myQueue:追踪所有从myQueue离开的消息

#.myQueue:实测效果等同于deliver.myQueue

添加后,点击即可查看日志

如果出现错误,是因为插件默认是使用 guest 用户,是因为把 guest 用户删除了,或者在配置文件里面使用其他用户

解决: 配置/etc/rabbitmq/rabbitmq.config,添加配置

{rabbitmq_tracing,
   [
       {directory, "/var/vcap/sys/log/rabbitmq-server/tracing"},
       {username, <<"admin">>},
       {password, <<"password">>}
   ]
},

三、详细说明

1. RabbitMQ 的 5 种核心消息模式

RabbitMQ 提供五种核心工作模式,适用于不同业务场景,满足多样化消息传递需求:

1.1. 简单模式(HelloWorld)

架构:单生产者、单消费者、单队列,无需手动配置交换机,默认使用 RabbitMQ 内置的 Direct 交换机。
原理:生产者发送消息直接进入队列,消费者从队列获取并处理消息。
应用场景:适用于快速验证消息收发功能,如基础程序间通信测试、简单任务处理,可实现高效的消息生产与消费。

1.2. 工作队列模式(Work Queue)

架构:一个生产者对应多个消费者,依赖默认交换机,消费者间形成竞争关系。
原理:生产者将任务消息发送至队列,多个消费者从队列拉取任务,同一消息仅被一个消费者处理,实现任务并行处理。若消费者宕机,未确认的消息会在一段时间后重新分配,保障任务可靠性。
应用场景:适用于异步处理耗时任务(如文件处理、数据分析)、批量执行后台任务(发送邮件、生成报表)等场景,提升系统处理能力。

1.3. 发布订阅模式(Publish/Subscribe)

架构:需配置fanout类型交换机,交换机绑定多个队列,队列与消费者解耦。
原理:生产者将消息发送至fanout交换机,交换机无视路由键,直接将消息广播至所有绑定队列,实现一对多消息分发。
应用场景:常用于实时通知(如系统告警、新闻推送)、日志广播等场景,确保消息快速扩散至多个接收端。

1.4. 路由模式(Routing)

架构:采用direct类型交换机,交换机与队列绑定需指定精确的routing key(路由键)。
原理:生产者发送消息时携带routing key,交换机根据该键值将消息精准投递至匹配的队列,实现按条件筛选消息的定向分发。
应用场景:适用于根据业务规则分类处理消息的场景,如订单系统中按订单类型(普通订单、加急订单)分发至不同处理队列。

1.5. 主题模式(Topic)

架构:使用topic类型交换机,支持通配符规则(*匹配单个单词,#匹配零个或多个单词)。
原理:生产者发送消息时携带routing key,交换机依据通配符规则将消息路由至多个符合条件的队列,实现灵活的消息过滤与动态分发。
应用场景:适用于复杂消息分类场景,如日志系统按级别(info.*error.#)和模块(user.logorder.log)归档消息,或物联网设备状态监控中的多维度数据分发。

通过灵活选择不同工作模式,RabbitMQ 能够高效支持从基础通信到复杂业务场景的消息处理需求,保障系统的可靠性与扩展性。

2. 交换机分发策略

当消息的生产者将消息发送到交换器之后,是不会存储消息的,而是通过中间层绑定关系将消息分发到不同的队列上,其中交换器的分发策略分为四种:Direct、Topic、Headers、Fanout!

  • Direct:直连类型,即在绑定时设定一个 routing_key, 消息的 routing_key 匹配时, 才会被交换器投送到绑定的队列中去,原则是先匹配、后投送;
  • Topic:按照规则转发类型,支持通配符匹配,和 Direct 功能一样,但是在匹配 routing_key的时候,更加灵活,支持通配符匹配,原则也是先匹配、后投送;
  • Headers:头部信息匹配转发类型,根据消息头部中的 header attribute 参数类型,将消息转发到对应的队列,原则也是先匹配、后投送;
  • Fanout:广播类型,将消息转发到所有与该交互机绑定的队列上,不关心 routing_key;
2.1. Direct(直接交换机)

Direct 交换机是 RabbitMQ 的默认交换机类型,遵循精确匹配路由规则:只有当消息携带的路由键(routing key)与队列绑定到交换机时配置的绑定键(binding key)完全一致,消息才会被转发至该队列。这种模式属于单播机制,确保消息精准投递。

RabbitMQ 默认提供了一个名称为空字符串的 Direct 交换机(在 Web 管理界面显示为(AMQP default)),该交换机自动绑定到所有队列,且每个队列的绑定键为自身名称。因此,在未显式指定交换机的场景下,生产者发送的消息实际通过此默认交换机完成路由,这也是部分开发者误以为 “不配置交换机也能收发消息” 的原因。

例如,若交换机绑定队列时设置绑定键为black,仅当消息路由键也为black时,消息才会被转发至该队列;若消息路由键为black.green,则不会被投递。Direct 交换机以严格的一对一匹配逻辑,实现消息的确定性路由。

示例1:

如图,假设生产发送消息到直接交换机,消息路由键为:green,那么消息将被投放入Queue1队列中,因为Queue1的绑定建和消息路由键精确匹配。

示例2:

如图,假设生产发送消息到直接交换机,消息路由键为:green,那么消息将被投放入Queue1和Queue2队列中,因为Queue1,Queue2的绑定建和消息路由键精确匹配。

2.2. Fanout(扇出交换机)

扇出(fanout)类型的交换机与其他类型不同,它采用消息广播模式。其消息转发规则是:会将所有发送到该交换机的消息转发到与之绑定的所有消息队列中,完全不考虑路由键和绑定键,即便配置了路由键也会被忽略。

这就类似于子网广播,子网内的每台主机都会获得一份复制的消息。在 RabbitMQ 的四种交换机类型(Direct、Topic、Headers、Fanout)中,扇出交换机转发消息的速度是最快的,能够高效地将消息同时分发给多个绑定队列。

示例

假设生产发送消息到直接交换机,消息路由键为:quick,那么最后消息将被投放入所有队列中(和路由键,绑定键无关)

2.3. Topic(主题交换机)

Topic 类型交换机基于通配符匹配规则实现消息路由,相比 Direct 交换机更加灵活:仅当消息携带的路由键(routing key)符合队列绑定的通配符规则时,才会将消息转发至对应队列。该模式支持两种特殊通配符:

  • *****(星号):匹配单个单词。例如绑定键为logs.*,仅接收路由键中包含两个单词且第一个单词为logs的消息(如logs.error)。
  • **#**(井号):匹配零个或多个单词。若绑定键设置为logs.#,可接收logs开头的任意长度单词消息(如logslogs.errorlogs.error.server);当绑定键为#时,该队列将接收交换机的所有消息,效果等同于 Fanout 交换机。

当绑定键中不使用通配符时,Topic 交换机将退化为 Direct 交换机的精确匹配模式。例如,若绑定键为black,仅当消息路由键也为black时才会投递;而使用通配符(如black.#),则可匹配blackblack.green等包含black前缀的多种路由键,实现动态、灵活的消息过滤与分发,适用于日志分级处理、多维度消息分类等场景。

示例:

假设

  • 生产发送消息到主题交换机,消息路由键为:quick.orange.rabbitlazy.orange.element,那么消息将被投放入所有队列中。
  • 生产发送消息到主题交换机,消息路由键为:quick.orange.fox,那么消息将被投放入Queue1队列中。
  • 生产发送消息到主题交换机,消息路由键为: lazy.pink.rabbit",那么消息将被投放入Queue2队列中,且只会放入一次,虽然匹配两个绑定键。
  • 生产发送消息到主题交换机,消息路由键为:quick.brown.foxorangequick.orange.new.rabbit,那么消息将不会被投放入任何队列中。
  • 生产发送消息到主题交换机,消息路由键为:lazy.orange.new.rabbit,那么消息将被投放入Queue2队列中。

2.4. Headers(消息头交换机)

头部交换机(Headers)依据消息的自定义消息头属性(headers)进行消息路由匹配,与路由键和绑定键无关,具体规则如下:

2.4.1. 绑定指定键值对且包含 x - match

当在绑定队列和交换机时指定了一组键值对,并且其中包含一个键为 x - match,其值为 anyall

  • **all**:消息携带的键值对(即消息头)必须全部匹配消息和队列绑定时配置的全部键值对(消息携带的键值对可以包含绑定键值中没有的键值对),消息才会被转发到该队列。
  • **any**:消息携带的键值对只要匹配消息和队列绑定时配置的任一键值对,消息就会被转发到该队列。
2.4.2. 绑定指定键值对但未指定 x - match

如果在绑定队列和交换机时指定了一组键值对,但没有指定 x - match 键值对,那么默认 x - matchall

2.4.3. 绑定未指定键值对

若在绑定队列和交换机时未指定键值对,交换机也会把消息发送到该队列。

2.4.4. 特殊匹配说明

对于 anyall 模式,以字符串 x - 开头的消息头不会用于路由匹配。若将 x - match 设置为 any - with - xall - with - x,则会使用以字符串 x - 开头的请求头进行路由匹配。

2.4.5. 与其他交换机对比

Headers 交换机同样是基于规则匹配来路由消息,与 Direct 和 Topic 交换机固定使用路由键不同,它通过自定义的消息头部键值对规则进行匹配。在队列与交换机绑定时设定一组键值对规则,消息中也包含一组键值对(即 headers 属性),当这些键值对部分或全部匹配时,消息就会被投送到对应队列。不过,Headers 交换机和 Direct 交换机功能类似,但性能远不如 Direct 交换机,目前在实际应用中几乎很少使用。

2.5. 不同交换机使用场景

不同类型的交换机性能排序:fanout > direct > topic > headers。

不同类型的 RabbitMQ 交换机适用于不同的使用场景,以下是它们的一些常见用例:

  • Direct Exchange(直连交换机):适用于一对一的消息分发,如订单处理系统,每个订单只发送到特定的处理队列。
  • Fanout Exchange(扇出交换机):适用于广播消息给多个消费者,不关心消息的具体内容。如:实时日志处理,将日志广播给多个监控系统。
  • Topic Exchange(主题交换机):根据消息的特定属性选择性地将消息路由到不同的队列。如:新闻发布系统,根据主题和地区将新闻分发给不同的订阅者。
  • Headers Exchange(头交换机)尽量不要用,性能太低。根据消息的头部属性来选择性地将消息路由到队列。
  • Default Exchange(默认交换机):用于简单的队列名称与绑定键相同的情况,不需要显式声明交换机。如:简单的消息发布和订阅,不需要复杂的路由规则。

选择适当的交换机类型取决于您的应用程序需求。通常情况下,您可以根据以下考虑来选择:

  • 如果消息需要精确路由到特定队列,使用直连交换机。
  • 如果需要将消息广播给多个队列,使用扇出交换机。
  • 如果消息需要根据多个属性进行选择性路由,使用主题交换机
  • 如果只是简单的队列名称与绑定键相同,可以使用默认交换机。

3. 队列 – Queue

RabbitMQ中的队列是一个缓冲区,用于为消费者应用程序存储消息。这些应用程序可以创建、使用和删除队列。RabbitMQ中的队列可以是持久的、临时的或自动删除的。持久队列一直保留,直到它们被删除。临时队列一直存在,直到服务器关闭。自动删除队列在不再使用时被删除。

3.1. Queue的属性

Type:指定Queue类型。RabbitMQ提供了三种Type:

  • Classic Queues:经典队列。这是RabbitMQ最为经典的队列类型。在单机环境中,拥有比较高的消息可靠性
  • Quorum Queues:仲裁队列。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。
  • Stream Queues:流式队列。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景

Durability:声明队列是否持久化,即服务器重启后是否还存在

**Auto delete:**是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除

**Exclusive:**exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除;基本上不设置它,设置成false

**Arguments:**队列的其他属性,例如指定DLX(死信交换机等);

通过设置不同的属性,可以实现不同功能的Queue,如优先级队列,死信队列,临时队列等

3.2. 3.2 优先级队列

3.2.1 什么是优先级队列

优先级队列是一种特殊类型的队列,它根据消息的优先级进行排序和发送,在这种队列中,高优先级的消息将先被消费。

优先级可以设置为1255,但建议设置为15,如果最大值5满足不了需求建议1~10。更高的优先级需要更多的CPU和内存资源,因为RabbitMq需要在内部为每个优先级维护一个子队列,从1到你配置最大值。

优先级队列与普通队列有相同特性,如支持持久化,分页和镜像等功能,但需要注意的是消息的过期机制,过期的消息是从队列的头部开始过期的,即使你设置了队列级别的TTL,低优先级的过期消息仍然会被高优先级的未过期消息阻塞,导致无法传递,但它会出现在队列统计信息中;

另外,如果你队列设置了最大长度限制,RabbitMq会按正常流程从队列中删除消息,无论是高优先级还是低优先级。

3.2.2 如何配置优先级队列

  • 创建Queue时,需要添加 x-max-priority 参数。此参数应为介于 1 和 255 之间的正整数,指示队列应支持的最大优先级。
  • 创建Exchange及Binding
  • 发布者发布消息时,在**Properties**中添加 priority 属性,指定消息的优先级。值越大,优先级越高。

没有 priority 属性的消息其优先级被视为 0。优先级高于队列最大值的消息将被视为以最大优先级发布。

3.2.3 优先级队列的应用

  • 订单催付,客户下单后,商家希望在客户设定的时间内完成付款,如果为付款,我们需要推送催付短信。对于大客户的订单,我们就可以使用优先队列去处理。
  • 任务调度,后台调度任务中,有些任务可以稍后执行,有些需要立即执行,所以我们可以用优先队列发送一些紧急任务。
3.3. 死信队列

3.3.1 什么是死信

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

死信的来源主要有三类:

  • 消息 TTL 过期:通过配置 x-message-ttl 参数指定消息过期实践
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中):通过配置 x-max-length参数指定队列最大长度
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

3.3.2 如何配置死信队列

上图是死信队列的一个例子。我们通过交换机 tempreture.ex 接收消息,消息路由到 队列 tempreture.queue。在队列中队列 tempreture.queue中,配置超时时间(如3秒)和死信交换机tempreture.dlx.ex。 消息超时后,消息会被转移到死信交换机 tempreture.dlx.ex 中,该死信交换机绑定队列tempreture.dlx.queue。

如上例子中,队列tempreture.queuearguments配置了:

x-dead-letter-exchange:

tempreture.dlx.ex

x-message-ttl:

5000

3.4. 自动删除队列 – Auto delete

临时队列是一种自动删除队列。当这个队列被创建后,如果没有消费者监听,则会一直存在,还可以不断向其发布消息。但是一旦的消费者开始监听,然后断开监听后,它就会被自动删除。

PS:有些地方将自动删除队列称为 临时队列。不过临时队列有时还指 非持久化队列 (与持久化队列相对应,对应参数Durability)

自动删除队列创建时,将Auto delete配置成Yes 即可。

3.5. 惰性队列

在RabbitMQ 3.12之前,Classic的队列可以配置为在惰性模式下运行,这意味着它们会将所有消息写入磁盘,而不会将消息保存在内存中。 3.12之后,该配置被忽略。当前的行为是:

  • 通常情况下,消息被短暂的存储在内存中,知道缓存的消息写入磁盘
  • 一小部分消息仍保留在内存中,以满足消息的快速传递(存储在内存中的消息数量,取决于消费者的消费速度)
  • 如果发布的消息可以立即传递给消费者,并且消费者在消息被写入磁盘之前确认了消息,那么消息不会被写入磁盘(此时消息已经传递并得到确认,因此不需要写入)
3.6. 延迟队列

RabbitMQ本身不直接支持延迟消息队列**,**但是可以通过一些高级特性配合使用来实现延迟消息的功能。一种常见的方法是使用RabbitMQ的"死信队列"(Dead Letter Queues)特性配合消息的TTL(Time-To-Live)。

  • 首先,设置一个交换机(Exchange)和队列(Queue),并将它们绑定,用于存放需要延迟处理的消息。
  • 将消息的TTL设置为所需的延迟时间。
  • 当消息过期,它们会被自动发送到一个死信交换机(Dead Letter Exchange)。
  • 创建一个死信队列,并将其绑定到死信交换机。
  • 消费者从死信队列中获取消息进行处理。
3.6.1. 延迟队列使用场景
  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

4. 其它工作机制

4.1. 消息确认机制

RabbitMQ 提供两种消息确认模式,用于解决消息处理失败、消费者宕机等场景下的消息生命周期管理问题:

  1. 自动确认模式:消费者接收消息后,RabbitMQ 自动认为消息已被处理,无需额外确认操作。此模式虽高效,但存在消息丢失风险,适用于允许少量数据丢失的场景。
  2. 显式确认模式:消费者需调用basic.ack方法手动确认消息。确认时机灵活,可在消息接收后、持久化存储前或处理完成后执行。若消费者未发送确认即消亡,RabbitMQ 会将消息重新分配给其他消费者或等待新消费者接入,确保消息不丢失。但需注意,若消费者遗漏确认,可能导致队列消息堆积。
4.2. 生产者确认机制

为确保消息成功抵达 RabbitMQ,生产者可采用以下两种方式:

  1. 事务机制:通过Tx()TxCommit()TxRollback()方法实现事务控制。开启事务后,仅当消息成功提交至 RabbitMQ,事务才会生效;若提交前发生异常,可回滚事务并重发消息。但事务机制会使生产者处于阻塞状态,严重影响性能。
  2. 发送者确认机制:将信道设置为确认(confirm)模式后,每条消息会被分配唯一 ID。消息抵达匹配队列或持久化到磁盘后,RabbitMQ 会发送确认(Ack)或否定确认(Nack)。生产者可通过回调方法异步处理确认结果,支持批量确认(multiple参数)。该机制虽提升性能,但无法回滚,服务器崩溃时可能导致消息重复,需业务层实现去重逻辑。
4.3. 消息持久化

为防止 RabbitMQ 重启导致消息丢失,需将队列和消息均标记为持久化:

  1. 队列持久化:声明队列时设置durable=True,确保队列元数据存储在磁盘,如channel.queue_declare(queue='hello', durable=True)
  2. 消息持久化:发送消息时设置delivery_mode = pika.DeliveryMode.Persistent,使消息写入磁盘。但需注意,持久化存在短暂未写入磁盘的窗口风险,结合发布确认机制可增强可靠性。
4.4. 工作队列消息分发

工作队列通过多个消费者处理同一队列消息,提升处理效率,支持两种分发策略:

  1. 轮询分发(Round-robin):RabbitMQ 按顺序将消息发送给消费者,实现负载均衡。但可能出现消息丢失(未处理即确认)、任务分配不均(复杂任务积压)等问题。
  2. 公平分发(Fair dispatch):消费者设置basic_qos(prefetch_count=1),每次仅接收一条消息,处理完成并手动确认后再获取下一条,避免忙闲不均。适用于任务处理耗时差异大的场景。
4.5. 备用交换机(Alternate Exchange, AE)

用于处理无法路由的消息(无匹配队列或绑定规则):当消息无法路由至目标队列时,交换机将其转发至配置的备用交换机。若 AE 仍无法路由,消息将继续沿 AE 链传递,直至成功路由或到达链尾。此机制可捕获异常消息、实现分级处理逻辑。

4.6. 队列长度限制

可通过策略或客户端参数设置队列最大长度(消息数或字节数):

  • 限制规则:仅计算就绪态消息(未确认消息不计入)。达到限制后,默认丢弃队首消息,可通过overflow参数调整行为(如reject-publish拒绝新消息、reject-publish-dlx将拒绝消息转为死信)。
  • 监控指标:通过messages_readymessage_bytes_ready查看就绪态消息数量及占用空间。
4.7. 死信交换机(Dead Letter Exchange, DLX)

当消息出现以下情况时,会被重新发布到死信交换机:

  1. 消费者使用basic.rejectbasic.nackrequeue=false
  2. 消息因 TTL 过期;
  3. 队列超过长度限制;
  4. 仲裁队列中消息重试次数超过delivery-limit

可通过客户端参数(x-dead-letter-exchange)或策略配置 DLX,死信消息将使用原路由键或指定路由键重新路由,需避免死信循环导致消息无限流转。

4.8. 优先级队列

RabbitMQ 支持为经典队列设置优先级(1-255,建议 1-5):

  1. 声明方式:通过x-max-priority参数声明队列支持的最大优先级,如args.put("x-max-priority", 10);发布消息时设置priority字段指定优先级。
  2. 行为特性:高优先级消息优先处理,但需注意内存、CPU 开销;未设置优先级的消息默认为 0;结合basic.qos可避免优先级失效(如消费者预取过多导致高优先级消息等待)。
4.9. 延迟消息

通过rabbitmq_delayed_message_exchange插件实现:发送消息时设置x-delay头(单位毫秒)指定延迟时间,到期后消息将路由至匹配队列。若无法路由,消息将被丢弃。

4.10. 生存时间(Time-To-Live, TTL)

可分别为消息和队列设置过期时间:

  1. 消息 TTL:通过策略(message-ttl)或消息属性(expiration)设置,取队列级和消息级 TTL 的最小值。过期消息将被丢弃或转为死信(取决于队列类型)。
  2. 队列 TTL:通过策略(expires)或参数(x-expires)设置,仅适用于非持久化经典队列,未使用(无消费者、未重声明、无basic.get调用)的队列将在到期后自动删除。

四、SpringBoot中使用RabbitMQ

1. 简单模式

基础配置

首先创建两个SpringBoot项目,一个作为消费者,一个作为生产者,并导入依赖项。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在两个项目的yaml文件中写入rabbitmq的配置信息

spring:
  rabbitmq:
    host: localhost # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: guest # 用户名
    password: guest # 密码

在rabbitmq中创建一个队列叫做queue1

生产者

在生产者中编写测试类,发送消息

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitPublisherApplicationTests {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Test
    public void SimpleQueue(){
        //队列名字
        String name = "queue1";
        //具体的消息
        String message = "你好!!!rabbit";
        //发送消息
        rabbitTemplate.convertAndSend(name,message);
    }
}

执行测试类后在rabbitmq的相应队列中就会有消息了。

消费者

随后编写消费者来消费消息。

创建一个监听类,来监听具体的队列

@Component
public class SpringRabbitListener {
    /**
     * 从队列queue1中取消息
     * @param msg
     */
    @RabbitListener(queues = "queue1")
    public void listenSimpleQueueMessage(String msg) {
        System.out.println("spring 消费者接收到消息:" + msg);
    }
}

随后启动消费者项目,就可以从队列中成功获取到消息了。

2. 工作队列模式

编写方法发送20条消息

    @Test
    public void WorkQueue(){
        String name = "work.queue";
        for(int i=0;i<20;i++){
            String message = "第"+i+"消息";
            rabbitTemplate.convertAndSend(name,message);
        }
    }

用两个消费者来接收

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueueMessage1(String msg) {
        System.out.println("第一位消费者接收到消息:" + msg);
    }
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueueMessage2(String msg) {
        System.out.println("第二位消费者接收到消息:" + msg);
    }

先启动消费者再发送消息:

此时可以发现两个消费者将消息均匀分配,每位各接收10条消息,这里是因为mq有预分配,会先预取,每个消费者和队列中有一个通道,存放预取的消息,所以会平均分消息,然后各自独立消费。

但是假如我们的消费者消费能力不一样呢,rabbitmq也会将其平均分配,这样的机制显然是不合理的,因此面对这种情况时,可以通过控制预取数量来改变这种情况,我们可以在yml文件中编写配置。这样就实现了取一个消费一个,不会先预取很多个,如果消费能力强取得就快,消费能力弱,取得就慢。

spring:
  rabbitmq:
    host: localhost # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: guest # 用户名
    password: guest # 密码
    listener:
      simple:
        prefetch: 1

3. 发布/订阅模式(广播模式)

广播模式使用fanout交换机

首先编写一个配置类,配置交换机和队列

@Configuration
public class FanoutConfig {
    //声明交换机,设置名称
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("exchange1");
    }
    //队列1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout1",true);
    }
    //绑定队列1和交换机
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    //队列2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout2",true);
    }
    //绑定交换机和队列2
    @Bean
    public Binding bindingQueue2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
}

发送消息

@Test
public void fanout(){
    String name = "exchange1";
    String meg = "这里时fanout";
    rabbitTemplate.convertAndSend(name,"",meg);
}

接收消息

@RabbitListener(queues = "fanout1")
public void listenWorkQueueMessage1(String msg) {
    System.out.println("第一位消费者接收到消息:" + msg);
}
@RabbitListener(queues = "fanout2")
public void listenWorkQueueMessage2(String msg) {
    System.out.println("第二位消费者接收到消息:" + msg);
}

结果我们可以看到两个消费者都接收到了消息。

4. 路由模式

路由模式使用direct交换机,除了使用配置类的方式进行队列和交换机的绑定,还可以在消费者中使用注解的方式,例如

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name="direct",type = ExchangeTypes.DIRECT),
    key = {"one","two"}
))
public void directListener1(String msg){
System.out.println("第一位消费者接收到来自direct.queue1的消息:"+ msg);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name="direct",type = ExchangeTypes.DIRECT),
    key = {"three"}
))
public void directListener2(String msg){
System.out.println("第二位消费者接收到来自direct.queue2的消息:"+ msg);
}

发送消息

@Test
public void directTest(){
    // 交换机名称
    String exchangeName = "direct";
    // 消息
    String message = "第 one 消息";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "one", message);

    // 消息
    String message2 = "第 two 消息";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "two", message2);

    // 消息
    String message3 = "第 three 消息";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "three", message3);
}

最后可以看到,根据key路由接收到了消息。

5. 主题模式

主题模式和路由模式类似,使用topic交换机,只是可以在key中使用通配符,

主题模式中RoutingKey一般由一个或多个单词组成,用“.”分割。

通配符规则

# 匹配一个或多个词

* 匹配一个词

修改消费者配置

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name="topic",type = ExchangeTypes.TOPIC),
    key = {"a.*"}
))
public void topicListener1(String msg){
System.out.println("第一位消费者接收到来自topic.queue1的消息:"+ msg);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name="topic",type = ExchangeTypes.TOPIC),
    key = {"a.#"}
))
public void topocListener2(String msg){
System.out.println("第二位消费者接收到来自topic.queue2的消息:"+ msg);
}

发送消息

@Test
public void topicTest(){
    // 交换机名称
    String exchangeName = "topic";

    String message = "消息:a.b";
    rabbitTemplate.convertAndSend(exchangeName, "a.b", message);

    String message2 = "消息:a.b.c";
    rabbitTemplate.convertAndSend(exchangeName, "a.b.c", message2);

    String message3 = "消息:a.b.c.d";
    rabbitTemplate.convertAndSend(exchangeName, "a.b.c.d", message3);
}

最后结果可以看到,第二个消费者可以拿到所有消息,第一个消费者只能拿到a.b的消息

6. 消息转换器

代码里直接发送对象,虽然接收的到消息,但是rabbitmq的界面上看到的消息会是乱码

依赖

 <dependency>
     <groupId>com.fasterxml.jackson.dataformat</groupId>
     <artifactId>jackson-dataformat-xml</artifactId>
     <version>2.9.10</version>
 </dependency>

配置

@Configuration
public class rabbitmqConfig {
 	// 消息转换配置
	@Bean
	public MessageConverter jsonMessageConverter(){
		return new Jackson2JsonMessageConverter();
	}
}

再次发送就会是转换好的消息

7. 进阶

7.1. 消费者并发消费

让消费者可以开启多个线程并发去消费消息,可以配合上方工作队列
只需要加配置

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672
#    host: 127.0.0.1
#    port: 5672
    username: 你的账号
    password: 你的密码
    virtual-host: /
    # 消费者配置
    listener:
      simple:
        concurrency: 2 # 并发数
        max-concurrency: 10  #最大并发数
7.1.1. 生产者

正常发送消息,发送10个

// 并发消费
@GetMapping("/test")
public void test(){
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend("concurrency","测试并发消费消息");
    }
}
7.1.2. 消费者
@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency(String msg) throws InterruptedException {
    long l = System.currentTimeMillis();
    // 打印线程名
    String name = Thread.currentThread().getName();
    System.out.println("name = " + name);
    Thread.sleep(1000); // 休眠一秒,好看效果
    long l2 = System.currentTimeMillis();

    System.out.println("time" + (l2-l)/1000);
}

打印日志发现,并发消费生效

配合工作队列,消费速度就非常快了

7.1.3. 多个消费者
@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency(String msg) throws InterruptedException {
    // 打印线程名
    String name = Thread.currentThread().getName();
    System.out.println("name = "+new Date() + name);
    Thread.sleep(1000); // 休眠一秒,好看效果

}
@RabbitListener(queuesToDeclare = @Queue(value = "concurrency"))
public void concurrency1(String msg) throws InterruptedException {
    // 打印线程名
    String name = Thread.currentThread().getName();
    System.out.println("name1 = "+new Date() + name);
    Thread.sleep(1000); // 休眠一秒,好看效果

}

7.2. 批量发送

实现多个消息批量发送,可配置每次发送几个,不足发送数时,等待超时后也继续发送

7.2.1. 批量发送配置类
// 批量发送配置
@Configuration
public class RabbitBatchSendConfig {
	@Resource
	ConnectionFactory connectionFactory;

	/**
	 * 注入一个批量 template
	 * Spring-AMQP 通过 BatchingRabbitTemplate 提供批量发送消息的功能。如下是三个条件,满足任一即会批量发送:
	 * 
	 * 【数量】batchSize :超过收集的消息数量的最大条数。
	 * 【空间】bufferLimit :超过收集的消息占用的最大内存。
	 * 【时间】timeout :超过收集的时间的最大等待时长,单位:毫秒。
	 * 不过要注意,这里的超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达 timeout 毫秒才算超时。
	 *
	 * @return BatchingRabbitTemplate
	 */
	@Bean
	public BatchingRabbitTemplate batchRabbitTemplate() {
		// 创建 BatchingStrategy 对象,代表批量策略
		// 超过收集的消息数量的最大条数。
		int batchSize = 10; // 例:每次发送10条
		// 每次批量发送消息的最大内存 b
		int bufferLimit = 1024 * 1024;
		// 超过收集的时间的最大等待时长,单位:毫秒
		int timeout = 10 * 1000; // 例:不足10条时,等待10秒继续发送
		BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(batchSize, bufferLimit, timeout);
		// 创建 TaskScheduler 对象,用于实现超时发送的定时器
		TaskScheduler taskScheduler = new ConcurrentTaskScheduler();
		// 创建 BatchingRabbitTemplate 对象
		BatchingRabbitTemplate batchTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);
		batchTemplate.setConnectionFactory(connectionFactory);
		return batchTemplate;
	}
}
7.2.2. 生产者
@Resource
private BatchingRabbitTemplate batchingRabbitTemplate;

// 批量发送
@GetMapping("/test14")
public void test14(){
    for (int i = 0; i < 15; i++) {
        batchingRabbitTemplate.convertAndSend("batchSend","批量发送");
    }
}
7.2.3. 消费者
@RabbitListener(queuesToDeclare = @Queue(value = "batchSend"))
public void batchSend(String msg){
    System.out.println("msg = " +new Date() + msg);
}

可以看到消息批量发送已实现,不足10条的按配置等待10秒后发送

7.3. 批量消费

实现多个消息批量消费,可配置每次消费几个,不足消费数时,等待超时后也继续消费

7.3.1. 批量消费配置类
/**
 * 批量消费
 */
@Configuration
public class RabbitBatchConsumerConfig {
    @Resource
    ConnectionFactory connectionFactory;
    @Resource
    SimpleRabbitListenerContainerFactoryConfigurer configurer;

    /**
	 * 配置一个批量消费的 SimpleRabbitListenerContainerFactory
	 */
    @Bean(name = "consumer10BatchContainerFactory")
    public SimpleRabbitListenerContainerFactory consumer10BatchContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        // 这里是重点 配置消费者的监听器是批量消费消息的类型
        factory.setBatchListener(true);

        // 一批十个
        factory.setBatchSize(10);
        // 等待时间 毫秒 , 这里其实是单个消息的等待时间 指的是单个消息的等待时间
        // 也就是说极端情况下,你会等待 BatchSize * ReceiveTimeout 的时间才会收到消息
        factory.setReceiveTimeout(10 * 1000L);
        factory.setConsumerBatchEnabled(true);

        return factory;
    }
}
7.3.2. 生产者
@GetMapping("/test13")
public void test13(){
    for (int i = 0; i < 16; i++) {
        rabbitTemplate.convertAndSend("batchConsume","测试批量消费消息");
    }
}
7.3.3. 消费者
// 指定containerFactory
@RabbitListener(queuesToDeclare = @Queue(value = "batchConsume"),containerFactory = "consumer10BatchContainerFactory") 
public void batchSend(List<Message> msg){ //接收换成List
    for (int i = 0; i < msg.size(); i++) {
        System.out.println("msg = " +new Date() + msg.get(i).getBody());
    }
}

可以看到消息批量消费已实现,不足10条的按配置等待10秒后消费

7.4. 基于插件延迟队列

延迟队列可以将消息发送后使消费者延迟接收。

7.4.1. RabbitAdmin配置
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitAdminConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtualhost}")
    private String virtualhost;
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        return connectionFactory;
    }
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}
7.4.2. 封装发送延迟队列工具类
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Component
public class DelayedQueue {
    // routingKey
    private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    // 延迟队列交换机
    private static final String DELAYED_EXCHANGE = "delayed.exchange";
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Resource
    RabbitAdmin rabbitAdmin;


    /**
	 * 发送延迟队列
	 * @param queueName 队列名称
	 * @param params 消息内容
	 * @param expiration 延迟时间 毫秒
	 */
    public void sendDelayedQueue(String queueName, Object params, Integer expiration) {
        // 先创建一个队列
        Queue queue = new Queue(queueName);
        rabbitAdmin.declareQueue(queue);
        // 创建延迟队列交换机
        CustomExchange customExchange = createCustomExchange();
        rabbitAdmin.declareExchange(customExchange);
        // 将队列和交换机绑定
        Binding binding = BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
        rabbitAdmin.declareBinding(binding);
        // 发送延迟消息
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE, DELAYED_ROUTING_KEY, params, msg -> {
            // 发送消息的时候 延迟时长
            msg.getMessageProperties().setDelay(expiration);
            return msg;
        });
    }

    public CustomExchange createCustomExchange() {

        Map<String, Object> arguments = new HashMap<>();
        /**
		 * 参数说明:
		 * 1.交换机的名称
		 * 2.交换机的类型
		 * 3.是否需要持久化
		 * 4.是否自动删除
		 * 5.其它参数
		 */
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", true, false, arguments);
    }

}
7.4.3. 生产者
@Autowired
private DelayedQueue delayedQueue;

/**
* 发送延迟队列
* @param queueName 队列名称
* @param params 消息内容
* @param expiration 延迟时间 毫秒
*/
@GetMapping("/test9")
public void topicTest8() {
    delayedQueue.sendDelayedQueue("delayTest2","这是消息",5000);
}
7.4.4. 消费者
@RabbitListener(queuesToDeclare = @Queue(value = "delayTest2",durable = "true"))
public void declareExchange2(String message){
    System.out.println("delayTest2 = " + message);
}
7.5. TTL队列

TTL是time to live的缩写,生存时间,RabbitMQ支持消息的过期时间,消息发送时可以指定,从消息入队列开始计算,只要超过队列的超时时间配置,消息没被接收,消息就会自动清除

7.5.1. 封装发送TTL队列工具类
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Component
public class TtlQueue {
    // routingKey
    private static final String TTL_KEY = "ttl.routingkey";
    private static final String TTL_EXCHANGE = "ttl.exchange";
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Resource
    RabbitAdmin rabbitAdmin;
    /**
	 * 发送TTL队列
	 * @param queueName 队列名称
	 * @param params 消息内容
	 * @param expiration 过期时间 毫秒
	 */
    public void sendTtlQueue(String queueName, Object params, Integer expiration) {
        /**
		 * ----------------------------------先创建一个ttl队列--------------------------------------------
		 */
        Map<String, Object> map = new HashMap<>();
        // 队列设置存活时间,单位ms,必须是整形数据。
        map.put("x-message-ttl",expiration);
        /*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/
        Queue queue = new Queue(queueName,true,false,false,map);
        rabbitAdmin.declareQueue(queue);
        /**
		 * ---------------------------------创建交换机---------------------------------------------
		 */
        DirectExchange directExchange = new DirectExchange(TTL_EXCHANGE, true, false);
        rabbitAdmin.declareExchange(directExchange);
        /**
		 * ---------------------------------队列绑定交换机---------------------------------------------
		 */
        // 将队列和交换机绑定
        Binding binding = BindingBuilder.bind(queue).to(directExchange).with(TTL_KEY);
        rabbitAdmin.declareBinding(binding);
        // 发送消息
        rabbitTemplate.convertAndSend(TTL_EXCHANGE,TTL_KEY,params);
    }
}
7.5.2. 生产者
@Autowired
private TtlQueue ttlQueue;
/**
* 发送TTL队列
* @param queueName 队列名称
* @param params 消息内容
* @param expiration 过期时间 毫秒
*/
@GetMapping("/test10")
public void topicTest10() {
    ttlQueue.sendTtlQueue("ttlQueue","这是消息内容",5000);
}
7.5.3. 消费者
@RabbitListener(queues = "ttlQueue" )
public void ttlQueue(String message){
    System.out.println("message = " + message);
}
7.6. 死信队列

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器。队列消息变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

消息变成死信的几种情况:

1.消息被拒绝(basic.reject/ basic.nack)并且requeue=false

2. 消息TTL过期

3. 队列达到最大长度

流程:发送消息,消息过期后进入到另一个队列(这个队列设置持久化,不过期)的过程。

7.6.1. 封装发送死信队列工具类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Component
public class DLXQueue {
    // routingKey
    private static final String DEAD_ROUTING_KEY = "dead.routingkey";
    private static final String ROUTING_KEY = "routingkey";
    private static final String DEAD_EXCHANGE = "dead.exchange";
    private static final String EXCHANGE = "common.exchange";
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Resource
    RabbitAdmin rabbitAdmin;
    /**
	 * 发送死信队列,过期后进入死信交换机,进入死信队列
	 * @param queueName 队列名称
	 * @param deadQueueName 死信队列名称
	 * @param params 消息内容
	 * @param expiration 过期时间 毫秒
	 */
    public void sendDLXQueue(String queueName, String deadQueueName,Object params, Integer expiration){
        /**
		 * ----------------------------------先创建一个ttl队列和死信队列--------------------------------------------
		 */
        Map<String, Object> map = new HashMap<>();
        // 队列设置存活时间,单位ms,必须是整形数据。
        map.put("x-message-ttl",expiration);
        // 设置死信交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        // 设置死信交换器路由键
        map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        /*参数1:队列名称  参数2:持久化  参数3:是否排他 参数4:自动删除队列  参数5:队列参数*/
        Queue queue = new Queue(queueName,true,false,false,map);
        rabbitAdmin.declareQueue(queue);
        /**
		 * ---------------------------------创建交换机---------------------------------------------
		 */
        DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);
        rabbitAdmin.declareExchange(directExchange);
        /**
		 * ---------------------------------队列绑定交换机---------------------------------------------
		 */
        Binding binding = BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
        rabbitAdmin.declareBinding(binding);
        /**
		 * ---------------------------------在创建一个死信交换机和队列,接收死信队列---------------------------------------------
		 */
        DirectExchange deadExchange = new DirectExchange(DEAD_EXCHANGE, true, false);
        rabbitAdmin.declareExchange(deadExchange);

        Queue deadQueue = new Queue(deadQueueName,true,false,false);
        rabbitAdmin.declareQueue(deadQueue);
        /**
		 * ---------------------------------队列绑定死信交换机---------------------------------------------
		 */
        // 将队列和交换机绑定
        Binding deadbinding = BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
        rabbitAdmin.declareBinding(deadbinding);
        // 发送消息
        rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,params);
    }
}
7.6.2. 生产者
@Autowired
private DLXQueue dlxQueue;
/**
	 * 发送死信队列,过期后进入死信交换机,进入死信队列
	 * @param queueName 队列名称
	 * @param deadQueueName 死信队列名称
	 * @param params 消息内容
	 * @param expiration 过期时间 毫秒
	 */
@GetMapping("/test11")
public void topicTest11() {
    dlxQueue.sendDLXQueue("queue","deadQueue","这是消息内容",5000);
}
7.6.3. 消费者
// 接收转移后的队列消息
@RabbitListener(queuesToDeclare = @Queue(value = "deadQueue",durable = "true"))
public void ttlQueue(String message){
    System.out.println("message = " + message);
}
7.7. 消息确认
7.7.1. 发送消息确认机制

为确保消息发送有真的发送出去,设置发布时确认,确认消息是否到达 Broker 服务器

spring:
  rabbitmq:
    host: 47.99.110.29
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
    publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
    publisher-returns: true  #确认消息已发送到队列(Queue)

如果有使用rabbitAdmin配置的话,那里也需要加配置

7.7.1.1. 修改RabbitAdmin配置
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitAdminConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtualhost}")
    private String virtualhost;
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        // 配置发送确认回调时,次配置必须配置,否则即使在RabbitTemplate配置了ConfirmCallback也不会生效
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}
7.7.1.2. 实现发送消息确认接口

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

/**
 * 消息发送确认配置
 */
@Component
public class ConfirmCallbackConfig implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct // @PostContruct是spring框架的注解,在?法上加该注解会在项?启动的时候执?该?法,也可以理解为在spring容器初始化的时候执
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
	 * 交换机不管是否收到消息的一个回调方法
	 * @param correlationData 消息相关数据
	 * @param ack 交换机是否收到消息
	 * @param cause 失败原因
	 */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){ // 消息投递到broker 的状态,true表示成功
            System.out.println("消息发送成功!");
        }else { // 发送异常
            System.out.println("发送异常原因 = " + cause);
        }
    }
}
7.7.1.3. 实现发送消息回调接口

如果消息未能投递到目标queue里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Component
public class ReturnCallbackConfig implements RabbitTemplate.ReturnsCallback {
	@Autowired
	private RabbitTemplate rabbitTemplate;

	@PostConstruct // @PostContruct是spring框架的注解,在?法上加该注解会在项?启动的时候执?该?法,也可以理解为在spring容器初始化的时候执
	public void init(){
		rabbitTemplate.setReturnsCallback(this);
	}
	@Override
	public void returnedMessage(ReturnedMessage returnedMessage) {
		System.out.println("消息"+returnedMessage.getMessage().toString()+"被交换机"+returnedMessage.getExchange()+"回退!"
				+"退回原因为:"+returnedMessage.getReplyText());
		// 回退了所有的信息,可做补偿机制
	}
}
7.7.2. 消费者消息确认机制

为确保消息消费成功,需设置消费者消息确认机制,如果消费失败或异常了,可做补偿机制。

7.7.2.1. 配置
spring:
  rabbitmq:
    host: 47.99.110.29
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 消费者配置
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条,处理完成才能获取下一条
        acknowledge-mode: manual # 设置消费端手动ack确认
        retry:
          enabled: true # 是否支持重试
    # 生产者配置      
    publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange)
    publisher-returns: true  #确认消息已发送到队列(Queue)
7.7.2.2. channel.basicAck消息确认

消费者修改,利用消费者参数Channel 进行消息确认操作

@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
    // 消息
    System.out.println("msg = " + msg);
    /**
		 * 确认
		 * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
		 * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
		 */ 
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
7.7.2.3. channel.basicNack消息回退

将消息重返队列

@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
    try {
        // 消息
        System.out.println("msg = " + msg);
        throw new RuntimeException("来个异常");
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println("消息消费异常,重回队列");
        /**
			 * deliveryTag:表示消息投递序号。
			 * multiple:是否批量确认。
			 * requeue:值为 true 消息将重新入队列。
			 */
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    }
    // 确认
    /**
    * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
    * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
    */
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
7.7.2.4. channel.basicReject消息拒绝

拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

/**
 * 消息拒绝
 * deliveryTag:表示消息投递序号。
 * requeue:值为 true 消息将重新入队列。
 */
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
7.7.3. 封装消息确认处理类

我们在消费mq的时候都会做消息确认机制,消息要是消费过程中发送异常,将消息回退。

7.7.3.1. 正常代码
@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
    try {
        // 消息消费
        System.out.println("msg = " + msg);
        // 消息消费。。。。。。
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println("消息消费异常,重回队列");
        // 回退
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    }
    // 确认
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

但是这样会有一个缺点,要是我们有很多个消费者,重复的代码太多。

7.7.3.2. 封装后的代码

封装后我们只需要处理消息内容

@RabbitListener(queues = "rabbitListener")
public void nonTicketUploadHandler(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    super.onMessage(json, deliveryTag, channel, new MqListener<String>() {
        @Override
        public void handler(String msg, Channel channel) throws IOException {
            // 消息消费
            System.out.println("msg = " + msg);
            // 消息消费。。。。。。
        }
    });
}

封装消息确认

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import java.io.IOException;

@Slf4j
public class BaseRabbiMqHandler<T> {

    public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) {
        try {
            // 通过实现这个接口,处理我们的业务代码
            mqListener.handler(t, channel);
            /**
             * 手动确认消息
             * deliveryTag:该消息的index
             *  false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
              */
            channel.basicAck(deliveryTag, false);
        }
        catch (Exception e) { log.error("接收消息失败,重新放回队列,异常原因:{}",e.getMessage());
                              try {
                                  /**
                 * 重回队列
                 * deliveryTag:该消息的index
                 * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
                 * requeue:被拒绝的是否重新入队列
                 */
                                  //                channel.basicAck(deliveryTag, false);
                                  channel.basicNack(deliveryTag, false, true);
                              } catch (IOException ex) {
                                  ex.printStackTrace();
                              }
                            }

    }
}

mqListener接口

业务通过实现这个接口处理

import com.rabbitmq.client.Channel;
import java.io.IOException;

public interface MqListener<T> {
    // 业务通过实现这个接口处理
    default void handler(T map, Channel channel) throws IOException {
    }

}

mq消费者

消费者代码示例

import com.rabbitmq.client.Channel;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;

import java.io.IOException;

public class TestListener extends BaseRabbiMqHandler<String> { // 继承封装的消息确认类

	@RabbitListener(queues = "rabbitListener")
	public void nonTicketUploadHandler(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
		super.onMessage(json, deliveryTag, channel, new MqListener<String>() { // 实现父类的消息方法里的接口
			@Override 
			public void handler(String msg, Channel channel) throws IOException {
				// 业务代码写这里,try/catch不用再写了
				System.out.println("msg = " + msg);
				// 消息消费。。。。。。
			}
		});
	}
}

8. rabbitmq集群搭建

8.1. 普通集群

1、新建三个docker容器

docker run -d --hostname rabbit1 --name myrabbit1  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15671:15672 -p 5671:5672 rabbitmq

docker run -d --hostname rabbit2 --name myrabbit2  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 --link myrabbit1:rabbit1 rabbitmq

docker run -d --hostname rabbit3 --name myrabbit3  -v /home/rabbitmq/data:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 --link myrabbit1:rabbit1 --link myrabbit2:rabbit2 rabbitmq

2、三个都进入容器下载可视化工具

3、进入第一个mq容器重启

docker exec -it ef4a1f0fade7 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

4、进入第二个 和 第三个 mq容器执行

docker exec -it e36d94d40008 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit1   //如遇到报错再执行上句、再继续执行
rabbitmqctl start_app
exit

5、进去mq可视化界面,overview面板中的Nodes可查看到节点信息。

6、测试,在mq上新建交换机、其余两个也出现新建的交换机

此时普通集群已构建完成

1、此种集群主节点down掉后,消费者也无法消费从节点的消息,不能做故障转移,只能当作备份。

2、主节点正常,从节点则可以消费消息

8.2. 镜像集群(高可用)

这种集群弥补第一种的缺陷,需在普通集群的基础下搭建(确保第一种集群可用)

镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升mq集群的高可用性。

1、配置集群架构

2、进入任意节点配置策略

docker exec -it ef4a1f0fade7 /bin/bash


rabbitmqctl set_policy ha-all "^rabbitmq" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

3、测试,新建一个rabbitmq开头的队列

此时某个节点down掉(包括主节点),其余节点也能消费

将主节点down掉,节点自动切换

4、清除策略

rabbitmqctl clear_policy ha-all