当前位置: 首页 > news >正文

RocketMQ核心技术精讲-----详解消息发送样例

文章目录

    • 📕1. 基本样例
        • ✏️1.1 发送消息
            • 🔖1.1.1 发送同步消息
            • 🔖1.1.2 发送异步消息
            • 🔖1.1.3 发送单项消息
        • ✏️1.2 消费消息
            • 🔖1.2.1 负载均衡模式
            • 🔖1.2.2 广播模式
    • 📕2. 顺序消息
        • ✏️2.1 生产顺序消息
        • ✏️2.2 消费顺序消息
    • 📕3. 延时消息
        • ✏️3.1 发送延时消息
        • ✏️3.2 消费延时消息
    • 📕4. 批量消息
        • ✏️4.1 发送批量消息
        • ✏️4.2 消费批量消息
    • 📕5. 过滤消息
        • ✏️5.1 Tag过滤消息
        • ✏️5.2 SQL语法过滤消息
    • 📕6. 事务消息
        • ✏️6.1 流程分析
        • ✏️6.2 发送事务消息
        • ✏️6.3 使用限制

特此注明 :
Designed By :长安城没有风
Version:1.0
Time:2025.10.28
Location:辽宁 · 大连

导入MQ客户端依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>

消息发送者步骤分析

1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer

消息消费者步骤分析

1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer

📕1. 基本样例

✏️1.1 发送消息
🔖1.1.1 发送同步消息
public class SyscProducer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("TopicTest", // topic"TagA", // tag("Hello RocketMQ " + i).getBytes()); // body// 4.2 发送消息到brokerSendResult result = producer.send(msg);System.out.println(result);// 4.3 等待1秒,模拟生产消息的间隔Thread.sleep(1000);}// 5.关闭producerproducer.shutdown();}
}
🔖1.1.2 发送异步消息
public class AsyscProducer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("TopicTest", // topic"TagA", // tag("Hello RocketMQ " + i).getBytes()); // body// 4.2 发送消息到brokerproducer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println(throwable);}});Thread.sleep(1000);}// 5.关闭producerproducer.shutdown();}
}
🔖1.1.3 发送单项消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class OnewayProducer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("TopicTest", // topic"TagA", // tag("Hello RocketMQ " + i).getBytes()); // body// 4.2 发送消息到brokerproducer.sendOneway(msg);// 4.3 等待1秒,模拟生产环境下的消息发送延迟Thread.sleep(1000);}// 5.关闭producerproducer.shutdown();}
}
✏️1.2 消费消息
🔖1.2.1 负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同

public class Consumer {public static void main(String[] args) throws MQClientException {// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("base","tag2" );//设置消费模式为集群模式(负载均衡模式)consumer.setMessageModel(MessageModel.CLUSTERING);// 4.注册回调函数,处理消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (Message msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5.启动consumerconsumer.start();}
}
🔖1.2.2 广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的

public class Consumer {public static void main(String[] args) throws MQClientException {// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("base","tag2" );//设置消费模式为广播模式consumer.setMessageModel(MessageModel.BROADCASTING);// 4.注册回调函数,处理消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (Message msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5.启动consumerconsumer.start();}
}

📕2. 顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

✏️2.1 生产顺序消息
public class Producer {public static void main(String[] args) throws Exception {List<OrderStep> orderSteps = OrderStep.create();// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();for (OrderStep order : orderSteps) {String body = System.currentTimeMillis()+order.toString();// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("OrderTopic","tag",body.getBytes(StandardCharsets.UTF_8));// 4.2 发送消息到brokerSendResult result = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {int orderId = (int) o;int index = orderId % list.size();return list.get(index);}}, order.getOrderId());System.out.println(result);// 4.3 等待1秒,模拟生产消息的间隔Thread.sleep(1000);}// 5.关闭producerproducer.shutdown();}
}
✏️2.2 消费顺序消息
public class Consumer {public static void main(String[] args) throws Exception {// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("OrderTopic", "tag");// 4.注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println("当前线程" + Thread.currentThread().getName() + " 消费消息:" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});// 5.启动consumerconsumer.start();}
}

📕3. 延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

✏️3.1 发送延时消息
public class Producer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("DelayTopic","tag",("hello delay message"+System.currentTimeMillis()).getBytes());// 4.3 设置消息的延迟时间等级msg.setDelayTimeLevel(3);// 4.2 发送消息到brokerSendResult result = producer.send(msg);System.out.println(result);// 5.关闭producerproducer.shutdown();}
}

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
✏️3.2 消费延时消息
public class Consumer {public static void main(String[] args) throws Exception {// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("DelayTopic","tag");// 4.注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println(msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5.启动consumerconsumer.start();}
}

📕4. 批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

✏️4.1 发送批量消息
public class Producer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg1 = new Message("BatchTopic","tag",("hello batch message1"+System.currentTimeMillis()).getBytes());Message msg2 = new Message("BatchTopic","tag",("hello batch message2"+System.currentTimeMillis()).getBytes());Message msg3 = new Message("BatchTopic","tag",("hello batch message3"+System.currentTimeMillis()).getBytes());List<Message> msgs = List.of(msg1,msg2,msg3);// 4.2 发送消息到brokerSendResult result = producer.send(msgs);System.out.println(result);// 5.关闭producerproducer.shutdown();}
}
✏️4.2 消费批量消息
public class Consumer {public static void main(String[] args) throws Exception{// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("BatchTopic","tag");// 4.注册消息监听器consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.println(msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5.启动consumerconsumer.start();}
}

📕5. 过滤消息

✏️5.1 Tag过滤消息

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择你想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。

✏️5.2 SQL语法过滤消息

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

📕6. 事务消息

✏️6.1 流程分析

在这里插入图片描述

