SprigBoot整合rocketmq-v5-client-spring-boot
- 安装RocketMQ 服务端
Apache RocketMQ官方网站
5.X文档地址
rocketmq-v5-client-spring-boot官方示例
rocketmq-v5-client-spring-boot-starter maven仓库
RocketMQ服务端 的安装包分为两种,二进制包和源码包。这里以5.3.2版本做示例 。 点击这里 下载 Apache RocketMQ 5.3.2的源码包。你也可以从这里 下载到二进制包。二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。
本文整合的rocketmq-v5-client-spring-boot 版本2.3.3 , 内部引用的是rocketmq-client-java , 版本5.0.7,使用的是gRPC 协议 , 使用前建议先把官方文档与示例看一下,使用的Java环境是openjdk-17
建议使用jdk11以上版本
ubuntu安装RocketMQ
查看jdk版本
java -version
如果没安装jdk的话,在root账号下执行以下命令
apt-get upgradeapt-get updateapt install openjdk-17-jdk
创建文件夹
cd /usr/local
mkdir rocketmq
cd rocketmq
下载RocketMQ二进制包(我这里使用的是5.3.2版本,使用其他版本可前往官方下载)
wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.2/rocketmq-all-5.3.2-bin-release.zip
解压 没有unzip的解压命令 , ubuntu会提示,根据提示安装unzip插件
unzip rocketmq-all-5.3.2-bin-release.zip
进入bin目录
调整runserver.sh的内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
调整runbroker.sh内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
修改/conf/broker.conf配置文件,这里主要是为了测试方便我们放开自动创建Topic的配置,加入以下配置(经过测试5.0以上版本不支持自动创建主题topic)
# 开启自动创建 Topic 加不加都行
autoCreateTopicEnable=true
#内网ip namesrvAddr:nameSrv地址 公网访问设置公网IP 内网访问设置内网IP 以下所有IP需一致
namesrvAddr=192.168.3.86:9876
#brokerIP1:broker也需要一个ip 内网或公网
brokerIP1=192.168.3.86
配置 NameServer 的环境变量
配置环境
vim /etc/profile
添加以下配置
#MQ安装位置
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-5.3.2
#MQ公网或内网ip 公网访问设置公网IP 内网访问设置内网IP
export NAMESRV_ADDR=192.168.3.86:9876
重新编译文件生效
source /etc/profile
修改完后,我们就可以启动 RocketMQ 的 NameServer 了
启动 namesrv
nohup sh bin/mqnamesrv &
验证
# 验证 namesrv 是否启动成功
tail -f -n 500 mqnamesrv.log
...
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876# 或者是
tail -f ~/logs/rocketmqlogs/namesrv.log
启动 Broker 消息存储中心和 Proxy 代理
# 启动(不使用代理)
nohup sh bin/mqbroker -n 192.168.3.86:9876 >mqbroker.log 2>&1 &# 启动 Broker+Proxy
nohup sh bin/mqbroker -n 192.168.3.86:9876 --enable-proxy &# 推荐使用 指定配置文件启动(broker默认使用的端口是10911,我们也可以在配置文件修改端口)
nohup sh bin/mqbroker -n 192.168.3.86:9876 -c conf/broker.conf --enable-proxy &
# 验证是否启动成功
tail -n 500 nohup.out
tail -f ~/logs/rocketmqlogs/broker.log
tail -f ~/logs/rocketmqlogs/proxy.log
Wed May 14 12:41:41 CST 2025 rocketmq-proxy startup successfully
使用tail -f ~/logs/rocketmqlogs/broker.log 查看日志如果提示以下
The default acl dir /usr/local/rocketmq/rocketmq-all-5.3.2/conf/acl is not exist
需要切换conf目录下 新建acl文件夹就行了
mkdir acl
由于v5可参考的文档太少,这个报错我也没找到为什么源码包里会少一个acl的文件夹,有知道的希望留言告知
测试消息收发
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
可以通过 mqadmin 命令创建
Admin Tool官方命令工具
注意 TestTopic 是topic名称
sh bin/mqadmin updatetopic -n 192.168.3.86:9876 -t TestTopic -c DefaultCluster
打印
create topic to 192.168.3.86:10911 success.
TopicConfig [topicName=TestTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
- 安装RocketMQ Dashboard 可视化
官方介绍
按照官方安装运行就可以
需要注意的点,IP需要跟上面设置的IP一致,防火墙开通8080,8081,10911,9876
注意:默认端口为:8080,不修改会跟Proxy端口冲突,Proxy端口默认的也是8080
我这里修改了RocketMQ Dashboard的默认端口,改成8082
可以在本地运行,也可以打包运行,我是打包运行的,运行成功后访问:http://192.168.3.86:8082
可以先添加主题,(研究了几天,没研究自动添加的方法 , 如果那位大佬研究出来了可以给我分享一下)
点击提交就行了。研究了源码,这个添加跟更新是一个接口。提交之后就可以敲代码了(有研究出来能自动加载Topic的麻烦留言)
如果启动报错,需添加 topic
CODE: 17 DESC: No topic route info in name server for the topic: delay-topic
- 整合rocketmq-client-java
先用官方原生 rocketmq-client-java JDK
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.7</version></dependency>
官方示例代码可以去看看
发送普通消息
package com.example.mq.producer;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;public class ProducerExample {private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws ClientException {// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。String endpoint = "192.168.3.86:8081";// 消息发送的目标Topic名称,需要提前创建。String topic = "TestTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")// 消息体。.setBody("你好,mq".getBytes()).build();try {// 发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (ClientException e) {logger.error("Failed to send message", e);}// producer.close();}
}
订阅
package com.example.mq.consumer;import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class PushConsumerExample {private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);private PushConsumerExample() {}public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。String endpoints = "192.168.3.86:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 订阅消息的过滤规则,表示订阅所有Tag的消息。String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建。String consumerGroup = "YourConsumerGroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建。String topic = "TestTopic";// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者分组。.setConsumerGroup(consumerGroup)// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(messageView -> {// 处理消息并返回消费结果。logger.info("Consume message successfully, messageId={}", messageView.getMessageId());ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();logger.info("Consume message successfully, body={}", message);return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);// 如果不需要再使用 PushConsumer,可关闭该实例。// pushConsumer.close();}
}
4.整合rocketmq-v5-client-spring-boot
整合SpringBoot rocketmq-v5-client-spring-boot
注意原生的rocketmq-client-java需要注释掉,rocketmq-v5-client-spring-boot已经引入了,不注释会jar包冲突
引入maven
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-v5-client-spring-boot</artifactId><version>2.3.3</version>
</dependency>
官方示例代码
先创建变量
public class RocketMQVariable {/*** 普通消息队列*/public static final String NORMAL_TOPIC = "normal-topic";// 如果使用负载均衡模式 需要设置相同的消费组名public static final String NORMAL_GROUP = "normal-group";// 处理广播消费模式使用 public static final String NORMAL1_GROUP = "normal1-group";/*** 异步普通消息队列*/public static final String ASYNC_NORMAL_TOPIC = "async-normal-topic";public static final String ASYNC_NORMAL_GROUP = "async-normal-group";/*** 顺序消息队列*/public static final String FIFO_TOPIC = "fifo-topic";public static final String FIFO_GROUP = "fifo-group";/*** 定时/延时消息队列*/public static final String DELAY_TOPIC = "delay-topic";public static final String DELAY_GROUP = "delay-group";/*** 事务消息队列*/public static final String TRANSACTION_TOPIC = "transaction-topic";public static final String TRANSACTION_GROUP = "transaction-group";}
注意:
如果使用负载均衡模式 需设置相同的Topic 相同的group
如果使用广播消费模式 需设置相同的Topic 不同的group
编写工具类
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.Resource;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.common.Pair;
import org.apache.rocketmq.client.core.RocketMQClientTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Component
public class RocketMQV2Service {private static final Logger log = LoggerFactory.getLogger(RocketMQV2Service.class);@Resourceprivate RocketMQClientTemplate template;/*** 发送普通消息** @param topic* @param message*/public void syncSendNormalMessage(String topic, Object message) {SendReceipt sendReceipt = template.syncSendNormalMessage(topic, message);log.info("普通消息发送完成:topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt);}/*** 发送异步普通消息** @param topic* @param message*/public void asyncSendNormalMessage(String topic, Object message) {CompletableFuture<SendReceipt> future = new CompletableFuture<>();ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();future.whenCompleteAsync((sendReceipt, throwable) -> {if (null != throwable) {log.error("发送消息失败", throwable);return;}log.info("发送异步消息消费成功5, messageId={}", sendReceipt.getMessageId());}, sendCallbackExecutor);CompletableFuture<SendReceipt> completableFuture = template.asyncSendNormalMessage(topic, message, future);log.info("发送异步消息成功1, topic={}, message = {}, sendReceipt={}", topic, message, completableFuture);}/*** 发送顺序消息** @param topic* @param message* @param messageGroup*/public void syncSendFifoMessage(String topic, Object message, String messageGroup) {SendReceipt sendReceipt = template.syncSendFifoMessage(topic, message, messageGroup);log.info("顺序消息发送完成:topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt);}/*** 发送延时消息** @param topic* @param message* @param delay 单位:秒*/public void syncSendDelayMessage(String topic, Object message, Long delay) {SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, Duration.ofSeconds(delay));log.info("延时消息发送完成 :topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt);}/*** 发送延时消息** @param topic* @param message* @param duration Duration.ofSeconds(秒) Duration.ofMinutes(分钟) Duration.ofHours(小时)*/public void syncSendDelayMessage(String topic, Object message, Duration duration) {SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, duration);log.info("延时消息发送完成 :topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt);}/*** 发送事务消息** @param topic* @param message* @throws ClientException*/public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) {try {Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message);SendReceipt sendReceipt = pair.getSendReceipt();Transaction transaction = pair.getTransaction();log.info("事务消息发送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));log.info("消息id : {} ", sendReceipt.getMessageId());//如果这里提交了事务 if (doLocalTransaction(1)) {log.info("本地事务执行成功");transaction.commit();} else {log.info("本地事务执行失败");transaction.rollback();}return pair;} catch (ClientException e) {throw new RuntimeException(e);}}boolean doLocalTransaction(int number) {// 本地事务逻辑 数据库操作log.info("执行本地事务 : {}", number);return number > 5;}
}
测试代码
yml配置
rocketmq:producer:endpoints: 192.168.3.86:8081topic:push-consumer:endpoints: 192.168.3.86:8081access-key:secret-key:topic:tag: "*"
发送消息
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.util.RocketMQV2Service;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.common.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.Duration;@Slf4j
@RestController
@RequestMapping("/send")
public class SendController {@Autowiredprivate RocketMQV2Service rocketMQV2Service;/*** 普通消息** @return*/@GetMapping("/normal.message")public String normalMessage() {rocketMQV2Service.syncSendNormalMessage(RocketMQVariable.NORMAL_TOPIC, "hello RocketMQ 这是普通消息");return "发送成功";}/*** 异步普通消息** @return*/@GetMapping("/async.normal.message")public String asyncSendNormalMessageNormalMessage() {rocketMQV2Service.asyncSendNormalMessage(RocketMQVariable.ASYNC_NORMAL_TOPIC, "hello RocketMQ 这是异步普通消息");return "发送成功";}/*** 顺序消息** @return*/@GetMapping("/flfo.message")public String flfoMessage() {for (int i = 0; i < 20; i++) {rocketMQV2Service.syncSendFifoMessage(RocketMQVariable.FIFO_TOPIC, "hello RocketMQ 这是顺序消息" + i, RocketMQVariable.FIFO_GROUP);}return "发送成功";}/*** 定时/延时消息** @return*/@GetMapping("/delay.message")public String delayMessage() {rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 这是30秒定时消息", Duration.ofSeconds(30));rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 这是10秒定时消息 ", 10l);rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 这是1分钟定时消息", Duration.ofMinutes(1));return "发送成功";}/*** 事务消息** @return*/@GetMapping("/transaction.message")public String transactionMessage() throws ClientException {Pair<SendReceipt, Transaction> sendReceiptTransactionPair = rocketMQV2Service.sendMessageInTransaction(RocketMQVariable.TRANSACTION_TOPIC, "hello RocketMQ 这是事务消息");Transaction transaction = sendReceiptTransactionPair.getTransaction();SendReceipt sendReceipt = sendReceiptTransactionPair.getSendReceipt();MessageId messageId = sendReceipt.getMessageId();log.info("事务消息发送完成 messageId = {}", messageId);log.info("事务消息发送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));return "发送成功";}}
普通消息(广播模式)
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,consumerGroup = RocketMQVariable.NORMAL_GROUP)
public class PushConsumerNormalService implements RocketMQListener {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("普通消息, messageView={}", messageView);ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();log.info("普通消息, message={}", message);Map<String, String> properties = messageView.getProperties();log.info("普通消息, properties={}", JSONObject.toJSONString(properties));if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {log.info("普通消息, message={}", messageView);return ConsumeResult.FAILURE;}return ConsumeResult.SUCCESS;}
}
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,consumerGroup = RocketMQVariable.NORMAL1_GROUP)
public class PushConsumerNormal1Service implements RocketMQListener {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("普通消息1, messageView={}", messageView);ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();log.info("普通消息1, message={}", message);Map<String, String> properties = messageView.getProperties();log.info("普通消息1, properties={}", JSONObject.toJSONString(properties));if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {log.info("普通消息1, message={}", messageView);return ConsumeResult.FAILURE;}return ConsumeResult.SUCCESS;}
}
日志 (注意consumerGroup 不同)
负载均衡模式 把consumerGroup改为 RocketMQVariable.NORMAL_GROUP
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC,consumerGroup = RocketMQVariable.NORMAL_GROUP)
public class PushConsumerNormal1Service implements RocketMQListener {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("普通消息1, messageView={}", messageView);ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();log.info("普通消息1, message={}", message);Map<String, String> properties = messageView.getProperties();log.info("普通消息1, properties={}", JSONObject.toJSONString(properties));if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {log.info("普通消息1, message={}", messageView);return ConsumeResult.FAILURE;}return ConsumeResult.SUCCESS;}}
日志 (会自动选择一个消费者消费)
顺序消费
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.FIFO_TOPIC,consumerGroup = RocketMQVariable.FIFO_GROUP)
public class PushConsumerFifoService implements RocketMQListener {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("顺序消息, messageView={}", messageView);ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();log.info("顺序消息, message={}", message);Map<String, String> properties = messageView.getProperties();log.info("顺序消息, properties={}", JSONObject.toJSONString(properties));if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {log.info("顺序消息, message={}", messageView);return ConsumeResult.FAILURE;}log.info("rollback transaction");return ConsumeResult.SUCCESS;}
}
日志
定时/延时任务消息 (可自定义时间)
import com.alibaba.fastjson2.JSONObject;
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.DELAY_TOPIC,consumerGroup = RocketMQVariable.DELAY_GROUP)
public class PushConsumerDelayService implements RocketMQListener {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("定时/延时消息, messageView={}", messageView);ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();log.info("定时/延时消息, message={}", message);Map<String, String> properties = messageView.getProperties();log.info("定时/延时消息, properties={}", JSONObject.toJSONString(properties));if (Objects.nonNull(messageView.getProperties().get("OrderId"))) {log.info("定时/延时消息, message={}", messageView);return ConsumeResult.FAILURE;}log.info("定时/延时消息 消费完成");return ConsumeResult.SUCCESS;}
}
日志
事务处理情况1
/*** 发送事务消息** @param topic* @param message* @throws ClientException*/public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) {try {Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message);SendReceipt sendReceipt = pair.getSendReceipt();Transaction transaction = pair.getTransaction();log.info("事务消息发送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt));log.info("消息id : {} ", sendReceipt.getMessageId());//如果这里提交了事务 if (doLocalTransaction(1)) {log.info("本地事务执行成功");transaction.commit();} else {log.info("本地事务执行失败");transaction.rollback();}return pair;} catch (ClientException e) {throw new RuntimeException(e);}}boolean doLocalTransaction(int number) {// 本地事务逻辑 数据库操作log.info("执行本地事务 : {}", number);return number > 5;}
如果在工具类里面提交了事务 transaction.commit();下面的就不会进入处理了
@Slf4j
@RocketMQTransactionListener
public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker {@Overridepublic TransactionResolution check(MessageView messageView) {log.info("Receive transactional message check, message={}", messageView);return null;}
}
而是直接消费了
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP)
public class PushConsumerTransactionService implements RocketMQListener {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("事务消息消费, messageView={}", messageView);ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();log.info("事务消息消费, message={}", message);if (Objects.isNull(message)) {log.info("事务消息 消费失败");return ConsumeResult.FAILURE;}log.info("事务消息 消费成功");return ConsumeResult.SUCCESS;}
}
日志
事务处理情况2
/*** 发送事务消息 这里只发消息 不参与事务提交** @param topic* @param message* @throws ClientException*/public void sendMessageInTransaction(String topic, Object message) {try {template.sendMessageInTransaction(topic, message);} catch (ClientException e) {throw new RuntimeException(e);}}
使用官方事务处理机制处理事务
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.core.RocketMQTransactionChecker;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;@Slf4j
@RocketMQTransactionListener
public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker {@Overridepublic TransactionResolution check(MessageView messageView) {log.info("事务消息 事务操作, messageView={}", messageView);ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();log.info("事务消息 事务操作, message={}", message);String messageId = messageView.getMessageId().toString();if (Objects.nonNull(messageId)) {log.info("事务消息 事务操作, messageId={}", messageId);return TransactionResolution.COMMIT;}log.info("事务消息消费失败");return TransactionResolution.ROLLBACK;}
}
事务提交后才会被消费
import com.example.mq.variable.RocketMQVariable;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;@Slf4j
@Service
@RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP)
public class PushConsumerTransactionService implements RocketMQListener {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("事务消息消费, messageView={}", messageView);ByteBuffer body = messageView.getBody();String message = StandardCharsets.UTF_8.decode(body).toString();log.info("事务消息消费, message={}", message);if (Objects.isNull(message)) {log.info("事务消息 消费失败");return ConsumeResult.FAILURE;}log.info("事务消息 消费成功");return ConsumeResult.SUCCESS;}
}
日志
如果哪位大佬发现有错误或者我的理解有误,还请高抬贵手,勿喷
原创不易,转载请附带链接