RabbitMQ高级:可靠性

发布于:2025-09-06 ⋅ 阅读:(13) ⋅ 点赞:(0)

发送者可靠性

发送者重连

        其实就是写个配置,让发送者连接失败了就去尝试重新连接MQ。

发送者确认

        临时消息就是不需要写到磁盘的数据,也就是不需要持久化的数据。持久消息则反之。ACK是Publisher Confirm来返回的。

        correlated就是发送者发送消息后,直接往下走,MQ判断完是否发送成功后调用回调函数。

@Test
public void testConfirmCallback() {
    //0.创建CorrelationData
    CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("spring amqp 处理确认结果异常", ex);
        }

        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            //判断是否成功
            if (result.isAck()) {
                log.debug("收到ConfirmCallback ack,消息发送成功!");
            } else {
                log.error("收到ConfirmCallback nack,消息发送失败! reason: {}",result.getReason());
            }
        }
    });

    //1.交换机名
    String exchangeName = "hmall.fanout";
    //2.消息
    String message = "hello world";
    //3.发送消息
    rabbitTemplate.convertAndSend(exchangeName, null, message, cd);

}

        ConfirmCallback是针对消息的,所以每次发送都得写一个CorrelationData。这里在new CorrelationData的时候,同时也给消息指定了唯一id。这里的Future是多并发编程的知识。注意这里的onFailure不是返回nack,而是Spring处理Future异常了,nack在下面的onSuccess里。如果返回的是nack,还要重发消息,就是再用一次convertAndSend,但这里就只是记录了一下日志。不过这一块白雪呜呜呜,不用太在意。

MQ可靠性

        当队列满了的时候,MQ会去给早发的那些消息做持久化,持久化到磁盘,给队列空出一点空间,持久化期间发送者是不能成功发送消息到队列里的。

数据持久化

        其实就是把数据持久化到磁盘,但不是被动的等满了再去做,而是提前做,这样就不会导致消息堆积了。

        交换机和队列的持久化都是默认的,所以不用管,但是消息的持久化,你得在发消息时选择第二个Persistent。但其实Spring AMQP默认就是发持久化的消息,所以我们啥都不用管。

Lazy Queue

        消息队列会动态监测消费者处理消息的速度,如果速度慢,就直接从磁盘读取数据,如果发现速度很快,超过了我读取磁盘的速度,那就会提前缓存一批数据到内存。这个我们不用管,用高版本的RabbitMQ就行。

消费者可靠性

消费者确认机制

        其实应该是MQ投递消息,如果返回ack或者reject,消息直接丢弃。如果返回nack,消息重新入队(requeue),然后再重新投递给消费者。

        推荐使用auto。比如MessageConversionException,就属于消息处理异常,会返回reject。

消费者失败重试

        意思就是不会MQ和消费者不会来回踢皮球了。

        第三种就是重试耗尽后,将失败消息(还包括报错信息)投递给指定的交换机(error.direct),再给这个交换机绑定队列和消费者,就可以处理这种投递失败的消息,通知开发者介入。

        也就是重试耗尽后,用你给的rabbitTemplate去重新投递,投递到error.direct交换机,routingKey为"error"。

@Configuration
public class ErrorMessageConfiguration {

    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorQueueBinding(Queue errorQueue,DirectExchange errorExchange){
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}

业务幂等性

        为了避免消费者重复处理同一条消息影响业务的情况,需要保证业务幂等性。

        注意这里的id和之前发送者确认的那个id不一样。这个配置是在Publisher里的。

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(Message message) {
    log.info("监听到simple.queue的消息:{}", new String(message.getBody()));
    log.info("监听到simple.queue的消息: ID: {}", message.getMessageProperties().getMessageId());
}

        然后用消费者用Message类来接受消息,因为消息转换器就是把Java对象转换为Message类的嘛。在message中就能拿到消息体和消息ID了。不过这种方法太影响性能了,不推荐......白雪。

        这一种就是添加业务逻辑了。比如这个支付业务,MQ发一个消息是修改订单为已支付,我去修改订单为已支付的时候,我先去查看当前订单状态,如果当前订单状态是未支付,那就说明这个消息不是重复消息;如果当前订单状态是已支付或者退款中,那就说明这个消息是重复消息,这样就保证了业务的幂等性。


网站公告

今日签到

点亮在社区的每一天
去签到