当前位置: 首页 > news >正文

消息队列MQ

概念

通过异步通信、解耦服务、缓冲流量等方式解决复杂业务场景中的问题。

MQ(Message Queue),消息队列,是一种通过存储消息来实现系统间通信的中间件,是一套提供了消息生产、存储、消费全过程 API的软件系统(消息即数据)。通俗点说,就是一个先进先出的数据结构。

 关键词

  • 生产者(Producer):发送消息到队列。

  • 消费者(Consumer):从队列订阅并处理消息。

  • 消息代理(Broker):中间件服务,负责存储、路由和传递消息

MQ应用场景

一:系统解耦

场景描述:

        不同服务之间直接调用可能导致强耦合(如支付成功后需通知订单、库存、物流等多个服务),一旦某一服务宕机或逻辑变更,整个链路可能崩溃。

案例
  • 电商支付成功后的下游处理
    支付服务发送“支付成功”消息到队列,订单服务更新状态,库存服务扣减库存,物流服务生成运单,各服务独立消费消息。

优势
  • 容错性提升:某个消费者故障不影响其他服务。

  • 灵活扩展:新增消费者无需修改生产者代码(如新增一个积分服务)。

二、异步处理

场景描述

同步处理耗时操作(如发送邮件、短信、生成报表)会阻塞主流程,导致用户等待时间过长。

最常见于,用户注册后,发送注册邮件与短信通知,告知用户注册成功。

注册功能实际只需注册系统存储用户的账户信息后,用户便可以登录,而后续的注册短信与邮件不是即时需要关注的步骤。实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返 回用户结果,由消息队列 MQ 异步地进行这些操作。

消息队列的作用
  • 主流程快速完成核心操作,将非关键任务异步投递到消息队列。

  • 消费者后台处理异步任务,不阻塞主线程。

案例
  • 用户注册流程
    用户提交注册后,主流程写入数据库并返回成功,消息队列异步触发“发送欢迎邮件”和“初始化用户画像”。

优势
  • 提升用户体验:用户无需等待所有操作完成。

  • 资源合理分配:耗时任务由独立服务处理,避免占用核心业务资源。

三、流量削峰

场景描述

突发高并发流量(如秒杀、抢购)可能压垮数据库或下游服务。

消息队列的作用
  • 将瞬时高并发请求转换为队列中的消息,消费者按处理能力逐步消费。

  • 避免系统超负荷崩溃,实现“削峰填谷”。

案例
  • 秒杀活动
    用户请求先进入消息队列排队,库存服务按顺序处理,避免数据库瞬间被击穿。

优势
  • 系统稳定性:平滑处理突发流量,防止服务雪崩。

  • 资源利用率:按需分配处理能力,避免过度扩容。

常见的MQ产品

ZeroMQ:

号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,采用 C 语言 实现
ZeroMQ 仅提供非持久性的队列,也就是说如果 down 机,数据将会丢失。

RabbitMQ:

使用 erlang 语言开发,性能较好,适合于企业级的开发,但是不利于做二次开发和维护。

ActiveMQ:

Apache 开源项目,可以和 springjms 轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。

RocketMQ:

阿里巴巴的 MQ 中间件,由 java 语言开发,性能非常好,使用起来 很简单

Kafka:

Apache 下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe 消息队列系统, 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

技术特点适用场景
Kafka高吞吐、分布式、持久化,适合日志和大数据流处理实时日志收集、流数据处理
RabbitMQ支持多种协议(AMQP),灵活的路由机制,社区成熟企业级复杂路由、事务消息
RocketMQ阿里开源,高可用、低延迟,支持事务和顺序消息电商交易、金融场景
ActiveMQ支持 JMS 规范,兼容性好,但吞吐量较低传统企业系统集成
Redis Stream基于内存,轻量级,适合简单场景小型实时消息通知

 RocketMQ

阿里巴巴开源的高性能分布式消息中间件,支持高吞吐、低延迟、高可用。

  • 核心组件

    • NameServer:轻量级注册中心,管理Broker路由信息。

    • Broker:消息存储与转发核心,支持主从架构。

    • Producer:消息生产者,发送消息到Broker。

    • Consumer:消息消费者,订阅并消费消息。

搭建 

1.下载

