RabbitMQ模型详解与常见问题

发布于:2025-09-06 ⋅ 阅读:(21) ⋅ 点赞:(0)

项目demo地址:https://github.com/tian-qingzhao/rabbitmq-demo

一、RabbitMQ组件概念

在这里插入图片描述

1.1 Server:接收客户端的连接,实现AMQP实体服务。

1.2 Connection:连接

应用程序与Server的网络连接,TCP连接。

1.3 Channel:信道

消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。

1.4 Message:消息

应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由Properties和Body组成。Properties为外包装,
可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。

1.5 Virtual Host:虚拟主机

用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。

1.6 Exchange:交换器

接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种。

1.7 Binding:绑定

交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。

1.8 RoutingKey:路由键

生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。
路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。

1.9 Queue:消息队列

用来保存消息,供消费者消费。

二、RabbitMQ七种模式

2.1 Hello Work

生产者发布消息到队列,不需要交换机,消费者接收。也是顺序消费的一种。

2.2 Work Queue

生产者发布消息到队列,多个消费者随机消费消息,也不需要交换机,可以通过 channel.basicQos(1) 指定权重。

2.3 Publish/Subscribe

生产者发送消息到交换机,交换机绑定队列,消费者可以从该队列上消费消息,所有该队列的消费者都可以获取到消息。
生产者如果不想声明队列和队列绑定到交换机的步骤,也可以在消费者去实现, 生产者只需要定义交换机即可,消费者从该交换机上拿消息。

2.4 Routing

和发布订阅不同的是,生产者发送消息还是到交换机,交换机不是发送到所有队列,
而是根据Routing Key参数选择性的发送到对应规则的队列上,也就是在绑定交换机和队列关系的时候指定一个路由键参数。
生产者如果不想声明队列和队列绑定到交换机的步骤,也可以在消费者去实现, 生产者只需要定义交换机即可,消费者从该交换机上拿消息。
交换机是不存储消息的,交换机转发给队列之后,消息存储在队列上。如果交换机找不到对应的队列,则会将该消息丢弃掉。

2.5 Topic

和Routing模式不同的是,Routing模式只支持路由键全词匹配,而Topic模式则支持通配符匹配。

2.6 RPC

实现“发送请求 → 等待响应”的同步调用机制,尽管底层仍是异步消息通信。不是传统意义上的 RPC(如 gRPC、Dubbo)。
基于消息队列的请求-响应模型。使用 reply_to 和 correlation_id 实现请求与响应的匹配。

2.7 Publish/Confirms

发布确认模式,通过 channel.confirmSelect() 开启确认。
单独发布消息,同步等待确认:简单,但吞吐量非常有限。
批量发布消息,等待批量的同步确认:简单、合理的吞吐量,但是很难判断出什么时候出了问题。
异步处理:最佳的性能和资源利用,良好的控制情况下的错误,但涉及到正确的实现,相对复杂。

三、RabbitMQ四种交换机类型

3.1 Direct exchange:直连交换机

是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,
当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。
同样的一个binding_key也是支持应用到多个队列中的。 这样当一个交换机绑定多个队列时,就会被送到对应的队列去处理。

3.2 Fanout exchange:扇形交换机

是最基本的交换机类型,它能做的事非常简单——广播消息,
扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。
因为广播不需要"思考"(不需要路由键),所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

3.3 Topic exchange:主题交换机

直连交换机的routing_key方法非常简单,如果希望将一条消息发送给多个队列,
那么这个交换机需要绑定非常多的routing_key,这样的话消息的管理就会非常的困难。
所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带制定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的队列上。
主题交换机的routing_key需要有一定的规则,交换机和队列绑定时候设置的binding_key需要采用*.#.*…的格式,每个部分用.分开,其中:
*表示一个单词, #表示任意数量(零个或多个)单词。 假设有一条消息的routing_key为com.lrving.www,
那么带有这样binding_key的几个队列都有收到消息:

  • com…
  • …www
  • com.#

3.4 Headers exchange:首部交换机

