当前位置: 首页 > news >正文

使用RedisTemplate设计一个消息队列?

使用RedisTemplate设计消息队列

下面我将详细介绍如何使用Spring Boot的RedisTemplate来设计各种类型的消息队列。

前言

在代码实现之前,我们先了解一下消息队列的基本需求。要保证数据的可靠性与性能之间提供选择。

1. 核心需求分析

在设计消息队列之前,首先要明确核心需求:

基本功能需求

  • 消息发送:生产者能够发送消息到队列
  • 消息接收:消费者能够从队列获取消息
  • 消息存储:消息需要持久化存储,防止丢失
  • 顺序保证:确保消息的消费顺序(FIFO或其他策略)
  • 可靠性:消息不丢失,至少被消费一次

高级功能需求

  • 消息确认机制:确保消息被成功处理
  • 重试机制:处理失败的消息可以重新投递
  • 延迟消息:支持定时或延迟发送
  • 优先级队列:不同优先级的消息处理顺序不同
  • 死信队列:处理多次重试失败的消息
  • 监控管理:队列状态监控和管理功能

2. 架构设计思路

2.1 核心组件设计

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Producer  │───▶│   Message   │───▶│  Consumer   │
│  (生产者)   │    │   Queue     │    │  (消费者)   │
└─────────────┘    └─────────────┘    └─────────────┘│┌────┴────┐│ Storage ││ (存储)  │└─────────┘

2.2 数据流设计

  1. 生产者队列:发送消息
  2. 队列存储:持久化消息
  3. 队列消费者:分发消息
  4. 消费者队列:确认消息处理

3. 存储设计思路

3.1 存储选型考虑
  • 内存存储:高性能,但数据易失
  • 磁盘存储:数据持久,但性能较低
  • 混合存储:内存+磁盘,兼顾性能和持久性

一. 项目配置

Maven依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>

Redis配置类

@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用StringRedisSerializer来序列化和反序列化redis的key值template.setKeySerializer(new StringRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);ObjectMapper mapper = new ObjectMapper();mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);mapper.activateDefaultTyping(LazyCollectionDefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);serializer.setObjectMapper(mapper);template.setValueSerializer(serializer);template.setHashValueSerializer(serializer);template.afterPropertiesSet();return template;}@Beanpublic StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {return new StringRedisTemplate(factory);}
}

二. 基本消息队列实现

消息实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class QueueMessage implements Serializable {private String id;private String topic;private Object payload;private Long timestamp;private Map<String, Object> headers;public QueueMessage(String topic, Object payload) {this.id = UUID.randomUUID().toString();this.topic = topic;this.payload = payload;this.timestamp = System.currentTimeMillis();this.headers = new HashMap<>();}public void addHeader(String key, Object value) {this.headers.put(key, value);}
}

基础队列服务

@Service
public class BasicQueueService {private static final Logger logger = LoggerFactory.getLogger(BasicQueueService.class);@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 发送消息到队列*/public void sendMessage(String queueName, Object message) {QueueMessage queueMessage;if (message instanceof QueueMessage) {queueMessage = (QueueMessage) message;} else {queueMessage = new QueueMessage(queueName, message);}redisTemplate.opsForList().leftPush(queueName, queueMessage);logger.info("Sent message to queue [{}]: {}", queueName, queueMessage.getId());}/*** 批量发送消息*/public void batchSendMessages(String queueName, List<Object> messages) {List<Object> queueMessages = messages.stream().map(msg -> {if (msg instanceof QueueMessage) {return msg;} else {return new QueueMessage(queueName, msg);}}).collect(Collectors.toList());redisTemplate.opsForList().leftPushAll(queueName, queueMessages);logger.info("Batch sent {} messages to queue [{}]", messages.size(), queueName);}/*** 阻塞获取消息*/public QueueMessage blockingReceiveMessage(String queueName, long timeout, TimeUnit unit) {Object result = redisTemplate.opsForList().rightPop(queueName, timeout, unit);if (result instanceof QueueMessage) {QueueMessage message = (QueueMessage) result;logger.info("Received message from queue [{}]: {}", queueName, message.getId());return message;}return null;}/*** 非阻塞获取消息*/public QueueMessage receiveMessage(String queueName) {Object result = redisTemplate.opsForList().rightPop(queueName);if (result instanceof QueueMessage) {QueueMessage message = (QueueMessage) result;logger.info("Received message from queue [{}]: {}", queueName, message.getId());return message;}return null;}/*** 获取队列长度*/public long getQueueSize(String queueName) {Long size = redisTemplate.opsForList().size(queueName);return size != null ? size : 0;}/*** 查看消息但不消费*/public List<QueueMessage> peekMessages(String queueName, int count) {List<Object> messages = redisTemplate.opsForList().range(queueName, 0, count - 1);return messages.stream().filter(msg -> msg instanceof QueueMessage).map(msg -> (QueueMessage) msg).collect(Collectors.toList());}
}

