当前位置: 首页 > news >正文

Spring Boot 整合 Redis 实现发布/订阅(含ACK机制 - 事件驱动方案)

Spring Boot整合Redis实现发布/订阅(含ACK机制)全流程

一、整体架构

二、实现步骤

步骤1:添加Maven依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

步骤2:配置Redis连接

# application.yml
spring:redis:host: localhostport: 6379lettuce:pool:max-active: 16max-idle: 8
# redisStream配置信息
app:redis:stream: app-eventsgroup: app-groupconsumer: consumer-${random.int(1000)}

步骤3:创建消费者组

@Configuration
public class RedisConfig {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Beanpublic void createConsumerGroup(StringRedisTemplate redisTemplate) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {System.out.println("消费者组已存在: " + groupName);}}
}

步骤4:配置消息监听容器

@Configuration
public class RedisConfig {// 配置消息监听线程池@Bean(name = "redisStreamTaskExecutor")public ThreadPoolTaskExecutor redisStreamTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setThreadNamePrefix("redis-stream-");return executor;}// 创建消息监听容器@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(RedisConnectionFactory factory,@Qualifier("redisStreamTaskExecutor") ThreadPoolTaskExecutor executor) {StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).executor(executor).batchSize(10).build();return StreamMessageListenerContainer.create(factory, options);}
}

步骤5:注册消息监听器

@Component
public class StreamListenerRegistrar {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;@PostConstructpublic void registerListener(StreamMessageListenerContainer container, RedisMessageProcessor processor) {StreamReadRequest<String> readRequest = StreamReadRequest.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false) // 手动ACK.build();container.register(readRequest, processor);}
}

步骤6:实现消息处理器

