当前位置: 首页 > news >正文

Redis发布订阅:实时消息系统的极简解决方案

💡 一句话真相:Redis发布订阅就像"微信群发通知"💬——发布者(群主)发送消息,订阅者(群成员)实时接收,无需轮询,毫秒级延迟!


💥 一、为什么需要发布订阅?轮询的致命伤

传统轮询痛点:

在这里插入图片描述

问题:

  • 95%的请求是无效查询 ⏱
  • 高并发下服务器压力暴增 💥
  • 消息延迟高达数秒 ⏳

发布订阅破局:实时推送,0无效请求!


⚙️ 二、核心概念:三方角色解析

角色作用生活化比喻
发布者发送消息到指定频道新闻主播 📺
订阅者监听频道接收消息电视观众 👀
频道消息传输的通道电视频道 📡
发布消息
推送消息
推送消息
发布者
频道
订阅者1
订阅者2

📡 三、工作流程详解

1. 订阅者订阅频道
# 订阅news频道  
SUBSCRIBE news  

输出:

1) "subscribe"   # 订阅成功  
2) "news"        # 频道名  
3) (integer) 1   # 当前订阅数  
2. 发布者发送消息
# 向news频道发消息  
PUBLISH news "Redis 7.0 released!"  

返回值:接收到消息的订阅者数量

3. 订阅者实时接收
# 订阅者终端显示  
1) "message"        # 消息类型  
2) "news"           # 来源频道  
3) "Redis 7.0 released!"  # 消息内容  

🔧 四、五大进阶玩法

1. 模式订阅(匹配多个频道)
# 订阅所有tech_开头的频道  
PSUBSCRIBE tech_*  # 发布消息到tech_ai频道  
PUBLISH tech_ai "GPT-4震撼发布!"  
2. 取消订阅
UNSUBSCRIBE news     # 退订指定频道  
PUNSUBSCRIBE tech_*  # 退订模式频道  
3. 查看活跃频道
PUBSUB CHANNELS        # 所有活跃频道  
PUBSUB CHANNELS tech*  # 匹配tech*的频道  
4. 查看订阅者数量
PUBSUB NUMSUB news  # 查看news频道的订阅数  
5. 客户端阻塞监听
import redis  r = redis.Redis()  
pubsub = r.pubsub()  
pubsub.subscribe('news')  # 持续监听消息  
for message in pubsub.listen():  if message['type'] == 'message':  print(message['data'])  

🚀 五、四大应用场景实战

1. 实时聊天室

在这里插入图片描述

2. 系统事件通知
# 订单支付成功通知  
PUBLISH order_paid "订单ID:1001, 金额:299"  
3. 配置实时更新
# 配置中心发布更新  
redis.publish('config_update', '{"theme":"dark"}')  # 服务订阅更新  
def handle_config(message):  new_config = json.loads(message)  apply_config(new_config)  
4. 跨服务解耦
发布事件
推送事件
推送事件
服务A
Redis
服务B
服务C

⚖️ 六、发布订阅 vs 消息队列

特性发布订阅消息队列(Stream)
消息持久化❌ 消息不保存✅ 支持持久化
消费者组❌ 不支持✅ 支持
消息确认❌ 无ACK机制✅ 支持ACK
实时性⚡ 毫秒级⚡ 毫秒级
适用场景实时通知、广播任务队列、可靠传输

⚠️ 七、四大生产环境陷阱

🚫 陷阱1:消息丢失无保障

场景:订阅者掉线期间,消息全部丢失!
解决方案:

发布
发布者
Redis
Stream备份
订阅者
🚫 陷阱2:订阅者阻塞影响服务

错误代码:

while True:  message = pubsub.get_message()  # 死循环占用CPU  

优化方案:

for message in pubsub.listen():  # 阻塞式监听  process(message)  
🚫 陷阱3:频道爆炸性能下降

案例:创建10万个频道 → Redis内存暴增!
预防措施:

  • 使用模式订阅替代多个频道
  • 清理无效频道:CLIENT KILL TYPE pubsub
🚫 陷阱4:无权限控制

