当前位置: 首页 > news >正文

【redis】发布订阅

Redis的发布订阅(Pub/Sub)是一种基于消息多播的通信机制,它允许消息的**发布者(Publisher)向特定频道发送消息,而订阅者(Subscriber)**通过订阅频道或模式来接收消息。

其核心特点如下:

  1. 轻量级:无需额外组件,直接通过Redis服务实现

  2. 实时性:消息即时推送,无轮询延迟

  3. 广播模式:一个消息可被多个订阅者同时接收

  4. 无状态性:不存储历史消息,订阅者只能接收订阅后的消息

发布订阅命令的使用

有关发布订阅的命令可以通过help @pubsub命令来查看。有关命令的使用可以通过help 命令来查看,例如help publish

基础命令速查表

命令作用示例
SUBSCRIBE订阅一个或多个频道SUBSCRIBE news sports
PSUBSCRIBE使用模式匹配订阅频道PSUBSCRIBE sensor.*
PUBLISH向指定频道发送消息PUBLISH news "Hello"
UNSUBSCRIBE退订指定频道UNSUBSCRIBE news
PUNSUBSCRIBE退订模式订阅PUNSUBSCRIBE sensor.*
PUBSUB CHANNELS查看活跃频道列表PUBSUB CHANNELS "sensor.*"

操作示例

# 订阅者A(终端1)
127.0.0.1:6379> subscribe notifications
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "notifications"
3) (integer) 1

# 订阅者B(终端2) 
127.0.0.1:6379> psubscribe system.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "system.*"
3) (integer) 1

# 发布消息(终端3)
127.0.0.1:6379> publish notifications "Service will be upgraded soon"
(integer) 1

127.0.0.1:6379> publish system.alert "CPU usage exceeds 90%"
(integer) 1

# 订阅者A收到:
1) "message"
2) "notifications"
3) "Service will be upgraded soon"

# 订阅者B收到: 
1) "pmessage"
2) "system.*"
3) "system.alert"
4) "CPU usage exceeds 90%"

发布订阅的使用场景与优缺点

适用场景

  1. 实时通知系统:用户在线状态更新,即时聊天消息推送

  2. 事件驱动架构:缓存失效广播,分布式配置更新

  3. 轻量级监控:服务器状态报警,业务指标异常通知

优点

  • 极低延迟(平均<1ms)

  • 支持百万级TPS消息吞吐

  • 模式匹配订阅实现灵活路由

  • 零外部依赖(仅需Redis服务)

缺点

消息不可靠性:不保证送达,离线订阅者会丢失消息

无持久化机制:重启后所有订阅关系丢失

客户端阻塞:订阅操作会占用连接线程(需异步处理)

替代方案建议:需要可靠消息时,使用Redis Streams(支持消息持久化、消费者组)或RabbitMQ/Kafka

在Java中使用RedisTemplate实现

配置RedisTemplate

package com.morris.redis.demo.pubsub;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 对redis的键值进行序列化
 */
@Configuration
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(
            RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        // 使用 String 序列化 key
        template.setKeySerializer(new StringRedisSerializer());
        // 使用 JSON 序列化 value(需要额外依赖 jackson)
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());

        // 对于 Hash 结构同理
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

        return template;
    }
}

实现消息发布者

package com.morris.redis.demo.pubsub;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * 消息发布者
 */
@Service
public class MessagePublisher {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void sendNotification(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

实现消息订阅者

package com.morris.redis.demo.pubsub;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 消息订阅者
 */
@Component
public class MessageSubscriber implements MessageListener {

    @Resource
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        String channel = new String(message.getChannel());
        String body = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
        System.out.printf("收到频道[%s]的消息: %s\n", channel, body);
    }
}

配置订阅监听

package com.morris.redis.demo.pubsub;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

/**
 * 配置redis消息订阅监听器
 */
@Configuration
@Slf4j
public class RedisPubSubConfig {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory, MessageSubscriber messageSubscriber) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        
        // 订阅具体频道
        container.addMessageListener(messageSubscriber, new ChannelTopic("notifications"));
        
        // 订阅模式匹配
        container.addMessageListener(messageSubscriber, new PatternTopic("system.*"));
        
        // 异常处理
        container.setErrorHandler((e) -> {
            log.error("[listen message] error ", e);
        });
        
        return container;
    }
}

使用示例

package com.morris.redis.demo.pubsub;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 使用接口发布消息
 */
@RestController
@RequestMapping("/pubsub")
public class PubSubDemoController {

    @Resource
    private MessagePublisher publisher;

    // 发布告警
    @GetMapping("/alert")
    public String sendAlert(@RequestParam String message) {
        publisher.sendNotification("system.alert", message);
        return "警报已发送";
    }

    // 发布通知
    @GetMapping("/notify")
    public String sendNotify(@RequestParam String message) {
        publisher.sendNotification("notifications", message);
        return "通知已发送";
    }
}

相关文章:

  • 【鸿蒙开发】Hi3861学习笔记- GPIO之直流电机
  • 变量赋值汇编
  • 玩转云服务器——阿里云操作系统控制台体验测评
  • ES6新特性
  • 关于xcode Project navigator/项目导航栏的一些说明
  • 2574. 左右元素和的差值
  • Ubuntu24.04 LTS 版本 Linux 系统在线和离线安装 Docker 和 Docker compose
  • F. Counting Necessary Nodes 【Codeforces Round 1009 (Div. 3)】
  • 【实战ES】实战 Elasticsearch:快速上手与深度实践-8.2.1AWS OpenSearch无服务器方案
  • PySide(PyQt),使用types.MethodType动态定义事件
  • USB、DWC3与Gadget关系解析
  • 数据库管理员助理(DP-300)适合什么群体考?
  • 向量点积计算(信息学奥赛一本通-1108)
  • OpenHarmony项目的应用在DevEco Studio配置项目中固定的一键签名
  • 最节省服务器,手搓电子证书查询系统
  • 【C++语言】vector
  • 如何在vscode中编译linux中的c++文件
  • 广西建筑安全员C证考试的报名时间和考试时间是什么时候?
  • 大模型应用(一):RAG
  • 【redis】list类型:基本命令(上)
  • 微信客服电话95068人工服务时间/百度关键词优化教程
  • 网站收录差/企业推广视频
  • 品牌高端网站制作官网/网站推广经验
  • 南阳旅游网站建设/免费推广seo
  • 昭通公司做网站/如何做推广宣传
  • 市南区网站建设/怎么做百度关键词排名