当前位置: 首页 > news >正文

实现基于数据库 flag 状态的消息消费控制

网罗开发(小红书、快手、视频号同名)

  大家好,我是 展菲,目前在上市企业从事人工智能项目研发管理工作,平时热衷于分享各种编程领域的软硬技能知识以及前沿技术,包括iOS、前端、Harmony OS、Java、Python等方向。在移动端开发、鸿蒙开发、物联网、嵌入式、云原生、开源等领域有深厚造诣。

图书作者:《ESP32-C3 物联网工程开发实战》
图书作者:《SwiftUI 入门,进阶与实战》
超级个体:COC上海社区主理人
特约讲师:大学讲师,谷歌亚马逊分享嘉宾
科技博主:华为HDE/HDG

我的博客内容涵盖广泛,主要分享技术教程、Bug解决方案、开发工具使用、前沿科技资讯、产品评测与使用体验。我特别关注云服务产品评测、AI 产品对比、开发板性能测试以及技术报告,同时也会提供产品优缺点分析、横向对比,并分享技术沙龙与行业大会的参会体验。我的目标是为读者提供有深度、有实用价值的技术洞察与分析。

展菲:您的前沿技术领航员
👋 大家好,我是展菲!
📱 全网搜索“展菲”,即可纵览我在各大平台的知识足迹。
📣 公众号“Swift社区”,每周定时推送干货满满的技术长文,从新兴框架的剖析到运维实战的复盘,助您技术进阶之路畅通无阻。
💬 微信端添加好友“fzhanfei”,与我直接交流,不管是项目瓶颈的求助,还是行业趋势的探讨,随时畅所欲言。
📅 最新动态:2025 年 3 月 17 日
快来加入技术社区,一起挖掘技术的无限潜能,携手迈向数字化新征程!


文章目录

    • 摘要
    • 引言
    • 思路分析
    • 项目结构
    • 核心代码示例
      • 1. 消费者类 FlagCheckingConsumer
      • 2. 业务判断类 FlagService
      • 3. 数据访问层 FlagRepository
      • 4. 消息模型 MessagePayload
    • 场景举例
      • 场景一:订单支付后才能发货
      • 场景二:用户实名认证后才能开通服务
      • 场景三:批量导入异步任务
    • QA 环节
    • 总结

摘要

在分布式系统中,消息队列(比如 RocketMQ、Kafka、RabbitMQ)是常见的解耦手段。但是实际业务里经常遇到这种需求:
消费者拿到消息以后,不能立刻消费,而是要根据数据库里的某个业务标志(flag)来判断是否应该消费。

举个例子:一个订单的消息到了,只有当数据库里 flag=true 才允许真正消费,否则就需要跳过或者延迟处理。

本文会结合 RocketMQ,写一个基于数据库 flag 状态的消息消费控制 Demo。

引言

为什么会有这样的需求?

  • 某些业务有“前置条件”,比如订单必须支付完成、审核通过,才能进行后续处理;
  • 消息到达时,数据库可能还没更新状态,如果直接消费会造成数据不一致;
  • 我们需要一种“柔性控制”,让消息的消费时机依赖于数据库里的 flag 状态。

在这种场景下,消息的消费逻辑就不再是单纯的“来了就消费”,而是要先查询数据库。

思路分析

实现逻辑可以拆成几步:

  1. 消息里必须带有业务 ID(如订单 ID、用户 ID);

  2. 消费者收到消息后,先查数据库,看这个 ID 对应的 flag 值;

  3. 如果 flag=true,则继续消费业务逻辑;

  4. 如果 flag=false,则跳过消息,可以选择:

    • 直接忽略;
    • 写入日志方便排查;
    • 重新投递到延迟队列,稍后再试。

这样可以保证业务的一致性。

项目结构

我们搭建一个 Spring Boot + RocketMQ 的小项目,目录大致如下:

rocketmq-flag-consumer/
├── src/
│   ├── main/
│   │   ├── java/com/example/rocketmq/
│   │   │   ├── RocketmqFlagConsumerApplication.java       # 启动类
│   │   │   ├── consumer/FlagCheckingConsumer.java         # 消费者逻辑
│   │   │   ├── service/FlagService.java                   # 业务判断
│   │   │   ├── repository/FlagRepository.java             # 数据库查询
│   │   │   ├── model/MessagePayload.java                  # 消息体模型
│   │   │   └── util/JsonUtils.java                        # JSON 工具类(可选)
│   │   └── resources/
│   │       ├── application.yml                            # 配置文件
│   │       └── mapper/FlagMapper.xml                      # MyBatis SQL 映射(可选)
├── pom.xml                                                # 依赖
└── README.md

核心代码示例

1. 消费者类 FlagCheckingConsumer

@RocketMQMessageListener(topic = "your-topic", consumerGroup = "your-consumer-group")
@Component
public class FlagCheckingConsumer implements RocketMQListener<MessagePayload> {@Autowiredprivate FlagService flagService;@Overridepublic void onMessage(MessagePayload message) {boolean shouldConsume = flagService.shouldConsume(message.getId());if (shouldConsume) {System.out.println("消费消息:" + message);// TODO: 处理业务逻辑,比如订单处理} else {System.out.println("跳过消息,flag=false:" + message);// TODO: 这里可以选择把消息投递到延迟队列,或者写一张“待处理表”}}
}

解析:

