当前位置: 首页 > news >正文

基于Redis Streams的实时消息处理实战经验分享

cover

基于Redis Streams的实时消息处理实战经验分享

业务场景描述

在我们公司的电商平台中,存在大量异步事件需要实时处理,例如用户下单、库存更新、支付回调等。这些事件对消息的可靠性、顺序性和高吞吐量有较高要求。传统的消息中间件(如Kafka、RabbitMQ)在运维成本或部署复杂度上存在一定挑战,在部分场景下难以满足“轻量、低延迟、易集成” 的需求。

经过调研和验证,Redis 6.0+ 提供的 Streams 特性在嵌入式部署、快速上手方面具有显著优势。本篇文章将分享我们在生产环境中基于 Redis Streams 构建实时消息处理的完整经验,包括技术选型、核心代码示例、踩坑解决和优化方案。

技术选型过程

  1. 消息可靠性:Redis Streams 支持持久化,且提供 ACK 机制和 Pending List,能够有效追踪消费进度。
  2. 顺序消费:同一消费者组内,可保证分片流(同一 key)中消息按写入顺序被串行消费。
  3. 横向扩展:可通过 Stream 分片(多个 Stream Key)或消费者组内多实例并行消费提高吞吐。
  4. 运营成本:Redis 已是团队基础设施,集群部署与监控成熟度高,二次成本低。
  5. 客户端生态:Lettuce、Jedis、Redisson 等客户端均有支持,编码友好。

基于以上考量,最终选型 Redis Streams,落地于现有 Redis 集群,无需额外独立中间件部署。

实现方案详解

环境与依赖

Maven 依赖(以 Lettuce 客户端为例):

<dependencies><dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>6.1.5.RELEASE</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency>
</dependencies>

SpringBoot 配置(application.yml):

spring:redis:host: redis-cluster-hostport: 6379password: your_passwordtimeout: 2000ms

流程设计

  1. Producer 将事件写入 Stream:XADD
  2. 多消费者(Consumer Group)并行读取:XREADGROUP
  3. 消费确认:XACK
  4. 异常消息追踪:Pending-List 与 XCLAIM 回补处理

