Redis Stream简述

发布于:2025-07-17 ⋅ 阅读:(17) ⋅ 点赞:(0)

Redis Stream

Redis Stream 是 Redis 5.0 引入的一种新的数据结构,主要用于消息队列场景。它借鉴了 Kafka 的设计理念,提供了持久化的、可回溯的消息流功能。
基本概念

  1. 消息:Stream 中的基本单元,由键值对组成
  2. 消费者组:多个消费者可以组成一个组,共同消费消息
  3. 消息ID:格式如: 1526569495631-0

基本 Stream 操作命令

指令参数预读:
用方括号( [] )包起来的参数代表是可选参数
用竖线( | )表示的需要选择其中一个参数
字母小写的表示参数描述(自定义设置),一般字母用大写的表示关键字,如大写ID不是关键字,特殊ID说明如下

ID符号 描述
$ 表示流中最后一个条目的ID
> 在消费者组中表示从未递送给任何消费者的条目
0-0 表示流的开始
- 与0相同,表示最小可能ID
+ 表示最大可能ID
* 只有XADD命令可以用,代表自动生成ID

Stream基本命令

XADD

XADD表示添加消息到流

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold] *|ID field value [field value ...]
  1. [NOMKSTREAM] 参数:
    当流不存在时,不自动创建新流

  2. [MAXLEN|MINID [=|~] threshold] 参数:
    修剪(trimming)操作的核心参数组合,用于精确控制流的大小和内容范围

  3. * | ID 参数:
    用于指定消息 ID 的特殊参数语法
    *表示自动生成ID
    ID表示手动生成ID

  4. field value 参数:
    这种格式通常表示 键值对

XLEN

获取流中的条目数量

XLEN key

XRANGE

按ID范围获取流条目

XRANGE key start end [COUNT count]

XREVRANGE

反向按ID范围获取流条目

XREVRANGE key end start [COUNT count]

XDEL

从流中删除特定条目

XDEL key ID [ID ...]

XTRIM

修剪流的长度

XTRIM key MAXLEN [~] count

消费者组命令

XGROUP

用于管理消费者组(Consumer Group)的核心命令,包含创建、销毁、配置消费者组等多种操作

XGROUP [CREATE key groupname ID|$ [MKSTREAM]] 
       [DESTROY key groupname] 
       [CREATECONSUMER key groupname consumername] 
       [DELCONSUMER key groupname consumername] 
       [SETID key groupname ID|$] 
  1. [CREATE key groupname ID|$ [MKSTREAM]] 指令:
    用于创建消费者组参数:
    (1). CREATE创建新的消费者组
    (2). key为Stream 的键名
    (3). groupname为要创建的消费者组名称
    (4). ID|$ 的$表示从最新消息开始消费,0-0表示从第一条消息开始消费,也可以指定具体消息ID
    (5). [MKSTREAM]如果流不存在,则创建流

  2. [DESTROY key groupname] 指令:
    删除消费者组

  3. [CREATECONSUMER key groupname consumername] 指令:
    创建消费者
    (1). CREATECONSUMER创建新的消费者
    (2). key为Stream的键名
    (3). groupname为要创建的消费者组名称
    (4). consumername为创建消费者名称
    补充说明:CREATECONSUMER 是Redis 6.2.0新增的子命令,Redis 6.2.0以前消费者是自动隐式创建的,新旧机制可以共存

  4. [DELCONSUMER key groupname consumername] 指令:
    删除消费者

  5. [SETID key groupname ID|$] 指令:
    用于重置消费者组的读取位置

消费者消息命令

XREAD 与 XREADGROUP区别

特殊ID说明:

符号 XREAD XREADGROUP
$ 只读取新消息 组创建时的起始位置
> 无效 获取未消费的新消息
0-0 从第一条消息开始读 从第一条pending消息开始读

XREAD 与 XREADGROUP 的区别

特性 XREAD XREADGROUP
用途 简单读取消息 消费者组协同消费消息
是否记录消费位置 不记录 会记录每个消费者的读取位置
消息重复消费 可以重复读取相同消息 组内每个消息只能被一个消费者消费
ACK机制 无确认机制 需要XACK确认消息处理完成

XREAD

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

[COUNT count] 参数:
用于控制单次读取的最大消息数量
[BLOCK milliseconds] 参数:
用于阻塞等待消息的关键参数,milliseconds参数如下参考:

含义
0 无限阻塞,直到有消息到达
>0 最多阻塞指定的毫秒数(如 5000 = 5秒)
不指定 非阻塞模式,立即返回

STREAMS key [key …] ID [ID …] 参数:
STREAMS 是一个必须的关键字,它标志着后面要指定的是流名称和对应的起始ID

XREADGROUP

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

GROUP 参数:
消费者组关键字
group 参数:
消费者组名称
consumer 参数:
消费者名称

XACK

确认消息已处理

XACK key group ID [ID ...]

XPENDING

检查待处理消息

XPENDING key group [start end count] [consumer]

XCLAIM

转移消息所有权

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]

信息命令

XINFO

获取流和消费者组的信息

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

补充说明

在 Redis Stream 中,如果删除了 Stream(流)本身,那么关联的所有消费者组(Consumer Group)也会被自动删除,因为消费者组是依赖于 Stream 存在的

例子

创建一个order流
由于我需要创建一个没有数据的消费组,但是redis5做不到,所以需要创建了再删除数据

XADD order * name zs age 18

在这里插入图片描述
删除

XDEL order 1752648728774-0

在这里插入图片描述
在这里插入图片描述
创建消费者组

XGROUP create order mygroup $

在这里插入图片描述

消费,隐式创建消费者

XREADGROUP group mygroup user1 count 1 block 0 streams order >

在这里插入图片描述
阻塞状态,直到有数据到来
窗口二添加一条数据

XADD order * name ls age 20

在这里插入图片描述
看到消息
在这里插入图片描述

查看提交状态

XPENDING order mygroup

在这里插入图片描述
显示有一条数据未提交,这时我们去提交就好了
提交数据

XACK order mygroup 1752649019443-0

在这里插入图片描述
再查看提交状态
在这里插入图片描述
这时已经正确提交了


网站公告

今日签到

点亮在社区的每一天
去签到