RabbitMQ之顺序消费

发布于:2024-05-05 ⋅ 阅读:(28) ⋅ 点赞:(0)

什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。

不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。

如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现


private Queue creatQueue(String name){
        // 创建一个 单活模式 队列
        HashMap<String, Object> args=new HashMap<>();
        args.put("x-single-active-consumer",true);
        return new Queue(name,true,false,false,args);
        }

创建之后,我们可以在控制台看到 消费者的激活状态
在这里插入图片描述

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {
    @Bean
    public Queue queue15_0() {
        return creatQueue(Message15.QUEUE_0);
    }


    @Bean
    public Queue queue15_1() {
        return creatQueue(Message15.QUEUE_1);
    }

    @Bean
    public Queue queue15_2() {
        return creatQueue(Message15.QUEUE_2);
    }

    @Bean
    public Queue queue15_3() {
        return creatQueue(Message15.QUEUE_3);
    }

    @Bean
    public DirectExchange exchange15() {
        // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
        return new DirectExchange(Message15.EXCHANGE, true, false);
    }


    @Bean
    public Binding binding15_0() {
        return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");
    }

    @Bean
    public Binding binding15_1() {
        return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");
    }

    @Bean
    public Binding binding15_2() {
        return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");
    }

    @Bean
    public Binding binding15_3() {
        return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");
    }

    /**
     * 创建一个 单活 模式的队列
     * 注意 :
     * <p>
     * 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除
     *
     * @param name
     * @return queue
     */
    private Queue creatQueue(String name) {
        // 创建一个 单活模式 队列
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-single-active-consumer", true);
        return new Queue(name, true, false, false, args);
    }
=================================》生产者
@Component
@Slf4j
public class Producer15 {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 这里的发送是 拟投递到多个队列中
     *
     * @param id  业务id
     * @param msg 业务信息
     */
    public void syncSend(int id, String msg) {
        Message15 message = new Message15(id, msg);
        rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);
    }

    /**
     * 根据 id 取余来决定丢到那个队列中去
     *
     * @param id id
     * @return routingKey
     */
    private String getRoutingKey(int id) {
        return String.valueOf(id % Message15.QUEUE_COUNT);
    }
}
============================》消费者
/**
 * 要想保证消息的顺序,每个队列只能有一个消费者
 *
 * @author 深漂码农@明哥
 * @date 2024-03-18
 */
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {

    @RabbitHandler
    public void onMessage(Message15 message) throws InterruptedException {
        log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
        // 这里随机睡一会,模拟业务处理时候的耗时
        long l = new Random(1000).nextLong();
        TimeUnit.MILLISECONDS.sleep(l);
    }
}
==============================》测试类
@Test
    void mock() throws InterruptedException {
        // 先启动这个测试类,模拟多个副本情况下,看如何消费
        new CountDownLatch(1).await();
    }

    @Test
    void syncSend() throws InterruptedException {
        // 模拟每个队列中扔 10 个数据,看看效果
        for (int i = 0; i < 10; i++) {
            for (int j = 0; j < 4; j++) {
                producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");
            }
        }

        TimeUnit.SECONDS.sleep(20);
    }
}

ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述