RabbitMQ—事务与消息分发

发布于:2025-07-22 ⋅ 阅读:(15) ⋅ 点赞:(0)

上篇文章:

RabbitMQ—TTL、死信队列、延迟队列https://blog.csdn.net/sniper_fandc/article/details/149311921?fromshare=blogdetail&sharetype=blogdetail&sharerId=149311921&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link

目录

1 事务

2 消息分发

2.1 介绍

2.2 限流

2.3 负载均衡(非公平分发)


1 事务

        AMQP协议实现了事务机制,而RabbitMQ基于AMQP协议,因此也支持事务。RabbitMQ的事务机制要求消息的发送和接收是原子性的,要么同时成功,要么同时失败(失败后已发送的消息或接收的消息会回退)。

        这里实现了连续发送两条消息保证这两条消息同时发送成功或失败的事务机制:

        声明队列:

public class RabbitMQConnection {

    public static final String TRANS_QUEUE = "trans.queue";

}
@Configuration

public class RabbitMQConfig {

    @Bean("transQueue")

    public Queue transQueue(){

        return QueueBuilder.durable(RabbitMQConnection.TRANS_QUEUE).build();

    }

}

        配置事务管理器和RabbitTemplate:

@Configuration

public class RabbitTemplateConfig {

    //RabbitTemplate开启事务和下面的事务管理器都必须存在

    @Bean("transRabbitTemplate")

    public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        rabbitTemplate.setChannelTransacted(true);//开启事务

        return rabbitTemplate;

    }

    //事务管理器

    @Bean

    public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory){

        return new RabbitTransactionManager(connectionFactory);

    }

}

        生产者代码:

@RestController

@RequestMapping("/producer")

public class ProducerController {

    @Resource(name = "transRabbitTemplate")

    private RabbitTemplate transRabbitTemplate;

    //@Transactional也是开启事务必须存在的注解

    @Transactional

    @RequestMapping("trans")

    public String trans() {

        //两条消息要么同时发送成功,要么同时失败

        transRabbitTemplate.convertAndSend("", RabbitMQConnection.TRANS_QUEUE, "Hello SpringBoot RabbitMQ");

        int a = 1/0;

        transRabbitTemplate.convertAndSend("", RabbitMQConnection.TRANS_QUEUE, "Hello SpringBoot RabbitMQ");

        return "发送成功";

    }

}

        未开启事务,发生异常,第一条消息发送成功。

        开始事务后,发生异常,两条消息均不成功。

2 消息分发

2.1 介绍

        当某个队列被多个消费者订阅,队列向消费者推送消息(消息分发对应推模式,对拉模式无效),每条消息只会发送给一个消费者。默认情况下,RabbitMQ不管消费者是否已经消费完消息并返回确认,而是采用轮询方式推送消息

        在这种情况下,假设有10条消息,两个消费者,每个消费者会被推送5条消息,消费者1消费速度快(容易空闲),消费者2消费速度慢,那么消费者2就会消息积压,系统实际吞吐量并不高。

        可以采用channel.basicQos(int prefetchCount)方式来进行消费分发的控制,prefetchCount表示通道上消费者能同时保持未确认消息的最大数量。

        队列每向消费者推送一条消息,prefetchCount+1。消费者每返回一个确认,prefetchCount-1。当prefetchCount达到设置的上限,队列就不再向消费者推送消息,直到有新的确认到来。这种方式就类似滑动窗口,很好地保证消费者负载压力不会过大。

2.2 限流

        在秒杀场景下,假设订单系统每秒能处理的订单数是10000,但是秒杀场景下可能某一瞬间会有50000订单数,这就会导致订单系统处理不过来而压垮。可以利用basicQos()来进行限流:

        1.SpringBoot配置文件用prefetch控制限流数,对应channel.basicQos(int prefetchCount)的prefetchCount。

        2.开启消息确认机制的手动确认模式manual。未手动确认的消息都视为未消费完的消费,prefetchCount并不会-1。

        配置文件:

