当前位置: 首页 > news >正文

SpringBoot整合RocketMQ(阿里云ONS)

目录

配置pom.xml

配置application.properties 

MQ配置类

注入消息生产者配置信息

消息生产者

消费者配置

 消息消费者


配置pom.xml

<!-- 政务云MQ版本是4.x的,请使用对应的SDK ons-client-1.8.8.x -->
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.8.8.Final</version>
</dependency>

配置application.properties 

rocketmq.accessKey=xxx
rocketmq.secretKey=zzz
rocketmq.nameSrvAddr=http://yyy.aliyun.com:9876
rocketmq.groupId=GID_abc
rocketmq.topic=abcTopic
rocketmq.tag=TagA

MQ配置类

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String groupId;private String topic;private String tag;public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}
}

注入消息生产者配置信息

ProducerBean用于将Producer集成至SpringBoot中

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQProducerClient {@Autowiredprivate RocketMQConfig mqConfig;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(mqConfig.getMqPropertie());return producer;}
}

消息生产者

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class RocketMQProducer {private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);@Autowiredprivate ProducerBean producer;@Autowiredprivate RocketMQConfig mqConfig;public SendResult send(String body) {Message msg = new Message(mqConfig.getTopic(), mqConfig.getTag(), body.getBytes());msg.setKey(null);LOGGER.info("准备发送消息为:{}", body);try {SendResult sendResult = producer.send(msg);if (sendResult != null) {LOGGER.info("发送MQ消息成功");return sendResult;} else {LOGGER.error("发送MQ消息失败");return null;}} catch (ONSClientException e) {LOGGER.error("发送MQ消息失败");LOGGER.error("Exception -- : ", e);}return null;}
}

消费者配置

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQConsumerClient {@Autowiredprivate RocketMQConfig mqConfig;@Autowiredprivate MqReceiver messageListener;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();Properties properties = mqConfig.getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "-1");consumerBean.setProperties(properties);Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();Subscription subscription = new Subscription();subscription.setTopic(mqConfig.getTopic());subscription.setExpression(mqConfig.getTag());subscriptionTable.put(subscription, messageListener);consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}
}

 消息消费者

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;@Component
public class MqReceiver implements MessageListener {private static final Logger LOGGER = LoggerFactory.getLogger(MqReceiver.class);@Overridepublic Action consume(Message message, ConsumeContext consumeContext) {if (Objects.nonNull(message)) {LOGGER.info("MQ接收到的消息为:" + message.toString());try {String data = new String(message.getBody(), StandardCharsets.UTF_8);LOGGER.info("MQ消息topic={},  消息内容={}", message.getTopic(), data);} catch (Exception e) {LOGGER.error("获取MQ消息内容异常{}", e.getMessage());return Action.CommitMessage;}}return Action.CommitMessage;}
}

http://www.dtcms.com/a/304480.html

相关文章:

  • 数据库4.0
  • Linux 文件管理高级操作:复制、移动与查找的深度探索
  • Deep Research(信息检索增强)认识和项目实战
  • 计算器4.0:新增页签功能梳理页面,通过IO流实现在用户本地存储数据
  • 点控云数据洞察智能体:让房地产决策有据可循,让业务增长稳健前行
  • 【LLM】——qwen2.5 VL模型导出到onnx
  • Python中二进制文件操作
  • 快速了解逻辑回归
  • 【华为机试】43. 字符串相乘
  • 【LeetCode 随笔】
  • 【深度学习】独热编码(One-Hot Encoding)
  • 开源 Arkts 鸿蒙应用 开发(十一)证书和包名修改
  • C语言在键盘上输入一个3行3列矩阵的各个元素的值(值为整数),然后输出主对角线元素的积,并在fun()函数中输出。
  • 信号上升时间与带宽的关系
  • Leetcode-3361两个字符串的切换距离
  • FastAPI入门:请求体的字段、嵌套模型、额外数据、额外数据类型
  • Linux系统部署k8s集群
  • 在 Web3 时代通过自我主权合规重塑 KYC/AML
  • Git快速入门,完整的git项目管理工具教程,git入门到精通!
  • 青少年软件编程图形化Scratch等级考试试卷(二级)2025年6月
  • 【EDA】Calma--早期版图绘制工具商
  • python案例:基于python 神经网络cnn和LDA主题分析的旅游景点满意度分析
  • 解决mac下git pull、push需要输入密码
  • 半导体企业选用的跨网文件交换系统到底应该具备什么功能?
  • 【007TG洞察】美欧贸易新政下跨境业务的技术破局:从数据治理到智能触达的全链路重构
  • SpringBoot整合RocketMQ(rocketmq-client.jar)
  • 小程序中事件对象的属性与方法
  • IT实施方案书
  • 【dropdown组件填坑指南】—怎么实现三角箭头效果
  • 网络安全第15集