RocketMq程序动态创建Topic
1.修改配置文件 broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH# 必须开启:允许客户端自动创建 Topic(开发/测试环境)
autoCreateTopicEnable=true# 必须开启:允许自动创建 Consumer Group(可选,但建议开启)
autoCreateSubscriptionGroup=true# NameServer 地址(Broker 自己也要知道 NameServer 在哪)
# 如果 NameServer 在本机
namesrvAddr=127.0.0.1:9876
- 动态创建代码示例
package com.example.shopmq2.config;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;@Component
@Slf4j
public class TopicInitRunner implements ApplicationRunner {@Value("${rocketmq.name-server}")private String nameServer;private static final List<String> topicsToCreate = new ArrayList<>(); // 可配置要创建的 Topicstatic {topicsToCreate.add("testTopic");topicsToCreate.add("testTopic1");topicsToCreate.add("testTopic2");topicsToCreate.add("testTopic3");topicsToCreate.add("testTopic4");topicsToCreate.add("testTopic5");}@Overridepublic void run(ApplicationArguments args) throws Exception {if (topicsToCreate == null || topicsToCreate.isEmpty()) {log.info("❌ 未配置需要创建的 Topic");return;}DefaultMQAdminExt admin = new DefaultMQAdminExt();admin.setInstanceName("admin-" + System.currentTimeMillis());admin.setNamesrvAddr(nameServer);try {admin.start();// 获取所有Topic列表TopicList topicList = admin.fetchAllTopicList();Set<String> existingTopics = topicList.getTopicList().stream().collect(Collectors.toSet());for (String topic : topicsToCreate) {if (!existingTopics.contains(topic)) {try {admin.createAndUpdateTopicConfig("127.0.0.1:10911", new TopicConfig(topic));log.info("✅ 成功创建 Topic: {}", topic);} catch (MQClientException e) {log.error("❌ 创建 Topic {} 失败: {}", topic, e.getMessage(), e);}} else {log.info("Topic {} 已经存在", topic);}}} catch (Exception e) {log.error("❌ 初始化 RocketMQ Topic 失败: {}", e.getMessage(), e);} finally {admin.shutdown();}}
}
<!--RocketMQ--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-tools</artifactId><version>4.9.3</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>
- 启动指定配置文件(当前是windows)
cd D:\rocketmq-4.9.4\bin# 启动 Broker 并指定配置文件
start mqbroker.cmd -c ..\conf\broker.conf