当前位置: 首页 > news >正文

Java消息中间件(RocketMQ)

在分布式系统架构中,传统 HTTP 同步调用存在明显局限 —— 客户端需等待服务端响应才能继续执行,一旦服务端出现网络延迟或不可达问题,客户端将直接受影响。消息中间件的出现,正是为解决这一痛点,通过高效可靠的消息传递机制,实现分布式系统间的异步通信与解耦。本文将围绕消息中间件核心知识,结合 RocketMQ 的实战操作,全面梳理其原理、场景与应用。

目录

一、消息中间件基础:概念与价值

1. 核心定义

2. 核心使用场景

(1)异步处理:缩短响应时间,提升吞吐量

(2)应用解耦:降低系统依赖,提高容错性

3. 主流消息中间件对比

二、RocketMQ 实战:从环境搭建到消息收发

1. 环境准备与安装

(1)前置要求

(2)安装步骤

(3)安装可视化插件

2. RocketMQ 核心架构与概念

3. 消息发送与接收:三种发送方式 + 两种消费模式

(1)依赖引入

(2)三种消息发送方式

(3)两种消费模式

三、RocketMQ 实际应用:下单发短信场景

1. 订单微服务(生产者)实现

(1)添加依赖

(2)配置 application.yml

(3)编写消息发送代码

2. 用户微服务(消费者)实现

(1)添加依赖

(2)配置 application.yml

(3)编写消息接收服务

3. 测试验证

四、总结


一、消息中间件基础:概念与价值

1. 核心定义

消息中间件是基于数据通信的分布式系统集成工具,通过提供消息传递消息排队模型,扩展进程间通信能力。其核心角色清晰明确:

  • 生产者(Producer):发送消息的应用或服务,如同寄快递时的寄件人;
  • 消费者(Consumer):接收并处理消息的应用或服务,类似收件人;
  • 消息载体:包含业务数据与路由属性,是两者通信的核心媒介。

2. 核心使用场景

消息中间件的价值主要体现在两大场景,通过架构改造显著提升系统性能与稳定性:

(1)异步处理:缩短响应时间,提升吞吐量

以 “用户注册后发送邮件与短信” 为例,传统方案存在明显效率问题:

  • 串行方式:注册信息写入数据库(50ms)→发送邮件(50ms)→发送短信(50ms),总响应时间 150ms,步骤串行导致耗时叠加;
  • 并行方式:写入数据库后同时发送邮件与短信,总响应时间 100ms,虽有优化但仍受限于最慢任务。

引入消息队列后,架构变为 “注册信息写入数据库(50ms)→写入消息队列(5ms)”,仅需 55ms 即可响应客户端。邮件与短信服务通过异步读取消息队列完成处理,系统吞吐量较串行提升 3 倍,较并行提升 2 倍。

(2)应用解耦:降低系统依赖,提高容错性

传统 “订单系统调用库存系统接口” 的方案中,若库存系统故障,订单系统将直接受阻。引入消息队列后:

  • 订单系统下单成功后,仅需将消息写入队列即可完成流程,无需关注库存系统状态;
  • 库存系统恢复后,从队列中读取消息执行库存扣减,实现两者完全解耦,即使一方故障也不影响另一方核心功能。

3. 主流消息中间件对比

不同中间件在特性上存在差异,选型需结合业务需求。以下是四大主流中间件的关键指标对比:

特性ActiveMQRabbitMQRocketMQKafka
生产者消费者模式支持支持支持支持
发布订阅模式支持支持支持支持
请求回应模式支持支持不支持不支持
API 完备性
多语言支持支持支持仅 Java支持
单机吞吐量万级 / 秒万级 / 秒万级 / 秒十万级 / 秒
消息延迟微秒级毫秒级毫秒级(可优化)
可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
消息丢失风险理论上不丢失理论上不丢失
文档完备性较高
社区活跃度
商业支持商业云支持商业云支持

二、RocketMQ 实战:从环境搭建到消息收发

RocketMQ 是阿里巴巴开源的分布式消息中间件,现属 Apache 顶级项目,经 “双 11” 万亿级流量验证,在高可用、高可靠场景中表现优异。

1. 环境准备与安装

(1)前置要求
  • 64 位操作系统;
  • JDK 1.8 及以上;
  • Maven(用于编译可视化插件)。
