当前位置: 首页 > news >正文

SpringBoot整合RocketMQ(rocketmq-client.jar)

目录

配置pom.xml

配置application.properties

生产者配置 MQProducerConfig.java

消费者配置MQConsumerConfig.java

消费者监听 MQConsumeMsgListenerProcessor.java

发送消息


Springboot集成RocketMQ:通过直接引入rocketmq-client依赖实现基础集成,需手动配置生产者和消费者。

配置pom.xml

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.7.0</version>
</dependency>

配置application.properties

# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=GID_abc
# mq的nameserver地址
rocketmq.producer.namesrvAddr=localhost:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=3000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2# 是否开启自动配置
rocketmq.consumer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.consumer.groupName=GID_abc
# mq的nameserver地址
rocketmq.consumer.namesrvAddr=localhost:9876
# 消费者订阅的主题topic和tags(*标识订阅该主题下所有的tags),格式: topic~tag1||tag2||tags3;
rocketmq.consumer.topics=abcTopic~*
# 消费者线程数据量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
# 设置一次消费信心的条数,默认1
rocketmq.consumer.consumeMessageBatchMaxSize=1

生产者配置 MQProducerConfig.java

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfig {public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfig.class);private String groupName;private String namesrvAddr;// 消息最大值private Integer maxMessageSize;// 消息发送超时时间private Integer sendMsgTimeOut;// 发送失败重试次数private Integer retryTimesWhenSendFailed;@Bean@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")public DefaultMQProducer defaultProducer() throws MQClientException {LOGGER.info("-----defaultProducer 正在创建-----");DefaultMQProducer producer = new DefaultMQProducer(groupName);producer.setNamesrvAddr(namesrvAddr);producer.setVipChannelEnabled(false);producer.setMaxMessageSize(maxMessageSize);producer.setSendMsgTimeout(sendMsgTimeOut);producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);producer.start();LOGGER.info("-----rocketmq producer server 开启成功-----");return producer;}
}

消费者配置MQConsumerConfig.java

import com.bestone.online.consult.receiverMq.MQConsumeMsgListenerProcessor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfig {public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfig.class);private String groupName;private String namesrvAddr;private String topics;// 消费者线程数据量private Integer consumeThreadMin;private Integer consumeThreadMax;private Integer consumeMessageBatchMaxSize;@Autowiredprivate MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;@Bean@ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")public DefaultMQPushConsumer defaultConsumer() {LOGGER.info("-----defaultConsumer 正在创建-----");DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);consumer.setNamesrvAddr(namesrvAddr);consumer.setConsumeThreadMin(consumeThreadMin);consumer.setConsumeThreadMax(consumeThreadMax);consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);// 设置监听consumer.registerMessageListener(consumeMsgListenerProcessor);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);try {// 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,String[] topicArr = topics.split(";");for (String topic : topicArr) {String[] tagArr = topic.split("~");consumer.subscribe(tagArr[0], tagArr[1]);}consumer.start();LOGGER.info("-----consumer 创建成功 groupName={}, topics={}, namesrvAddr={}-----", groupName, topics, namesrvAddr);} catch (MQClientException e) {LOGGER.error("-----consumer 创建失败!-----");}return consumer;}
}

消费者监听 MQConsumeMsgListenerProcessor.java

import com.alibaba.fastjson.JSONObject;
import java.util.Date;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;/*** 消费者监听*/
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {private static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);/*** 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息* 不要抛异常,如果没有return CONSUME_SUCCESS,consumer会重新消费该消息,直到return CONSUME_SUCCESS*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList,ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (CollectionUtils.isEmpty(msgList)) {LOGGER.info("MQ接收消息为空,直接返回成功");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = msgList.get(0);LOGGER.info("MQ接收到的消息为:" + messageExt.toString());try {String topic = messageExt.getTopic();String tags = messageExt.getTags();String data = new String(messageExt.getBody(), "utf-8");LOGGER.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, data);// TODO 处理业务逻辑} catch (Exception e) {LOGGER.error("获取MQ消息内容异常{}", e);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

发送消息

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/mqProducer")
public class MQProducerController {public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerController.class);@AutowiredDefaultMQProducer defaultMQProducer;/*** 发送简单的MQ消息*/@GetMapping("/send")public void send(String msg)throws InterruptedException, RemotingException, MQClientException, MQBrokerException {LOGGER.info("发送MQ消息内容:" + msg);Message msg = new Message("abcTopic",   msg.getBytes());// 延时级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"// 设置消息延迟级别为3,也就是延迟10s。msg.setDelayTimeLevel(3);// 定时发送消息,5秒之后发送msg.setDeliveryTimestamp(System.currentTimeMillis() + Duration.ofSeconds(5).toMillis());// 默认3秒超时SendResult sendResult = defaultMQProducer.send(msg);LOGGER.info("消息发送响应:" + sendResult.toString());}
}
http://www.dtcms.com/a/304454.html

相关文章:

  • 小程序中事件对象的属性与方法
  • IT实施方案书
  • 【dropdown组件填坑指南】—怎么实现三角箭头效果
  • 网络安全第15集
  • 河南地区危化品安全员考试题库及答案
  • 【参考】Re
  • MYSQL难面试
  • 汇总数据(使用聚集函数)
  • Element Plus
  • AI数据管家:智能体如何像“超级助手”管理企业数据?
  • 宇树 G1 部署(九)——遥操作控制脚本 teleop_hand_and_arm.py 分析与测试部署
  • 项目如何分阶段推进?几大要点分析
  • 【Linux】初识make/makefile
  • 【C++算法】80.BFS解决FloodFill算法_岛屿数量
  • 数据结构 排序(2)---选择排序
  • 【WRF工具】服务器中安装编译GrADS
  • 组件调用传值、调用函数
  • 信息技术发展与区块链的崛起:深度解析与未来展望
  • Vulkan入门教程 | 第二部分:创建实例
  • 0基礎網站開發技術教學(一) --(前端篇)--
  • LeetCode 11 - 盛最多水的容器
  • 力扣面试150(43/150)
  • 3D 网上展厅,到底是什么?​
  • Radash.js 现代化JavaScript实用工具库详解 – 轻量级Lodash替代方案
  • 如何在Windows操作系统上通过conda 安装 MDAnalysis
  • TDengine 中 TDgpt 异常检测的数据密度算法
  • Qt小技巧 QStandardPaths详解
  • 【机器学习深度学习】DeepSpeed框架:高效分布式训练的开源利器
  • 车载诊断架构 --- 关于诊断时间参数P4的浅析
  • 【Spring Boot 快速入门】三、分层解耦