消息队列MQ
概念
通过异步通信、解耦服务、缓冲流量等方式解决复杂业务场景中的问题。
MQ(Message Queue),消息队列,是一种通过存储消息来实现系统间通信的中间件,是一套提供了消息生产、存储、消费全过程 API的软件系统(消息即数据)。通俗点说,就是一个先进先出的数据结构。
关键词
-
生产者(Producer):发送消息到队列。
-
消费者(Consumer):从队列订阅并处理消息。
-
消息代理(Broker):中间件服务,负责存储、路由和传递消息
MQ应用场景
一:系统解耦
场景描述:
不同服务之间直接调用可能导致强耦合(如支付成功后需通知订单、库存、物流等多个服务),一旦某一服务宕机或逻辑变更,整个链路可能崩溃。
案例
-
电商支付成功后的下游处理:
支付服务发送“支付成功”消息到队列,订单服务更新状态,库存服务扣减库存,物流服务生成运单,各服务独立消费消息。
优势
-
容错性提升:某个消费者故障不影响其他服务。
-
灵活扩展:新增消费者无需修改生产者代码(如新增一个积分服务)。
二、异步处理
场景描述
同步处理耗时操作(如发送邮件、短信、生成报表)会阻塞主流程,导致用户等待时间过长。
最常见于,用户注册后,发送注册邮件与短信通知,告知用户注册成功。
注册功能实际只需注册系统存储用户的账户信息后,用户便可以登录,而后续的注册短信与邮件不是即时需要关注的步骤。实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返 回用户结果,由消息队列 MQ 异步地进行这些操作。
消息队列的作用
-
主流程快速完成核心操作,将非关键任务异步投递到消息队列。
-
消费者后台处理异步任务,不阻塞主线程。
案例
-
用户注册流程:
用户提交注册后,主流程写入数据库并返回成功,消息队列异步触发“发送欢迎邮件”和“初始化用户画像”。
优势
-
提升用户体验:用户无需等待所有操作完成。
-
资源合理分配:耗时任务由独立服务处理,避免占用核心业务资源。
三、流量削峰
场景描述
突发高并发流量(如秒杀、抢购)可能压垮数据库或下游服务。
消息队列的作用
-
将瞬时高并发请求转换为队列中的消息,消费者按处理能力逐步消费。
-
避免系统超负荷崩溃,实现“削峰填谷”。
案例
-
秒杀活动:
用户请求先进入消息队列排队,库存服务按顺序处理,避免数据库瞬间被击穿。
优势
-
系统稳定性:平滑处理突发流量,防止服务雪崩。
-
资源利用率:按需分配处理能力,避免过度扩容。
常见的MQ产品
ZeroMQ:
RabbitMQ:
使用 erlang 语言开发,性能较好,适合于企业级的开发,但是不利于做二次开发和维护。
ActiveMQ:
Apache 开源项目,可以和 springjms 轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
RocketMQ:
阿里巴巴的 MQ 中间件,由 java 语言开发,性能非常好,使用起来 很简单
Kafka:
Apache 下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe 消息队列系统, 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
技术 | 特点 | 适用场景 |
---|---|---|
Kafka | 高吞吐、分布式、持久化,适合日志和大数据流处理 | 实时日志收集、流数据处理 |
RabbitMQ | 支持多种协议(AMQP),灵活的路由机制,社区成熟 | 企业级复杂路由、事务消息 |
RocketMQ | 阿里开源,高可用、低延迟,支持事务和顺序消息 | 电商交易、金融场景 |
ActiveMQ | 支持 JMS 规范,兼容性好,但吞吐量较低 | 传统企业系统集成 |
Redis Stream | 基于内存,轻量级,适合简单场景 | 小型实时消息通知 |
RocketMQ
阿里巴巴开源的高性能分布式消息中间件,支持高吞吐、低延迟、高可用。
-
核心组件:
-
NameServer:轻量级注册中心,管理Broker路由信息。
-
Broker:消息存储与转发核心,支持主从架构。
-
Producer:消息生产者,发送消息到Broker。
-
Consumer:消息消费者,订阅并消费消息。
-
搭建
1.下载
从 官网https://rocketmq.apache.org/download/ 或 GitHub 下载最新版本(如 5.1.3):
wget https://archive.apache.org/dist/rocketmq/5.1.3/rocketmq-all-5.1.3-bin-release.zip
unzip rocketmq-all-5.1.3-bin-release.zip
cd rocketmq-5.1.3
2.配置环境变量
3.启动 Name Server
进入到 bin 目录输入命令: mqnamesrv.cmd
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log # 查看日志,确认启动成功
4.启动 Broker
nohup sh bin/mqbroker -c conf/broker.conf &
tail -f ~/logs/rocketmqlogs/broker.log # 确认启动成功
5.发送和接收消息测试
-
添加依赖
在 Maven 项目的pom.xml
中添加:<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.1.3</version> </dependency>
-
生产者发送消息
public class ProducerExample { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("my-producer-group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TestTopic", "Hello RocketMQ".getBytes()); SendResult result = producer.send(msg); System.out.println("发送结果: " + result); producer.shutdown(); } }
-
消费者订阅消息
public class ConsumerExample { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TestTopic", "*"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { System.out.println("收到消息: " + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.in.read(); // 阻塞等待消息 } }
控制台安装
修改配置文件
添加依赖
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
使用maven打包
mvn clean package -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

RocketMQ 的架构及概念
Broker(邮递员) | 负责消息的接收,存储,投递 等功能. |
NameServer(邮局) | 消息队列的协调者,Broker 向它注册路由信息, 同时Producer 和 Consumer 向其获取路由信息 |
Producer(寄件人) | 消息的生产者,需要从 NameServer 获取 Broker 信息, 然后与 Broker 建立连接,向 Broker 发送消 息 |
Consumer(收件人) | 消息的消费者,需要从 NameServer 获取 Broker 信息, 然后与 Broker 建立连接,从 Broker 获取消息 |
Topic(地区) | 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic 针对 Topic 来发送和接收消息 |
Message Queue(邮件) | 为了提高性能和吞吐量,引入了 Message Queue, 一个 Topic 可以设置一个或多个 Message Queue, 这样消息就可以并行往各个Message Queue 发送消息, |
Producer Group | 是多个发送同一类消息的生产者 |
Consumer Group | 消费同一类消息的多个 consumer 实例 |
三种不同类型的消息
一、可靠同步发送
特点
-
发送消息后阻塞等待 Broker 返回结果,确保消息成功到达 Broker。
-
适用场景:对消息可靠性要求高,需确保消息不丢失的场景(如支付通知)。
注意事项
-
若发送失败(如网络问题),会抛出异常(
RemotingException
、MQClientException
)。 -
可通过
SendResult
获取消息的存储位置(Broker)、队列 ID 等信息。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1. 创建生产者实例,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("SYNC_PRODUCER_GROUP");
// 2. 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 3. 启动生产者
producer.start();
try {
// 4. 创建消息对象,指定 Topic、Tag 和消息体
Message msg = new Message("TEST_TOPIC", "TAG_A", "Hello RocketMQ (Sync)".getBytes());
// 5. 同步发送消息
SendResult sendResult = producer.send(msg);
System.out.println("同步发送结果: " + sendResult);
} finally {
// 6. 关闭生产者
producer.shutdown();
}
}
}
二、可靠异步发送
特点
-
发送消息后立即返回,通过回调函数处理结果。
-
适用场景:高吞吐场景,允许一定延迟,需异步处理发送结果(如日志收集)。
注意事项
-
必须处理回调:在
onException
中实现重试或日志记录。 -
资源释放:确保回调执行完成后再关闭生产者(示例中简单使用
Thread.sleep
,生产环境需更严谨设计)。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ASYNC_PRODUCER_GROUP");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
Message msg = new Message("TEST_TOPIC", "TAG_B", "Hello RocketMQ (Async)".getBytes());
// 异步发送,注册回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送成功: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.err.println("异步发送失败: " + e.getMessage());
}
});
} finally {
// 等待回调执行完成(实际生产环境需合理设计资源释放)
Thread.sleep(3000);
producer.shutdown();
}
}
}
三、单向发送(Oneway)
特点
-
发送消息后不等待 Broker 响应,不关心发送结果。
-
适用场景:对可靠性要求低,追求极致吞吐量的场景(如日志采集)。
注意事项
-
不保证可靠性:消息可能因网络问题丢失,需结合业务容忍度使用。
-
无异常反馈:即使 Broker 不可用,也不会抛出异常。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ONEWAY_PRODUCER_GROUP");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
Message msg = new Message("TEST_TOPIC", "TAG_C", "Hello RocketMQ (Oneway)".getBytes());
// 单向发送(无返回值)
producer.sendOneway(msg);
System.out.println("单向发送完成(无结果反馈)");
} finally {
producer.shutdown();
}
}
}
三种方式的对比
特性 | 同步发送 | 异步发送 | 单向发送 |
---|---|---|---|
可靠性 | 高(等待 Broker 确认) | 高(通过回调处理结果) | 低(不确认结果) |
吞吐量 | 低 | 中 | 高 |
延迟 | 高 | 中 | 低 |
资源占用 | 高(线程阻塞) | 中(回调处理) | 低 |
适用场景 | 支付通知、订单创建 | 日志收集、批量消息 | 实时监控、日志上报 |
消费者接收测试
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class TestConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST_CONSUMER_GROUP");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TEST_TOPIC", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("收到消息: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.in.read();
}
}
测试结果:
收到消息: Hello RocketMQ (Async)
收到消息: Hello RocketMQ (Oneway)
收到消息: Hello RocketMQ (Sync)
小结:
-
同步发送:简单可靠,适合强一致性场景。
-
异步发送:平衡吞吐与可靠性,适合高并发场景。
-
单向发送:极致性能,适合可容忍丢失的非关键数据。