(2)安装步骤
  1. 下载 RocketMQ:访问http://rocketmq.apache.org/release_notes/release-notes-4.4.0/获取安装包;
  2. 配置环境变量
    • 新建变量ROCKETMQ_HOME,值为安装包解压路径;
    • Path中添加%ROCKETMQ_HOME%\bin
  3. 启动服务
    • 切换至bin目录,执行start mqnamesrv.cmd启动 NameServer;
    • 执行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true启动 Broker;
    • 若提示 “找不到主类”,需打开runbroker.cmd,将%CLASSPATH%改为带英文双引号的形式。
(3)安装可视化插件
  1. 从 GitHub 下载rocketmq-externals-rocketmq-console-1.0.0.zip并解压;
  2. 进入rocketmq-console/src/main/resources,编辑application.properties配置 NameServer 地址;
  3. 切换至rocketmq-console目录,执行mvn clean package -Dmaven.test.skip=true编译生成 Jar 包;
  4. 进入target目录,执行java -jar rocketmq-console-ng-1.0.0.jar启动插件;
  5. 浏览器访问http://localhost:8085,进入控制台管理界面。

2. RocketMQ 核心架构与概念

RocketMQ 的架构由四大核心角色构成,协同完成消息的存储与传递:

角色功能描述类比场景
NameServer消息队列协调者,Broker 向其注册路由信息,Producer/Consumer 从其获取 Broker 地址邮局管理机构
Broker核心组件,负责消息的接收、存储、投递,是消息流转的核心节点邮局 / 邮递员
Producer消息生产者,需先从 NameServer 获取 Broker 信息,再建立连接发送消息寄件人
Consumer消息消费者,通过 NameServer 获取 Broker 信息,连接后接收并处理消息收件人

此外,还有两个关键概念:

  • Topic:用于区分消息类型,发送与接收前需先创建,如同消息的 “地区分类”;
  • Message Queue:为提升吞吐量,一个 Topic 可划分多个队列,支持消息并行发送与消费。

3. 消息发送与接收:三种发送方式 + 两种消费模式

(1)依赖引入

首先在项目中添加 RocketMQ 客户端依赖:

xml

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>
(2)三种消息发送方式

不同业务场景对应不同的发送策略,其特性差异如下:

发送方式发送 TPS结果反馈可靠性适用场景
同步发送不丢失重要通知(短信、告警)
异步发送可能丢失响应敏感场景(实时业务)
单向发送极快可能丢失日志发送(不关心结果)

① 同步发送代码示例

java

运行