三. 可靠消息队列(支持消息确认)

@Service
public class ReliableQueueService {private static final Logger logger = LoggerFactory.getLogger(ReliableQueueService.class);@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 发送消息到主队列,同时备份到处理中队列*/public void sendReliableMessage(String queueName, Object message) {QueueMessage queueMessage;if (message instanceof QueueMessage) {queueMessage = (QueueMessage) message;} else {queueMessage = new QueueMessage(queueName, message);}String processingQueue = getProcessingQueueName(queueName);// 使用事务确保原子性操作redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {operations.multi();operations.opsForList().leftPush(queueName, queueMessage);operations.opsForHash().put(processingQueue, queueMessage.getId(), queueMessage);return operations.exec();}});logger.info("Sent reliable message to queue [{}]: {}", queueName, queueMessage.getId());}/*** 获取并处理消息(移动到处理中状态)*/public QueueMessage receiveAndProcess(String queueName) {String processingQueue = getProcessingQueueName(queueName);// 从主队列移动到处理队列Object message = redisTemplate.opsForList().rightPopAndLeftPush(queueName, processingQueue);if (message instanceof QueueMessage) {QueueMessage queueMessage = (QueueMessage) message;logger.info("Message moved to processing: {}", queueMessage.getId());return queueMessage;}return null;}/*** 确认消息处理完成*/public void acknowledge(String queueName, String messageId) {String processingQueue = getProcessingQueueName(queueName);redisTemplate.opsForHash().delete(processingQueue, messageId);logger.info("Message acknowledged: {}", messageId);}/*** 消息处理失败,重新入队*/public void requeue(String queueName, String messageId) {String processingQueue = getProcessingQueueName(queueName);redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {operations.multi();QueueMessage message = (QueueMessage) operations.opsForHash().get(processingQueue, messageId);if (message != null) {operations.opsForList().leftPush(queueName, message);operations.opsForHash().delete(processingQueue, messageId);}return operations.exec();}});logger.info("Message requeued: {}", messageId);}/*** 恢复处理中的消息(应用重启时调用)*/public void recoverProcessingMessages(String queueName) {String processingQueue = getProcessingQueueName(queueName);Map<Object, Object> processingMessages = redisTemplate.opsForHash().entries(processingQueue);if (!processingMessages.isEmpty()) {logger.info("Recovering {} processing messages for queue [{}]", processingMessages.size(), queueName);redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {operations.multi();for (Object message : processingMessages.values()) {operations.opsForList().leftPush(queueName, message);}operations.delete(processingQueue);return operations.exec();}});}}/*** 获取处理中的消息数量*/public long getProcessingCount(String queueName) {String processingQueue = getProcessingQueueName(queueName);Long size = redisTemplate.opsForHash().size(processingQueue);return size != null ? size : 0;}private String getProcessingQueueName(String queueName) {return queueName + ":processing";}
}

四. 延迟队列实现

@Service
public class DelayedQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate BasicQueueService basicQueueService;/*** 发送延迟消息*/public void sendDelayedMessage(String targetQueue, Object message, long delay, TimeUnit timeUnit) {QueueMessage queueMessage;if (message instanceof QueueMessage) {queueMessage = (QueueMessage) message;} else {queueMessage = new QueueMessage(targetQueue, message);}// 添加延迟相关信息queueMessage.addHeader("targetQueue", targetQueue);queueMessage.addHeader("deliverTime", System.currentTimeMillis() + timeUnit.toMillis(delay));String delayedQueue = getDelayedQueueName();double score = System.currentTimeMillis() + timeUnit.toMillis(delay);redisTemplate.opsForZSet().add(delayedQueue, queueMessage, score);}/*** 处理到期的延迟消息*/@Scheduled(fixedRate = 5000) // 每5秒执行一次public void processDelayedMessages() {String delayedQueue = getDelayedQueueName();long currentTime = System.currentTimeMillis();// 获取所有到期的消息Set<Object> expiredMessages = redisTemplate.opsForZSet().rangeByScore(delayedQueue, 0, currentTime);if (expiredMessages != null && !expiredMessages.isEmpty()) {for (Object messageObj : expiredMessages) {if (messageObj instanceof QueueMessage) {QueueMessage message = (QueueMessage) messageObj;String targetQueue = (String) message.getHeaders().get("targetQueue");// 发送到目标队列basicQueueService.sendMessage(targetQueue, message);// 从延迟队列中移除redisTemplate.opsForZSet().remove(delayedQueue, message);}}}}/*** 获取延迟队列中的消息数量*/public long getDelayedMessageCount() {String delayedQueue = getDelayedQueueName();Long count = redisTemplate.opsForZSet().size(delayedQueue);return count != null ? count : 0;}private String getDelayedQueueName() {return "delayed:queue";}
}

五. 优先级队列实现

@Service
public class PriorityQueueService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 发送优先级消息*/public void sendPriorityMessage(String queueName, Object message, int priority) {QueueMessage queueMessage;if (message instanceof QueueMessage) {queueMessage = (QueueMessage) message;} else {queueMessage = new QueueMessage(queueName, message);}queueMessage.addHeader("priority", priority);// 使用不同的队列表示不同优先级String priorityQueue = getPriorityQueueName(queueName, priority);redisTemplate.opsForList().leftPush(priorityQueue, queueMessage);}/*** 按优先级获取消息(高优先级先出)*/public QueueMessage receivePriorityMessage(String queueName, int maxPriority) {for (int priority = maxPriority; priority >= 0; priority--) {String priorityQueue = getPriorityQueueName(queueName, priority);Object message = redisTemplate.opsForList().rightPop(priorityQueue);if (message instanceof QueueMessage) {return (QueueMessage) message;}}return null;}/*** 阻塞式按优先级获取消息*/public QueueMessage blockingReceivePriorityMessage(String queueName, int maxPriority, long timeout, TimeUnit unit) {List<String> priorityQueues = new ArrayList<>();for (int priority = maxPriority; priority >= 0; priority--) {priorityQueues.add(getPriorityQueueName(queueName, priority));}// 使用BRPOP命令,按优先级顺序检查队列for (String queue : priorityQueues) {Object message = redisTemplate.opsForList().rightPop(queue, timeout, unit);if (message instanceof QueueMessage) {return (QueueMessage) message;}}return null;}private String getPriorityQueueName(String baseQueue, int priority) {return baseQueue + ":priority:" + priority;}
}

六. 完整的队列管理器

@Service
public class RedisQueueManager {@Autowiredprivate BasicQueueService basicQueueService;@Autowiredprivate ReliableQueueService reliableQueueService;@Autowiredprivate DelayedQueueService delayedQueueService;@Autowiredprivate PriorityQueueService priorityQueueService;/*** 发送简单消息*/public void send(String queueName, Object message) {basicQueueService.sendMessage(queueName, message);}/*** 发送可靠消息*/public void sendReliable(String queueName, Object message) {reliableQueueService.sendReliableMessage(queueName, message);}/*** 发送延迟消息*/public void sendDelayed(String targetQueue, Object message, long delay, TimeUnit timeUnit) {delayedQueueService.sendDelayedMessage(targetQueue, message, delay, timeUnit);}/*** 发送优先级消息*/public void sendPriority(String queueName, Object message, int priority) {priorityQueueService.sendPriorityMessage(queueName, message, priority);}/*** 接收消息*/public QueueMessage receive(String queueName) {return basicQueueService.receiveMessage(queueName);}/*** 阻塞接收消息*/public QueueMessage blockingReceive(String queueName, long timeout, TimeUnit unit) {return basicQueueService.blockingReceiveMessage(queueName, timeout, unit);}/*** 接收并处理可靠消息*/public QueueMessage receiveAndProcess(String queueName) {return reliableQueueService.receiveAndProcess(queueName);}/*** 确认消息处理完成*/public void acknowledge(String queueName, String messageId) {reliableQueueService.acknowledge(queueName, messageId);}/*** 获取队列统计信息*/public Map<String, Object> getQueueStats(String queueName) {Map<String, Object> stats = new HashMap<>();stats.put("queueSize", basicQueueService.getQueueSize(queueName));stats.put("processingCount", reliableQueueService.getProcessingCount(queueName));stats.put("delayedCount", delayedQueueService.getDelayedMessageCount());return stats;}
}

七. 控制器示例

@RestController
@RequestMapping("/api/queue")
public class QueueController {@Autowiredprivate RedisQueueManager queueManager;@PostMapping("/send")public ResponseEntity<String> sendMessage(@RequestParam String queueName, @RequestBody Object message) {queueManager.send(queueName, message);return ResponseEntity.ok("Message sent successfully");}@PostMapping("/send/reliable")public ResponseEntity<String> sendReliableMessage(@RequestParam String queueName, @RequestBody Object message) {queueManager.sendReliable(queueName, message);return ResponseEntity.ok("Reliable message sent successfully");}@PostMapping("/send/delayed")public ResponseEntity<String> sendDelayedMessage(@RequestParam String queueName,@RequestParam long delay,@RequestParam TimeUnit unit,@RequestBody Object message) {queueManager.sendDelayed(queueName, message, delay, unit);return ResponseEntity.ok("Delayed message sent successfully");}@GetMapping("/receive")public ResponseEntity<QueueMessage> receiveMessage(@RequestParam String queueName) {QueueMessage message = queueManager.receive(queueName);return message != null ? ResponseEntity.ok(message) : ResponseEntity.noContent().build();}@PostMapping("/acknowledge")public ResponseEntity<String> acknowledgeMessage(@RequestParam String queueName,@RequestParam String messageId) {queueManager.acknowledge(queueName, messageId);return ResponseEntity.ok("Message acknowledged");}@GetMapping("/stats")public ResponseEntity<Map<String, Object>> getQueueStats(@RequestParam String queueName) {return ResponseEntity.ok(queueManager.getQueueStats(queueName));}
}

八. 消费者服务示例

@Service
public class MessageConsumerService {private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);@Autowiredprivate RedisQueueManager queueManager;/*** 启动简单消息消费者*/@Asyncpublic void startSimpleConsumer(String queueName) {logger.info("Starting simple consumer for queue: {}", queueName);while (true) {try {QueueMessage message = queueManager.blockingReceive(queueName, 30, TimeUnit.SECONDS);if (message != null) {processMessage(message);}} catch (Exception e) {logger.error("Error in simple consumer for queue {}: {}", queueName, e.getMessage(), e);}}}/*** 启动可靠消息消费者*/@Asyncpublic void startReliableConsumer(String queueName) {logger.info("Starting reliable consumer for queue: {}", queueName);while (true) {try {QueueMessage message = queueManager.receiveAndProcess(queueName);if (message != null) {boolean success = processMessage(message);if (success) {queueManager.acknowledge(queueName, message.getId());} else {// 处理失败,可以选择重试或记录日志logger.warn("Message processing failed: {}", message.getId());}} else {Thread.sleep(1000); // 队列为空时休眠}} catch (Exception e) {logger.error("Error in reliable consumer for queue {}: {}", queueName, e.getMessage(), e);}}}private boolean processMessage(QueueMessage message) {try {logger.info("Processing message: {}", message.getId());// 模拟业务处理Thread.sleep(100);logger.info("Message processed successfully: {}", message.getId());return true;} catch (Exception e) {logger.error("Error processing message {}: {}", message.getId(), e.getMessage(), e);return false;}}
}

9. 配置类

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfig {@Beanpublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setThreadNamePrefix("RedisQueue-");executor.initialize();return executor;}
}

十. 使用示例

@Component
public class QueueDemo implements CommandLineRunner {@Autowiredprivate RedisQueueManager queueManager;@Autowiredprivate MessageConsumerService consumerService;@Overridepublic void run(String... args) throws Exception {// 启动消费者consumerService.startSimpleConsumer("test.queue");consumerService.startReliableConsumer("reliable.queue");// 发送测试消息for (int i = 0; i < 10; i++) {Map<String, Object> data = new HashMap<>();data.put("index", i);data.put("timestamp", System.currentTimeMillis());// 发送简单消息queueManager.send("test.queue", data);// 发送可靠消息queueManager.sendReliable("reliable.queue", data);// 发送延迟消息if (i % 3 == 0) {queueManager.sendDelayed("test.queue", data, 10, TimeUnit.SECONDS);}Thread.sleep(1000);}}
}

设计要点总结

