SpringBoot整合RocketMQ--实例
原文网址:SpringBoot整合RocketMQ--实例-CSDN博客
简介
本文介绍SpringBoot整合RocketMQ的方法。
- spring-boot-starter-parent版本:2.4.13
- RocketMQ版本:4.9.4。(写这篇文章时,5.X版本的Java客户端还没完善,无法使用)。
本文会展示的实例有:
- 只指定topic发送和接收数据
- 指定topic和tag发送和接收数据
- 延迟消息
- 项目启动时自动注册topic
前三个都是基本的api,很简单。第四个自动注册是一种技术思维,适合高级开发和对技术有追求的人。
结果展示
先展示一下整合后的结果。
RocketMQ页面
主页面
主题页面
消费者页面
发送消息并消费
启动SpringBoot应用后,访问接口文档:http://localhost:8080/doc.html
结果:
1.测试只有topic的情景
后端结果(成功接收到消息):
2023-11-22 19:18:44.213 INFO 37900 --- [topic_group_1_1] c.e.business.mqConsumer.TopicConsumer : TopicConsumer收到消息:topic message:2023-11-22T19:18:43.991
2.测试指定topic和tag
后端结果(成功接收到消息):
2023-11-22 14:20:15.183 INFO 37900 --- [d_tag_group_1_1] c.e.business.mqConsumer.TagConsumer : TagConsumer收到消息:tag message:2023-11-22T14:20:15.175
3.测试延迟消息
后端结果(消息发送后,五秒钟之后收到了消息) :
2023-11-22 14:21:24.436 INFO 37900 --- [delay_group_1_1] c.e.business.mqConsumer.DelayConsumer : DelayConsumer收到消息:delay message:2023-11-22T14:21:19.382
1.启动RocketMQ服务器
安装和启动流程见:Docker Compose系列--安装RocketMQ--方法/示例-CSDN博客
2.引入依赖
pom.xml引入下边的依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-tools</artifactId><version>4.9.4</version>
</dependency>
3.修改配置文件
application.yml
spring:application:name: demo-rocketmq#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 192.168.5.193:9876#生产者配置producer:#组名group: group1# 自定义配置
custom:rocketmq:broker-address: 192.168.5.193:10911
4.编写代码
源码下载
代码结构
生产者
package com.example.business.controller;import com.example.common.constant.RocketMQConstant;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@Api(tags = "测试")
@RestController
@RequestMapping("test")
public class TestController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@ApiOperation("topic")@PostMapping("topic")public String topic() {rocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_TOPIC,"topic message:" + LocalDateTime.now());return "success";}@ApiOperation("tag")@PostMapping("tag")public String tag() {//RocketMQTemplate将 topic 和 tag 合二为一了,底层会进行拆分再组装。// 指定tag的方法:指定 topic 时跟上 {:tags}。例如 test-topic:tagArocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_TAG + ":" + RocketMQConstant.Tag.TAG_1,"tag message:" + LocalDateTime.now());return "success";}@ApiOperation("延时")@PostMapping("delay")public String delay() {// 4.x只支持预定义延迟时间(共18级)。从rocketmq5.0开始,支持自定义延迟时间// 4.x:level=0 级表示不延时,level=1 表示 延时1s,level=2 表示 延时5s// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hrocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_DELAY,MessageBuilder.withPayload("delay message:" + LocalDateTime.now()).build(),3000,2);// 5.x// rocketMQTemplate.syncSendDelayTimeMills(// RocketMQConstant.Topic.TOPIC_WELCOME,// "message:" + LocalDateTime.now(),// 5000);return "success";}
}
消费者
1.只有topic
package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_TOPIC,consumerGroup = RocketMQConstant.ConsumerGroup.TOPIC_GROUP_1
)
public class TopicConsumer implements RocketMQListener<MessageExt> {// 这里MessageExt也可以写成:String。// 不建议用String,因为只能获取到消息体,没有其他信息@Overridepublic void onMessage(MessageExt message) {// message.getTags();// message.getKeys();String body = new String(message.getBody());// 打印出消息内容log.info("TopicConsumer收到消息:" + body);}
}
2.指定topic和tag
package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_TAG,consumerGroup = RocketMQConstant.ConsumerGroup.TAG_GROUP_1,selectorExpression = RocketMQConstant.Tag.TAG_1// 如果是多个tag,用|隔开即可
)
public class TagConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String body = new String(message.getBody());// 打印出消息内容log.info("TagConsumer收到消息:" + body);}
}
3.延迟消息
package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_DELAY,consumerGroup = RocketMQConstant.ConsumerGroup.DELAY_GROUP_1
)
public class DelayConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String body = new String(message.getBody());// 打印出消息内容log.info("DelayConsumer收到消息:" + body);}
}
常量
package com.example.common.constant;public interface RocketMQConstant {interface Topic {String TOPIC_TOPIC = "topic_topic";String TOPIC_TAG = "topic_tag";String TOPIC_DELAY = "topic_delay";}interface Tag {String TAG_1 = "tag1";}/*** 这里必须每一个Topic对应一个ConsumerGroup,不然消息会丢失。* 5.x已经解决了这个问题*/interface ConsumerGroup {String TOPIC_GROUP_1 = "topic_group_1";String TAG_GROUP_1 = "tag_group_1";String DELAY_GROUP_1 = "delay_group_1";}
}
配置
此配置可以自动注册topic。
package com.example.common.config;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
public class RocketMQAutoRegister implements ApplicationRunner {private final DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();@Value("${rocketmq.name-server}")private String nameSrvAddress;@Value("${custom.rocketmq.broker-address}")private String brokerAddress;@Overridepublic void run(ApplicationArguments args) throws Exception {defaultMQAdminExt.setNamesrvAddr(nameSrvAddress);try {// 连接到 RocketMQ 服务器defaultMQAdminExt.start();List<String> allTopic = findAllTopic();for (String topic : allTopic) {createIfNotExist(topic);}} catch (Exception e) {log.error("检查并创建主题失败", e);} finally {defaultMQAdminExt.shutdown();}}/*** 检查是否已经存在指定的 Topic,如果不存在则创建该 Topic*/private void createIfNotExist(String topic) {try {try {defaultMQAdminExt.examineTopicStats(topic);} catch (MQClientException e) {// 响应码17表示Topic不存在if (e.getResponseCode() == 17) {log.info("主题{}不存在,自动创建", topic);TopicConfig topicConfig = new TopicConfig();topicConfig.setTopicName(topic);defaultMQAdminExt.createAndUpdateTopicConfig(brokerAddress, topicConfig);}}} catch (Exception e) {log.error("检查并创建主题{}失败", topic, e);}}private List<String> findAllTopic() {Class<RocketMQConstant.Topic> topicClass = RocketMQConstant.Topic.class;Field[] declaredFields = topicClass.getDeclaredFields();List<String> topicList = new ArrayList<>();for (Field declaredField : declaredFields) {String topic = null;try {topic = (String) declaredField.get(null);} catch (IllegalAccessException e) {throw new RuntimeException(e);}topicList.add(topic);}return topicList;}
}