Redis--黑马点评--消息队列

发布于:2025-06-30 ⋅ 阅读:(15) ⋅ 点赞:(0)
Redis消息队列

在进行redis秒杀优化时,我们在异步秒杀的实现中发现了两个较为严重的问题:内存限制问题以及数据安全问题。

想要解决这两个难题,就需要使用消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括三个角色:

  • 消息队列:存储和管理消息,也称消息代理(Message Broker)

  • 生产者:发送消息到消息队列

  • 消费者:从消息队列获取消息并处理消息

优点:解除耦合,提高工作效率

这样看来和我们开始用的阻塞队列好像没什么不同,但是实际有三点不同:

  • 消息队列是在JVM以外的一个独立服务,不受JVM内存的限制,这就解决了内存限制问题。

  • 消息队列不仅仅做消息存储,而且还需要确保数据的安全,就是指存入在消息队列的所有数据要去做持久化,这样不管是服务宕机还是重启,数据就不存在丢失的问题。这就解决了数据安全问题

  • 并且在消息投递给消费者以后,还需要要求消费者做消息的确认,如果消息没有得到确认,那么这个消息就会在队列中依然存在,下一次会再次投递给消费者,让它继续处理,直到成功为止,就是确保消息至少被消费一次。

正常市面上的消息队列也有很多,比如RabbitMQ,RocketMQ,等等,但是上述的消息队列都需要去额外搭建一些服务,去学习另外的一些技术,是有一定的成本。

而在Redis中,提供了三种不同的方式来实现消息队列:

  • list结构:基于list结构模拟消息队列

  • PubSub:基本的点对点消息模型

  • Stream:比较完善的消息队列模型

接下来就来详细了解redis的三种消息队列实现方式:

基于List模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列,而redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列:先进先出,出口和入口不一样,因此可以利用redis命令:lpush集合rpop,或者rpush结合lpop来实现。

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。

因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

进入客户端进行测试:

image-20250627215444671

这就是在模仿jvm中的阻塞队列,相比于阻塞队列的优点就是不依赖于JVM内存,不用担心存储上限的问题,redis支持数据持久化,数据安全也能得到保证。还能保证消息有序性。

数据持久化:List不是消息队列,只是一种存储结构,被当做消息碎队列来使用,因此数据存储结构具有数据持久化能力

但是List机构只是基本满足了消息队列的功能,还有一些欠缺点:

  • 无法避免消息丢失:比如从redis的队列中取到一条消息,但是在还没有处理就出现了异常,这个消息就丢失了。

  • 只支持单消费者:一条消息只能被一个用户拿到,无法实现一条消息被共享的场景

基于PubSub的消息队列

