RabbitMQ--消费端限流

发布于:2025-07-24 ⋅ 阅读:(24) ⋅ 点赞:(0)

📌 一、核心概念:prefetch(限流)

✅ 什么是 prefetch?

  • prefetchCount 是 RabbitMQ 提供的限流机制:

控制一个消费者在未 ack 之前,最多能收到多少条消息。

例如:prefetch = 10,消费者最多收到 10 条未确认的消息,没确认之前不再推送新消息。


📦 二、原生 RabbitMQ:限流设置与 ack 影响


✅ 1. 原生限流设置:channel.basicQos()

channel.basicQos(10); // 限制最多处理10条未ack的消息

✅ 2. 开启“手动 ack”时,限流生效(推荐 ✅)

import com.rabbitmq.client.*;

public class ManualAckLimit {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection conn = factory.newConnection();
        Channel channel = conn.createChannel();

        String QUEUE_NAME = "test_queue";
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // ✅ 设置限流:每次最多接收10条未ack的消息
        channel.basicQos(10);

        // ❗ false 表示手动 ack 模式
        channel.basicConsume(QUEUE_NAME, false, (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody());
            System.out.println("接收消息:" + msg);

            // 业务处理...
            Thread.sleep(1000);

            // ✅ 手动确认
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }, consumerTag -> {});
    }
}
📌 说明:
  • basicQos(10) 配合手动 ack:RabbitMQ 会控制只发10条未确认消息

  • 限流生效 ✅


❌ 3. 自动 ack 模式下,限流失效(不推荐)

channel.basicQos(10); // 设置限流
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); // autoAck = true
📌 说明:
  • 设置虽然写了 basicQos(10),但因为 autoAck = true,RabbitMQ 会立即认为消息处理完成

  • 限流机制不生效,RabbitMQ 会一直推消息给消费者

  • 无法控制负载,消费者可能“被压垮”


☕ 三、Spring Boot 中限流与 ack 提交方式

Spring Boot 使用 Spring AMQP 封装,配置项如下:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 10               # (不写默认是250)限制未确认消息数量(生效条件见下)
        acknowledge-mode: auto     # 可选:auto、manual、none

✅ 1. auto 模式(默认值 ✅ 推荐)

@RabbitListener(queues = "test_queue")
public void consume(String msg) {
    System.out.println("接收到消息:" + msg);
    // 方法执行完毕后,Spring 自动 ack(前提是无异常)
}
📌 说明:
  • Spring 会自动 ack(方法无异常)

  • 实际底层使用 channel.basicConsume(..., false),非原生 autoAck

  • 所以 prefetch 生效 ✅


✅ 2. manual 模式(完全手动 ack)

@RabbitListener(queues = "test_queue", ackMode = "MANUAL")
public void consume(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());

    try {
        System.out.println("处理消息:" + msg);
        // 手动 ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理失败时可重回队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}
📌 说明:
  • 你自己完全控制 ack 时机

  • prefetch 一定生效 ✅

  • 适合复杂业务、批量确认、失败重试


⚠️ 3. none 模式(不确认 ❌ 极不推荐)

acknowledge-mode: none
📌 说明:
  • Spring 不会发送任何 ack,RabbitMQ 也不会等

  • 一发出就当作“处理完”,消息可能丢失

  • prefetch 无效 ❌

  • ⚠️ 生产环境严禁使用!


📊 四、整体对比总结表(必记)

场景 ack 模式 ack 控制方式 prefetch 生效? 说明
原生 + 手动 ack autoAck = false 手动 basicAck() ✅ 生效 推荐
原生 + 自动 ack autoAck = true 消息发出立刻 ack ❌ 无效 不推荐
Spring Boot - auto(默认) acknowledge-mode: auto 方法执行完自动 ack ✅ 生效 推荐
Spring Boot - manual acknowledge-mode: manual 需手动调用 basicAck() ✅ 生效 推荐
Spring Boot - none acknowledge-mode: none 不确认 ❌ 无效 危险!

✅ 五、建议使用方式(结论)

场景 推荐配置
原生客户端 basicQos(x) + autoAck = false
Spring Boot(简单业务) acknowledge-mode: auto
Spring Boot(复杂控制) acknowledge-mode: manual
Spring Boot(测试) acknowledge-mode: none(仅测试)

小问题:不用线程池, channel.basicQos(10);用和不用有啥区别?不都是单线程执行?起到限流的作用了吗?

  • 单线程同步处理,basicQos 可不设置或设1,控制更严格,避免内存堆积

  • 多线程或异步消费,建议合理设置 basicQos 限流,控制客户端预取消息数量,避免消息积压和压力暴增。

单线程执行的情况下,basicQos(10)确实还是起到了限流作用,但效果体现得比较微妙:

  • 基本原理basicQos(10)告诉RabbitMQ,最多给该消费者推送10条未ack的消息,这限制了消息在网络和客户端缓存中的积压量。

  • 你的场景:单线程同步处理,每次只能处理一条消息,处理慢(1秒),每处理完一条才ack。
    RabbitMQ会先推1条消息,等你ack后推下一条,以此类推。
    因此,即使你设置了basicQos(10),由于ack速度慢,最多只有1条消息处于“未ack”状态,实际并不会同时有10条消息待处理。

  • 总结
    basicQos(10)确实限制了预取数量,但你的单线程同步场景本身“处理速度=ack速度”,
    实际上RabbitMQ只能推送1条消息后等待确认,
    所以看起来和设置basicQos(1)或没设置效果没区别
    也就是说限流机制起了作用,但由于业务处理本身的串行特性,没体现出多条并发的差别。


如果你改成多线程异步处理,basicQos(10)才会让你最多并发处理10条消息,限流效果明显。


网站公告

今日签到

点亮在社区的每一天
去签到