当前位置: 首页 > news >正文

Spring Boot使用Redis实现消息队列

Spring Boot使用Redis实现消息队列

消息队列有很多现成的产品,但是服务器上的东西有时候不是开发者能说了算的,一般情况下服务器上都会有Mysql和Redis。消息队列又是在开发中大部分情况下要用到的,所以在不增加额外的东西的情况下,Redis实现消息队列就是成必须了。

队列的好处

  • 解耦: 生产消息的和处理消息的模块分开了,互不影响。以后就算处理消息的逻辑再复杂,也不会拖慢用户请求的速度。
  • 异步: 用户请求(生产消息)可以立即返回,大大提升了用户体验。
  • 削峰: 如果瞬间有大量用户注册(比如搞活动),任务会先在Redis里排队,消费者再慢慢处理,保护了后端服务(如邮件服务器)不被冲垮。

实现方法

  • 发布/订阅(Pub/Sub)模式
  • List或者Stream 模式

两种方式的对比

特性 FeaturePub/Sub (学校广播)Stream (智能厨房订单系统)
消息持久化不支持完全支持
消费者组不支持核心功能
负载均衡不支持 (广播模式)完美支持 (组内竞争)
消息确认(ACK)不支持核心功能
失败重试无法实现可以轻松实现
历史追溯不支持任意回溯
实现复杂度⭐ (非常简单)⭐⭐⭐⭐ (相对复杂)
最适合的场景实时通知、聊天、数据可视化任务队列、订单处理、日志收集

什么时候用 Pub/Sub?

当你追求极致的速度,并且不介意偶尔丢失消息时。

	场景举例:网页上的实时股价推送(丢一两个点无所谓,下一个马上就来了)。在线游戏里,广播玩家的位置信息(这次没收到,0.1秒后下次更新就收到了)。多个微服务系统状态的实时监控看板。

什么时候用 Stream?

当你的业务绝不能丢失任何一条数据,并且需要协同处理大量任务时。

	场景举例:电商订单系统: 每个订单都必须被准确处理。用户注册后的欢迎邮件/短信发送: 每个用户都必须收到。日志处理系统: 收集所有系统的日志,进行分析。任何需要“任务队列”思想的场景。

代码实现

1. Pub/Sub

  • 生产者
package com.example.redismqdemo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;/*** 消息生产者(Producer)* 就像奶茶店的前台,负责接收点单,并把订单信息发送到传送带(Redis)*/
@Component // 把这个类声明为一个组件,让Spring Boot来管理它
public class MessageProducer {// 这是一个Spring Boot提供的、专门用来操作Redis的强大工具@Autowiredprivate StringRedisTemplate redisTemplate;/*** 发送消息到指定的频道(Channel)* @param channel 频道名称,就像传送带的不同泳道,比如"邮件传送带"、"短信传送带"* @param message 要发送的具体消息内容,比如"给用户xxx发送欢迎邮件"*/public void sendMessage(String channel, String message) {System.out.println("发送消息 -> 频道: " + channel + ", 内容: " + message);// 使用convertAndSend方法,将消息发布到指定的频道redisTemplate.convertAndSend(channel, message);}
}
  • 消费者