public class RocketMQSendTest {public static void main(String[] args) throws Exception { // 1. 创建生产者,指定组名DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); // 2. 配置NameServer地址producer.setNamesrvAddr("192.168.109.131:9876"); // 3. 启动生产者producer.start(); // 4. 创建消息(主题、标签、消息体)Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes());// 5. 发送消息,获取结果SendResult sendResult = producer.send(msg); System.out.println(sendResult); // 6. 关闭生产者producer.shutdown(); }
}

② 异步发送代码示例

java

运行

public class RocketMQAsyncSendTest {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("myTopic", "myTag2", ("防疫政策修改").getBytes());// 异步发送,通过回调获取结果producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功:" + sendResult);}@Overridepublic void onException(Throwable e) {System.out.println("发送异常:" + e);}});TimeUnit.SECONDS.sleep(3);}producer.shutdown();}
}

③ 单向发送代码示例

java

运行

public class RocketMQOnewaySendTest {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("myTopic", "myTag3", ("防疫政策修改").getBytes());// 单向发送,无结果反馈producer.sendOneway(msg);TimeUnit.SECONDS.sleep(3);}producer.shutdown();}
}
(3)两种消费模式

RocketMQ 提供两种消费模式,满足不同业务需求:

① 负载均衡模式(默认):多个消费者共同消费队列消息,每个消费者处理不同消息,实现任务分摊,适合集群消费场景。

② 广播模式:每个消费者均接收并处理所有消息,适合通知类场景(如全量配置更新)。

消费代码示例(含广播模式配置)

java

运行

public class RocketMQReceiveTest { public static void main(String[] args) throws MQClientException { // 1. 创建消费者,指定组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");// 2. 配置NameServer地址consumer.setNamesrvAddr("192.168.109.131:9876"); // 3. 订阅主题(*表示所有标签)consumer.subscribe("myTopic", "*"); // 配置广播模式(默认负载均衡,无需此句)consumer.setMessageModel(MessageModel.BROADCASTING);// 4. 设置消息处理回调consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("Receive New Messages: " + msgs); // 返回消费成功状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5. 启动消费者consumer.start();System.out.println("Consumer Started."); }
}

三、RocketMQ 实际应用:下单发短信场景

以 “订单成功后向用户发送短信” 为例,结合 Spring Boot 集成 RocketMQ,实现微服务间的消息通信。

1. 订单微服务(生产者)实现

(1)添加依赖

xml

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version>
</dependency>
(2)配置 application.yml

yaml

rocketmq:name-server: 127.0.0.1:9876 # NameServer地址producer: group: shop-order # 生产者组名
(3)编写消息发送代码

在订单服务的下单接口中,添加消息发送逻辑:

java

运行

@Service
public class OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void createOrder(Order order) {// 1. 保存订单到数据库// ...(订单保存逻辑)// 2. 发送消息到MQrocketMQTemplate.convertAndSend("order-topic", order);System.out.println("订单创建成功,已发送消息");}
}

2. 用户微服务(消费者)实现

(1)添加依赖

同订单微服务,引入rocketmq-spring-boot-starter依赖。

(2)配置 application.yml

yaml

rocketmq:name-server: 127.0.0.1:9876 # 与生产者一致的NameServer地址
(3)编写消息接收服务

通过注解实现消息监听,接收订单消息并发送短信:

java

运行

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsService implements RocketMQListener<Order> { @Override public void onMessage(Order order) {// 处理短信发送逻辑log.info("收到订单信息:{},开始发送短信", JSON.toJSONString(order));// ...(短信发送代码)}
}

3. 测试验证

  1. 启动 NameServer、Broker 与可视化插件;
  2. 启动订单微服务与用户微服务;
  3. 调用订单创建接口,观察日志:
    • 订单服务日志显示 “订单创建成功,已发送消息”;
    • 用户服务日志显示 “收到订单信息... 开始发送短信”,表明消息传递成功。

四、总结

消息中间件通过异步通信与应用解耦,成为分布式系统的核心支撑组件。RocketMQ 凭借高可用、高可靠的特性,在电商、金融等场景中广泛应用。掌握其环境搭建、消息收发方式及实际业务集成,能有效提升系统的吞吐量与容错性。在实际开发中,需根据业务场景选择合适的消息发送方式与消费模式,结合监控工具(如可视化插件)保障消息流转的稳定性,让消息中间件真正成为分布式系统的 “通信中枢”。

http://www.dtcms.com/a/363930.html

相关文章:

  • Linux 文本处理实战手册
  • 【专栏升级】大模型应用实战并收录RAG专题,Agent专题,LLM重构数据科学流程专题,端侧AI专题,累计63篇文章
  • Redis 哨兵 (基于 Docker)
  • YOLO 目标检测:YOLOv5网络结构、Focus、CSP、自适应Anchor、激活函数SiLU、SPPF、C3
  • 3.2-C++基础组件
  • Kafka面试精讲 Day 5:Broker集群管理与协调机制
  • 深度学习-----通过本地数据实现图片识别的训练
  • PS痕迹检测器:基于深度学习的图像篡改检测
  • 撤销修改 情况⼀:对于⼯作区的代码,还没有 add
  • 浏览器内存 (JavaScript运行时内存)存储的优劣分析
  • linux(cut,sort,uniq ,tr,sed,awk)命令介绍
  • 贝叶斯定理:理解概率更新与实际场景应用
  • 在VS Code中直接操控浏览器
  • 预算紧张?这5款低代码平台免费还好用!
  • 光储充一体化智慧能源平台助力某能投公司绿色能源转型
  • 【面试场景题】如何理解设计模式
  • three.js手机端的4种旋转方式
  • 有鹿巡扫机器人:智慧清洁时代的多面手
  • (四)Python控制结构(条件结构)
  • MMORPG 游戏战斗系统架构
  • 2025互联网大厂Java后端面试:3-5年经验必问核心考点解析
  • 机器学习辅助的Backtrader资产配置优化策略
  • 【vue2】vue2.7x的项目中集成tailwind.css真的不要太香
  • Python 类的方法类型详解
  • 企业如何实现零工用工零风险?盖雅全自动化合规管控
  • 望获实时Linux:亚微秒级系统响应的实现方法
  • Qt中字节对齐问题和数据的序列化和反序列化的问题
  • springboot2.x集成swagger api(springdoc-openapi-ui)
  • 开源企业级快速开发平台(JeecgBoot)
  • python - ( js )object对象、json对象、字符串对象的相关方法、数组对象的相关方法、BOM对象、BOM模型中 Navigator 对象