是忽略routing_key的一种路由方式。路由器和交换机路由的规则是通过Headers信息来交换的,
这个有点像HTTP请求中的请求头。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,消息发送的时候,
会携带一组hash数据结构的信息,当Hash内容匹配上的时候,消息就会被写入队列。
绑定交换机和队列的时候,Hash结构中要求携带一个键"x-match",这个键的Value可以是any或者all,
这代表消息携带的Hash是需要全部匹配(all), 还是仅仅匹配一个键(any)
就可以了。相比较直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。

四、RabbitMQ三种队列

4.1 classic:经典队列

是Rabbit MQ默认的队列,在单机模式下是最常用的。
单节点存储: Classic队列存储在单一的RabbitMQ节点上,消息不会跨节点复制,虽然提高了速度,但在节点故障时,队列的容错性较差。
FIFO消息处理: 消息按接收顺序存储和消费,尤其适合对消息顺序要求严格的任务。
持久与非持久消息: Classic队列可以存储内存(瞬态)或磁盘(持久)的消息。持久化消息会被保存到磁盘,以确保在服务器重启或崩溃后不会丢失消息,但性能会有所下降。
高吞吐量: Classic队列经过优化,能够以低延迟处理大量消息,适用于对消息处理速度要求严格的应用,如实时系统或日志聚合服务。

4.2 quorum:仲裁队列

3.8版本引入的,Quorum是针对镜像队列的一种优化,目前已经取代了镜像队列,作为Rabbit MQ集群部署保证高可用性的解决方案。
传统的镜像队列,是将消息副本存储在一组节点上,以提高可用性和可靠性。镜像队列将队列中的消息复制到一个或多个其他节点上,并使这些节点上的队列保持同步。
当一个节点失败时,其他节点上的队列不受影响,因为它们上面都有消息的备份。
镜像队列使用主从模式,所有消息写入和读取均通过主节点,并异步复制到镜像节点。主节点故障时需重新选举,期间队列不可用。而仲裁队列基于Raft
分布式共识算法,所有节点组成仲裁组。消息需被多数节点持久化后才确认成功,Leader故障时自动触发选举。
相比较于传统的主从模式,避免了发生网络分区时的脑裂问题(基于Raft分布式共识算法避免)。
相比较于普通队列,仲裁队列增加了一个对于有毒消息的处理。什么是有毒消息?首先,消费者从队列中获取到了元素,队列会将该元素删除,
但是消费者消费失败了,会给队列nack,并且可以设置消息重新入队。这样可能存在因为业务代码的问题,某条消息一直处理不成功的问题。
仲裁队列会记录消息的重新投递次数,判断是否超过了设置的阈值,Quorum队列会持续跟踪消息的失败投递尝 试次数,
并记录在"x-delivery-count"这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。
当消息的重复投递次数超过了Delivery limit参数阈值时, 就直接丢弃或者放入死信队列人工处理。
仲裁队列适用于集群环境下,队列长期存在,并且对于消息可靠性要求高,允许牺牲一部分性能(因为raft算法,消息需被多数节点持久化后才确认成功)的场景。

4.3 stream:流式队列

3.9版本引入的,在传统的队列模型中,同一条消息只能被一个消费者消费(一个队列如果有多个消费者,是工作分发的机制。
消息1->消费者1,消息2->消费者2,消息3->消费者1, 不能两个消费者读同一条消息。),并且消息是阅后即焚的(消费者接收到消息后,队列中的该消息就删除,
如果消费者拒绝签收并且设置了重新入队,再把消息重新放入队列中),无法重复从队列中获取相同的消息。并且在当队列中积累的消息过多时,性能下降会非常明显。
Stream队列正是解决了以上的这些问题。Stream队列的核心是用aof文件的形式存储队列,将消息以aof的方式追加到文件中。
允许用户在日志的任何一个连接点开始重新读取数据。(需要用户自己记录偏移量)

五、死信队列

目前只有classic和quorum队列支持死信队列,stream不支持死信队列。死信队列和普通队列是一样的。
死信队列如果也配置了死信队列,那么就会往下传递。

5.1 消费失败或者拒绝消费

channel.basicNack(envelope.getDeliveryTag(), false, true);
channel.basicReject(deliveryTag, true)

5.2 超过消息的TTL(Time To Live)时间

