Kafka并没有使⽤JDK⾃带的Timer或者DelayQueue来实现延迟的功能,⽽是基于时间轮⾃定义了⼀个⽤于实现延迟功能的定时器(SystemTimer)。
JDK的Timer和DelayQueue插⼊和删除操作的平均时间复杂度为O(nlog(n)),并不能满⾜Kafka的⾼性能要求,⽽基于时间轮可以将插⼊和删除操作的时间复杂度都降为O(1)。
时间轮的应⽤并⾮Kafka独有,其应⽤场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。
底层使⽤数组实现,数组中的每个元素可以存放⼀个TimerTaskList对象。
TimerTaskList是⼀个环形双向链表,在其中的链表项TimerTaskEntry中封装了 真正的定时任务TimerTask.Kafka中到底是怎么推进时间的呢?
Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。
具体做法是对于每个使⽤到的TimerTaskList都会加⼊到DelayQueue 中。
Kafka中的TimingWheel专⻔⽤来执⾏插⼊和删除TimerTaskEntry的操作,⽽DelayQueue专⻔负责时间推进的任务。
再试想⼀下,DelayQueue中的第⼀个超时任务列表的expiration为200ms,第⼆个超时任务为840ms,这⾥获取DelayQueue的队头只需要O(1)的时间复杂度。
如果采⽤每秒定时推进, 那么获取到第⼀个超时的任务列表时执⾏的200次推进中有199次属于“空推进”,⽽获取到第⼆个超时任 务时有需要执⾏639次“空推进”,这样会⽆故空耗机器的性能资源,这⾥采⽤DelayQueue来辅助以少量空间换时间,从⽽做到了“精准推进”。
Kafka中的定时器真可谓是“知⼈善⽤”,⽤TimingWheel做最擅⻓的任务添加和删除操作,⽽⽤DelayQueue做最擅⻓的时间推进⼯作,相辅相成。
⏳ Kafka 实现延迟队列的两种核心方案
由于 Kafka 本身不支持直接延迟投递,需通过 时间轮+外部存储 或 插件 实现。以下是详细实现方法及选型建议。
1. 🕒 方案一:TTL+死信队列(原生支持)
📌 实现原理
创建 延迟Topic(如
delay-5min
),设置消息TTL(5分钟过期)过期后自动转入 死信队列(实际消费的Topic)
🔧 步骤详解
# 1. 创建延迟Topic(设置5分钟TTL和死信路由)
kafka-topics.sh --create \
--topic delay-5min \
--partitions 3 \
--config retention.ms=300000 \
--config cleanup.policy=delete \
--config x-dead-letter-topic=real-orders \
--bootstrap-server kafka1:9092
# 2. 生产者发送延迟消息
ProducerRecord<String, String> record = new ProducerRecord<>(
"delay-5min",
null, // key=null走轮询分区
"订单取消:123"
);
producer.send(record);
# 3. 消费者订阅真实Topic
kafka-console-consumer.sh --topic real-orders --bootstrap-server kafka1:9092
✅ 优点
无需额外组件
适合简单延迟场景(如固定5分钟/10分钟)
❌ 缺点
延迟时间固定,无法动态指定
需要维护多个Topic
2. 🚀 方案二:Kafka+外部调度器(推荐)
📌 架构设计
🔧 实现步骤
生产者:将消息+延迟时间存入DB
// 消息结构示例 { "id": "msg-123", "topic": "orders", "body": "订单取消:123", "deliver_time": "2023-07-20T15:30:00Z", // ISO8601时间 "status": "PENDING" }
调度器:
定时扫描DB(
WHERE deliver_time <= NOW()
)将到期的消息写入Kafka目标Topic
消费者:正常消费目标Topic
✅ 优点
支持任意精确延迟(秒级控制)
可查询/修改延迟任务
❌ 缺点
需引入额外存储和调度服务
3. 🔌 方案三:Kafka插件(非官方)
📌 使用 kafka-delayed-queue
插件
# 安装插件(需重启Broker)
cp kafka-delayed-queue.jar /kafka/libs/
配置延迟Topic
# server.properties
delayed.queue.enable=true
delayed.queue.topic=delayed-orders
发送延迟消息
// 设置延迟头信息
record.headers().add("x-delay-ms", "300000".getBytes()); // 5分钟
producer.send(record);
✅ 优点
原生延迟支持
无需额外存储
❌ 缺点
社区插件,生产环境需验证稳定性
4. ⚖️ 方案对比决策表
方案 | 延迟精度 | 复杂度 | 适用场景 |
---|---|---|---|
TTL+死信队列 | 固定时间档位 | 低 | 简单延迟(如5/10分钟) |
外部调度器 | 毫秒级 | 高 | 精准延迟(如定时抢购) |
插件 | 毫秒级 | 中 | 技术预研/非核心业务 |
5. 🛠️ 生产建议:电商订单超时案例
# 伪代码:延迟30分钟关闭未支付订单
def produce_delayed_message(order_id):
# 计算延迟时间
deliver_time = datetime.now() + timedelta(minutes=30)
# 存入MongoDB
db.delayed_messages.insert_one({
"order_id": order_id,
"topic": "order-timeout",
"body": f"超时关闭:{order_id}",
"deliver_time": deliver_time
})
# 调度器每分钟扫描一次
while True:
messages = db.delayed_messages.find({"deliver_time": {"$lte": datetime.now()}})
for msg in messages:
kafka.produce(msg["topic"], msg["body"])
db.delete(msg["_id"])
sleep(60)
📌 终极总结
简单需求 → 用TTL+死信队列(低成本)
精准延迟 → Kafka+外部调度器(推荐方案)
技术尝鲜 → 尝试社区插件
Kafka实现延迟队列如同 快递定时配送:
TTL方案:像快递柜,固定时间后取件
调度器方案:像专人预约配送,精准控制时间
插件方案:像智能快递机器人,尚在试验阶段
根据业务需求选择合适方案! ⏱️🚚