从 官网https://rocketmq.apache.org/download/ 或 GitHub 下载最新版本(如 5.1.3):

wget https://archive.apache.org/dist/rocketmq/5.1.3/rocketmq-all-5.1.3-bin-release.zip
unzip rocketmq-all-5.1.3-bin-release.zip
cd rocketmq-5.1.3

2.配置环境变量 

ROCKETMQ_HOME=D:\ProgramFiles\rocketmq-4.9.3
NAMESRV_ADDR =127.0.0.1:9876

3.启动 Name Server

进入到 bin 目录输入命令: mqnamesrv.cmd

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log  # 查看日志,确认启动成功

4.启动 Broker 

进入到 bin 目录输入命令:
mqbroker.cmd -n 127.0.0.1:9876 atuoCreateTopicEnable=true
nohup sh bin/mqbroker -c conf/broker.conf &
tail -f ~/logs/rocketmqlogs/broker.log  # 确认启动成功

 5.发送和接收消息测试

  1. 添加依赖
    在 Maven 项目的 pom.xml 中添加:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>5.1.3</version>
    </dependency>
  2. 生产者发送消息

    public class ProducerExample {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();
            
            Message msg = new Message("TestTopic", "Hello RocketMQ".getBytes());
            SendResult result = producer.send(msg);
            System.out.println("发送结果: " + result);
            
            producer.shutdown();
        }
    }
  3. 消费者订阅消息

    public class ConsumerExample {
        public static void main(String[] args) throws Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe("TestTopic", "*");
            
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            
            consumer.start();
            System.in.read();  // 阻塞等待消息
        }
    }

控制台安装

修改配置文件

添加依赖

<dependency>
    <groupId>javax.xml.bind</groupId>
    <artifactId>jaxb-api</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.sun.xml.bind</groupId>
    <artifactId>jaxb-impl</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.sun.xml.bind</groupId>
    <artifactId>jaxb-core</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>javax.activation</groupId>
    <artifactId>activation</artifactId>
    <version>1.1.1</version>
</dependency>

使用maven打包

mvn clean package -Dmaven.test.skip=true
启动控制台
 java -jar rocketmq-console-ng-1.0.0.jar
访问: http://127.0.0.1:6060

 RocketMQ 的架构及概念

Broker(邮递员)

负责消息的接收,存储,投递 等功能.

NameServer(邮局)

消息队列的协调者,Broker 向它注册路由信息,

同时Producer 和 Consumer 向其获取路由信息

Producer(寄件人)

消息的生产者,需要从 NameServer 获取 Broker 信息,

然后与 Broker 建立连接,向 Broker 发送消 息

Consumer(收件人)

消息的消费者,需要从 NameServer 获取 Broker 信息,

然后与 Broker 建立连接,从 Broker 获取消息

Topic(地区)

用来区分不同类型的消息,发送和接收消息前都需要先创建Topic

针对 Topic 来发送和接收消息

Message Queue(邮件)

为了提高性能和吞吐量,引入了 Message Queue,

一个 Topic 可以设置一个或多个 Message Queue,

这样消息就可以并行往各个Message Queue 发送消息,

Producer Group

是多个发送同一类消息的生产者

Consumer Group

消费同一类消息的多个 consumer 实例

三种不同类型的消息

一、可靠同步发送

特点
  • 发送消息后阻塞等待 Broker 返回结果,确保消息成功到达 Broker。

  • 适用场景:对消息可靠性要求高,需确保消息不丢失的场景(如支付通知)。

注意事项
  • 若发送失败(如网络问题),会抛出异常(RemotingExceptionMQClientException)。

  • 可通过 SendResult 获取消息的存储位置(Broker)、队列 ID 等信息。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建生产者实例,指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("SYNC_PRODUCER_GROUP");
        // 2. 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 3. 启动生产者
        producer.start();

        try {
            // 4. 创建消息对象,指定 Topic、Tag 和消息体
            Message msg = new Message("TEST_TOPIC", "TAG_A", "Hello RocketMQ (Sync)".getBytes());
            // 5. 同步发送消息
            SendResult sendResult = producer.send(msg);
            System.out.println("同步发送结果: " + sendResult);
        } finally {
            // 6. 关闭生产者
            producer.shutdown();
        }
    }
}

二、可靠异步发送