消息超过 x-message-ttl 设置的超时时间(单位:毫秒)而没有被消费。消息在队列中保存时间超过这个TTL,即会被认为死亡。死亡的消
息会被丢入死信队列,如果没有配置死信队列的话,RabbitMQ会保证死了的消息不会再次被投递,并且在未来版本中,会主动删除掉这些死掉的消息。

5.3 队列达到最大长度

队列超过 x-max-length 设置的最大长度之后,新来的消息会被处理为死信,消息也会进入死信队列。

5.4 如何确认一个消息是不是死信

消息被作为死信转移到死信队列后,会在Header当中增加一些消息。在官网的详细介绍中,可以看到很多内容,
比如时间、原因(rejected,expired,maxlen)、队列等。然后header中还会加上第一次成为死信的三个属性,
并且这三个属性在以后的传递过程中都不会更改:

  • x-first-death-reason
  • x-first-death-queue
  • x-first-death-exchange

六、消息确认的三种模式

6.1 NONE(自动确认/不确认)

消费者收到消息后即自动确认,无论消息是否正确处理,都不会进一步检查,直接删除队列中的消息 。
可能导致某些情况下消息丢失(如消费者处理失败时,RabbitMQ仍认为消息已成功处理)

6.2 AUTO(自动处理确认)

如果消费者使用了 autoAck=true ,那么 RabbitMQ 在投递给消费者的同时就会立即删除队列里的消息(不管客户端是否处理成功),
如果发生异常时消息重新回到队列。在springboot场景下可以通过 setDefaultRequeueReject 方法设置是否重新入队。

6.3 MANUAL(手动确认)

若抛异常,消息不会丢失,一直处Unacked状态,消息不会再次发送给其他消费者。可选择显式关闭连接,消息会恢复到Ready状态并重新投递。消费者需要显式调用ack
方法确认消息成功处理。如果消费者没有确认(如抛出异常或未处理消息),消息会保持在未确认状态(Unacked),不会再次投递。关闭消费者连接时,未确认的消息会重新回到队列中。
手动确认模式(MANUAL)适用于需要更精细控制的场景,能够确保消息不会因为处理失败而丢失。

七、懒队列

RabbitMQ从3.6.0版本开始,就引入了懒队列(Lazy Queue)的概念。懒队列会尽可能早的将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中。
懒队列的设计目标是为了支持非常长的队列(数百万级别)。队列可能会因为一些原因变得非常长-也就是数据堆积。
消费者服务宕机了,有一个突然的消息高峰,生产者生产消息超过消费者
消费者消费太慢了,默认情况下,RabbitMQ接收到消息时,会保存到内存以便使用,同时把消息写到硬盘。但是,消息写入
硬盘的过程中,是会阻塞队列的。RabbitMQ虽然针对写入硬盘速度做了很多算法优化,但是在长队列中,
依然表现不是很理想,所以就有了懒队列的出现。
懒队列会尝试尽可能早的把消息写到硬盘中,而在消费者消费到相应的消息时才会被加载到内存中。
这意味着在正常操作的大多数情况下,RAM中要保存的消息要少得多。当然,这是以增加磁盘IO为代价的。
声明懒队列有两种方式:

  • 给队列指定参数
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
  • 设定一个策略,在策略中指定queue-mode 为 lazy。
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

要注意的是,当一个队列被声明为懒队列,那即使队列被设定为不持久化,消息依然会写入到硬盘中。如果是在集群模式中使用,这会给集群资源带来很大的负担。
最后一句话总结:懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。但是这是以大量消耗集群的网络及磁盘IO为代价的。

八、联邦插件

8.1 确认联邦插件

rabbitmq-plugins list|grep federation
[ ] rabbitmq_federation
[ ] rabbitmq_federation_management

8.2 启用联邦插件

rabbitmq-plugins.bat enable rabbitmq_federation

8.3 启用联邦插件的管理平台支持

rabbitmq-plugins.bat enable rabbitmq_federation_management

8.4 在RabbitMQ控制台配置federation

打开下游RabbitMQ的控制台,在admin标签下,点击Federation Upstreams,然后配置Name和URI即可,
这里的Name和URI填写成上游的信息,可以理解成mysql的slave填写master的信息。
在这里插入图片描述

