RabbitMQ如何保证消息不丢失

发布于:2024-12-18 ⋅ 阅读:(60) ⋅ 点赞:(0)

前言:

当在项目中使用到了RabbitMQ后,目前主要是通过MQ来进行一些数据同步操作(Redis、ES等),以及一些异步下单场景等。那么在这个过程中,就需要保证消息的不丢失。而在整个MQ的流程中,总共有三个角度来进行分析:生产者丢数据消息队列丢数据消费者丢数据

1.生产者丢数据:

RabbitMQ提供事务机制(transaction)和确认机制(confirm)两种模式来确保生产者不丢消息。

(1)事务机制:

发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())

该方式的缺点是生产者发送消息会同步阻塞等待发送结果是成功还是失败,导致生产者发送消息的吞吐量降下降。

    // 开启事务
    channel.txSelect
    try {
        // 发送消息
    } catch(Exception e){
        // 回滚事务
        channel.txRollback;
        //再次重试发送这条消息
        ....
    }      
    //提交事务
    channel.txCommit;

(2)确认机制:

生产环境常用的是confirm模式。生产者将信道 channel 设置成 confirm 模式,一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确到达目的队列了。如果rabbitMQ没能处理该消息,也会发送一个Nack消息给你,这时就可以进行重试操作。

Confirm模式最大的好处在于它是异步的,一旦发布消息,生产者就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息。

处理Ack和Nack的代码如下所示:

channel.addConfirmListener(new ConfirmListener() {  
    @Override  
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {  
        System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);  
    }  
    @Override  
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {  
        System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);  
    }  
}); 
2、消息队列丢数据:

处理消息队列丢数据的情况,一般是开启持久化磁盘。持久化配置可以和生产者的 confirm 机制配合使用,在消息持久化磁盘后,再给生产者发送一个Ack信号。这样的话,如果消息持久化磁盘之前,即使 RabbitMQ 挂掉了,生产者也会因为收不到Ack信号而再次重发消息。

持久化设置如下(必须同时设置以下 2 个配置):

(1)创建queue的时候,将queue的持久化标志durable在设置为true,代表是一个持久的队列,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化queue里的数据;

(2)发送消息的时候将 deliveryMode 设置为 2,将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

这样设置以后,RabbitMQ 就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)

3、消费者丢数据:

消费者丢数据一般是因为采用了自动确认消息模式。该模式下,虽然消息还在处理中,但是消费中者会自动发送一个确认,通知 RabbitMQ 已经收到消息了,这时 RabbitMQ 就会立即将消息删除。这种情况下,如果消费者出现异常而未能处理消息,那就会丢失该消息。

解决方案就是采用手动确认消息,设置 autoAck = False,等到消息被真正消费之后,再手动发送一个确认信号,即使中途消息没处理完,但是服务器宕机了,那 RabbitMQ 就收不到发的ack,然后 RabbitMQ 就会将这条消息重新分配给其他的消费者去处理。

但是 RabbitMQ 并没有使用超时机制,RabbitMQ 仅通过与消费者的连接来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ 会给消费者足够长的时间来处理消息。另外,采用手动确认消息的方式,我们也需要考虑一下几种特殊情况:

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被消费,然后重新分发给下一个订阅的消费者,所以存在消息重复消费的隐患
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息

需要注意的点:

1、消息可靠性增强了,性能就下降了,因为写磁盘比写 RAM 慢的多,两者的吞吐量可能有 10 倍的差距。所以,是否要对消息进行持久化,需要综合考虑业务场景、性能需要,以及可能遇到的问题。若想达到单RabbitMQ服务器 10W 条/秒以上的消息吞吐量,则要么使用其他的方式来确保消息的可靠传输,要么使用非常快速的存储系统以支持全持久化,例如使用 SSD。或者仅对关键消息作持久化处理,且应该保证关键消息的量不会导致性能瓶颈。

2、当设置 autoAck = False 时,如果忘记手动 ack,那么将会导致大量任务都处于 Unacked 状态,造成队列堆积,直至消费者断开才会重新回到队列。解决方法是及时 ack,确保异常时 ack 或者拒绝消息。

3、启用消息拒绝或者发送 nack 后导致死循环的问题:如果在消息处理异常时,直接拒绝消息,消息会重新进入队列。这时候如果消息再次被处理时又被拒绝 。这样就会形成死循环。

消费者可以通过重试机制,通过Spring的retry机制,在消费者出现异常情况时可以经过本地重试,当重试机制达到设定的上限后,就可以将消息投递到异常交换机,交由人工处理。

总结

有关消息不丢失这个问题解决的办法无非就是从三个点进行切入:

1.生产端:开启生产端确认机制,确保生产者的消息能达到队列。

2.队列持久化:开启消息队列持久化,确保消息在队列中不会被丢失。

3.消费端:开启消费者确认模式,有Spring处理自动确认消息处理后返回ack,但是如果是自动确认的话,因为是消费者收到消息后就会发送ack,发送ack后不代表消息已经处理完,如果此时消费端发生异常是有可能会造成消息丢失的,解决方式就是消费者处理完毕消息后手动发送ack。

然后开启消费者失败重试机制,多次重试失败后投递给异常交换机,进行进一步处理或者人工处理。

 


网站公告

今日签到

点亮在社区的每一天
去签到