当前位置: 首页 > news >正文

使用redis的发布/订阅(Pub/Sub), 实现消息队列

文章目录

    • 一、介绍
      • 发布订阅采用观察者模式 实现,包含三个核心组件:
      • 工作流程
      • Redis发布订阅与消息队列(RocketMQ)对比
    • 二、配置监听器、及消费者处理类
      • 配置监听器
      • 消费者处理类
    • 三、生产者处理类

一、介绍

Redis的发布订阅拥有简单高效的实时消息传递,但需要根据业务需求权衡其持久化和可靠性限制,对于特殊场景,特殊业务还是建议使用专业的MQ, 如Kafka、RocketMQ。

发布订阅采用观察者模式 实现,包含三个核心组件:

(1)发布者(Publisher)

  • 通过 PUBLISH 命令向指定频道发送消息
  • 消息格式: PUBLISH channel_name message
  • 返回值为当前订阅该频道的客户端数量

(2)订阅者(Subscriber)

  • 通过 SUBSCRIBE 命令订阅一个或多个频道
  • 可以订阅模式: PSUBSCRIBE pattern*
  • 订阅后进入阻塞状态,等待消息到达

(3)频道(Channel)

  • 消息传递的通道
  • 支持通配符模式匹配
  • 无需预先创建,自动管理

工作流程

1.订阅阶段 :客户端执行 SUBSCRIBE channel1 channel2
2.发布阶段 :另一个客户端执行 PUBLISH channel1 “Hello”
3.消息传递 :Redis服务器将消息推送给所有订阅channel1的客户端
4.接收处理 :订阅者收到消息格式: [“message”, “channel1”, “Hello”]

Redis发布订阅与消息队列(RocketMQ)对比

特性Redis Pub/Sub消息队列(RocketMQ)
消息持久化❌ 不支持(内存存储)✅ 支持(磁盘持久化)
离线消息消息丢失消息保存
重启恢复消息丢失消息恢复
消息可靠性❌ 无ACK机制✅ 支持ACK确认机制
事务消息❌ 不支持✅ 支持分布式事务
消息顺序⚡ 基本保证✅ 严格顺序保证
消费者模式📢 广播模式🔄 集群消费+广播模式
重试机制❌ 不支持✅ 支持自动重试
死信队列❌ 不支持✅ 支持死信队列
消息轨迹❌ 不支持✅ 支持完整消息轨迹
部署复杂度⭐ 非常简单⭐⭐ 中等复杂

Redis发布订阅虽然有许多不足,但对一些业务小,要求不高的场景使用起来也是非常香的



二、配置监听器、及消费者处理类

引入redis依赖,及配置这里就不展示了

配置监听器

