当前位置: 首页 > news >正文

SpringBoot集成RocketMQ的两种方式

一:原生RocketMQ

配置复杂,需要手动配置product、consumer、监听器、序列化、消息过滤、事物消息;

非常灵活,所有RocketMQ底层能力都能用上。

使用建议: 对 RocketMQ 特性依赖较深(如事务消息、消息延迟等级、过滤等)。 项目中对性能和精细化控制要求高。

1.添加依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

2.生产者:

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//1.初始化一个消息生产者,指定组名DefaultMQProducer producer = new DefaultMQProducer("DemoProducer");// 2.指定nameserver地址producer.setNamesrvAddr("192.168.65.112:9876");// 3.启动消息生产者服务producer.start();for (int i = 0; i < 2; i++) {try {// 4.创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//5.发送消息,获取发送结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//6.消息发送完后,停止消息生产者服务。producer.shutdown();}
}

3.消费者:

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//1.构建一个消息消费者指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");//2.指定nameserver地址consumer.setNamesrvAddr("192.168.65.112:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 3.订阅一个感兴趣的话题,这个话题需要与消息的topic一致,不实用tag过滤consumer.subscribe("TopicTest", "*");// 4.注册一个消息回调函数,消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});//返回消费状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者服务consumer.start();System.out.print("Consumer Started");}
}

二:spring-boot-starter-rocketmq

使用 @RocketMQMessageListener 和 RocketMQTemplate 快速开发,封装好 producer 和 consumer 配置简单,SpringBoot自动装配,不够灵活

1.添加依赖(使用springboot集成时,版本特别关键,稍微有偏差就会报错)

<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>

2.配置文件

rocketmq.name-server=192.168.65.112:9876
rocketmq.producer.group=springBootGroup
#如果这⾥不配,那就需要在消费者的注解中配。
#rocketmq.consumer.topic=
rocketmq.consumer.group=testGroup
server.port=9000

3.生产者

RocketMQTemplate 不光可以发送消息还可以拉消息

@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic,String msg){
this.rocketMQTemplate.convertAndSend(topic,msg);
}
}

4.消费者

消费者的声明也很简单。所有属性通过@RocketMQMessageListener注解声明

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode=
ConsumeMode.CONCURRENTLY,messageModel= MessageModel.BROADCASTING)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message : "+ message);
}
}
SpringBoot框架中对消息的封装与原⽣API的消息封装是不⼀样的
在springBoot封装的rocketmq中,默认的RocketMQTemplate只能处理一开始初始化的生产者组,特别不方便,可以通过@ExtRocketMQTemplateConfiguration()自己在额外配置需要的。
原生的代码量多,但是更加令快,SpinrgBoot封装的则适合场景简单快速开发,具体以自己业务场景为主。

Producer发送的Message对象是没有msgId属性的。Broker端接收到Producer发过来的消息后,会给每条消息单独分配⼀个唯⼀的msgId。这个msgID可以作为消息的唯⼀主键来使⽤。
但是需要注意,对于客户端来说,毕竟是不知道这个msgId是如何产⽣的。实际上,在RocketMQ内部,也会针对批量消息、事务消息等特殊的消息机制,有特殊的msgId分配机制。因此,在复杂业务场景下,不建议使⽤msgId来作为消息的唯⼀索引,⽽建议采⽤下⾯的key属性,⾃⾏指定业务层⾯上的唯⼀索引。

针对key这⼀个属性,建议在业务中可以添加⼀些带有业务唯⼀性的数据,作为MessageId的补充。RocketMQ基于Keys属性,实现了消息溯源、消息压缩等⼀系列功能。


文章转载自:
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://.
http://www.dtcms.com/a/281647.html

相关文章:

  • WGAS+WGCNA分析文章套路
  • LeetCode Hot100 【1.两数之和、2.两数相加、3.无重复字符的最长子串】
  • 动态数组:ArrayList的实现原理
  • 504网关超时可能是哪些原因导致?
  • web前端渡一大师课 01 事件循环
  • 【交流等效负载电阻的推导】
  • SpringBoot 项目搭建的 4 种常用方式,从入门到实践
  • 魔力宝贝归来虚拟机版怎么修复卡第一个任务
  • Kimi K2驱动Claude Code,稳定且低价
  • 入选《机器视觉》:视觉AI 生态链加速工业检测场景落地
  • MySQL数据库----函数
  • vue3:wangEditor使用过程中,点击编辑回显数据的问题修复.
  • 操作HTML网页的知识点
  • 飞搭系列 | 子事件流节点,让逻辑复用更简单!
  • 【前端】Vue 3 页面开发标准框架解析:基于实战案例的完整指南
  • 第二次线上事故
  • 【leetcode】263.丑数
  • Unity 多人游戏框架学习系列一
  • (附源码)基于 Go 和 gopacket+Fyne 的跨平台网络抓包工具开发实录
  • 轻松管理多个Go版本:g工具安装与使用
  • DTU轮询通信有哪些隐患?功耗、容量与响应效率全解析
  • Cookie全解析:Web开发核心机制
  • jetson安装opencv的cuda的加速
  • 二分答案#贪心
  • Python的requests包中使用session管理cookie
  • 无人机故障响应模块运行与技术难点
  • 知识蒸馏 是什么?具体怎么实现的
  • 防抖与节流
  • JavaScript认识+JQuery的依赖引用
  • 手撕线程池详解(C语言源码+解析)