【RabbitMQ】应用问题、仲裁队列(Raft算法)和HAProxy负载均衡

发布于:2025-05-13 ⋅ 阅读:(14) ⋅ 点赞:(0)

    🔥个人主页: 中草药

🔥专栏:【中间件】企业级中间件剖析


一、幂等性保障

什么是幂等性?

        幂等性是指对一个系统进行重复调用(相同参数),无论同一操作执行多少次,这些请求对系统的影响都是相同的效果,结果都与执行一次相同。

        消息可能因网络重传、消费者异常重启、消息重复投递等导致重复消费,需确保多次处理不会产生副作用。

RabbitMQ 重复消息的来源

场景 原因
生产者重复发送 生产者未收到 Broker 的 ACK,触发重试机制(如网络抖动、Broker 未及时响应)
消费者重复消费 消费者处理消息后未及时 ACK,消息重新入队(如消费者崩溃、处理超时)
Broker 消息堆积 消息因队列配置(如死信队列、TTL)被多次重新投递

MQ的幂等性保障

对于 MQ 而言,幂等性是指同一条消息,多次消费,对系统的影响是相同的。

一般消息中间件的消息传输保障分为三个层级。

  1. At most once: 最多一次。消息可能会丢失,但绝不会重复传输.
  2. At least once: 最少一次。消息绝不会丢失,但可能会重复传输.
  3. Exactly once: 恰好一次。每条消息肯定会被传输一次且仅传输一次.

        RabbitMQ 支持 "最多一次" 和 "最少一次"。对于 "恰好一次", 目前 RabbitMQ 还做不到,不仅是 RabbitMQ, 目前市面上主流的消息中间件,都做不到这一点.

实现方案

1、唯一标识 + 去重表

原理:为每条消息分配唯一 ID(如 UUID、业务主键),消费前检查该 ID 是否已处理。

实现步骤

生产者:在消息头(Header)中添加唯一标识(如 message_id)。

消费者

        消费前查询去重表(如 Redis 或数据库),判断 message_id 是否存在。

        若不存在,处理消息并写入去重表;若存在,直接 ACK 消息。

优化

        去重表设计:可以使用 Redis 的原子性操作 setnx 来保证幂等性,将唯一 ID 作为 key 放到 redis 中(SETNX messageID 1). 返回 1,说明之前没有消费过,正常消费。返回 0,说明这条消息之前已消费过,抛弃.

        过期时间:为去重表记录设置 TTL,避免数据无限膨胀。


2、业务逻辑判断

在业务逻辑层面实现消息处理的幂等性。

例如: 通过检查数据库中是否已存在相关数据记录,或者使用乐观锁机制来避免更新已被其他事务更改的数据,再或者在处理消息之前,先检查相关业务的状态,确保消息对应的操作尚未执行,然后才进行处理,具体根据业务场景来处理

二、顺序性保障

        在分布式系统中,消息的顺序性保障是确保消息按照生产者发送的先后顺序被消费者处理的机制。RabbitMQ 作为消息中间件,默认不提供严格的全局顺序保证,但可通过特定设计和配置实现部分场景下的顺序性。

顺序性问题的根源

RabbitMQ 默认无法保证全局顺序性的原因:

  • 多消费者并行消费:一个队列绑定多个消费者时,消息可能被无序处理。

  • 消息重试与重新入队:消费者处理失败的消息重新入队后,可能插入到队列中间。

  • 交换机路由策略:使用 directtopic 或 headers 交换机时,消息可能分散到不同队列。

  • 网络延迟与分区:网络抖动可能导致消息到达 Broker 的顺序与发送顺序不一致。

顺序性保障方案

1、单一队列 + 单一消费者

  • 原理:同一队列仅绑定一个消费者,串行处理消息。

  • 适用场景:低吞吐量但对顺序性要求极高的场景(如金融交易)。

  • 实现

    • 生产者将所有消息发送到同一队列。

    • 队列仅允许一个消费者连接(设置 prefetch_count=1)。

    • 消费者禁用自动 ACK,处理完一条消息后手动确认。

2、分区消费

        单个消费者的吞吐太低了,当需要多个消费者以提高处理速度时,可以使用分区消费,把一个队列分割成多个分区,每个分区由一个消费者处理,以此来保持每个分区内消息的顺序性.

Rabbitmq本身并不支持分区消费,需要业务逻辑去实现,或者借助spring-cloud-stream来实现

Partitioning with the RabbitMQ Binder :: Spring Cloud Stream

实现效果演示

3、消息确认机制
        使用手动消息确认机制,消费者在处理完一条消息后,显式地发送确认,这样RabbitMQ才会移除并继续发送下一条消息.

