Redis Stream
Redis Stream 是 Redis 5.0 引入的一种新的数据结构,主要用于消息队列场景。它借鉴了 Kafka 的设计理念,提供了持久化的、可回溯的消息流功能。
基本概念
- 消息:Stream 中的基本单元,由键值对组成
- 消费者组:多个消费者可以组成一个组,共同消费消息
- 消息ID:格式如: 1526569495631-0
基本 Stream 操作命令
指令参数预读:
用方括号( [] )包起来的参数代表是可选参数
用竖线( | )表示的需要选择其中一个参数
字母小写的表示参数描述(自定义设置),一般字母用大写的表示关键字,如大写ID不是关键字,特殊ID说明如下
ID符号 | 描述 |
---|---|
$ | 表示流中最后一个条目的ID |
> | 在消费者组中表示从未递送给任何消费者的条目 |
0-0 | 表示流的开始 |
- | 与0相同,表示最小可能ID |
+ | 表示最大可能ID |
* | 只有XADD命令可以用,代表自动生成ID |
Stream基本命令
XADD
XADD表示添加消息到流
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold] *|ID field value [field value ...]
[NOMKSTREAM] 参数:
当流不存在时,不自动创建新流[MAXLEN|MINID [=|~] threshold] 参数:
修剪(trimming)操作的核心参数组合,用于精确控制流的大小和内容范围* | ID 参数:
用于指定消息 ID 的特殊参数语法
*表示自动生成ID
ID表示手动生成IDfield value 参数:
这种格式通常表示 键值对
XLEN
获取流中的条目数量
XLEN key
XRANGE
按ID范围获取流条目
XRANGE key start end [COUNT count]
XREVRANGE
反向按ID范围获取流条目
XREVRANGE key end start [COUNT count]
XDEL
从流中删除特定条目
XDEL key ID [ID ...]
XTRIM
修剪流的长度
XTRIM key MAXLEN [~] count
消费者组命令
XGROUP
用于管理消费者组(Consumer Group)的核心命令,包含创建、销毁、配置消费者组等多种操作
XGROUP [CREATE key groupname ID|$ [MKSTREAM]]
[DESTROY key groupname]
[CREATECONSUMER key groupname consumername]
[DELCONSUMER key groupname consumername]
[SETID key groupname ID|$]
[CREATE key groupname ID|$ [MKSTREAM]] 指令:
用于创建消费者组参数:
(1). CREATE创建新的消费者组
(2). key为Stream 的键名
(3). groupname为要创建的消费者组名称
(4). ID|$ 的$表示从最新消息开始消费,0-0表示从第一条消息开始消费,也可以指定具体消息ID
(5). [MKSTREAM]如果流不存在,则创建流[DESTROY key groupname] 指令:
删除消费者组[CREATECONSUMER key groupname consumername] 指令:
创建消费者
(1). CREATECONSUMER创建新的消费者
(2). key为Stream的键名
(3). groupname为要创建的消费者组名称
(4). consumername为创建消费者名称
补充说明:CREATECONSUMER 是Redis 6.2.0新增的子命令,Redis 6.2.0以前消费者是自动隐式创建的,新旧机制可以共存[DELCONSUMER key groupname consumername] 指令:
删除消费者[SETID key groupname ID|$] 指令:
用于重置消费者组的读取位置
消费者消息命令
XREAD 与 XREADGROUP区别
特殊ID说明:
符号 | XREAD | XREADGROUP |
---|---|---|
$ | 只读取新消息 | 组创建时的起始位置 |
> | 无效 | 获取未消费的新消息 |
0-0 | 从第一条消息开始读 | 从第一条pending消息开始读 |
XREAD 与 XREADGROUP 的区别
特性 | XREAD | XREADGROUP |
---|---|---|
用途 | 简单读取消息 | 消费者组协同消费消息 |
是否记录消费位置 | 不记录 | 会记录每个消费者的读取位置 |
消息重复消费 | 可以重复读取相同消息 | 组内每个消息只能被一个消费者消费 |
ACK机制 | 无确认机制 | 需要XACK确认消息处理完成 |
XREAD
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
[COUNT count] 参数:
用于控制单次读取的最大消息数量
[BLOCK milliseconds] 参数:
用于阻塞等待消息的关键参数,milliseconds参数如下参考:
值 | 含义 |
---|---|
0 | 无限阻塞,直到有消息到达 |
>0 | 最多阻塞指定的毫秒数(如 5000 = 5秒) |
不指定 | 非阻塞模式,立即返回 |
STREAMS key [key …] ID [ID …] 参数:
STREAMS 是一个必须的关键字,它标志着后面要指定的是流名称和对应的起始ID
XREADGROUP
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
GROUP 参数:
消费者组关键字
group 参数:
消费者组名称
consumer 参数:
消费者名称
XACK
确认消息已处理
XACK key group ID [ID ...]
XPENDING
检查待处理消息
XPENDING key group [start end count] [consumer]
XCLAIM
转移消息所有权
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
信息命令
XINFO
获取流和消费者组的信息
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
补充说明
在 Redis Stream 中,如果删除了 Stream(流)本身,那么关联的所有消费者组(Consumer Group)也会被自动删除,因为消费者组是依赖于 Stream 存在的
例子
创建一个order流
由于我需要创建一个没有数据的消费组,但是redis5做不到,所以需要创建了再删除数据
XADD order * name zs age 18
删除
XDEL order 1752648728774-0
创建消费者组
XGROUP create order mygroup $
消费,隐式创建消费者
XREADGROUP group mygroup user1 count 1 block 0 streams order >
阻塞状态,直到有数据到来
窗口二添加一条数据
XADD order * name ls age 20
看到消息
查看提交状态
XPENDING order mygroup
显示有一条数据未提交,这时我们去提交就好了
提交数据
XACK order mygroup 1752649019443-0
再查看提交状态
这时已经正确提交了