📌 一、核心概念:prefetch(限流)
✅ 什么是 prefetch?
prefetchCount 是 RabbitMQ 提供的限流机制:
控制一个消费者在未
ack
之前,最多能收到多少条消息。例如:
prefetch = 10
,消费者最多收到 10 条未确认的消息,没确认之前不再推送新消息。
📦 二、原生 RabbitMQ:限流设置与 ack 影响
✅ 1. 原生限流设置:
channel.basicQos()
channel.basicQos(10); // 限制最多处理10条未ack的消息
✅ 2. 开启“手动 ack”时,限流生效(推荐 ✅)
import com.rabbitmq.client.*; public class ManualAckLimit { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); String QUEUE_NAME = "test_queue"; channel.queueDeclare(QUEUE_NAME, true, false, false, null); // ✅ 设置限流:每次最多接收10条未ack的消息 channel.basicQos(10); // ❗ false 表示手动 ack 模式 channel.basicConsume(QUEUE_NAME, false, (consumerTag, delivery) -> { String msg = new String(delivery.getBody()); System.out.println("接收消息:" + msg); // 业务处理... Thread.sleep(1000); // ✅ 手动确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }, consumerTag -> {}); } }
📌 说明:
basicQos(10)
配合手动 ack:RabbitMQ 会控制只发10条未确认消息限流生效 ✅
❌ 3. 自动 ack 模式下,限流失效(不推荐)
channel.basicQos(10); // 设置限流 channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); // autoAck = true
📌 说明:
设置虽然写了
basicQos(10)
,但因为autoAck = true
,RabbitMQ 会立即认为消息处理完成限流机制不生效,RabbitMQ 会一直推消息给消费者
无法控制负载,消费者可能“被压垮”
☕ 三、Spring Boot 中限流与 ack 提交方式
Spring Boot 使用 Spring AMQP 封装,配置项如下:
spring: rabbitmq: listener: simple: prefetch: 10 # (不写默认是250)限制未确认消息数量(生效条件见下) acknowledge-mode: auto # 可选:auto、manual、none
✅ 1. auto 模式(默认值 ✅ 推荐)
@RabbitListener(queues = "test_queue") public void consume(String msg) { System.out.println("接收到消息:" + msg); // 方法执行完毕后,Spring 自动 ack(前提是无异常) }
📌 说明:
Spring 会自动 ack(方法无异常)
实际底层使用
channel.basicConsume(..., false)
,非原生 autoAck所以 prefetch 生效 ✅
✅ 2. manual 模式(完全手动 ack)
@RabbitListener(queues = "test_queue", ackMode = "MANUAL") public void consume(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); try { System.out.println("处理消息:" + msg); // 手动 ack channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 处理失败时可重回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }
📌 说明:
你自己完全控制 ack 时机
prefetch
一定生效 ✅适合复杂业务、批量确认、失败重试
⚠️ 3. none 模式(不确认 ❌ 极不推荐)
acknowledge-mode: none
📌 说明:
Spring 不会发送任何 ack,RabbitMQ 也不会等
一发出就当作“处理完”,消息可能丢失
prefetch
无效 ❌⚠️ 生产环境严禁使用!
📊 四、整体对比总结表(必记)
场景 ack 模式 ack 控制方式 prefetch 生效? 说明 原生 + 手动 ack autoAck = false
手动 basicAck()
✅ 生效 推荐 原生 + 自动 ack autoAck = true
消息发出立刻 ack ❌ 无效 不推荐 Spring Boot - auto(默认) acknowledge-mode: auto
方法执行完自动 ack ✅ 生效 推荐 Spring Boot - manual acknowledge-mode: manual
需手动调用 basicAck()
✅ 生效 推荐 Spring Boot - none acknowledge-mode: none
不确认 ❌ 无效 危险!
✅ 五、建议使用方式(结论)
场景 推荐配置 原生客户端 basicQos(x) + autoAck = false
Spring Boot(简单业务) acknowledge-mode: auto
Spring Boot(复杂控制) acknowledge-mode: manual
Spring Boot(测试) acknowledge-mode: none
(仅测试)小问题:不用线程池, channel.basicQos(10);用和不用有啥区别?不都是单线程执行?起到限流的作用了吗?
单线程同步处理,
basicQos
可不设置或设1,控制更严格,避免内存堆积。多线程或异步消费,建议合理设置
basicQos
限流,控制客户端预取消息数量,避免消息积压和压力暴增。单线程执行的情况下,
basicQos(10)
确实还是起到了限流作用,但效果体现得比较微妙:
基本原理:
basicQos(10)
告诉RabbitMQ,最多给该消费者推送10条未ack的消息,这限制了消息在网络和客户端缓存中的积压量。你的场景:单线程同步处理,每次只能处理一条消息,处理慢(1秒),每处理完一条才ack。
RabbitMQ会先推1条消息,等你ack后推下一条,以此类推。
因此,即使你设置了basicQos(10)
,由于ack速度慢,最多只有1条消息处于“未ack”状态,实际并不会同时有10条消息待处理。总结:
basicQos(10)
确实限制了预取数量,但你的单线程同步场景本身“处理速度=ack速度”,
实际上RabbitMQ只能推送1条消息后等待确认,
所以看起来和设置basicQos(1)
或没设置效果没区别,
也就是说限流机制起了作用,但由于业务处理本身的串行特性,没体现出多条并发的差别。
如果你改成多线程异步处理,
basicQos(10)
才会让你最多并发处理10条消息,限流效果明显。