特点
  • 发送消息后立即返回,通过回调函数处理结果。

  • 适用场景:高吞吐场景,允许一定延迟,需异步处理发送结果(如日志收集)。

注意事项
  • 必须处理回调:在 onException 中实现重试或日志记录。

  • 资源释放:确保回调执行完成后再关闭生产者(示例中简单使用 Thread.sleep,生产环境需更严谨设计)。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ASYNC_PRODUCER_GROUP");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        try {
            Message msg = new Message("TEST_TOPIC", "TAG_B", "Hello RocketMQ (Async)".getBytes());
            // 异步发送,注册回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("异步发送成功: " + sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.err.println("异步发送失败: " + e.getMessage());
                }
            });
        } finally {
            // 等待回调执行完成(实际生产环境需合理设计资源释放)
            Thread.sleep(3000);
            producer.shutdown();
        }
    }
}

三、单向发送(Oneway)

特点
  • 发送消息后不等待 Broker 响应,不关心发送结果。

  • 适用场景:对可靠性要求低,追求极致吞吐量的场景(如日志采集)。

注意事项
  • 不保证可靠性:消息可能因网络问题丢失,需结合业务容忍度使用。

  • 无异常反馈:即使 Broker 不可用,也不会抛出异常。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ONEWAY_PRODUCER_GROUP");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        try {
            Message msg = new Message("TEST_TOPIC", "TAG_C", "Hello RocketMQ (Oneway)".getBytes());
            // 单向发送(无返回值)
            producer.sendOneway(msg);
            System.out.println("单向发送完成(无结果反馈)");
        } finally {
            producer.shutdown();
        }
    }
}

三种方式的对比

特性同步发送异步发送单向发送
可靠性高(等待 Broker 确认)高(通过回调处理结果)低(不确认结果)
吞吐量
延迟
资源占用高(线程阻塞)中(回调处理)
适用场景支付通知、订单创建日志收集、批量消息实时监控、日志上报

消费者接收测试

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

public class TestConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST_CONSUMER_GROUP");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TEST_TOPIC", "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.println("收到消息: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.in.read();
    }
}

测试结果: 

收到消息: Hello RocketMQ (Async)
收到消息: Hello RocketMQ (Oneway)
收到消息: Hello RocketMQ (Sync)

小结:

  • 同步发送:简单可靠,适合强一致性场景。

  • 异步发送:平衡吞吐与可靠性,适合高并发场景。

  • 单向发送:极致性能,适合可容忍丢失的非关键数据。

相关文章:

  • 【初探数据结构】二叉树的顺序结构——堆的实现详解(上下调整算法的时间复杂度分析)
  • 使用位置控件
  • 自动化测试定位元素方法成功率排行
  • PicGo安装与配置-Gitee图床
  • AI工具如何改变编程学习?Trae IDE与Claude 3.5的实践案例
  • JDK 24 发布,新特性解读!
  • 用 Pinia 点燃 Vue 3 应用:状态管理革新之旅
  • STM32F4与串口屏通信
  • C++项目——内存池
  • dockerSDK-Go语言实现
  • dfs刷题排列问题 + 子集问题 + 组和问题总结
  • 【Java SE】单例设计模式
  • TNNLS 2024 | 基于残差超密集网络的高光谱图像空间光谱融合方法
  • 【Java基础】在Java中,一个线程的大小(即线程所占用的内存)是多少
  • 关于FastAPI框架的面试题及答案解析
  • 如何在 Flutter 中使用 WebRTC
  • 从零开始学3PC:分布式事务的进阶方案
  • HarmonyOS第23天:应用性能优化,解锁流畅体验密码
  • 当下主流 AI 模型对比:ChatGPT、DeepSeek、Grok 及其他前沿技术
  • 51单片机笔记
  • 上海乐高乐园度假区将于7月5日开园
  • 短剧剧组在贵州拍戏突遇极端天气,演员背部、手臂被冰雹砸伤
  • 胖东来关闭官网内容清空?工作人员:后台维护升级
  • 强沙尘暴压城近万名游客被困,敦煌如何用3小时跑赢12级狂风?
  • 以总理:在加沙地带扩大的军事行动将是“高强度”的
  • 五一假期上海接待游客1650万人次,全要素旅游交易总额超200亿元