RocketMQ核心技术精讲-----详解消息发送样例
文章目录
- 📕1. 基本样例
- ✏️1.1 发送消息
- 🔖1.1.1 发送同步消息
- 🔖1.1.2 发送异步消息
- 🔖1.1.3 发送单项消息
- ✏️1.2 消费消息
- 🔖1.2.1 负载均衡模式
- 🔖1.2.2 广播模式
- 📕2. 顺序消息
- ✏️2.1 生产顺序消息
- ✏️2.2 消费顺序消息
- 📕3. 延时消息
- ✏️3.1 发送延时消息
- ✏️3.2 消费延时消息
- 📕4. 批量消息
- ✏️4.1 发送批量消息
- ✏️4.2 消费批量消息
- 📕5. 过滤消息
- ✏️5.1 Tag过滤消息
- ✏️5.2 SQL语法过滤消息
- 📕6. 事务消息
- ✏️6.1 流程分析
- ✏️6.2 发送事务消息
- ✏️6.3 使用限制
特此注明 :
Designed By :长安城没有风
Version:1.0
Time:2025.10.28
Location:辽宁 · 大连
导入MQ客户端依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>
消息发送者步骤分析
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
消息消费者步骤分析
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
📕1. 基本样例
✏️1.1 发送消息
🔖1.1.1 发送同步消息
public class SyscProducer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("TopicTest", // topic"TagA", // tag("Hello RocketMQ " + i).getBytes()); // body// 4.2 发送消息到brokerSendResult result = producer.send(msg);System.out.println(result);// 4.3 等待1秒,模拟生产消息的间隔Thread.sleep(1000);}// 5.关闭producerproducer.shutdown();}
}
🔖1.1.2 发送异步消息
public class AsyscProducer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("TopicTest", // topic"TagA", // tag("Hello RocketMQ " + i).getBytes()); // body// 4.2 发送消息到brokerproducer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println(throwable);}});Thread.sleep(1000);}// 5.关闭producerproducer.shutdown();}
}
🔖1.1.3 发送单项消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();for (int i = 0; i < 10; i++) {// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("TopicTest", // topic"TagA", // tag("Hello RocketMQ " + i).getBytes()); // body// 4.2 发送消息到brokerproducer.sendOneway(msg);// 4.3 等待1秒,模拟生产环境下的消息发送延迟Thread.sleep(1000);}// 5.关闭producerproducer.shutdown();}
}
✏️1.2 消费消息
🔖1.2.1 负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
public class Consumer {public static void main(String[] args) throws MQClientException {// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("base","tag2" );//设置消费模式为集群模式(负载均衡模式)consumer.setMessageModel(MessageModel.CLUSTERING);// 4.注册回调函数,处理消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (Message msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5.启动consumerconsumer.start();}
}
🔖1.2.2 广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
public class Consumer {public static void main(String[] args) throws MQClientException {// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("base","tag2" );//设置消费模式为广播模式consumer.setMessageModel(MessageModel.BROADCASTING);// 4.注册回调函数,处理消息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (Message msg : msgs) {System.out.println("Received message: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5.启动consumerconsumer.start();}
}
📕2. 顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
✏️2.1 生产顺序消息
public class Producer {public static void main(String[] args) throws Exception {List<OrderStep> orderSteps = OrderStep.create();// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();for (OrderStep order : orderSteps) {String body = System.currentTimeMillis()+order.toString();// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("OrderTopic","tag",body.getBytes(StandardCharsets.UTF_8));// 4.2 发送消息到brokerSendResult result = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {int orderId = (int) o;int index = orderId % list.size();return list.get(index);}}, order.getOrderId());System.out.println(result);// 4.3 等待1秒,模拟生产消息的间隔Thread.sleep(1000);}// 5.关闭producerproducer.shutdown();}
}
✏️2.2 消费顺序消息
public class Consumer {public static void main(String[] args) throws Exception {// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("OrderTopic", "tag");// 4.注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println("当前线程" + Thread.currentThread().getName() + " 消费消息:" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});// 5.启动consumerconsumer.start();}
}
📕3. 延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
✏️3.1 发送延时消息
public class Producer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg = new Message("DelayTopic","tag",("hello delay message"+System.currentTimeMillis()).getBytes());// 4.3 设置消息的延迟时间等级msg.setDelayTimeLevel(3);// 4.2 发送消息到brokerSendResult result = producer.send(msg);System.out.println(result);// 5.关闭producerproducer.shutdown();}
}
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
✏️3.2 消费延时消息
public class Consumer {public static void main(String[] args) throws Exception {// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("DelayTopic","tag");// 4.注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println(msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5.启动consumerconsumer.start();}
}
📕4. 批量消息
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
✏️4.1 发送批量消息
public class Producer {public static void main(String[] args) throws Exception {// 1.实例化消息生产者producerDefaultMQProducer producer = new DefaultMQProducer("producer-group");// 2.设置NameServer的地址producer.setNamesrvAddr("localhost:9876");// 3.启动producerproducer.start();// 4.1 创建消息实例,参数1:topic,参数2:tag,参数3:消息体Message msg1 = new Message("BatchTopic","tag",("hello batch message1"+System.currentTimeMillis()).getBytes());Message msg2 = new Message("BatchTopic","tag",("hello batch message2"+System.currentTimeMillis()).getBytes());Message msg3 = new Message("BatchTopic","tag",("hello batch message3"+System.currentTimeMillis()).getBytes());List<Message> msgs = List.of(msg1,msg2,msg3);// 4.2 发送消息到brokerSendResult result = producer.send(msgs);System.out.println(result);// 5.关闭producerproducer.shutdown();}
}
✏️4.2 消费批量消息
public class Consumer {public static void main(String[] args) throws Exception{// 1.实例化消息消费者consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 2.设置NameServer的地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅topicconsumer.subscribe("BatchTopic","tag");// 4.注册消息监听器consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.println(msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 5.启动consumerconsumer.start();}
}
📕5. 过滤消息
✏️5.1 Tag过滤消息
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择你想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。
✏️5.2 SQL语法过滤消息
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
📕6. 事务消息
✏️6.1 流程分析

- 发送消息(half消息)。
- 服务端响应消息写入结果。
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
- 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”(事务补偿)
- Producer收到回查消息,检查回查消息对应的本地事务的状态,根据本地事务状态,重新Commit或者Rollback。
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
✏️6.2 发送事务消息
public class Producer {public static void main(String[] args) throws Exception {// 1. 创建消息生产者TransactionMQProducer transactionMQProducer = new TransactionMQProducer("transaction_producer_group");// 2. 设置NameServer地址transactionMQProducer.setNamesrvAddr("localhost:9876");// 3. 设置事务监听器transactionMQProducer.setTransactionListener(new TransactionListener() {/*** 执行本地事务* @param msg 消息* @param arg 参数* @return 本地事务状态*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {if ("TagA".equals(msg.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if ("TagB".equals(msg.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}/*** 检查本地事务状态* @param msg 消息* @return 本地事务状态*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {if ("TagC".equals(msg.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}});// 4. 启动消息生产者transactionMQProducer.start();String[] tags = {"TagA", "TagB", "TagC"};for (String tag : tags) {Message message = new Message("transaction_topic", tag, ("Hello RocketMQ " + tag).getBytes());// 发送消息transactionMQProducer.sendMessageInTransaction(message, null);}// 5.关闭消息生产者transactionMQProducer.shutdown();}
}
✏️6.3 使用限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