8.5 测试是否配置成功

生产者发送消息往上游RabbitMQ发送消息,消费者使用下游RabbitMQ的信息进行接收消息。

九、消息分片存储插件

Lazy Queue懒队列机制提升了消息在RabbitMQ中堆积的能力,但是最终,消息还是需要消费者处理消化。
但是如何在消费者的处理能力有限的前提下提升消费者的消费速度呢?RabbitMQ提供的Sharding插件,就提供了一种思路。
对于RabbitMQ同样,针对单个队列,如何增加吞吐量呢? 消费者并不能对消息增加消费并发度,所以,
RabbitMQ的集群机制并不能增加单个队列的吞吐量。
上面的懒队列其实就是针对这个问题的一种解决方案。但是很显然,懒队列的方式属于治标不治本。
真正要提升RabbitMQ单队列的吞吐量,还是要从数据也就是消息入手,只有将数据真正的分开存储才行。
RabbitMQ提供的Sharding插件,就是一个可选的方案。他会真正将一个队列中的消息分散存储到不同的节点上,
并提供多个节点的负载均衡策略实现对等的读与写功能。

9.1 启用Sharding插件

rabbitmq-plugins enable rabbitmq_sharding

9.2 添加Sharding策略

在RabbitMQ控制台的admin标签下,点击Policies,在 Add / update a policy 里面添加一个策略即可。
在这里插入图片描述

9.3 使用生产者发送消息

生产者只需要声明交换机即可,交换机的类型为 x-modulus-hash

9.4 使用消费者消费消息

消费者只需要声明队列即可,但是队列的名称一定要写成生产者的交换机名称,并且有几个分片,就要消费几次。
sharding创建出来的分片就是一个一个的队列,和自己定义的队列没什么区别,都可以直接使用,
但是在分片场景下,我们都尽量不要单独去使用它,否则会让sharding出现消费不均匀的情况。

十、保证消息不丢失

10.1 生产者端

RabbitMQ的生产者确认机制分为同步确认和异步确认。
同步确认主要是通过在生产者端使用 Channel.waitForConfirmsOrDie() 指定一个等待确认的完成时间。
异步确认机制则是通过 channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2) 在生产者端注入两个回调确认函数。
如果发送批量消息,在RabbitMQ中,另外还有一种手动事务的方式,可以保证消息正确发送。
手动事务机制主要有几个关键的方法: channel.txSelect() 开启事务; channel.txCommit() 提交事务;
channel.txRollback() 回滚事务; 用这几个方法来进行事务管理。但是这种方式需要手动控制事务逻辑,并且手动事务会对channel产生阻塞,造成吞吐量下降。

10.2 消费者使用ack机制

消费者接收到消息之后,将autoAck改成false,使用手动应答的方式。

10.3 服务端

对应classic队列,使用持久化机制,将消息存储在磁盘上。而quorum和stream默认都是持久化的,所以不用考虑。
使用主从架构,对于普通集群,消息是分散存储的,不会进行消息同步。而对于镜像模式集群,可以在各个节点之间同步,丢失消息的概率不高。

十一、保证消息幂等

可以使用 AMQP.BasicProperties#messageId 设置一个消息的id,这个需要生产者自己设置成唯一标识,
然后在消费者端拿到 messageId 后做逻辑判断。也可以在消息体放入一个唯一标识,比如订单编号之类的字段。

十二、保证消息顺序性

rabbitmq一个队列对应多个消费者的场景下,保证不了消息的顺序性问题,只能通过一个生产者、一个队列、一个消费者去保证消息的顺序,
因为消息进入队列之后是先进先出的。

十三、解决消息堆积问题

13.1 生产者端

降低消息生产的速度

13.2 消费者端

增加消费者数量,但是一个队列的消费者数量尽量不要超过5个消费者。另外可以调大每个消费者每次消费消息的数量,以及增加消费者的线程数量。

# 单次推送消息数量
spring.rabbitmq.listener.simple.prefetch=100
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency=5

13.3 服务端

可以使用懒队列,或者使用sharding分片队列。


网站公告

今日签到

点亮在社区的每一天
去签到