Redis延迟队列
Redis延迟队列是基于Redis构建的消息队列,用来处理需延迟执行的任务。
基本原理
它借助Redis的有序集合(Sorted Set)数据结构达成目的。会把任务及其执行时间分别当成成员与分值存进有序集合,由于执行时间作为分值,任务便会依执行时间在有序集合里自动排序。
系统会持续查询分值小于等于当下时间的任务,找到那些该立刻执行的。
实现方式
任务添加:借助Redis客户端,将任务唯一标识设为成员,把任务执行时间戳当作分值,添加进有序集合。
任务获取与执行:依靠定时任务,或是循环操作定时获取当前时间戳,再从有序集合捞出分值小于等于该时间戳的任务。捞出后,把任务从有序集合移除,接着执行对应的任务逻辑。
任务删除与更新:要是任务延迟阶段需要取消,或是更新执行时间,直接从有序集合删去任务,或是更改任务分值即可。
应用场景
定时任务调度:像定时发送邮件、定时产出报表这类定时执行的事务,用它实现很合适。
延迟处理:处理如订单超时未支付自动取消、用户注册一段时间后发送欢迎短信这类需延迟处理的任务。
异步任务处理:把耗时任务异步处理,增强系统响应速度与并发处理能力。
import time
import redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 生产者:将消息插入延迟队列
message = "Task 1"
delay_time = 10 # 延迟时间为10秒
timestamp = time.time() + delay_time # 消息到期时间
r.zadd('delayed_queue', {message: timestamp})
# 消费者:定期扫描队列
while True:
current_time = time.time()
# 获取所有 delayed_queue 中,分数(时间戳)小于或等于 current_time 的元素(即所有已经到期的消息)。
messages = r.zrangebyscore('delayed_queue', '-inf', current_time)
for message in messages:
print(f"Processing message: {message.decode()}")
r.zrem('delayed_queue', message) # 从队列中移除已消费的消息
time.sleep(1) # 每秒检查一次