package com.example.redismqdemo;import org.springframework.stereotype.Component;/*** 消息消费者(Consumer)* 就像奶茶店的师傅,时刻准备从传送带(Redis)上接收新订单并处理*/
@Component // 同样,让Spring Boot管理它
public class MessageConsumer {/*** 这是处理消息的方法。* 当我们订阅的频道(比如 "email-channel")收到消息时,这个方法就会被自动调用。* @param message 从频道接收到的消息内容*/public void receiveMessage(String message) {// 为了演示,我们只在控制台打印出来,表示我们已经收到并处理了// 在真实场景中,这里会是发送邮件、处理订单等具体业务代码System.out.println("收到消息 <- 内容: " + message);System.out.println("正在处理任务...(例如:发送邮件)");// 模拟处理任务需要的时间try {Thread.sleep(2000); // 暂停2秒} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务处理完成!");}
}
  • 消费关系
package com.example.redismqdemo;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;/*** Redis配置类* 作用:建立“订阅关系”,即把我们的消费者(MessageConsumer)和指定的频道(Channel)绑定起来*/
@Configuration // 告诉Spring Boot,这是一个配置类
public class RedisConfig {// 我们要监听的频道名称public static final String EMAIL_CHANNEL = "email-channel";/*** 创建一个消息监听容器* 就像为奶茶师傅分配一个固定的工作站,让他专门监听某个传送带* @param connectionFactory Redis连接工厂,Spring Boot会自动提供* @param listenerAdapter 监听适配器,告诉监听到消息后该怎么办* @return*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// addMessageListener方法将监听器和频道绑定// 这里我们让它监听 "email-channel" 这个频道container.addMessageListener(listenerAdapter, new PatternTopic(EMAIL_CHANNEL));return container;}/*** 创建消息监听适配器* 它定义了当消息来了之后,具体要调用哪个对象的哪个方法* @param consumer 我们自己写的消费者实例(MessageConsumer)* @return*/@BeanMessageListenerAdapter listenerAdapter(MessageConsumer consumer) {// "receiveMessage" 是我们在 MessageConsumer 类中定义的方法名return new MessageListenerAdapter(consumer, "receiveMessage");}
}
  • 测试
package com.example.redismqdemo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController // 声明这是一个网页接口控制器
public class TestController {@Autowiredprivate MessageProducer messageProducer;/*** 创建一个网页接口,用来发送消息* @param message 要发送的消息内容,可以从浏览器传递* @return*/@GetMapping("/send") // 任何人访问 http://localhost:8080/send 就会触发这个方法public String sendMessage(@RequestParam(defaultValue = "这是一个默认消息") String message) {// 调用我们之前写的生产者,把消息发送到 "email-channel" 频道messageProducer.sendMessage(RedisConfig.EMAIL_CHANNEL, message);return "消息发送成功!内容: '" + message + "'。请查看后台控制台的接收情况。";}
}
  • 控制台
发送消息 -> 频道: email-channel, 内容: 欢迎新用户_张三
收到消息 <- 内容: 欢迎新用户_张三
正在处理任务...(例如:发送邮件)
// (等待2秒)
任务处理完成!

2、Stream