spring:

  rabbitmq:

    addresses: amqp://admin:admin@192.168.217.150:5672/testVirtual

    listener:

      simple:

        acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)

        prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量

        声明队列和交换机:

public class RabbitMQConnection {

    public static final String QOS_QUEUE = "qos.queue";

    public static final String QOS_EXCHANGE = "qos.exchange";

}
@Configuration

public class RabbitMQConfig {

    @Bean("qosQueue")

    public Queue qosQueue(){

        return QueueBuilder.durable(RabbitMQConnection.QOS_QUEUE).build();

    }

    @Bean("qosExchange")

    public DirectExchange qosExchange(){

        return ExchangeBuilder.directExchange(RabbitMQConnection.QOS_EXCHANGE).durable(true).build();

    }

    @Bean("qosQueueBinding")

    public Binding qosQueueBinding(@Qualifier("qosExchange") DirectExchange directExchange, @Qualifier("qosQueue") Queue queue){

        return BindingBuilder.bind(queue).to(directExchange).with("qos");

    }

}

        生产者代码:

@RestController

@RequestMapping("/producer")

public class ProducerController {

    @Resource(name = "rabbitTemplate")

    private RabbitTemplate rabbitTemplate;

    @RequestMapping("qos")

    public String qos() {

        for (int i = 0; i < 20; i++) {

            rabbitTemplate.convertAndSend(RabbitMQConnection.QOS_EXCHANGE, "qos", "Hello SpringBoot RabbitMQ");

        }

        return "发送成功";

    }

}

        消费者代码:

@Component

public class QosListener {

    @RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)

    public void queueListener(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {

            System.out.printf("listener ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",

                    new String(message.getBody(),"UTF-8"),

                    deliveryTag);

            System.out.println("消息处理完成");

            channel.basicAck(deliveryTag,false);

        } catch (Exception e) {

            channel.basicNack(deliveryTag,false,true);

        }

    }

}

        当进行限流时,如果不进行确认,则消费者最多只有5条消息:

        如果进行确认,则消息很快就被消费完了:

2.3 负载均衡(非公平分发)

        负载均衡就是指在分布式环境下,让每个服务接收其能同时处理上限的任务数,这样每个服务都不会空闲下来(最大化利用硬件资源),也不会因为负载过大导致崩溃。

        对应prefetchCount就应该配置为机器所能处理的最大上限数。假设消费者只能一次处理一个消息,此时prefetch就配置为1:

spring:

  rabbitmq:

    addresses: amqp://admin:admin@192.168.217.150:5672/testVirtual

    listener:

      simple:

        acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)

        prefetch: 1 #控制消费者从队列中预取(prefetch)消息的数量

        其它代码不变,增加一个消费者来模拟不同处理速度的服务:

@Component

public class QosListener {

    @RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)

    public void queueListener(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {

            System.out.printf("listener ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",

                    new String(message.getBody(),"UTF-8"),

                    deliveryTag);

            Thread.sleep(5);

            channel.basicAck(deliveryTag,false);

        } catch (Exception e) {

            channel.basicNack(deliveryTag,false,true);

        }

    }

    @RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)

    public void queueListener2(Message message, Channel channel) throws IOException {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {

            System.out.printf("listener2 ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",

                    new String(message.getBody(),"UTF-8"),

                    deliveryTag);

            Thread.sleep(10);

            channel.basicAck(deliveryTag,false);

        } catch (Exception e) {

            channel.basicNack(deliveryTag,false,true);

        }

    }

}

        最终消费者2处理了8条消息,消费者1处理了12条消息。这样就不会出现某些消费者大量时间空闲,整个系统的吞吐量就会得到很大提升。

        注意:deliveryTag有重复是因为两个消费者占用不同的通道,deliveryTag在同一个通道里保持连续,通道与通道之间相互独立,因此出现这样的现象。

下篇文章:

RabbitMQ应用问题https://blog.csdn.net/sniper_fandc/article/details/149312372?fromshare=blogdetail&sharetype=blogdetail&sharerId=149312372&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link


网站公告

今日签到

点亮在社区的每一天
去签到