/*** @author Redis消息监听配置*/
@RequiredArgsConstructor
@Component
public class RedisMessageListenerConfig {private final RedisMessageListenerContainer container;private final MessageSubscriber messageSubscriber;/*** 默认是已经初始化了RedisMessageListenerContainer*/@PostConstructpublic void init(){//这里默认是已经初始化了RedisMessageListenerContainer, 在此处仅仅添加消息监听器即可//如果没有初始化RedisMessageListenerContainer,则需要先初始化,再添加消息监听器container.addMessageListener((message, pattern) -> messageSubscriber.messageHandler1(message), new ChannelTopic("Channel_Topic_1"));container.addMessageListener((message, pattern) -> messageSubscriber.messageHandler2(message), new ChannelTopic("Channel_Topic_2"));}/*** 没有初始化RedisMessageListenerContainer,则需要先初始化*/// @Bean// public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {//     //自行进行配置,这里不再说明//     RedisMessageListenerContainer initContainer = new RedisMessageListenerContainer();//     initContainer.setConnectionFactory();//     ...//     initContainer.addMessageListener((message, pattern) -> messageSubscriber.messageHandler1(message), new ChannelTopic("Channel_Topic_1"));//     initContainer.addMessageListener((message, pattern) -> messageSubscriber.messageHandler2(message), new ChannelTopic("Channel_Topic_2"));//     return initContainer;// }
}

消费者处理类

@Slf4j
@RequiredArgsConstructor
@Component
public class MessageSubscriber {public void messageHandler1(Message message) {log.info("消息通知, message:{}",message);try {String string = message.toString();//添加业务逻辑} catch (Exception e) {log.error("消费者异常,message:{} error:{},",message,e.getMessage());}}public void messageHandler2(Message message) {log.info("消息通知, message:{}",message);try {String string = message.toString();//添加业务逻辑} catch (Exception e) {log.error("消费者异常,message:{} error:{},",message,e.getMessage());}}
}



三、生产者处理类

@Service
@RequiredArgsConstructor
public class InterviewSessionServiceImpl {private final RedisTemplate<String, Object> redisTemplate;public void message1(String message1) {redisTemplate.convertAndSend("Channel_Topic_1", message1);}public void message2(String message2) {redisTemplate.convertAndSend("Channel_Topic_2", message2);}
}

文章转载自:

http://cAUmVdD1.qmqgx.cn
http://bQ202Juy.qmqgx.cn
http://k4IaGzOC.qmqgx.cn
http://OxMHbYhS.qmqgx.cn
http://3jsjgAWl.qmqgx.cn
http://XPIkvgUM.qmqgx.cn
http://Cz7EmGTO.qmqgx.cn
http://VJ3Ii2IX.qmqgx.cn
http://L2Uk6G6R.qmqgx.cn
http://7yO7DgEJ.qmqgx.cn
http://GBa9zRZP.qmqgx.cn
http://L8tAhgAo.qmqgx.cn
http://edEYSVYp.qmqgx.cn
http://nU363YIj.qmqgx.cn
http://P4KP2iM0.qmqgx.cn
http://vIs56aYF.qmqgx.cn
http://AXLZOK1E.qmqgx.cn
http://aw2PVDZz.qmqgx.cn
http://xPhZX4Kk.qmqgx.cn
http://s6GY1BA6.qmqgx.cn
http://XfCiCWDf.qmqgx.cn
http://yIpOjR8w.qmqgx.cn
http://tUfigtsV.qmqgx.cn
http://k7UIyp3G.qmqgx.cn
http://7M76ANaF.qmqgx.cn
http://6AcDqL8E.qmqgx.cn
http://ELm0VsaW.qmqgx.cn
http://fWf4jb5w.qmqgx.cn
http://4e4Tw4lN.qmqgx.cn
http://y3vDvfSn.qmqgx.cn
http://www.dtcms.com/a/373625.html

相关文章:

  • 鸿蒙:更改状态栏、导航栏颜色
  • [数据结构——lesson4.双向链表]
  • 集成学习:从理论到实践的全面解析
  • 机器学习-集成学习
  • 集成学习简介
  • JDK 17、OpenJDK 17、Oracle JDK 17 的说明
  • VM中CentOS 7密码重置
  • 科技信息差(9.8)
  • MATLAB的数值计算(三)曲线拟合与插值
  • 城市脉搏中的“绿色卫士”:当智能科技邂逅城市清洁
  • linux播放视频出现需要MPEG-4 AAC解码器,H.265(Main Profile)解码器,但是没有安装
  • ARM工作模式、汇编学习
  • 【入门级-算法-6、排序算法:选择排序】
  • React state在setInterval里未获取最新值的问题
  • Linux 物理机如何区分 SSD 与 HDD ——以 DELL PERC H730 Mini 为例
  • AP和stage模式差异
  • 支持生成一维条形码Extend .NET
  • 企业级固态硬盘——U.2接口技术
  • 【Android虚拟摄像头】七、安卓15系统实现虚拟摄像头
  • FxSound:提升音频体验,让音乐更动听
  • Don‘t Sleep:保持电脑唤醒,确保任务不间断
  • android/java中,配置更改导致activity销毁重建的解决方法
  • C++day8作业
  • 【CI/CD】GitHub Actions 快速入门
  • 如何在安卓手机/平板上找到下载文件?
  • Claude Code Windows 原生版安装指南
  • AR技术:多行业数字化转型的加速引擎
  • C++初阶(4)类和对象(上)
  • SpringAI企业级应用开发面试全流程解析:核心技术、架构落地与业务场景实战
  • 从旋转位置编码RoPE到YaRN的原理与实现