  • 生产者
package com.example.redismqdemo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.Map;/*** 消息生产者(Producer) - 升级版* 负责把订单信息发送到【智能传送带】(Redis Stream)*/
@Component
public class MessageProducer {@Autowiredprivate StringRedisTemplate redisTemplate;/*** 发送消息到Stream* @param streamKey Stream的名称,比如 "stream-email"* @param messageData 消息内容,可以是一个包含多个键值对的Map*/public void sendMessage(String streamKey, Map<String, String> messageData) {// 将Map数据转换成Stream需要的数据结构// ObjectRecord<String, Map<String, String>> record = StreamRecords.newRecord()//         .ofObject(messageData) // 设置消息体//         .withStreamKey(streamKey); // 设置Stream的KeySystem.out.println("发送消息到Stream -> Stream: " + streamKey + ", 内容: " + messageData);// 使用 xadd 命令将消息添加到Stream的末尾redisTemplate.opsForStream().add(streamKey, messageData);}
}
  • 消费者
package com.example.redismqdemo;import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** 消息消费者(Consumer) - 升级版* 模拟一个“奶茶师傅班组”的工作流程*/
@Component
public class StreamConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;// --- Stream基本信息 ---public static final String STREAM_KEY = "stream-email"; // 智能传送带的名称public static final String GROUP_NAME = "email-group";   // 奶茶师傅班组的名称public static final String CONSUMER_NAME_PREFIX = "consumer-"; // 师傅名称前缀// --- 死信队列信息 ---public static final String DLQ_STREAM_KEY = "stream-email-dlq"; // 疑难问题板(死信队列)public static final int MAX_RETRIES = 3; // 最大重试次数// 当前消费者的名字,用机器名+线程ID保证唯一private final String consumerName = CONSUMER_NAME_PREFIX + java.lang.management.ManagementFactory.getRuntimeMXBean().getName();/*** 初始化方法:项目启动时,自动创建Stream的消费者组* 就像奶茶店开门前,先确认传送带和班组都已就位*/@PostConstructpublic void init() {try {// 尝试创建消费者组。如果Stream不存在,这个命令会自动创建它。// ReadOffset.latest() 表示只从最新的消息开始消费,忽略历史消息。redisTemplate.opsForStream().createGroup(STREAM_KEY, ReadOffset.latest(), GROUP_NAME);System.out.println("消费者组 '" + GROUP_NAME + "' 创建成功或已存在。");} catch (Exception e) {// 如果Stream或组已经存在,可能会抛异常,这里可以安全地忽略System.out.println("消费者组 '" + GROUP_NAME + "' 已存在,无需创建。");}}/*** 定时任务:每隔5秒,来检查并处理传送带上的新订单* 使用 @Scheduled 注解,让Spring Boot自动周期性地调用这个方法*/@Scheduled(fixedRate = 5000) // 每5000毫秒(5秒)执行一次public void consumeMessages() {// 1. 从传送带为我们这个班组(GROUP_NAME)获取最多1条新订单//    > 表示新消息,block(Duration.ofSeconds(2)) 表示如果没有新消息,愿意等待2秒List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(Consumer.from(GROUP_NAME, consumerName),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()));if (messages == null || messages.isEmpty()) {return; // 没有新消息,直接返回}// 2. 遍历获取到的新订单并处理for (MapRecord<String, Object, Object> message : messages) {Map<Object, Object> body = message.getValue();RecordId messageId = message.getId();System.out.println("消费者 '" + consumerName + "' 收到新订单: " + body + ", 订单ID: " + messageId);try {// 模拟处理订单的业务逻辑processOrder(body);// 3. 处理成功!告诉传送带,这个订单我搞定了(ACK)redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, messageId);System.out.println("订单 " + messageId + " 处理成功,已ACK。");} catch (Exception e) {// 4. 处理失败!System.err.println("订单 " + messageId + " 处理失败: " + e.getMessage());handleFailure(message);}}}/*** 模拟订单处理逻辑* @param orderData 订单数据*/private void processOrder(Map<Object, Object> orderData) {// 模拟一个可能失败的操作if ("fail".equals(orderData.get("action"))) {throw new RuntimeException("原料不足,无法制作!");}// 正常的处理逻辑System.out.println("正在处理订单: " + orderData);}/*** 专门处理失败的订单(消息)* @param failedMessage 失败的消息记录*/private void handleFailure(MapRecord<String, Object, Object> failedMessage) {RecordId messageId = failedMessage.getId();String streamKey = failedMessage.getStream();Map<Object, Object> body = failedMessage.getValue();// 查询这条消息被投递了几次(检查重试次数)// pending方法可以查询一个组里,哪些消息被拿走但还没ACKPendingMessagesSummary pendingSummary = redisTemplate.opsForStream().pending(streamKey, GROUP_NAME);long deliveryCount = pendingSummary.getRecordsFor(messageId) != null ?pendingSummary.getRecordsFor(messageId).getTotalDeliveryCount() : 1;System.out.println("订单 " + messageId + " 已被尝试处理 " + deliveryCount + " 次。");if (deliveryCount >= MAX_RETRIES) {// 达到最大重试次数,放入死信队列System.err.println("达到最大重试次数(" + MAX_RETRIES + "),将订单 " + messageId + " 移入死信队列 " + DLQ_STREAM_KEY);// 将原始消息内容和失败信息一起,发送到DLQ// 注意:我们这里只是简单地用add方法,实际生产中可能会添加更多错误信息redisTemplate.opsForStream().add(DLQ_STREAM_KEY, body);// 在原Stream中确认(ACK)此消息,防止无限重试redisTemplate.opsForStream().acknowledge(streamKey, GROUP_NAME, messageId);} else {// 未达到最大重-试次数,不做任何事。// 消息会保留在“待处理列表”(Pending Entries List)中,// 其他消费者可以通过 XCLAIM 或者等待超时后自动重新投递来再次消费。// 这里我们为了简单,就让它暂时悬挂,等待下次被重新发现。System.out.println("订单 " + messageId + " 将等待下一次重试。");}}
}
  • 开启定时任务
package com.example.redismqdemo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling; // 导入@SpringBootApplication
@EnableScheduling // <-- 添加这个注解
public class RedisMqDemoApplication {public static void main(String[] args) {SpringApplication.run(RedisMqDemoApplication.class, args);}
}
  • 测试
package com.example.redismqdemo;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;@RestController
public class TestController {@Autowiredprivate MessageProducer messageProducer;/*** 发送一个成功的消息*/@GetMapping("/send-success")public String sendSuccessMessage() {Map<String, String> message = new HashMap<>();message.put("userId", "1001");message.put("email", "user1@example.com");message.put("content", "Welcome to our platform!");messageProducer.sendMessage(StreamConsumer.STREAM_KEY, message);return "发送了一个【成功】消息,请观察后台日志。";}/*** 发送一个注定会失败的消息*/@GetMapping("/send-fail")public String sendFailMessage() {Map<String, String> message = new HashMap<>();message.put("userId", "9999");message.put("action", "fail"); // 特殊标记,让消费者处理时抛出异常message.put("reason", "This message is designed to fail.");messageProducer.sendMessage(StreamConsumer.STREAM_KEY, message);return "发送了一个【失败】消息,请观察后台的重试和死信队列逻辑。";}
}
http://www.dtcms.com/a/516496.html

相关文章:

