当前位置: 首页 > news >正文

Redis——实现消息队列

目录

前言

 基于List结构模拟消息队列

基于List实现消息队列优缺点

基于PubSub(订阅者)实现消息队列

示例

 基于PubSub的消息队列的优缺点

基于Stream的消息队列

STREAM类型消息队列的XREAD命令特点:

基于Stream的消息队列-消费者组

基于消费者监听消息的基本思路: 

 stream类型消息队列的xreadgroup命令特点:

Redis消息队列总结


前言

消息队列:Message Queue,字面意思就是存放消息的队列,最简单的消息队列模型包括3个角色

以下就是基于Redis结构特性来实现消息队列

 基于List结构模拟消息队列

Redis的list数据结构是双向链表,很容易模拟出消息队列的效果

对于Redis的list数据结构操作方法LPUSH,RPOP,RPUSH,LPOP这些方法。可以利用这些方法来实现消息队列。

如果不知道这些方法怎么使用找到redis的客户端 使用和help + 对应操作方法就可以展现出相关文档使用

基于List实现消息队列优缺点

优点:

  1.  利用Redis存储,不受限于JVM内存上限
  2. 基于Redis的持久化机制,数据安全性有保证
  3. 可以满足消息的有序性

缺点:

  1. 无法避免消息丢失(从消息队列中取出一条消息还没来得及处理就出现异常,这条消息就会丢失,remove出一条消息,list中就没有了
  2. 只支持单消费者(发送一条消息remove移除了,其他消费者就拿不到了)

基于PubSub(订阅者)实现消息队列

Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或者多个channel,生产者向对应channel发送消息后所有订阅者都可以收到相关消息。

PubSub设计之初本来就是用来作消息频道的,有人订阅就会接收,没人接收就不会保存到PubSub直接丢失

示例

 基于PubSub的消息队列的优缺点

优点:

  1. 采用发布订阅模型,支持多生产,多消费

缺点:

  1. 不支持数据持久化(pusub本来就是用作消息队列的不像list本来就是数据结构数据存储,发布消息没人订阅的化消息丢失
  2. 无法避免消息丢失
  3. 消息堆积有上限,超出时数据丢失(发送消息有人订阅,会在消费者那里缓存,缓存空间有上限,连续发很多消息就会消息丢失)

基于Stream的消息队列

Stream是Redis5.0引入的一种新的数据类型,可以实现一个功能非常完善的消息队列。

    发送消息的命令:(也就是往消息队列添加一条消息)

    例如:

     读取消息的方式之一:xread

     示例:

     在业务开发中,我们可以循环的调用xread阻塞方式来查询最新的消息,从而实现持续监听队列的效果,伪代码如下:

    whlie(true){
        //尝试读取队列中的消息,最多阻塞2s
    Object msg = redis.execute("xread count 1 block 2000 streams users $");
    if(msg == null){
            continue;
    }
    //处理消息
       handleMessga(mag);

    注意

    STREAM类型消息队列的XREAD命令特点:

    1.消息可回溯(消息读取完之后不消失)

    2.一个消息可以被多个消费者读取

    3.可以阻塞读取

    4.有漏读风险

    基于Stream的消息队列-消费者组

    消费者组(consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备一下特点:消息分流,消息标识,消息确认

    创建消费者组: 

    其他常见命令:

    一般情况下不需要自己去添加消费者,当我们从这个组中指定一个消费者并且监听消息的时候,如果发现消费者不存在自动帮我们创建出来,并不需要手动去创建

    从消费者组读取消息:

    基于消费者监听消息的基本思路: 

      while (true){
               //尝试监听队列,使用阻塞模式,最长等待2秒
               Object msg = redis.call("xreadgroup group g1 c1 count 1 block 2 streams s1 >");
               if(msg == null){
                   continue;
               }
               try {
                   //处理消息,完成后一定要ack
                   handleMassage(msg);
               }catch (Exception e){
                   while (true){
                       Object msg = redis.call("xreadgroup group g1 c1 count 1 block 2 streams s1 0");
                       if(msg == null){
                           continue;
                       }
                       try {
                           //说名没有异常消息,再次处理
                           handleMassge(msg);
                       }catch (Exception e){
                           //再次出现异常,记录日志,继续循环
                           continue;
                       }
                   }
               }

     stream类型消息队列的xreadgroup命令特点:

    • 消息可回溯
    • 可以多消费争抢消息,加快消费速度
    • 可以阻塞读取.
    • 没有消息漏读风险
    • 有消息确认机制,保证消息至少被消费一次

    Redis消息队列总结

      相关文章:

    1. 【langchain库名解析】
    2. Vue环境搭建:vue+idea
    3. 几款开源网盘的比较
    4. windows 安装 pygame( pycharm)
    5. 基于DNS的负载均衡和反向代理负载均衡
    6. 川翔云电脑:D5 渲染摆脱硬件限制,云端高效创作
    7. 2025年常见渗透测试面试题-sql(题目+回答)
    8. oracle常见问题处理集锦
    9. 深入解析以太坊虚拟机(EVM)架构与状态机特性
    10. 【HarmonyOS 5】鸿蒙中@State的原理详解
    11. ​​IPerf工具使用笔记(基于MobaXterm串口终端)​
    12. 页面编辑器CodeMirror初始化不显示行号或文本内容
    13. docker内安装达梦8数据库
    14. PhotoShop学习09
    15. 设计模式:单例模式
    16. AI大模型与知识生态:重构认知的新时代引擎
    17. 将mongdb中文档转储到mysql设计思路
    18. 众趣科技丨沉浸式 VR 体验,助力酒店民宿数字化营销宣传
    19. Maya云渲染工作流,提升渲染速度
    20. C++蓝桥杯填空题(攻克版)
    21. 做视频网站收费侵权吗/360网站推广费用
    22. 淘宝联盟推广可以做网站吗/重庆seo推广
    23. 企业营销网站建设/批量查询神马关键词排名
    24. 合肥做公司网站/手机创建网站教程
    25. 如何做网站的映射/北京网站快速优化排名
    26. 个人网站能百度推广吗/如何拿高权重网站外链进行互换?