使用redis的发布/订阅(Pub/Sub), 实现消息队列
文章目录
- 一、介绍
- 发布订阅采用观察者模式 实现,包含三个核心组件:
- 工作流程
- Redis发布订阅与消息队列(RocketMQ)对比
- 二、配置监听器、及消费者处理类
- 配置监听器
- 消费者处理类
- 三、生产者处理类
一、介绍
Redis的发布订阅拥有简单高效的实时消息传递,但需要根据业务需求权衡其持久化和可靠性限制,对于特殊场景,特殊业务还是建议使用专业的MQ, 如Kafka、RocketMQ。
发布订阅采用观察者模式 实现,包含三个核心组件:
(1)发布者(Publisher)
- 通过 PUBLISH 命令向指定频道发送消息
- 消息格式: PUBLISH channel_name message
- 返回值为当前订阅该频道的客户端数量
(2)订阅者(Subscriber)
- 通过 SUBSCRIBE 命令订阅一个或多个频道
- 可以订阅模式: PSUBSCRIBE pattern*
- 订阅后进入阻塞状态,等待消息到达
(3)频道(Channel)
- 消息传递的通道
- 支持通配符模式匹配
- 无需预先创建,自动管理
工作流程
1.订阅阶段 :客户端执行 SUBSCRIBE channel1 channel2
2.发布阶段 :另一个客户端执行 PUBLISH channel1 “Hello”
3.消息传递 :Redis服务器将消息推送给所有订阅channel1的客户端
4.接收处理 :订阅者收到消息格式: [“message”, “channel1”, “Hello”]
Redis发布订阅与消息队列(RocketMQ)对比
特性 | Redis Pub/Sub | 消息队列(RocketMQ) |
---|---|---|
消息持久化 | ❌ 不支持(内存存储) | ✅ 支持(磁盘持久化) |
离线消息 | 消息丢失 | 消息保存 |
重启恢复 | 消息丢失 | 消息恢复 |
消息可靠性 | ❌ 无ACK机制 | ✅ 支持ACK确认机制 |
事务消息 | ❌ 不支持 | ✅ 支持分布式事务 |
消息顺序 | ⚡ 基本保证 | ✅ 严格顺序保证 |
消费者模式 | 📢 广播模式 | 🔄 集群消费+广播模式 |
重试机制 | ❌ 不支持 | ✅ 支持自动重试 |
死信队列 | ❌ 不支持 | ✅ 支持死信队列 |
消息轨迹 | ❌ 不支持 | ✅ 支持完整消息轨迹 |
部署复杂度 | ⭐ 非常简单 | ⭐⭐ 中等复杂 |
Redis发布订阅虽然有许多不足,但对一些业务小,要求不高的场景使用起来也是非常香的
。
二、配置监听器、及消费者处理类
引入redis依赖,及配置这里就不展示了
。
配置监听器
/*** @author Redis消息监听配置*/
@RequiredArgsConstructor
@Component
public class RedisMessageListenerConfig {private final RedisMessageListenerContainer container;private final MessageSubscriber messageSubscriber;/*** 默认是已经初始化了RedisMessageListenerContainer*/@PostConstructpublic void init(){//这里默认是已经初始化了RedisMessageListenerContainer, 在此处仅仅添加消息监听器即可//如果没有初始化RedisMessageListenerContainer,则需要先初始化,再添加消息监听器container.addMessageListener((message, pattern) -> messageSubscriber.messageHandler1(message), new ChannelTopic("Channel_Topic_1"));container.addMessageListener((message, pattern) -> messageSubscriber.messageHandler2(message), new ChannelTopic("Channel_Topic_2"));}/*** 没有初始化RedisMessageListenerContainer,则需要先初始化*/// @Bean// public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {// //自行进行配置,这里不再说明// RedisMessageListenerContainer initContainer = new RedisMessageListenerContainer();// initContainer.setConnectionFactory();// ...// initContainer.addMessageListener((message, pattern) -> messageSubscriber.messageHandler1(message), new ChannelTopic("Channel_Topic_1"));// initContainer.addMessageListener((message, pattern) -> messageSubscriber.messageHandler2(message), new ChannelTopic("Channel_Topic_2"));// return initContainer;// }
}
消费者处理类
@Slf4j
@RequiredArgsConstructor
@Component
public class MessageSubscriber {public void messageHandler1(Message message) {log.info("消息通知, message:{}",message);try {String string = message.toString();//添加业务逻辑} catch (Exception e) {log.error("消费者异常,message:{} error:{},",message,e.getMessage());}}public void messageHandler2(Message message) {log.info("消息通知, message:{}",message);try {String string = message.toString();//添加业务逻辑} catch (Exception e) {log.error("消费者异常,message:{} error:{},",message,e.getMessage());}}
}
三、生产者处理类
@Service
@RequiredArgsConstructor
public class InterviewSessionServiceImpl {private final RedisTemplate<String, Object> redisTemplate;public void message1(String message1) {redisTemplate.convertAndSend("Channel_Topic_1", message1);}public void message2(String message2) {redisTemplate.convertAndSend("Channel_Topic_2", message2);}
}