当前位置: 首页 > news >正文

SpringBoot整合RocketMQ--实例

原文网址:SpringBoot整合RocketMQ--实例-CSDN博客

简介

本文介绍SpringBoot整合RocketMQ的方法。

  1. spring-boot-starter-parent版本:2.4.13
  2. RocketMQ版本:4.9.4。(写这篇文章时,5.X版本的Java客户端还没完善,无法使用)。

本文会展示的实例有:

  1. 只指定topic发送和接收数据
  2. 指定topic和tag发送和接收数据
  3. 延迟消息
  4. 项目启动时自动注册topic

前三个都是基本的api,很简单。第四个自动注册是一种技术思维,适合高级开发和对技术有追求的人。

结果展示

先展示一下整合后的结果。

RocketMQ页面

主页面

 主题页面

 消费者页面

发送消息并消费

启动SpringBoot应用后,访问接口文档:http://localhost:8080/doc.html

结果:

1.测试只有topic的情景

后端结果(成功接收到消息):

2023-11-22 19:18:44.213  INFO 37900 --- [topic_group_1_1] c.e.business.mqConsumer.TopicConsumer    : TopicConsumer收到消息:topic message:2023-11-22T19:18:43.991

2.测试指定topic和tag

后端结果(成功接收到消息):

2023-11-22 14:20:15.183  INFO 37900 --- [d_tag_group_1_1] c.e.business.mqConsumer.TagConsumer      : TagConsumer收到消息:tag message:2023-11-22T14:20:15.175

3.测试延迟消息

后端结果(消息发送后,五秒钟之后收到了消息) :

2023-11-22 14:21:24.436  INFO 37900 --- [delay_group_1_1] c.e.business.mqConsumer.DelayConsumer    : DelayConsumer收到消息:delay message:2023-11-22T14:21:19.382

1.启动RocketMQ服务器

安装和启动流程见:Docker Compose系列--安装RocketMQ--方法/示例-CSDN博客

2.引入依赖

pom.xml引入下边的依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-tools</artifactId><version>4.9.4</version>
</dependency>

3.修改配置文件

application.yml

spring:application:name: demo-rocketmq#rocketmq配置信息
rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: 192.168.5.193:9876#生产者配置producer:#组名group: group1# 自定义配置
custom:rocketmq:broker-address: 192.168.5.193:10911

4.编写代码

源码下载

代码结构

生产者

package com.example.business.controller;import com.example.common.constant.RocketMQConstant;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@Api(tags = "测试")
@RestController
@RequestMapping("test")
public class TestController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@ApiOperation("topic")@PostMapping("topic")public String topic() {rocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_TOPIC,"topic message:" + LocalDateTime.now());return "success";}@ApiOperation("tag")@PostMapping("tag")public String tag() {//RocketMQTemplate将 topic 和 tag 合二为一了,底层会进行拆分再组装。// 指定tag的方法:指定 topic 时跟上 {:tags}。例如 test-topic:tagArocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_TAG + ":" + RocketMQConstant.Tag.TAG_1,"tag message:" + LocalDateTime.now());return "success";}@ApiOperation("延时")@PostMapping("delay")public String delay() {// 4.x只支持预定义延迟时间(共18级)。从rocketmq5.0开始,支持自定义延迟时间// 4.x:level=0 级表示不延时,level=1 表示 延时1s,level=2 表示 延时5s// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hrocketMQTemplate.syncSend(RocketMQConstant.Topic.TOPIC_DELAY,MessageBuilder.withPayload("delay message:" + LocalDateTime.now()).build(),3000,2);// 5.x// rocketMQTemplate.syncSendDelayTimeMills(//         RocketMQConstant.Topic.TOPIC_WELCOME,//         "message:" + LocalDateTime.now(),//         5000);return "success";}
}

消费者

1.只有topic

package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_TOPIC,consumerGroup = RocketMQConstant.ConsumerGroup.TOPIC_GROUP_1
)
public class TopicConsumer implements RocketMQListener<MessageExt> {// 这里MessageExt也可以写成:String。// 不建议用String,因为只能获取到消息体,没有其他信息@Overridepublic void onMessage(MessageExt message) {// message.getTags();// message.getKeys();String body = new String(message.getBody());// 打印出消息内容log.info("TopicConsumer收到消息:" + body);}
}

2.指定topic和tag

package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_TAG,consumerGroup = RocketMQConstant.ConsumerGroup.TAG_GROUP_1,selectorExpression = RocketMQConstant.Tag.TAG_1// 如果是多个tag,用|隔开即可
)
public class TagConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String body = new String(message.getBody());// 打印出消息内容log.info("TagConsumer收到消息:" + body);}
}