  • 互联网大厂Java面试实战:以Spring Boot与微服务为核心的技术场景剖析
  • 做网站页面的软件毕业设计网站成品
  • 《一个浏览器多人用?Docker+Neko+cpolar实现跨网共享》
  • design设计网站网站优化方法页面
  • C++基础:(十七)模版进阶:深入探索非类型参数、特化、分离编译与实战技巧
  • 《Git:从入门到精通(五)—— Git:Gitee远程仓库创建与克隆指南》
  • UML学习文档(一)
  • 淘宝放单网站开发网站wordpress错误
  • Latex中的错误汇总
  • huggingface transformers调试问题--加载本地路径模型时pdb断点消失
  • KMP算法详解 -- 串的模式匹配
  • 用php做网站的方法学网站建设前途
  • 网站不用下载免费软件曰本孕妇做爰网站
  • 【微信小程序 + 消息订阅 + 授权】 微信小程序实现消息订阅流程介绍,代码示例(仅前端)
  • 网站开发找哪家什么查网站是否降权
  • 【经典书籍】C++ Primer 第13类继承精华讲解
  • “VMware与vmx86驱动程序版本不匹配:预期为:417,实际为416。”解决步骤,亲测有效!!!
  • 查找组成一个偶数最接近的两个素数
  • 获取文件版本(C++源码)
  • 济南网站建设鲁icp备附近展览制作工厂
  • 在Windows WSL2中安装Ubuntu和Docker的完整指南
  • Ubuntu 22 .04安装CUDA, cuDNN, TensorRT
  • Linux编辑神器——vim工具的使用
  • UPS-不间断电源系统
  • AMDGPU/KFD IV(Interrupt Vector)信息结构及实现
  • 网站开发公司计划书如何做英文网站的外链
  • 彬县网站建设it外包前景
  • 网站集约化做暧暧国外网站
  • 基于python的电子商务管理系统
  • Git Remote 实现双向仓库同步教程(适合跨公司协作)