当前位置: 首页 > news >正文

RocketMq程序动态创建Topic

1.修改配置文件 broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH# 必须开启:允许客户端自动创建 Topic(开发/测试环境)
autoCreateTopicEnable=true# 必须开启:允许自动创建 Consumer Group(可选,但建议开启)
autoCreateSubscriptionGroup=true# NameServer 地址(Broker 自己也要知道 NameServer 在哪)
# 如果 NameServer 在本机
namesrvAddr=127.0.0.1:9876
  1. 动态创建代码示例
package com.example.shopmq2.config;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;@Component
@Slf4j
public class TopicInitRunner implements ApplicationRunner {@Value("${rocketmq.name-server}")private String nameServer;private static final List<String> topicsToCreate  = new ArrayList<>(); // 可配置要创建的 Topicstatic {topicsToCreate.add("testTopic");topicsToCreate.add("testTopic1");topicsToCreate.add("testTopic2");topicsToCreate.add("testTopic3");topicsToCreate.add("testTopic4");topicsToCreate.add("testTopic5");}@Overridepublic void run(ApplicationArguments args) throws Exception {if (topicsToCreate == null || topicsToCreate.isEmpty()) {log.info("❌ 未配置需要创建的 Topic");return;}DefaultMQAdminExt admin = new DefaultMQAdminExt();admin.setInstanceName("admin-" + System.currentTimeMillis());admin.setNamesrvAddr(nameServer);try {admin.start();// 获取所有Topic列表TopicList topicList = admin.fetchAllTopicList();Set<String> existingTopics = topicList.getTopicList().stream().collect(Collectors.toSet());for (String topic : topicsToCreate) {if (!existingTopics.contains(topic)) {try {admin.createAndUpdateTopicConfig("127.0.0.1:10911", new TopicConfig(topic));log.info("✅ 成功创建 Topic: {}", topic);} catch (MQClientException e) {log.error("❌ 创建 Topic {} 失败: {}", topic, e.getMessage(), e);}} else {log.info("Topic {} 已经存在", topic);}}} catch (Exception e) {log.error("❌ 初始化 RocketMQ Topic 失败: {}", e.getMessage(), e);} finally {admin.shutdown();}}
}
        <!--RocketMQ--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-tools</artifactId><version>4.9.3</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>
  1. 启动指定配置文件(当前是windows)
cd D:\rocketmq-4.9.4\bin# 启动 Broker 并指定配置文件
start mqbroker.cmd -c ..\conf\broker.conf
http://www.dtcms.com/a/352418.html

相关文章:

  • 在 Ubuntu 下遇到 <string>头文件找不到的问题
  • 运筹优化(OR)-在机器学习(ML)浪潮中何去何从?
  • 独孤思维:无限分发,无成本赚钱的副业
  • JVM分层编译深度解析:完整机制与实践指南
  • 面向世界模型构建的跨模态认知网络工程
  • the scientist and engineer‘s guide to DSP:1 The Breadth and Depth of DSP 引言
  • CSS实现内凹圆角边框技巧(高频)
  • 【C++】用哈希表封装unordered_XX
  • 西游记24-26:万寿山,五庄观,镇元子;猴子偷果,猪八戒吃人参果——食而不知其味;逃跑被抓回,替师傅受罚;到处求仙,最终观音菩萨救树
  • Qt数据结构与编码技巧全解析
  • LeetCode 2140. 解决智力问题
  • 力扣(滑动窗口最大值)
  • LeetCode 刷题【53. 最大子数组和】
  • 一篇文章拆解Java主流垃圾回收器及其调优方法。
  • 详解 torch.distributed.all_gather_into_tensor
  • 15.examples\01-Micropython-Basics\demo_yield_task.py 加强版
  • 【实时Linux实战系列】基于实时Linux的生物识别系统
  • #Linux内存管理学以致用# 请你根据linux 内核struct page 结构体的双字对齐的设计思想,设计一个类似的结构体
  • 【测试需求分析】-需求来源分析(一)
  • 博士招生 | 香港大学 Intelligent Communication Lab 招收全奖博士
  • 【deepseek问答记录】:chatGPT的参数数量和上下文长度有关系吗?
  • AI Agent正在给传统数据仓库下“死亡通知书“
  • 读《精益数据分析》:用户行为热力图
  • 【拍摄学习记录】01-景别
  • 创龙3576ububuntu系统设置静态IP方法
  • 【Linux 进程】进程程序替换详解
  • 8.26网络编程——Modbus TCP
  • Git 高级技巧:利用 Cherry Pick 实现远程仓库的同步合并
  • 【自然语言处理与大模型】微调数据集如何构建
  • docker 的网络