Redis Stream 消息队列详解及 PHP 实现
Redis Stream 作为消息队列的基本概念
Redis Stream 是 Redis 5.0 引入的一种数据结构,专门用于实现高性能消息队列系统。它提供了以下核心特性:
- 消息持久化:消息被持久保存在内存中
- 消费者组:支持多个消费者组同时消费消息
- 消息确认机制:确保消息被正确处理
- 消息回溯:支持按时间范围查询历史消息
- 高性能:支持每秒数十万条消息的读写
关键方法解析
1. XADD
- 添加消息到流
XADD stream_name * key1 value1 key2 value2 ...
- 向指定的流添加新消息
*
表示自动生成消息ID(格式为<毫秒时间戳>-<序列号>
)- 消息以键值对形式存储
2. XGROUP
- 消费者组管理
XGROUP CREATE stream_name group_name id [MKSTREAM]
- 创建消费者组
id
参数指定起始位置:$
:从新消息开始消费0
:从流开头消费所有消息
MKSTREAM
:如果流不存在则自动创建
3. XREADGROUP
- 消费者组读取消息
XREADGROUP GROUP group_name consumer_name COUNT n STREAMS stream_name >
- 消费者组中的消费者读取消息
>
:表示只读取未分配给其他消费者的新消息COUNT n
:指定一次读取的最大消息数
4. XACK
- 确认消息处理完成
XACK stream_name group_name message_id
- 消费者处理完消息后发送确认
- 被确认的消息会从消费者组的待处理列表中移除
- 确保消息不会被重复处理
PHP 实现 Redis Stream 消息队列
安装依赖
composer require predis/predis
完整实现代码
<?php
require 'vendor/autoload.php';
use Predis\Client;
class RedisStreamQueue {
private $redis;
private $streamName;
private $groupName;
public function __construct($streamName, $groupName) {
$this->redis = new Client([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);
$this->streamName = $streamName;
$this->groupName = $groupName;
// 确保消费者组存在
$this->createConsumerGroup();
}
private function createConsumerGroup() {
try {
$this->redis->xgroup('CREATE', $this->streamName, $this->groupName, '0', 'MKSTREAM');
} catch (\Exception $e) {
// 消费者组可能已存在,忽略错误
}
}
// 生产者:添加消息
public function produce($message) {
$messageId = $this->redis->xadd($this->streamName, '*', [
'data' => json_encode($message),
'timestamp' => microtime(true)
]);
echo "Produced message ID: $messageId\n";
return $messageId;
}
// 消费者:处理消息
public function consume($consumerName, $timeout = 5000) {
// 读取消息(阻塞方式)
$messages = $this->redis->xreadgroup(
'GROUP', $this->groupName, $consumerName,
'COUNT', 1,
'BLOCK', $timeout,
'STREAMS', $this->streamName, '>'
);
if (!$messages) {
return null;
}
// 解析消息
$stream = $messages[0];
$messageId = $stream[1][0][0];
$messageData = json_decode($stream[1][0][1]['data'], true);
echo "Consumer '$consumerName' received message ID: $messageId\n";
return [
'id' => $messageId,
'data' => $messageData,
'ack' => function() use ($messageId) {
$this->acknowledge($messageId);
}
];
}
// 确认消息处理完成
public function acknowledge($messageId) {
$this->redis->xack($this->streamName, $this->groupName, [$messageId]);
echo "Acknowledged message ID: $messageId\n";
}
// 查看待处理消息
public function pendingMessages() {
return $this->redis->xpending($this->streamName, $this->groupName);
}
}
// 使用示例
$queue = new RedisStreamQueue('order_queue', 'order_processing_group');
// 生产者添加消息
if ($argv[1] === 'producer') {
$orderId = uniqid('order_');
$orderData = [
'id' => $orderId,
'product' => 'Laptop',
'quantity' => 1,
'price' => 999.99,
'customer' => 'john.doe@example.com'
];
$messageId = $queue->produce($orderData);
echo "Produced order: $orderId\n";
// 消费者处理消息
} elseif ($argv[1] === 'consumer') {
$consumerName = isset($argv[2]) ? $argv[2] : 'consumer_1';
echo "Consumer '$consumerName' started. Waiting for messages...\n";
while (true) {
$message = $queue->consume($consumerName);
if ($message) {
// 模拟消息处理
echo "Processing order: {$message['data']['id']}\n";
sleep(rand(1, 3)); // 模拟处理时间
// 确认消息处理完成
$message['ack']();
echo "Order processed: {$message['data']['id']}\n\n";
}
}
} else {
echo "Usage:\n";
echo " php redis_queue.php producer\n";
echo " php redis_queue.php consumer [consumer_name]\n";
}
使用说明
1. 启动生产者
php redis_queue.php producer
- 每次执行会生成一个模拟订单消息
- 输出示例:
Produced message ID: 1716733794340-0 Produced order: order_664d4d622c7e5
2. 启动消费者
php redis_queue.php consumer consumer_1
- 消费者会持续监听并处理消息
- 输出示例:
Consumer 'consumer_1' started. Waiting for messages... Consumer 'consumer_1' received message ID: 1716733794340-0 Processing order: order_664d4d622c7e5 Acknowledged message ID: 1716733794340-0 Order processed: order_664d4d622c7e5
3. 关键功能说明
消息生产:
- 使用
XADD
命令添加消息 - 消息内容以 JSON 格式存储
- 使用
消息消费:
- 使用
XREADGROUP
从消费者组读取消息 - 阻塞方式等待消息(默认5秒超时)
- 使用
消息确认:
- 处理完成后使用
XACK
确认消息 - 确保消息不会重复处理
- 处理完成后使用
消费者组:
- 支持多个消费者同时处理消息
- 每个消费者处理不同的消息
容错机制:
- 未确认的消息会被重新分配给其他消费者
- 使用
XPENDING
可以查看待处理消息
实际应用场景
- 订单处理系统:如示例所示,处理电商订单
- 异步任务队列:处理耗时任务(邮件发送、图片处理)
- 事件驱动架构:微服务间的事件通知
- 实时数据处理:日志收集与分析
- 消息广播:向多个消费者组广播消息
Redis Stream 提供了一种轻量级但功能强大的消息队列解决方案,特别适合需要高性能和简单部署的场景。