  1. 序列化配置:正确配置RedisTemplate的序列化方式,支持复杂对象存储
  2. 事务支持:使用SessionCallback确保多个操作的原子性
  3. 异常处理:完善的异常处理和日志记录
  4. 性能考虑:使用连接池、批量操作、异步处理
  5. 可靠性:实现消息确认、重试、死信处理机制
  6. 扩展性:支持延迟消息、优先级队列等高级特性
  7. 监控管理:提供队列统计和监控接口

这种基于RedisTemplate的消息队列方案适合中小型应用,具有实现简单、部署方便、性能良好的特点。对于高吞吐量、高可靠性的生产环境,建议还是使用专业的消息中间件如RabbitMQ、Kafka等。

http://www.dtcms.com/a/528022.html

相关文章:

  • 海龟交易系统R
  • 【攻防实战】Redis未授权RCE联动metasploit打穿三层内网(上)
  • 织梦网站图片修改不了wordpress模板开发 2016
  • .Net Framework 3.5下载安装教程(附安装包)
  • pycharm远程提交Git
  • PLM实施专家宝典:离散制造企业工程变更的“流程金融”方案
  • Orleans分布式系统架构详细分析
  • 建设网站的价钱深圳宝安上市公司网站建设报价
  • F034 vue+neo4j 体育知识图谱系统|体育文献知识图谱vue+flask知识图谱管理+d3.js可视化
  • 【day10】分治
  • 【Go】C++转Go:数据结构练习(一)排序算法
  • 每天学习一个新注解——@SafeVarargs
  • valgrind交叉编译android版本
  • 公司网站开发设计题目来源怎么写佛山免费建站怎样
  • 构建AI智能体:七十四、探索AI新纪元:扣子平台让想法到智能应用的极简之旅
  • P2119 [NOIP 2016 普及组] 魔法阵
  • 数据结构13:排序
  • 网站搭建 里短信wordpress acf破解版
  • 【C/C++】数据在内存中的存储
  • 我们项目中如何运用vueuse
  • 【开发者导航】集成多引擎与离线查询的macOS开源翻译工具:Easydict
  • 龙岗客户 IBM x3650 M5服务器system board fault故障,上门快修分享
  • TENGJUN-TYPE-C 24PIN(JX24-BPS015-A)连接器深度技术解析
  • 10.23作业
  • 深入剖析 Vue Router History 路由刷新页面 404 问题:原因与解决之道
  • FreeP2W:一个PDF转Word的CLI工具
  • .NET - .NET Aspire的Command-Line和GitHub Copilot
  • 10月25日
  • 【电玩电脑杂志】超级整理合集PDF
  • 怎样做某个网站有更新的提醒成都网络优化网站