3.延迟消息

package com.example.business.mqConsumer;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMQConstant.Topic.TOPIC_DELAY,consumerGroup = RocketMQConstant.ConsumerGroup.DELAY_GROUP_1
)
public class DelayConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String body = new String(message.getBody());// 打印出消息内容log.info("DelayConsumer收到消息:" + body);}
}

常量

package com.example.common.constant;public interface RocketMQConstant {interface Topic {String TOPIC_TOPIC = "topic_topic";String TOPIC_TAG = "topic_tag";String TOPIC_DELAY = "topic_delay";}interface Tag {String TAG_1 = "tag1";}/*** 这里必须每一个Topic对应一个ConsumerGroup,不然消息会丢失。* 5.x已经解决了这个问题*/interface ConsumerGroup {String TOPIC_GROUP_1 = "topic_group_1";String TAG_GROUP_1 = "tag_group_1";String DELAY_GROUP_1 = "delay_group_1";}
}

配置

此配置可以自动注册topic。

package com.example.common.config;import com.example.common.constant.RocketMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;@Slf4j
@Component
public class RocketMQAutoRegister implements ApplicationRunner {private final DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();@Value("${rocketmq.name-server}")private String nameSrvAddress;@Value("${custom.rocketmq.broker-address}")private String brokerAddress;@Overridepublic void run(ApplicationArguments args) throws Exception {defaultMQAdminExt.setNamesrvAddr(nameSrvAddress);try {// 连接到 RocketMQ 服务器defaultMQAdminExt.start();List<String> allTopic = findAllTopic();for (String topic : allTopic) {createIfNotExist(topic);}} catch (Exception e) {log.error("检查并创建主题失败", e);} finally {defaultMQAdminExt.shutdown();}}/*** 检查是否已经存在指定的 Topic,如果不存在则创建该 Topic*/private void createIfNotExist(String topic) {try {try {defaultMQAdminExt.examineTopicStats(topic);} catch (MQClientException e) {// 响应码17表示Topic不存在if (e.getResponseCode() == 17) {log.info("主题{}不存在,自动创建", topic);TopicConfig topicConfig = new TopicConfig();topicConfig.setTopicName(topic);defaultMQAdminExt.createAndUpdateTopicConfig(brokerAddress, topicConfig);}}} catch (Exception e) {log.error("检查并创建主题{}失败", topic, e);}}private List<String> findAllTopic() {Class<RocketMQConstant.Topic> topicClass = RocketMQConstant.Topic.class;Field[] declaredFields = topicClass.getDeclaredFields();List<String> topicList = new ArrayList<>();for (Field declaredField : declaredFields) {String topic = null;try {topic = (String) declaredField.get(null);} catch (IllegalAccessException e) {throw new RuntimeException(e);}topicList.add(topic);}return topicList;}
}

相关文章:

  • RTX腾讯通停服后,有哪些兼容Linux及移动端的升级途径?
  • SQL(Database Modifications)
  • 杏仁海棠花饼的学习日记第十四天CSS
  • Windows 11 全角半角切换方法
  • 《仿盒马》app开发技术分享-- 订单列表页(端云一体)
  • 日常--OBS+mediamtx实现本地RTMP推流环境搭建(详细图文)
  • Telegram平台分发其聊天机器人Grok
  • 【仿生系统】爱丽丝的“内在”或“灵魂”:概念与形式
  • 关于《DAHSF》即《火小兔智慧开发平台V2.0》的碎碎念
  • 微机系统-汇编语言入门
  • 计算机图形学:(六)渲染管线
  • 基于matlab遗传算法和模拟退火算法求解三维装箱优化问题
  • Virtuoso中对GDS文件进行工艺库转换的方法
  • Vision Transformer网络结构
  • CppCon 2014 学习第4天:Transactional Language Constructs for C++ TS(未进入到标准)
  • 麒麟v10+信创x86处理器离线搭建k8s集群完整过程
  • 【实证分析】上市公司全要素生产率+5种测算方式(1999-2024年)
  • Baklib知识中台驱动业务创新
  • 车载通信网络 --- 车载通信网络槪述
  • Codeforces Round 1024 (Div. 2)
  • 珠海做网站多少钱/百度识图鉴你所见
  • 襄阳建设局网站/全自动引流推广软件
  • 网站建设基本流程ppt/注册推广赚钱一个40元
  • 淘客网站开发视频教程/如何写推广软文
  • 山东城乡建设厅网站首页/最近热点新闻事件2023
  • 幼儿园班级网站建设/百度一下首页登录入口