RabbitMQ核心机制——事务、消息分发

发布于:2025-05-27 ⋅ 阅读:(39) ⋅ 点赞:(0)

一、事务

     RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制 因此RabbitMQ也支持事务机制。 Spring AMQP也提供了对事务相关的操作。RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部成功, 要么全部失败

1.1 声明队列、交换机(使用内置交换机)

   @Bean
    public Queue transQueue() {
        return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
    }

1.2 生产者代码

    需要通过 rabbitTemplate.setChannelTransacted 方法开启事务,但是如果直接使用已有的RabbitTemplate对象进行设置的化,前面使用这个对象的方法也会被开启事务,因此需要在配置类中手动配置一新的RabbitTemplate对象

 @Bean("transRabbitTemplate")
    public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }
    @RequestMapping("/trans")
    @Transactional//还需要加上这个注解才能开启事务/
    public String trans(){
        System.out.println("trans test...");
        transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test 1...");
        int num = 10/0;
        transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE,"trans test 2...");
        return "消息发送成功";
    }

1.3 配置事务管理器

    /*事务管理器*/
    @Bean("rabbitTransactionManager")
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
        return new RabbitTransactionManager(connectionFactory);
    }

1.4 运行程序,测试代码

报错了,查看对应队列是否接收了第一条消息

队列没有接收消息,说明成功回滚


1.5 使用事务注意事项

  使用事务机制时,需要注意:

(1)必须通过 setChannelTransacted 开启事务

(2)必须使用注解 @Transactional

(3)必须配置事务管理器  RabbitTransactionManager

  少了任何异步,事务机制都不会触发。


二、消息分发

     当队列拥有多个消费者时,RabbitMQ默认会通过轮询的方式将消息平均的分发给每个消费者,但是没有可能其中一部分消费者消费消息的速度很快,另一部分消费者消费很慢呢?其实是有可能的,那么这就有可能导致这个系统的吞吐量下降,那如何分发消息才是合理的?在前面学习RabbitMQ JDK Client 时,我们可以通过 channel.basicQos(int prefetchCount) 来设置当前信道的消费者所能拥有的最大未确认消息数量,在Spring AMQP中我们可以通过配置 prefetch 来达到同样的效果,使用消息分发机制时消息确认机制必须为手动确认

    消息分发机制主要有两种应用场景:

1> 限流

2> 负载均衡

2.1 限流

     在上面的场景中,秒杀客户端请求瞬间增大到每秒1W个,但订单系统每秒最多只能处理5000个,这时就可以通过配置 prefetch 的值来达到限流的效果,如配置 prefetch 为 4000,就可限制订单系统最多有4000个未确认的消息,从而避免每秒1W请求都到达订单系统而导致系统奔溃

代码示例:

一、配置 prefetch ,且修改确认机制为手动确认

 

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://study:study@110.41.17.130:5672/extension
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 5

二、声明队列、交换机及绑定关系

  @Bean("qosQueue")
    public Queue qosQueue() {
        return QueueBuilder.durable(Constants.QOS_QUEUE).build();
    }

    @Bean("qosExchange")
    public DirectExchange qosExchange() {
        return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
    }

    @Bean("qosBinding")
    public Binding qosBinding(@Qualifier("qosQueue") Queue queue,@Qualifier("qosExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("qos");
    }

三、消费者代码

public class QosListener {
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void messageHandler1(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            System.out.printf("consumer1 接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
            //业务逻辑处理
            Thread.sleep(2000);
            System.out.println("业务处理完成");

            //消息正常消费,肯定确认
            //channel.basicAck(deliveryTag,false);

        }catch (Exception e){
            //消息消费异常,否定确认
            channel.basicNack(deliveryTag,false,true);//单条否定,重新发送消息
        }
    }
}

四、生产者代码

 @RequestMapping("/qos")
    public String qos(){
        System.out.println("qos test...");
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE,"qos","qos test..." + i);
        }
        return "消息发送成功";
    }

五、运行程序,测试

预期结果:由于消费者手动确认代码被注释了,因此消费者应只能拿到5条消息

结果符合预期


2.2 负载均衡

      负载均衡主要是根据不同消费者消费消息的速度来协调它们的压力,比如一个消费者处理消息快,另一个消费者处理消息满,那么就可以配置 prefetch(如配置prefetch为1),就可以使这些消费者还未处理完当前消息,不允许处理下一条,这样就可以使处理消息满的消费者可以慢慢处理一条消息,而处理消息快的消费者,可以在处理完一条消息后,继续处理下一条。

代码示例:

一、修改 prefetch 配置


二、修改消费者代码(取消手动确认的注释并新增一个消费者)

@Component
public class QosListener {
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void messageHandler1(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            System.out.printf("consumer1 接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
            //业务逻辑处理
            Thread.sleep(2000);
            System.out.println("业务处理完成");

            //消息正常消费,肯定确认
            channel.basicAck(deliveryTag,false);

        }catch (Exception e){
            //消息消费异常,否定确认
            channel.basicNack(deliveryTag,false,true);//单条否定,重新发送消息
        }
    }

    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void messageHandler2(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            System.out.printf("consumer2 接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());
            //业务逻辑处理
            Thread.sleep(1000);
            System.out.println("业务处理完成");

            //消息正常消费,肯定确认
            channel.basicAck(deliveryTag,false);

        }catch (Exception e){
            //消息消费异常,否定确认
            channel.basicNack(deliveryTag,false,true);//单条否定,重新发送消息
        }
    }
}

三、运行程序,测试

可以看到,consumer1 消费1条消息,consumer2 消费2条,达到负载均衡的效果

 


网站公告

今日签到

点亮在社区的每一天
去签到