4、业务逻辑控制
        在某些情况下,即使消息乱序到达,也可以在业务逻辑层面实现顺序控制,比如通过在消息中嵌入序列号,并在消费时根据这些信息来处理

由于RabbitMO本身并不保证全局的严格顺序性,所以以上所提供的方案往往需要搭配混合使用,特别是在分布式系统中,在实际应用开发中,根据具体的业务需求,需要结合多种策略来实现所需要的顺序保证.

三、消息积压

常见原因

1、消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力,包括一些流量激增的情况(活动促销)

2、消费者处理能力不足:消费者处理处理消息的速度跟不上消息生产的速度,也会导致消息在队列中积压,可能原因有:

  • 消费端业务逻辑复杂,耗时长
  • 消费端代码性能低
  • 系统资源限制,如 CPU、内存、磁盘 I/O 等也会限制消费者处理消息的效率.
  • 异常处理处理不当。消费者在处理消息时出现异常,导致消息无法被正确处理和确认.

3、网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压

4、RabbitMQ 服务器配置问题

  • 未设置合理的 prefetch count:消费者一次拉取过多消息,导致内存压力。
  • 队列未持久化:重启后消息丢失,需重新处理积压。
  • 未使用惰性队列(Lazy Queue):高吞吐场景下内存不足。

解决方案

1)提高消费者效率
        a. 增加消费者实例数量,比如新增机器
        b. 优化业务逻辑,比如使用多线程来处理业务
        c. 设置 prefetchCount, 当一个消费者阻塞时,消息转发到其他未阻塞的消费者.
        d. 消息发生异常时,设置合适的重试策略,或者转入到死信队列

2)限制生产者速率。比如流量控制,限流算法等
        a. 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送速率
        b. 限流:使用限流工具,为消息发送速率设置一个上限
        c. 设置过期时间。如果消息过期未消费,可以配置死信队列,以避免消息丢失,并减少对主队列的压力

3)资源与配置优化   比如升级 RabbitMQ 服务器的硬件,调整 RabbitMQ 的配置参数等

在选择策略的时候需要实际考虑业务的需求和系统的实际承载能力

四、Raft算法

        Raft 是一种专为 分布式一致性 设计的共识算法。其核心目标是通过 强可理解性 解决传统 Paxos 算法的复杂性,同时保证分布式系统的 高可用性 和 数据一致性

分解问题

将共识问题拆分为三个子问题:

领导人选举(Leader Election):系统中仅有一个 Leader 负责处理客户端请求。

日志复制(Log Replication):Leader 将操作日志同步到所有 Follower 节点。

安全性(Safety):确保所有节点最终状态一致,避免数据冲突。

核心机制

节点角色

  • Leader:唯一处理客户端请求的节点,负责日志复制和心跳维持。

  • Follower:被动接收 Leader 的日志和心跳,不主动响应客户端,不直接处理客户端请求。

  • Candidate:选举过程中的临时角色(Follower 超时未收到心跳后成为 Candidate,开始尝试通过 投票过程成为新的Leader)。

正常的情况下,集群中只有一个Leader,剩下的节点都是follower

任期(Term)

  • 全局单调递增的整数(类似“逻辑时钟”),每个任期至多一个 Leader。

  • 节点间通信携带 Term,用于检测过期信息(如旧 Leader 的请求会被拒绝)。

        Raft 将时间划分成任意长度的任期(term).每一段任期从一次选举开始,在这个时候会有一个或者多个candidate 尝试去成为leader,在成功完成一次leaderelection之后,一个leader就会一直节管理集群直到任期结束,在某些情况下,一次选举无法选出 leader,这个时候这个任期会以没有leader 而结束(如下图t3).同时一个新的任期(包含一次新的选举)会很快重新开始

通信

Raft算法中的服务器节点之间采用RPC进行通信,主要由两类RPC请求:

  • RequestVote RPCs: 请求投票,由 candidate 在选举过程中发出

  • AppendEntries RPCs: 追加条目,由leader 发出,用来做日志复制和提供心跳机制

选举过程

可以通过此网站动画来理解投票选举过程Raft Consensus Algorithm

        Raft 采用一种心跳机制来触发 leader 选举,当服务器启动的时候,都是follow状态.如果follower在election timeout内没有收到来自leader的心跳(可能没有选出leader,也可能leader挂了,或者leader与follower之间网络故障),则会主动发起选举.

步骤如下:
1、率先超时的节点,自增当前任期号然后切换为 candidate 状态,并投自己一票

2、以并行的方式发送一个 RequestVote RPCs 给集群中的其他服务器节点(企图得到它们的投票)

3、等待其他节点的回复

此时可能会出现三种结果

a、赢得选举,自己成为Leader(包括自己的一票),新的Leader会给其他节点发布消息,避免其余节点触发新的选举