  • 消息拿到后,不是立刻处理,而是交给 FlagService 先判断;
  • 如果 flag=false,就直接跳过,避免错误消费。

2. 业务判断类 FlagService

@Service
public class FlagService {@Autowiredprivate FlagRepository flagRepository;public boolean shouldConsume(Long id) {Boolean flag = flagRepository.getFlagById(id);return Boolean.TRUE.equals(flag);}
}

解析:

  • 查询数据库,判断 flag 是否为 true
  • 防御性写法:用 Boolean.TRUE.equals(flag),避免 NPE。

3. 数据访问层 FlagRepository

@Mapper
public interface FlagRepository {@Select("SELECT flag FROM your_table WHERE id = #{id}")Boolean getFlagById(@Param("id") Long id);
}

解析:

  • 直接查数据库 flag 值;
  • 可用注解或 XML 映射都行。

4. 消息模型 MessagePayload

@Data
public class MessagePayload {private Long id;       // 业务 ID,比如订单 IDprivate String content; // 业务内容
}

解析:

  • 消息里必须带有业务 ID,否则无法去数据库查 flag;
  • 其他字段根据业务需要扩展。

场景举例

场景一:订单支付后才能发货

  • 用户下单 → 订单消息进入 MQ;
  • 消费者拿到消息 → 查数据库订单 isPaid 字段;
  • 如果 isPaid=true → 正常发货;
  • 如果 isPaid=false → 跳过,稍后重试。

场景二:用户实名认证后才能开通服务

  • 消息带有用户 ID;
  • 消费者查询数据库 isVerified
  • 未实名的消息丢到延迟队列,等用户实名后再消费。

场景三:批量导入异步任务

  • 每条消息里有任务 ID;
  • 只有当任务状态 flag=READY 时,才允许消费并执行。

QA 环节

Q: 如果消息跳过了,后续怎么保证能消费?
A: 有两种常见做法:

  1. 把消息丢到延迟队列(RocketMQ 支持延时等级);
  2. 消息写到数据库“待消费表”,由定时任务扫描重投。

Q: 如果数据库压力太大怎么办?
A: 可以考虑:

  • 用缓存(Redis)存储 flag;
  • 或者在生产端就判断 flag,避免进入 MQ。

Q: 会不会造成消息乱序?
A: 有可能,如果强依赖顺序,要结合 RocketMQ 的顺序消费功能。

总结

  • 普通消息消费是“来了就消费”;
  • 本文介绍了一个“条件消费”的思路:消费者先查数据库 flag,只有满足条件才处理;
  • 这种方式在订单、任务、用户状态控制等场景很常见;
  • 可以结合 RocketMQ 延迟队列或者“待处理表”机制,让未消费的消息在 flag 改变后再处理。

这样既能保证数据一致性,又能灵活控制消息的消费时机。

http://www.dtcms.com/a/354063.html

相关文章:

  • 【docker】P1 虚拟化与容器化
  • 全球协作无障碍:cpolar+Nextcloud实现跨国文件共享
  • 通过远程桌面横向移动(破解凭证)
  • 【51单片机】【protues仿真】 基于51单片机出租车计价器系统
  • 三轴云台之动态性能篇
  • 数字化时代催生变革,楼宇自控系统成为建筑管理新潮流的引领者
  • ESP32S3:开发环境搭建、VSCODE 单步调试、Systemview 分析任务运行情况
  • 北斗导航|接收机自主完好性监测算法综述
  • 【C++】类和对象 --- 类中的6个默认成员函数
  • CAS 浅析
  • 视觉语言模型应用开发——Qwen 2.5 视觉语言模型的零样本学习能力在多模态内容审核中的实践研究
  • 把CentOS 7默认yum源改成腾讯云镜像
  • 阿里云——云存储与数据库服务
  • RustFS架构解密:零GC设计如何实现12μs级存储延迟?
  • 【lucene】SpanNearQuery中的slop
  • 【lucene】SpanFirstQuery的end参数
  • 【Python】包管理,弄明白import,package,module
  • 复杂网络环境实测:主流云VR产品性能对比——平行云LarkXR突破网络限制 引领云VR技术新高度
  • 记住密码管理器
  • 在Eclipse中配置Tomcat
  • 终端美化:Windows11 下 安装 WSL 并使用好看的的 zsh 主题
  • 【图论】最短路算法
  • 802.11ax上行OFDMA接入机制:技术原理与实现细节
  • 流水线用到的Dockerfile和构建脚本build.sh
  • Python电影票房预测模型研究——贝叶斯岭回归Ridge、决策树、Adaboost、KNN分析猫眼豆瓣数据
  • MYSQL---存储过程
  • 【轨物方案】“无人值守”光伏电站智能运维解决方案,赋能绿色能源高效运营
  • 正则表达式 —— 贪婪与非贪婪
  • 汽车盲点检测系统的网络安全分析和设计
  • 【Linux学习】正则表达式学习记录