1.Stream消息队列介绍
1.1 基本概念
Redis的Pub/Sub发布订阅模式虽然能够实现消息队列的功能,但存在一个显著局限性:它不支持消息持久化。因此,在网络连接中断或Redis服务发生故障时,消息会丢失。简而言之,Redis Pub/Sub能够有效地传递实时消息,但却无法保存历史消息记录。
Redis Stream是Redis 5.0版本新增加的一种数据结构(数据类型),它主要设计用于构建消息队(Message Queue,简称MQ)列和事件驱动的应用程序。与传统的Redis发布/订阅(Pub/Sub)不同,Streams不仅可以分发消息,而且能够持久化存储这些消息,并支持消息的历史记录查询和回溯。
Redis Stream提供了消息的持久化功能,这意味着即使在Redis实例重启后,之前写入的数据也不会丢失。此外,它还支持主从复制,保证了高可用性和容错能力。这解决了Redis Pub/Sub模式下的一个主要缺点——消息无法持久化的问题。
1.2 工作原理
Redis Stream消息队列的结构如下图所示。
这里,有五个基本概念需要我们掌握:
- Stream:一个Stream是由多个消息组成的有序集合。每个Stream都有唯一的名称,它就是Redis的key。
- Message:一条消息是Stream中的一个元素,每条消息都有一个或多个字段对(key-value对)组成。每条消息都有一个唯一的ID,这个ID由Redis自动生成或用户指定。消息ID由两部分组成:时间戳和序列号,这使得消息的顺序可以按照时间先后进行排序。
- Consumer Group:消费者组是一组消费者的集合,拥有唯一的组名。Redis Stream引入了消费者组的概念,允许多个消费者组成一个组来共同消费同一个Stream中的消息。每个消费者组都有自己的last_delivered_id游标,用于跟踪该组已经读取过的最新消息。当消费者读取消息时,游标会自动更新。更重要的是,消费者组内的成员之间存在竞争关系,即一条消息只会被其中一个消费者处理一次,避免了重复消费的问题。
- last_delivered_id:消费组游标,每个消费组都维护着一个游标,即last_delivered_id,它指示了消费组当前已经“交付”给消费者的最新消息的ID。每当组内的消费者成功读取并处理了一条消息,这个游标就会相应地向前移动。
- pending_ids:这是Redis官方用来描述消费者状态的一个变量,它记录了那些已经被消费者读取但尚未确认(ACK)的消息ID。简而言之,pending_ids列出了当前正等待消费者确认处理的消息。
在一个Stream队列中,可以存在多个消费组,而每个消费组内部又有多个消费者。这些消费者在同一组内形成竞争关系,即一旦某条消息被组内的一个消费者处理,其他消费者就不会再处理这条消息。已消费但未确认的消息ID会被加入到pending_ids列表中,随着消息被确认处理,消费组的游标将相应地向前移动,消费者们则继续争夺下一条消息进行处理。这种机制确保了高效且无重复的消息处理流程。
1.3 应用场景
Redis Stream在多个领域有着广泛的应用,包括但不限于:
- 发布订阅系统:生产者将消息发布到Stream中,而订阅者可以通过消费者组的形式订阅并消费消息。
- 消息队列:Stream可以作为一个高性能的消息队列,用于实现异步任务处理、事件驱动架构等。生产者可以将消息发送到Stream中,消费者可以从Stream中获取消息并进行处理。
- 实时数据处理:在实时数据处理场景中,Stream可以用于收集和处理实时数据。例如,将传感器数据、日志数据等发送到Stream中,然后使用消费者组对数据进行实时分析和处理。
- 分布式系统协调:Stream可以用于分布式系统中的协调和通信。不同的节点可以将自己的状态、事件等信息发送到Stream中,其他节点可以通过消费Stream中的消息来获取最新的状态和事件,实现分布式系统的协调和同步。
综上所述,Redis Stream结合了传统消息队列的优点,如持久化、消息确认机制等,同时也继承了Redis的高效性能,非常适合用来实现异步通信、事件驱动架构以及其他对实时性和可靠性要求较高的应用场景。
2.Stream消息队列命令
Redis Stream消息队列相关命令见下表:
命令 |
说明 |
XADD |
用于将消息添加到Stream中(添加消息到末尾)。如果Stream不存在,Redis会自动创建一个新的Stream。 |
XTRIM |
对Stream流进行修剪,限制长度。 |
XDEL |
删除指定的消息。 |
XLEN |
获取流包含的元素数量,即消息长度。 |
XRANGE |
获取消息列表,会自动过滤已经删除的消息。 |
XREVRANGE |
反向获取消息列表,ID从大到小。 |
XREAD |
以阻塞或非阻塞的方式读取Stream中的消息。 |
XACK |
用于确认消息已被某个Consumer处理完毕(将消息标记为已处理)。只有经过确认的消息才会被认为已经被消费。 |
XPENDING |
显示待处理消息的信息,帮助监控哪些消息还没有被确认。 |
XCLAIM |
转移消息的归属权。 |
Redis Stream消费者组相关命令见下表:
命令 |
说明 |
XGROUP CREATE |
创建消费者组。 |
XREADGROUP GROUP |
读取消费者组中的消息。 |
XGROUP SETID |
为消费者组设置新的最后递送消息ID。 |
XGROUP DELCONSUMER |
删除消费者。 |
XGROUP DESTROY |
删除消费者组。 |
XINFO |
用于获取Stream、Consumer Group和Consumer的相关信息。 |
XINFO GROUPS |
查看消费者组的信息。 |
XINFO STREAM |
查看Stream流信息。 |
XINFO CONSUMERS key group |
查看组内消费者流信息。 |
3.Stream消息队列使用
3.1 XADD创建消息
当我们创建一个Stream时,需要确保消息ID既唯一又不可重复,同时遵循递增原则。消息ID的生成提供两种方式:一是由系统自动创建,二是用户自定义创建。
3.1.1 系统自动创建消息ID
使用XADD向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD命令的语法格式见下。
XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold
[LIMIT count]] <* | id> field value [field value ...]
对XADD命令中的参数做如下说明。
参数 |
作用 |
key |
队列名称,如果不存在就创建。 |
id |
消息id,我们使用"*"表示由Redis生成,可以自定义,但是要自己保证递增性。 |
field |
消息记录的字段名称。 |
value |
消息记录的字段值。 |
XADD命令实现系统自动创建消息ID的演示示例见下。
# 添加一个消息,"*"表示以时间戳自动创建id
127.0.0.1:6379> xadd mystream * username tom
"1721822477703-0"
127.0.0.1:6379> xadd mystream * password 123456 age 20 addr beijing
"1721822530587-0"
返回值是毫秒时间戳格式的字符串。比如"1721822477703-0",它表示在该毫秒内产生的第1条消息。
3.1.2 自定义消息ID
自定义ID的创建较为简单,但需注意以下两点:首先,ID必须为“整数”形式;其次,新加入消息的ID必须大于之前所有消息的ID。因此,即使是自定义ID,也必须遵循递增的原则。
XADD命令实现自定义消息ID的演示示例见下。
127.0.0.1:6379> xadd mystream2 001 username tom
"1-0"
127.0.0.1:6379> xadd mystream2 002 password 123456 age 20 addr beijing
"2-0"
# 如果插入重复的id号会报错
127.0.0.1:6379> xadd mystream2 002 password 456789 age 22 addr shanxi
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
3.2 基本命令使用
3.2.1 XLEN
使用XLEN获取Redis Stream流包含的元素数量,即消息长度。XLEN命令的语法格式见下,其中key表示队列名称。
XLEN key
XLEN命令的演示示例见下。
127.0.0.1:6379> XLEN mystream
(integer) 2
3.2.2 XRANGE
使用XRANGE获取消息列表,会自动过滤已经删除的消息。XRANGE命令的语法格式见下。
XRANGE key start end [COUNT count]
对XRANGE命令中的参数做如下说明。
参数 |
作用 |
key |
队列名称。 |
start |
开始值,"-"表示最小值。 |
end |
结束值,"+"表示最大值。 |
count |
数量。 |
XRANGE命令的演示示例见下。
# 获取消息列表,-表示最小,+表示最大
127.0.0.1:6379> xrange mystream - +
1) 1) "1721822477703-0"
2) 1) "username"
2) "tom"
2) 1) "1721822530587-0"
2) 1) "password"
2) "123456"
3) "age"
4) "20"
5) "addr"
6) "beijing"
# 使用count指定返回数据的数量
127.0.0.1:6379> xrange mystream - + count 1
1) 1) "1721822477703-0"
2) 1) "username"
2) "tom"
3.2.3 XREVRANGE
使用XREVRANGE反向获取消息列表,ID从大到小,会自动过滤已经删除的消息。XREVRANGE命令的语法格式见下。
XREVRANGE key end start [COUNT count]
对XREVRANGE命令中的参数做如下说明。
参数 |
作用 |
key |
队列名称。 |
end |
结束值,"+"表示最大值。 |
start |
开始值,"-"表示最小值。 |
count |
数量。 |
XREVRANGE命令的演示示例见下。
127.0.0.1:6379> xrevrange mystream + -
1) 1) "1721822530587-0"
2) 1) "password"
2) "123456"
3) "age"
4) "20"
5) "addr"
6) "beijing"
2) 1) "1721822477703-0"
2) 1) "username"
2) "tom"
127.0.0.1:6379> xrevrange mystream + - count 1
1) 1) "1721822530587-0"
2) 1) "password"
2) "123456"
3) "age"
4) "20"
5) "addr"
6) "beijing"
3.2.4 XREAD
以阻塞或非阻塞的方式读取Stream中的消息。XREAD命令的语法格式见下。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
对XREAD命令中的参数做如下说明。
参数 |
作用 |
count |
数量。 |
milliseconds |
可选,阻塞毫秒数,没有设置就是非阻塞模式。 |
key |
队列名称。 |
id |
消息ID。 |
XREAD命令的演示示例见下。
# 使用xread读取消息
127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) "1721822477703-0"
2) 1) "username"
2) "tom"
2) 1) "1721822530587-0"
2) 1) "password"
2) "123456"
3) "age"
4) "20"
5) "addr"
6) "beijing"
3.2.5 XTRIM
使用XTRIM对Stream流进行修剪,限制长度。XTRIM命令的语法格式见下。
XTRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count]
对XTRIM命令中的参数做如下说明。
参数 |
作用 |
key |
队列名称。 |
MAXLEN和MINID |
长度。 |
count |
数量。 |
XTRIM命令的演示示例见下。
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1721822477703-0"
2) 1) "username"
2) "tom"
2) 1) "1721822530587-0"
2) 1) "password"
2) "123456"
3) "age"
4) "20"
5) "addr"
6) "beijing"
127.0.0.1:6379> XTRIM mystream MAXLEN 1
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1721822530587-0"
2) 1) "password"
2) "123456"
3) "age"
4) "20"
5) "addr"
6) "beijing"
3.2.6 XDEL
使用XDEL删除指定的消息。XDEL命令的语法格式见下。
XDEL key id [id ...]
对XDEL命令中的参数做如下说明。
参数 |
作用 |
key |
队列名称。 |
id |
消息ID。 |
XDEL命令的演示示例见下。
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1721822530587-0"
2) 1) "password"
2) "123456"
3) "age"
4) "20"
5) "addr"
6) "beijing"
127.0.0.1:6379> XDEL mystream 1721822530587-0
(integer) 1
127.0.0.1:6379> XLEN mystream
(integer) 0
3.3 创建消息者组
Redis Stream通过"XGROUP CREATE"命令创建消费者组(Consumer Group),在创建时需要传递起始消息的ID,用来初始化last_delivered_id变量。语法格式如下。
XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
对XGROUP命令中的参数做如下说明。
参数 |
作用 |
key |
指定Stream队列名称,若不存在则自动创建。 |
group |
自定义消费组的名称,不可重复。 |
$ |
表示从尾部开始消费,只接受新消息,而当前Stream的消息则被忽略。 |
XGROUP命令的演示示例见下。
# 添加消息
127.0.0.1:6379> xadd userstream * username liming password 123456 nick xiaoming
"1721910259726-0"
127.0.0.1:6379> xadd userstream * username zhangsan password 123123 nick saner
"1721910308525-0"
127.0.0.1:6379> xadd userstream * username wangwu password 112233 nick xiaowang
"1721910335006-0"
127.0.0.1:6379> xadd userstream * username zhaoliu password 654321 nick xiaoliu
"1721910366584-0"
# 创建消费组,并传递消息起始id(0-0)
127.0.0.1:6379> xgroup create userstream consumer-group-name1 0-0
OK
# 从尾部开始消费信息,只接受新消息
127.0.0.1:6379> xgroup create userstream consumer-group-name2 $
OK
# xinfo查看Stream流信息(队列信息)
127.0.0.1:6379> xinfo stream userstream
1) "length" # 队列中消息的长度
2) (integer) 4
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1721910366584-0"
9) "max-deleted-entry-id"
10) "0-0"
11) "entries-added"
12) (integer) 4
13) "recorded-first-entry-id"
14) "1721910259726-0"
15) "groups" # 有几个消费组
16) (integer) 2
17) "first-entry" # 第一个消息
18) 1) "1721910259726-0"
2) 1) "username"
2) "liming"
3) "password"
4) "123456"
5) "nick"
6) "xiaoming"
19) "last-entry" # 最后一个消息
20) 1) "1721910366584-0"
2) 1) "username"
2) "zhaoliu"
3) "password"
4) "654321"
5) "nick"
6) "xiaoliu"
# xinfo查看消费者组的信息
127.0.0.1:6379> xinfo groups userstream
1) 1) "name"
2) "consumer-group-name1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 4
2) 1) "name"
2) "consumer-group-name2"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1721910366584-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
3.4 消费消息
通过Redis Stream的XREADGROUP命令,用于读取消费者组中的消息(即消费信息),该命令与XREAD类似,能够阻塞并等待新消息的到来。当有新消息被读取时,对应的消息ID会被加入到消费者的PEL(Pending_ids)中,表示这些消息正在处理中。一旦客户端完成消息处理,它可以通过XACK命令通知Redis服务器消息已成功处理,随后该消息ID将从PEL中移除。示意图如下。
XREADGROUP命令的语法格式如下所示:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
[NOACK] STREAMS key [key ...] id [id ...]
对XREADGROUP命令中的参数做如下说明。
参数 |
作用 |
group |
消费组名称。 |
consumer |
消费者名称。 |
count |
要读取的数量。 |
milliseconds |
阻塞时间,以毫秒为单位。 |
key |
键指定的队列名称。 |
id |
表示消息ID。 |
XREADGROUP命令的演示示例见下。
# 消费组中消费者读取消息,">"表示每当消费一个信息,消费组游标就前移一位
127.0.0.1:6379> XREADGROUP GROUP consumer-group-name1 consumer1 COUNT 1 STREAMS userstream >
1) 1) "userstream"
2) 1) 1) "1721910259726-0"
2) 1) "username"
2) "liming"
3) "password"
4) "123456"
5) "nick"
6) "xiaoming"
# 参数"BLOCK 1000"表示等待1秒,如果没有任何消息到来,则返回nill
127.0.0.1:6379> XREADGROUP GROUP consumer-group-name1 consumer1 COUNT 1 BLOCK 1000 STREAMS userstream >
1) 1) "userstream"
2) 1) 1) "1721910308525-0"
2) 1) "username"
2) "zhangsan"
3) "password"
4) "123123"
5) "nick"
6) "saner"
# 通过XACK命令将consumer-group-name消息组中的id为"1721910259726-0"的消息标记为已经处理
127.0.0.1:6379> XACK userstream consumer-group-name1 1721910259726-0
(integer) 1
# 查看消费者组的信息
127.0.0.1:6379> XINFO GROUPS userstream
1) 1) "name"
2) "consumer-group-name1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 3
7) "last-delivered-id"
8) "1721910366584-0"
9) "entries-read"
10) (integer) 4
11) "lag"
12) (integer) 0
2) 1) "name"
2) "consumer-group-name2"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1721910366584-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
注意:在命令XREADGROUP中,“>” 具有特定含义,即每当消费者读取一条消息时,last_delivered_id变量便会向前移动一位。
4.总结
Redis Stream 是 Redis 5.0 版本中极具价值的数据结构,主要用于消息队列和事件驱动应用,有效解决了 Pub/Sub 模式消息易丢失的问题,提供了持久化存储与丰富功能。
其结构核心包括 Stream、Message、Consumer Group 等。Stream 是有序消息集合,以唯一名称作为 Redis 的 key。Message 由字段对组成且有自动或自定义的唯一 ID(由时间戳和序列号构成)。Consumer Group 由多个消费者构成,通过 last_delivered_id 游标记录已读消息,保证组内成员竞争消费消息,避免重复处理,pending_ids 则记录已读未确认消息。
应用场景多样,在发布订阅系统中,生产者向 Stream 发布消息,订阅者以消费者组形式消费;作为消息队列,能实现异步任务处理与事件驱动架构;在实时数据处理方面,可收集处理传感器、日志数据;在分布式系统协调中,帮助节点传递状态和事件信息。
相关命令是非常丰富的。消息操作上,XADD 可添加消息,自动生成或自定义符合递增原则的 ID;XLEN 获取消息长度;XRANGE 和 XREVRANGE 分别正反向获取消息且自动过滤已删除消息;XREAD 支持阻塞或非阻塞读取;XTRIM 修剪流长度;XDEL 删除指定消息。消费者组操作中,XGROUP CREATE 创建组并初始化游标;XREADGROUP 读取组内消息;XACK 确认消息处理完成;XPENDING 查看待处理消息;XCLAIM 转移消息归属权。这些命令协同工作,使 Redis Stream 在各类场景中都能高效处理消息,保障系统稳定运行与数据可靠传递。