生产者实现

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import java.util.HashMap;
import java.util.Map;public class RedisStreamProducer {private RedisClient client;private StatefulRedisConnection<String, String> connection;private RedisCommands<String, String> commands;private static final String STREAM_KEY = "orderStream";public RedisStreamProducer(String uri) {client = RedisClient.create(uri);connection = client.connect();commands = connection.sync();}public String sendMessage(Map<String, String> message) {// XADD key * field value [field value ...]return commands.xadd(STREAM_KEY, message);}public void shutdown() {connection.close();client.shutdown();}public static void main(String[] args) {RedisStreamProducer producer = new RedisStreamProducer("redis://:your_password@redis-host:6379/0");Map<String, String> order = new HashMap<>();order.put("orderId", "123456");order.put("userId", "u7890");order.put("amount", "258.50");String messageId = producer.sendMessage(order);System.out.println("消息发送成功, ID=" + messageId);producer.shutdown();}
}

消费者实现

import io.lettuce.core.RedisClient;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.models.stream.Consumer;
import io.lettuce.core.models.stream.PendingMessage;import java.time.Duration;
import java.util.List;
import java.util.Map;public class RedisStreamConsumer {private RedisClient client;private StatefulRedisConnection<String, String> connection;private RedisCommands<String, String> commands;private static final String STREAM_KEY = "orderStream";private static final String GROUP_NAME = "orderGroup";private static final String CONSUMER_NAME = "consumer-1";public RedisStreamConsumer(String uri) {client = RedisClient.create(uri);connection = client.connect();commands = connection.sync();// 创建消费者组, 如果已创建可 ignoretry {commands.xgroupCreate(STREAM_KEY, GROUP_NAME, "$", true);} catch (Exception e) {// Group exists}}public void consume() {while (true) {// 从 Pending List 先处理未 ack 的消息List<PendingMessage> pending = commands.xpending(STREAM_KEY, GROUP_NAME, Range.unbounded(), Limit.from(10));for (PendingMessage pm : pending) {// 重新消费StreamMessage<String, String> msg = commands.xclaim(STREAM_KEY,GROUP_NAME,CONSUMER_NAME,5000,pm.getId());process(msg.getBody());commands.xack(STREAM_KEY, GROUP_NAME, pm.getId());}// 正常读取新消息List<StreamMessage<String, String>> messages = commands.xreadgroup(Consumer.from(GROUP_NAME, CONSUMER_NAME),XReadArgs.StreamOffset.lastConsumed(STREAM_KEY));if (messages != null) {for (StreamMessage<String, String> msg : messages) {process(msg.getBody());commands.xack(STREAM_KEY, GROUP_NAME, msg.getId());}}// 轮询间隔try {Thread.sleep(200);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}private void process(Map<String, String> body) {// 业务处理逻辑System.out.println("处理订单: " + body);}public void shutdown() {connection.close();client.shutdown();}public static void main(String[] args) {RedisStreamConsumer consumer = new RedisStreamConsumer("redis://:your_password@redis-host:6379/0");consumer.consume();consumer.shutdown();}
}

踩过的坑与解决方案

  1. 消息重复消费

    • 问题:消费者处理过程中抛出异常导致 ack 未发送,Pending List 中累积大量消息。
    • 解决:定期扫描 Pending List,并结合 XCLAIM 将“活跃但挂起”消息重新分配给健康消费者处理;同时在业务端做好幂等控制。
  2. 消息积压与内存压力

    • 问题:Stream 长度持续增长,Redis 实例内存压力上升。
    • 解决:使用 XTRIM MAXLEN ~ N 对流进行修剪,结合业务保留时间策略,定期分批清理历史消息。
  3. 消费者实例重启后状态丢失

    • 问题:未及时恢复 Pending List 中未处理消息,导致部分消息长时间滞留。
    • 解决:消费者启动时优先处理 Pending List,再进入正常消费流程;并通过定时任务对挂起较久的消息进行报警或二次补偿处理。

总结与最佳实践

  1. Redis Streams 适合轻量级、低运维成本的实时消息场景,结合 ACK、Pending List 能保证高可靠性。
  2. 采用消费者组(Consumer Group)可支持横向扩展,读写分离与顺序消费兼得。
  3. 业务侧必须做好幂等设计,避免消息重复带来的副作用。
  4. 对 Stream 进行合理修剪,避免数据无节制增长导致内存问题。
  5. 建议结合监控告警,对 Pending List 长度、消费者积压情况进行实时监控。

通过以上实践,我们在千万级日事件量下实现了稳定的实时消息处理平台,整体延迟控制在 50ms 内,且系统可线性扩展。希望本文的实战经验和代码示例能助力大家快速上手 Redis Streams,构建高性能的实时消息处理系统。

http://www.dtcms.com/a/277378.html

相关文章:

  • 2025湖北省信息安全管理与评估赛项一阶段技能书
  • 当外卖骑手遇上“爽提学院”:一场关于专业的蜕变
  • 电商系统未来三年趋势:体验升级、技术赋能与模式重构
  • 海豚远程控制APP:随时随地,轻松掌控手机
  • 强化学习 (11)随机近似
  • 串口A和S的含义以及RT的含义
  • 大众点评商业模式:从内容护城河到竞争熔炉
  • MYSQL数据库----DCL语句
  • 初识JDBC的增删改
  • 12.3 安全内存区域划分
  • Oracle goldengate同步SQL server数据库测试实验中遇到的问题汇总
  • 股指期货的三种风险类型是什么?
  • 以太坊应用开发基础:从理论到实战的完整指南
  • 基于 STM32H743VIT6 的边缘 AI 实践:猫咪叫声分类 CNN 网络部署实战(已验证)中一些bug总结
  • 广东省省考备考(第四十四天7.13)——数量:数学运算(听课后强化训练)
  • IP 地址与网络基础全面解析
  • 飞算AI使用体验-一种基于项目工程思维的AI-Code思路
  • Web攻防-PHP反序列化魔术方法触发条件POP链构造变量属性修改黑白盒角度
  • iOS ish app 打印时间
  • 【Spring AOP】通知类型,@Pointcut、@Order(切面优先级)
  • 导入 SciPy 的 io 模块
  • CAPL报文信号接收和发送
  • Function CAll和MCP
  • 音视频学习(三十七):pts和dts
  • Web攻防-PHP反序列化原生内置类Exception类SoapClient类SimpleXMLElement
  • archive/tar: unknown file mode ?rwxr-xr-x
  • 数据结构 单链表(1)
  • FlinkSQL通解
  • ClickHouse 分区机制详解:规则、合并与实践指南
  • 中国国内面试基本流程解析