Redis消息队列
在进行redis秒杀优化时,我们在异步秒杀的实现中发现了两个较为严重的问题:内存限制问题以及数据安全问题。
想要解决这两个难题,就需要使用消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括三个角色:
消息队列:存储和管理消息,也称消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
优点:解除耦合,提高工作效率
这样看来和我们开始用的阻塞队列好像没什么不同,但是实际有三点不同:
消息队列是在JVM以外的一个独立服务,不受JVM内存的限制,这就解决了内存限制问题。
消息队列不仅仅做消息存储,而且还需要确保数据的安全,就是指存入在消息队列的所有数据要去做持久化,这样不管是服务宕机还是重启,数据就不存在丢失的问题。这就解决了数据安全问题
并且在消息投递给消费者以后,还需要要求消费者做消息的确认,如果消息没有得到确认,那么这个消息就会在队列中依然存在,下一次会再次投递给消费者,让它继续处理,直到成功为止,就是确保消息至少被消费一次。
正常市面上的消息队列也有很多,比如RabbitMQ,RocketMQ,等等,但是上述的消息队列都需要去额外搭建一些服务,去学习另外的一些技术,是有一定的成本。
而在Redis中,提供了三种不同的方式来实现消息队列:
list结构:基于list结构模拟消息队列
PubSub:基本的点对点消息模型
Stream:比较完善的消息队列模型
接下来就来详细了解redis的三种消息队列实现方式:
基于List模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列,而redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列:先进先出,出口和入口不一样,因此可以利用redis命令:lpush集合rpop,或者rpush结合lpop来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。
因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
进入客户端进行测试:
这就是在模仿jvm中的阻塞队列,相比于阻塞队列的优点就是不依赖于JVM内存,不用担心存储上限的问题,redis支持数据持久化,数据安全也能得到保证。还能保证消息有序性。
数据持久化:List不是消息队列,只是一种存储结构,被当做消息碎队列来使用,因此数据存储结构具有数据持久化能力
但是List机构只是基本满足了消息队列的功能,还有一些欠缺点:
无法避免消息丢失:比如从redis的队列中取到一条消息,但是在还没有处理就出现了异常,这个消息就丢失了。
只支持单消费者:一条消息只能被一个用户拿到,无法实现一条消息被共享的场景
基于PubSub的消息队列
PubSub(publish subscribe发布 订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能够收到相关消息。
相关Redis命令:
SUBSCRIBE channel[channel]:订阅一个或多个频道
publish channel msg:向一个频道发送消息
psubscribe pattern[pattern]:订阅和pattern格式匹配的所有频道(pattern:通配符)
相应的通配符可以去Redis官网查询:PSUBSCRIBE | Docs
流程图:
与基于list模拟消息队列的区别就在于允许多个消费者去进行订阅,在订阅时指定自己的频道名称。
进行演示:
基于PubSub的消息队列有哪些优缺点?
优点:
采用发布订阅模型,支持多生产、多消费
缺点:
不支持数据持久化:PubSub只是用来做消息对列的,没有数据持久化能力
无法避免消息丢失:PubSub就是用来做消息发送的,因此如果发送消息的频道没有被任意进程订阅,那么数据就直接丢失。
消息堆积有上限,超出时数据丢失。当发送消息时,如果有消费者进程监听频道,就会在消费者那里有一个缓存区域,将消息缓存,消费者去处理,如果消费者处理消息较慢,消息来得又多,消费者缓存空间是有上限的,如果超出就会数据丢失。
如果对可靠性要求较高,不建议使用PubSub消息队列。
基于Stream的消息队列--单消费模式
Stream是Redis5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
先从官网查看:Commands | Docs
较为重要的命令:发送消息与接收消息
举例说明:
xlen 命令:用于查看指定队列中元素的数量
读取消息的方式之一:XREAD
进行演示:
我们发现,在不同消费者可以读取一条消息,并且相同的消费者可以重复读取一条消息,即在stream这种数据类型中,一个消息读取完后不会删除,是永久存在的。
不仅如此,stream消息队列也可以实现阻塞效果:
演示如下:
那么在业务开发中,我们可以循环的调用xread阻塞方式来查询最新消息,从而实现持续监听队列的效果。伪代码如下:
while(true){ //尝试读取队列中的消息,最多阻塞两秒 Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $"); if(msg == null){ continue; } //处理消息 handleMessage(msg); }
注意事项:当指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过一条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。
演示:
stream类型消息队列的xread命令特点:
消息可回溯(消息永久存在)
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险
接下来就是消费者组。
基于stream的消息队列--消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
具备以下特点:
消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,从一定程度上避免了消息堆积的问题。
消息标识:消费者组会维护一个标识,记录最后一个被处理的消息。就类似于书签,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费。解决了单消费时的消息漏读风险。
消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。档处理完成后需要通过XACK来确认消息。标记消息为已处理,才会从pending-list移除。
较为重要的命令:
创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始id标识,$代表队列中最后一个消息,0则代表列表中第一个消息
MKSTREAM:队列不存在时自动创建队列
其他常见命令:
# 删除指定的消费者组 XGROUP destory key groupName # 给指定的消费者组添加消费者 xgroup createconsumer key groupName consumername #删除消费者组中的指定消费者 xgroup delconsumer key groupName consumername
注意事项:一般情况下不需要去添加消费者,因为当从这个消费组中指定一个消费者并且监听消息时,如果stream发现消费者不存在,就会自动创建。
从消费者组读取消息:
group:消费者组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
block milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
streams key:指定队列名称
ID:获取消息的起始ID:
">":从下一个未消费的消息开始
其他:根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始
命令演示:
确认消息:
key:队列名称
group:组名
ID:需要确认的消息ID
命令演示:
那如果此时,一个消费者读到了消息之后,未确认消息就挂了,此时,该消息就会在pending-list中,那么如何查看pending-list呢?
xpending key group [IDLE min-idle-time] start end count [consumer]
key:队列名称
group:组名称
IDLE:空闲时间,获取消息以后确认之前的这段时间,比如5000毫秒,空闲时间在5000毫秒以上的这些消息,才会接收到pending-list,低于空闲时间的消息不接收。
start end:起始范围,在pending-list中最小的ID和最大的ID是什么,- +是指最小到最大的所有消息
count:查看的消息数量
consumer:想要获取哪个消费者的pending-list。
命令演示:
那么在讲解读取消费者组消息的时候,我们说过,将最后一个ID参数换为数字,就是根据指定id从pending-list中获取已消费但未确认的消息。例如0,是从pending-list中的第一个消息开始。
命令演示:
处理消息流程:
在正常情况下先利用获取未消费信息,拿到后就处理,处理后就确认,确认的过程中如果出现了异常,这个消息就会进入pending-list,在Java代码中体现出来的就是我们出现了异常,catch到之后,再去查询pending-list的消息(出现异常的消息),处理异常消息后,再次确认pending-list是否清空,最后再去获取未消费消息,回归正常。
以上介绍了在命令行界面中处理消息的流程,较为抽象,那么,在Java代码中处理消息流程应该是怎么样的呢?
消费者监听消息的基本思路:
while(true){ //尝试今天队列,使用阻塞模式,最长等待2000毫秒 Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 >"); if(msg == null){//null说明没有消息,继续下一次 continue; } try{ //处理消息,完成后需要确认消息(ACK) handleMessage(msg); }catch(Exception e){ while(true){ Object msg = redis.call("xreadgroup group g1 c1 count 1 block 200 streams s1 0"); if(msg == null){//null说明没有异常消息,所有消息已确认,结束循环 break; } try{ //说明有异常消息,再次处理 handleMessage(msg); }catch(Exception e){ //再次出现异常,记录日志,继续循环 continue; } } } }
总结:stream类型消息队列的xreadgroup命令特点:
消息可回溯
可以同组多消费者争抢消息,加快消费速度
可以阻塞读取
没有消息漏读的风险
有消息确认机制,保证消息至少被消费一次
目前看来。可以满足大部分简单需求,缺点在于持久化时数据容易丢失,积压的消息会占用大量内存。
与前几种消息队列的比较
List | PubSub | Stream | |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
List:利用list的数据结构去模拟组合队列从而实现消息队列效果,因为他本身就是一种数据存储的模式,所以支持持久化,brpop 或者blpop可以实现阻塞读取,list内存上限就是redis对于list这种数据结构的内存上限,但是因为list可以有多个消费者同时去取,因此处理速度较快。而list本身不是消息队列,因此没有消息确认机制以及消息回溯。
PubSub其实还不如list结构,首先不支持持久化,发布订阅模式使其支持阻塞读取,而PubSub本身是消息队列,没有存储能力,直接是一个通道发送消息,最多就是在消费者中做一个缓冲,但是缓存空间有限,更别说消息确认机制,只要没人接收,直接消失,消息回溯能力也没有。
相比之下,Stream仿佛是完美的,消息持久化没问题,消息阻塞也OK,详细堆积处理也没问题,最多是受限于队列长度,并且可以利用多消费者消费,提高消息处理速度,减少堆积问题。消息确认机制:通过ACK与pending-list确保消息一定被消费一次。
因此如果要在redis的三种消息队列中选择的话,肯定是用stream模式,但是如果业务较为庞大,对于消息队列的要求更加严格,还是选要使用更加专业的消息队列比如RabbitMQ等,因为stream虽然支持消息的持久化,但这种持久化是依赖于Redis本身持久化的,Redis的持久化也可能会出现问题,而且消息确认机制只支持消费者的确认机制,不支持生产者确认机制,另外消息的事务机制,再多消费者下的消息有序性等等,还是有很多问题的,还是需要更加强大的消息队列去支持,但如果对消息队列要求不高,还是可以使用的。
希望对大家有所帮助!