30.redis消息队列
消息队列message queue,就是存放消息的队列。
消息队列模型包括三个角色:
消息队列:存储和管理消息,也叫做消息代理Message Broker
生产者:发送消息到消息队列。
消费者:从消息队列中获取消息并处理消息。
redis消息队列是在jvm以外的一个服务,就不受jvm内存的限制,存入消息队列的消息会做持久化,这样服务无论是宕机还是重启消息都不会丢失,而且消息投递给消费者以后,要求消费者做消息的确认,如果没有确认,那么消息依然会存在于消息队列中。下一次会再次投递给消费者,确保了消息至少被消费一次。
redis提供了三种不同的方式来实现消息队列:
list结构:基于list结构模拟消息队列。
PubSub:基本的点对点消息模型,发布订阅,但是功能并不完善。
Stream:比较完善的消息队列模型。
list结构的消息队列的实现原理
Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列的特点:先进先出。
队列是入口和出口不在一边,因此可以利用:LPUSH结合RPOP,或者RPUSH结合LPOP来实现。
需要注意:当队列中没有消息时候RPOP或者是LPOP操作会返回null, 并不像jvm的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
优点:
1.利用redis存储,不受限于jvm内存上限。
2.基于redis的持久化机制,数据安全性得到保证。
3.可以满足消息的有序性。
缺点:
1.无法避免消息丢失。
2.只支持单消费者。
PubSub消息队列的实现原理
发布订阅
默认是阻塞式的等待消息
消费者可以订阅一个或多个channel频道。
生产者向对应的channel发送消息后,所有订阅者都能收到相关消息。
实现一个生产者可以发消息给多个消费者
常用命令:
subscribe channel频道名称: 可以订阅一个或多个频道。
例如:subscribe order.queue
publish channel频道名称 msg: 向一个频道发送消息。
psubscribe pattern通配符: 订阅与pattern 格式匹配的所有频道。
通配符说明:
hel?o ?可以匹配任意一个字符
hel*o *代表零个多个字符
h[ea]llo 代表 hello或者hallo都可以。
例如:psubscribe order.*
缺点:
1.不支持数据持久化。
2.无法避免消息丢失。消息发送到频道后,如果没有消费者接收,消息就丢失了。
3.消息堆积在客户端的缓存,超出时消息丢失。
所以不太适合可靠性要求较高的场景。
Stream 消息队列
redis5.0以后引入的,Stream是一种新的数据类型。支持数据的持久化。
Stream数据类型是专门为消息队列设计的,可以实现一个功能非常完善的消息队列。
发送消息命令:xadd
执行命令后会返回消息的唯一标识,redis自动生成的id。
xlen users 命令:查看队列users中消息的数量。
读取消息命令:xread
s1为队列名称
在stream这种消息队列,一个消费者读取消息之后,其他消费者还可以读取该条消息,不会从消息队列中删除该条消息。消息时永久存在的。
block参数用于阻塞等待,block 0 表示永久阻塞。
基于Stream的消息队列-消费组
消费者组:将多个消费者划分到同一个组中,监听同一个队列。
特点:
1.消息分流,队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息的处理速度。同一个组里面的消费者之间属于竞争关系。
2.消费标识,消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费。
3.消息确认,消费者获取消息以后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
xreadgroup命令
xack命令用来确认消息
xpending命令 返回pendinglist中所有未确认的消息 - + 10 表示获取从最小到最大10条消息
java代码处理逻辑的伪代码
stream的缺点:
1.消息的持久化依赖于redis本身的持久化,redis的持久化也不能保证万无一失,还是有丢失风险的。
2.消息确认机制只支持消费者的确认机制,而不支持生产者的确认机制,如果是生产者发消息的过程中消息丢失了该如何办呢?
3.消息的事务机制,在多消费者下的消息的有序性等。
还是更推荐用专业的消息消费软件rabbitmq