b、其他节点赢得了选举,未成功选举的节点在接受到消息时,会自动转化为follower

c、一段时间内没有收到majority投票,保持candidate状态,重新发出选举

        没有任何节点获得majority投票.比如所有的 follower 同时变成 candidate,然后它们都将票投给自己,那这样就没有 candidate 能得到超过半数的投票了.当这种情况发生的时候,每个candidate 都会进行一次超时响应,然后通过自增任期号来开启一轮新的选举,并启动另一轮的RequestVote RPCs.如果没有额外的措施,这种无结果的投票可能会无限重复下去.

        为了解决上述问题,Raft 采用 随机选举超时时间(randomized election timeouts)来确保很少产生无结果的投票,并且就算发生了也能很快地解决。为了防止选票一开始就被瓜分,选举超时时间是从一个固定的区间(比如,150-300ms)中随机选择。这样可以把服务器分散开来以确保在大多数情况下会只有一个服务器率先结束超时,那么这个时候,它就可以赢得选举并在其他服务器结束超时之前发送心跳。

五、仲裁队列

        RabbitMQ 的 仲裁队列(Quorum Queues) 是 RabbitMQ 3.8 版本引入的一种新型队列类型,专为 高可用性和数据一致性 场景设计。它基于 Raft 一致性协议实现,替代了传统的镜像队列(Mirrored Queues),在节点故障时能更可靠地保证数据安全。

        在集群环境之中,如果某一节点宕机故障,其中原本的信息也会发生丢失,仲裁队列可以在rabbitmq之间进行队列数据的复制,保障集群系统的高可用性。

节点宕机之前

节点宕机后,消息丢失了 

使用仲裁队列

@Bean("quorumQueue")
public Queue quorumQueue() {
    return QueueBuilder.durable("quorum_queue").quorum().build();
}

可以观察到,仲裁队列后面有一个+2,表示队列中有两个镜像节点,点进去可以看到队列详细

此时如果发生单个节点宕机,队列里的消息不会丢失

六、HAProxy负载均衡

        面对大量的业务访问,高并发请求,试想如果一个集群中有3个节点,我们在写代码时,访问哪个节点呢?
答案是访问任何一个节点都可以.
这时候就存在两个问题:
1、如果我们访问的是node1,但是node1挂了,咱们的程序也会出现问题,所以最好是有一个统一的入口,一个节点故障时,流量可以及时转移到其他节点.

2、如果所有的客户端都与node1建议连接,那么node1的网络负载必然会大大增加,而其他节点又由于没有那么多的负载而造成硬件资源的浪费.

        这时,负载均衡显得尤为重要,HAProxy(High Availability Proxy)是一款开源的 高性能TCP/HTTP负载均衡器 和 反向代理,广泛用于分发流量、提升系统可用性和扩展性。

快速上手

Ubuntu安装

#更新软件包
sudo apt-get update

#查找haproxy
sudo apt listlgrep haproxy

#安装haproxy
sudo apt-get install haproxy

验证安装

#查看服务状态
sudo systemctl status haproxy

#查看版本
haproxy -v

#如果要设置HAProxy服务开机自启,可以使用
sudo systemctl enable haproxy

 修改haproxy.cfg

vim /etc/haproxy/haproxy.cfg

# haproxy web 管理界面
listen stats    #设置一个监听器,统计HAProxy的统计信息
    bind *:8100        #指定了监听器绑定到的IP地址和端口
    mode http          #监听器的工作模式为HTTP
    stats enable       #启用统计页面
    stats realm Haproxy\ Statistics
    stats uri /
    stats auth admin:admin    #登录账号密码
# 配置负载均衡
Listen rabbitmg
    bind *:5670
    mode tcp              #Rabbitmq使用的AMQP协议是一个基于TCP的协议
    balance roundrobin    #制定负载均衡策略为轮询
    server    rabbitmgl 127.0.0.1:5672 check inter 5000 rise 2 fall 3
    server    rabbitmq2 127.0.0.1:5673 check inter 5000 rise 2 fall 3
    server    rabbitmg3 127.0.0.1:5674 check inter 5000 rise 2 fall 3

重启HAProxy

sudo systemctl restart haproxy

此时可以通过访问 http://ip:8100/  查看HAProxy

修改配置文件

spring:
    rabbitmq:
        addresses: amqp://study:study@ip:5670/Test

此时成功实现了负载均衡,也实现了节点宕机后,流量的及时转移


自信与骄傲有异:信者常沉着,而骄傲者常浮扬。                                                ——梁启超

🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀

以上,就是本期的全部内容啦,若有错误疏忽希望各位大佬及时指出💐

  制作不易,希望能对各位提供微小的帮助,可否留下你免费的赞呢🌸 


网站公告

今日签到

点亮在社区的每一天
去签到