风险:任意客户端可订阅敏感频道(如payment
解决方案:

# 使用代理层鉴权  
location /pubsub {  auth_request /auth;  # 先验证权限  proxy_pass http://redis_backend;  
}  

📊 八、性能实测:百万级消息压力测试

场景消息大小QPS平均延迟
1万订阅者100B85,0000.3ms
10万订阅者100B42,0000.8ms
1万订阅者1KB68,0000.5ms
10万订阅者1KB32,0001.2ms

💡 测试环境:Redis 7.0,16核CPU/32GB内存,千兆网络


🔧 九、多语言实战代码

Python(redis-py)
import redis  # 发布者  
publisher = redis.Redis()  
publisher.publish('news', 'Python 3.12 released!')  # 订阅者  
def subscriber():  r = redis.Redis()  pubsub = r.pubsub()  pubsub.subscribe('news')  for message in pubsub.listen():  if message['type'] == 'message':  print(f"收到消息: {message['data']}")  # 启动订阅线程  
import threading  
threading.Thread(target=subscriber).start()  
Java(Jedis)
import redis.clients.jedis.Jedis;  
import redis.clients.jedis.JedisPubSub;  // 订阅者  
JedisPubSub listener = new JedisPubSub() {  @Override  public void onMessage(String channel, String message) {  System.out.println("收到: " + message);  }  
};  new Thread(() -> {  Jedis jedis = new Jedis("localhost");  jedis.subscribe(listener, "news");  
}).start();  // 发布者  
Jedis publisher = new Jedis("localhost");  
publisher.publish("news", "Java 21 LTS is out!");  
Node.js(ioredis)
const Redis = require('ioredis');  
const pub = new Redis();  
const sub = new Redis();  // 订阅  
sub.subscribe('news', (err) => {  if (!err) console.log('订阅成功!');  
});  sub.on('message', (channel, message) => {  console.log(`收到 ${channel} 消息: ${message}`);  
});  // 发布  
pub.publish('news', 'Node.js 20 released!');  

💎 十、总结:发布订阅适用三原则

  1. 实时性要求高:

    • 聊天消息
    • 实时监控报警
  2. 允许消息丢失:

    • 非关键通知(如在线人数更新)
    • 临时状态广播
  3. 无需复杂路由:

    • 广播场景
    • 简单一对一

在这里插入图片描述

🔥 黄金口诀:

  • 实时广播用PubSub,持久队列用Stream
  • 频道设计要规范,大小消息分得开
  • 生产环境加监控,异常退订及时查

#Redis发布订阅 #实时消息 #分布式系统

http://www.dtcms.com/a/354605.html

相关文章:

  • MyBatis延迟加载
  • 云计算学习100天-第29天
  • Node.js 的模块化规范是什么?CommonJS 和 ES6 模块有什么区别?
  • Python DELL Logo
  • day1 ———C++———变量和字符串的使用
  • AI驱动企业数字化转型:解码未来三年的智能化变革密码
  • STAGEWISE实战指南:从集成到使用的完整解决方案
  • AI在商业领域的多元应用:从写作助手到精准运营,解锁AI商业工具新价值
  • 流程控制语句(3)
  • 操作系统中的死锁是什么意思
  • 农行广西区分行携手广西专精特新商会共探金融赋能专精特新企业新路径
  • 用KPI导航数字化转型:制造企业如何科学评估系统上线成效
  • 流程控制语句(2)
  • Java网络编程(UDP, TCP, HTTP)
  • 【Linux基础知识系列:第一百一十五篇】使用gzip与bzip2进行压缩
  • 从首次测试到采购40个机器人:Junior kühlk如何自动化协作机械臂矩阵
  • Linux学习-基于TCP实现群聊
  • 医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(三)
  • windows下查看别的服务器的端口是否通
  • [光学原理与应用-319]:激光器光路设计的主要输出文件的形式和内容
  • 解构与重构:“真人不露相,露相非真人” 的存在论新解 —— 论 “真在” 的行为表达本质
  • 一文读懂:用PyTorch从零搭建一个Transformer模型
  • (LeetCode 每日一题) 3446. 按对角线进行矩阵排序(矩阵、排序)
  • 读大语言模型08计算基础设施
  • GeoScene Maps 完整入门指南:从安装到实战
  • 《Explanation of Adaptive Platform Design》详细解读
  • 同一个栅格数据,为何在QGIS和ArcGIS Pro中打开后显示的数值范围不同?
  • redis单哨兵模式
  • 单元测试到底是什么?该怎么做?
  • 破译心智密码:神经科学如何为下一代自然语言处理绘制语义理解的蓝图