@Component
public class RedisMessageProcessor implements StreamListener<String, MapRecord<String, String, String>> {@Overridepublic void onMessage(MapRecord<String, String, String> record) {CompletableFuture.runAsync(() -> {try {// 业务处理逻辑processBusiness(record);// 处理成功发送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());} catch (Exception e) {// 失败消息进入Pending List}});}private void processBusiness(MapRecord<String, String, String> record) throws Exception {String eventType = record.getValue().get("eventType");String payload = record.getValue().get("payload");// 根据事件类型处理switch (eventType) {case "ORDER_CREATED": handleOrder(payload); break;case "PAYMENT_PROCESSED": handlePayment(payload); break;}}
}

步骤7:实现Pending消息处理器

@Component
@Slf4j
public class PendingMessageProcessor {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;// 每分钟处理一次Pending消息@Scheduled(fixedRate = 60000)public void processPendingMessages() {// 1. 查询Pending消息PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);pending.forEach(this::handlePendingMessage);}private void handlePendingMessage(PendingMessage pending) {try {// 2. 重新认领消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().claim(streamKey, Consumer.from(groupName, consumerName), Duration.ofSeconds(30), pending.getId());if (!records.isEmpty()) {MapRecord<String, String, String> record = records.get(0);// 3. 重试处理messageProcessor.processBusiness(record);// 4. 处理成功发送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());}} catch (Exception e) {// 5. 超过重试次数移入死信队列if (pending.getTotalDeliveryCount() > 3) {moveToDeadLetterQueue(pending);}}}private void moveToDeadLetterQueue(PendingMessage pending) {// 获取消息内容List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().range(streamKey, Range.from(pending.getId()));if (!records.isEmpty()) {// 添加到死信队列redisTemplate.opsForStream().add("dead-letter:" + streamKey, records.get(0).getValue());// 确认原始消息redisTemplate.opsForStream().acknowledge(streamKey, groupName, pending.getId());}}
}

步骤8:实现消息生产者

@Service
public class RedisMessageProducer {@Value("${app.redis.stream}")private String streamKey;public String sendMessage(String eventType, String payload) {Map<String, String> message = Map.of("eventType", eventType,"payload", payload,"timestamp", String.valueOf(System.currentTimeMillis()));return redisTemplate.opsForStream().add(streamKey, message).getValue();}
}

步骤9:创建REST接口

@RestController
@RequestMapping("/messages")
public class MessageController {private final RedisMessageProducer producer;@PostMappingpublic String sendMessage(@RequestBody MessageRequest request) {return producer.sendMessage(request.getEventType(), request.getPayload());}@Datapublic static class MessageRequest {private String eventType;private String payload;}
}

三、消息生命周期流程图

1. 正常消息处理流程

2. Pending消息处理流程

 

3. ACK机制工作原理

四、生产环境建议

  1. 消费者命名策略

    @Value("${app.redis.consumer}")
    private String consumerName;// 在应用启动时设置
    @PostConstruct
    public void initConsumerName() {String hostName = InetAddress.getLocalHost().getHostName();String port = environment.getProperty("server.port");consumerName = "consumer-" + hostName + "-" + port;
    }
  2. 动态配置重试策略

    app:pending:max_retry: 5retry_interval: 30000 # 30秒
  3. 死信队列监控

    @Scheduled(fixedRate = 3600000) // 每小时检查一次
    public void checkDeadLetterQueue() {Long size = redisTemplate.opsForStream().size("dead-letter:" + streamKey);if (size > 0) {alertService.sendAlert("死信队列有 " + size + " 条未处理消息");}
    }
  4. 消息TTL设置

    // 发送消息时设置最大长度
    public String sendMessage(String eventType, String payload) {MapRecord<String, String, String> record = ...;return redisTemplate.opsForStream().add(Record.of(record).withMaxLen(10000).approximate(true));
    }

六、总结

本文详细介绍了Spring Boot整合Redis实现发布/订阅功能并添加ACK机制的完整方案:

  1. 事件驱动架构:使用Redis Stream监听器实现真正的发布/订阅模式

  2. 可靠ACK机制:通过手动ACK确认确保消息可靠处理

  3. 自动恢复系统:Pending消息处理器自动处理失败消息

  4. 死信队列:隔离无法处理的消息,防止系统阻塞

  5. 生产就绪:包含多实例部署、动态配置、监控告警等生产级特性

该方案适用于需要高可靠性消息传递的场景,如订单处理、支付系统、事件溯源等,在保证系统吞吐量的同时提供了消息可靠性保障。

http://www.dtcms.com/a/291870.html

相关文章:

  • 【Autosar】RTE(Runtime Environment)层详解
  • lspci/setpci用法小结
  • Day 18:推断聚类后簇的类型
  • 支付网关系统前后端鉴权方案
  • LLaMA-Mesh:语言模型驱动的3D内容生成革命
  • LLaMA-Factory相关参数说明
  • VRRP-虚拟路由器冗余协议
  • 微调LLaMA 7B
  • Python通关秘籍(五)数据结构——元组
  • Apache Ignite扫描查询
  • 【机器学习深度学习】微调量化与模型导出量化:区分与应用
  • 苹果app应用ipa文件程序开发后如何运行到苹果iOS真机上测试?
  • 深度学习-算子
  • TI DLP3010光机与相机触发使用指南
  • halcon手眼标定z方向实操矫正
  • CAN基础知识
  • 基于 KeepAlived + HAProxy 搭建 RabbitMQ 高可用负载均衡集群
  • 92套毕业相册PPT模版
  • 【菜狗处理脏数据】对很多个不同时间序列数据的文件聚类—20250722
  • JavaWeb学习打卡11(session(会话)、cookie(甜点)详解)
  • 云服务器进行安全防护的必要性
  • [C++11]范围for循环/using使用
  • 录音智能转写:如何实现一键转写、灵活下载!
  • Kubernetes服务发布基础
  • 【C语言进阶】枚举和联合
  • thinkbook14+指针消失的解决方法 + Windows常见快捷键
  • 四大组件:ContentProvider学习
  • linux用户态各定时器抖动测试
  • java day16
  • Vue开发常用