  1. 发送消息(half消息)。
  2. 服务端响应消息写入结果。
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
  4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
  5. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”(事务补偿)
  6. Producer收到回查消息,检查回查消息对应的本地事务的状态,根据本地事务状态,重新Commit或者Rollback。

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  1. TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  2. TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  3. TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
✏️6.2 发送事务消息
public class Producer {public static void main(String[] args) throws Exception {// 1. 创建消息生产者TransactionMQProducer transactionMQProducer = new TransactionMQProducer("transaction_producer_group");// 2. 设置NameServer地址transactionMQProducer.setNamesrvAddr("localhost:9876");// 3. 设置事务监听器transactionMQProducer.setTransactionListener(new TransactionListener() {/*** 执行本地事务* @param msg 消息* @param arg 参数* @return 本地事务状态*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {if ("TagA".equals(msg.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if ("TagB".equals(msg.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}/*** 检查本地事务状态* @param msg 消息* @return 本地事务状态*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {if ("TagC".equals(msg.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}});// 4. 启动消息生产者transactionMQProducer.start();String[] tags = {"TagA", "TagB", "TagC"};for (String tag : tags) {Message message = new Message("transaction_topic", tag, ("Hello RocketMQ " + tag).getBytes());// 发送消息transactionMQProducer.sendMessageInTransaction(message, null);}// 5.关闭消息生产者transactionMQProducer.shutdown();}
}
✏️6.3 使用限制
  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
http://www.dtcms.com/a/540667.html

相关文章:

  • 解锁 PySpark SQL 的强大功能:有关 App Store 数据的端到端教程
  • MousePlus(鼠标增强工具) 中文绿色版
  • 源码学习:MyBatis源码深度解析与实战
  • RAG项目中知识库的检索优化
  • Java IO 流之转换流:InputStreamReader/OutputStreamWriter(字节与字符的桥梁)
  • 熊掌号做网站推广的注意事项品牌网页
  • shell脚本curl命令发送钉钉通知(加签方式)——筑梦之路
  • [无人机sdk] AdvancedSensing | 获取实时视频流 | VGA分辨率
  • 海康相机通过透明通道控制串口收发数据
  • 建网站科技公司做校服的网站
  • 设计模式简介
  • PyTorch torch.unique() 基础与实战
  • 【图像处理基石】图像滤镜的算法原理:从基础到进阶的技术解析
  • 信宜网站建设网站开发配置表格
  • 提示词(Prompt)——指令型提示词在大模型中的调用(以 Qwen 模型为例)
  • python-88-实时消费kafka数据批量追加写入CSV文件
  • 提示词(Prompt)——链式思维提示词(Chain-of-Thought Prompting)在大模型中的调用(以 Qwen 模型为例)
  • 用三个面中心点求解长方体位姿:从几何直觉到线性代数实现
  • 网站备案ip查询网站做网站首页ps分辨率多少
  • 免费建一级域名网站千锋教育广州校区
  • CSS3属性(三)
  • 开源底盘+机械臂机器人:Lekiwi驱动链路分析
  • 通过 useEventBus 和 useEventCallBack 实现与原生 Android、鸿蒙、iOS 的事件交互
  • iOS 26 iPhone 使用记录分析 多工具组合构建全方位设备行为洞察体系
  • 【Unity】HTModuleManager(三)Markdown语法的Unity编辑器方言
  • 如何将安卓手机备份到电脑?7种方法
  • 基于SpringBoot+Vue的购物商城(支付宝沙盒支付、物流快递API、WebSocket及时通讯、协同过滤算法、Echarts图形化分析)
  • MYSQL-超全基础以及用法--仅个人的速记笔记(1)
  • 31、LangChain开发框架(八)-- LangChain 数据分析智能体实战
  • 建设局域网网站盐城市亭湖区城乡建设网站