PubSub(publish subscribe发布 订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能够收到相关消息。

相关Redis命令

  • SUBSCRIBE channel[channel]:订阅一个或多个频道

  • publish channel msg:向一个频道发送消息

  • psubscribe pattern[pattern]:订阅和pattern格式匹配的所有频道(pattern:通配符)

相应的通配符可以去Redis官网查询:PSUBSCRIBE | Docs

image-20250627221644654

流程图:

image-20250627221935252

与基于list模拟消息队列的区别就在于允许多个消费者去进行订阅,在订阅时指定自己的频道名称。

进行演示:

image-20250627222939860

基于PubSub的消息队列有哪些优缺点?

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化:PubSub只是用来做消息对列的,没有数据持久化能力

  • 无法避免消息丢失:PubSub就是用来做消息发送的,因此如果发送消息的频道没有被任意进程订阅,那么数据就直接丢失。

  • 消息堆积有上限,超出时数据丢失。当发送消息时,如果有消费者进程监听频道,就会在消费者那里有一个缓存区域,将消息缓存,消费者去处理,如果消费者处理消息较慢,消息来得又多,消费者缓存空间是有上限的,如果超出就会数据丢失。

如果对可靠性要求较高,不建议使用PubSub消息队列。

基于Stream的消息队列--单消费模式

Stream是Redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

先从官网查看:Commands | Docs

较为重要的命令:发送消息与接收消息

image-20250627224946845

举例说明:

image-20250627225226995

xlen 命令:用于查看指定队列中元素的数量

image-20250627225452244

读取消息的方式之一:XREAD

image-20250627230113934

进行演示:

image-20250627230528482

我们发现,在不同消费者可以读取一条消息,并且相同的消费者可以重复读取一条消息,即在stream这种数据类型中,一个消息读取完后不会删除,是永久存在的。

不仅如此,stream消息队列也可以实现阻塞效果:

演示如下:

image-20250627231130207

那么在业务开发中,我们可以循环的调用xread阻塞方式来查询最新消息,从而实现持续监听队列的效果。伪代码如下:

while(true){
    //尝试读取队列中的消息,最多阻塞两秒
    Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
    if(msg == null){
        continue;
    }
    //处理消息
    handleMessage(msg);
}

注意事项:当指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过一条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

演示:

image-20250627232211252

stream类型消息队列的xread命令特点:

  • 消息可回溯(消息永久存在)

  • 一个消息可以被多个消费者读取

  • 可以阻塞读取

  • 消息漏读的风险

接下来就是消费者组。

基于stream的消息队列--消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

具备以下特点:

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,从一定程度上避免了消息堆积的问题。

  • 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息。就类似于书签,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费。解决了单消费时的消息漏读风险

  • 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。档处理完成后需要通过XACK来确认消息。标记消息为已处理,才会从pending-list移除。

较为重要的命令

  • 创建消费者组:

 XGROUP CREATE key groupName ID [MKSTREAM]
  • key:队列名称

  • groupName:消费者组名称

  • ID:起始id标识,$代表队列中最后一个消息,0则代表列表中第一个消息

  • MKSTREAM:队列不存在时自动创建队列

其他常见命令:

 # 删除指定的消费者组
 XGROUP destory key groupName
 # 给指定的消费者组添加消费者
 xgroup createconsumer key groupName consumername
 #删除消费者组中的指定消费者
 xgroup delconsumer key groupName consumername

注意事项:一般情况下不需要去添加消费者,因为当从这个消费组中指定一个消费者并且监听消息时,如果stream发现消费者不存在,就会自动创建。

从消费者组读取消息:

image-20250629175000819

  • group:消费者组名称

  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者

  • count:本次查询的最大数量

  • block milliseconds:当没有消息时最长等待时间

  • NOACK:无需手动ACK,获取到消息后自动确认

  • streams key:指定队列名称

  • ID:获取消息的起始ID:

    • ">":从下一个未消费的消息开始

    • 其他:根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始

命令演示:

image-20250629213711915

确认消息:

image-20250629213829716

  • key:队列名称

  • group:组名

  • ID:需要确认的消息ID

命令演示:

image-20250629214418033

那如果此时,一个消费者读到了消息之后,未确认消息就挂了,此时,该消息就会在pending-list中,那么如何查看pending-list呢?

xpending key group [IDLE min-idle-time] start end count [consumer]

  • key:队列名称

  • group:组名称

  • IDLE:空闲时间,获取消息以后确认之前的这段时间,比如5000毫秒,空闲时间在5000毫秒以上的这些消息,才会接收到pending-list,低于空闲时间的消息不接收。

  • start end:起始范围,在pending-list中最小的ID和最大的ID是什么,- +是指最小到最大的所有消息

  • count:查看的消息数量

  • consumer:想要获取哪个消费者的pending-list。

命令演示:

image-20250629220036360

那么在讲解读取消费者组消息的时候,我们说过,将最后一个ID参数换为数字,就是根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始。

命令演示:

image-20250629220459141

处理消息流程:

在正常情况下先利用获取未消费信息,拿到后就处理,处理后就确认,确认的过程中如果出现了异常,这个消息就会进入pending-list,在Java代码中体现出来的就是我们出现了异常,catch到之后,再去查询pending-list的消息(出现异常的消息),处理异常消息后,再次确认pending-list是否清空,最后再去获取未消费消息,回归正常。

以上介绍了在命令行界面中处理消息的流程,较为抽象,那么,在Java代码中处理消息流程应该是怎么样的呢?

消费者监听消息的基本思路:

 while(true){
     //尝试今天队列,使用阻塞模式,最长等待2000毫秒
     Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 >");
     if(msg == null){//null说明没有消息,继续下一次
         continue;
     }
     try{
         //处理消息,完成后需要确认消息(ACK)
         handleMessage(msg);
     }catch(Exception e){
         while(true){
              Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 0");
             if(msg == null){//null说明没有异常消息,所有消息已确认,结束循环
                 break;
     }
     try{
         //说明有异常消息,再次处理
         handleMessage(msg);
         }catch(Exception e){
         //再次出现异常,记录日志,继续循环
         continue;
     }
     }
     }
 }

总结:stream类型消息队列的xreadgroup命令特点:

  • 消息可回溯

  • 可以同组多消费者争抢消息,加快消费速度

  • 可以阻塞读取

  • 没有消息漏读的风险

  • 有消息确认机制,保证消息至少被消费一次

目前看来。可以满足大部分简单需求,缺点在于持久化时数据容易丢失,积压的消息会占用大量内存。

与前几种消息队列的比较

List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间,可以利用多消费者加快处理 受限于消费者缓冲区 受限于队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持

List:利用list的数据结构去模拟组合队列从而实现消息队列效果,因为他本身就是一种数据存储的模式,所以支持持久化,brpop 或者blpop可以实现阻塞读取,list内存上限就是redis对于list这种数据结构的内存上限,但是因为list可以有多个消费者同时去取,因此处理速度较快。而list本身不是消息队列,因此没有消息确认机制以及消息回溯。

PubSub其实还不如list结构,首先不支持持久化,发布订阅模式使其支持阻塞读取,而PubSub本身是消息队列,没有存储能力,直接是一个通道发送消息,最多就是在消费者中做一个缓冲,但是缓存空间有限,更别说消息确认机制,只要没人接收,直接消失,消息回溯能力也没有。

相比之下,Stream仿佛是完美的,消息持久化没问题,消息阻塞也OK,详细堆积处理也没问题,最多是受限于队列长度,并且可以利用多消费者消费,提高消息处理速度,减少堆积问题。消息确认机制:通过ACK与pending-list确保消息一定被消费一次。

因此如果要在redis的三种消息队列中选择的话,肯定是用stream模式,但是如果业务较为庞大,对于消息队列的要求更加严格,还是选要使用更加专业的消息队列比如RabbitMQ等,因为stream虽然支持消息的持久化,但这种持久化是依赖于Redis本身持久化的,Redis的持久化也可能会出现问题,而且消息确认机制只支持消费者的确认机制,不支持生产者确认机制,另外消息的事务机制,再多消费者下的消息有序性等等,还是有很多问题的,还是需要更加强大的消息队列去支持,但如果对消息队列要求不高,还是可以使用的。

希望对大家有所帮助!


网站公告

今日签到

点亮在社区的每一天
去签到