使用 Docker 部署 Apache RocketMQ
这段内容是 《使用 Docker 部署 Apache RocketMQ》 的官方风格指南,它详细描述了如何通过 Docker 快速搭建一个单节点的 RocketMQ 服务(包含 NameServer、Broker 和 Proxy),并使用 Java SDK 发送和接收消息。
下面我将为你 逐段解析、翻译并深入解释其含义,帮助你彻底理解整个流程和背后的技术逻辑。
🐳 如何在 Docker 中运行 RocketMQ?(中文详解)
✅ 系统要求
- 64 位操作系统(Linux / macOS / Windows)
- 已安装 Docker 和 Docker Compose(推荐)
⚠️ 注意:虽然文档提到了 JDK 1.8+,但因为 RocketMQ 是用 Java 写的,而 Docker 镜像内部已经包含了 JRE,所以你不需要在宿主机上安装 JDK 来运行服务。但如果你要开发客户端程序(如 Java 项目),则需要本地有 JDK。
1️⃣ 拉取 RocketMQ 镜像
docker pull apache/rocketmq:5.3.2
🔍 解释:
- 从 Docker Hub 下载官方发布的 RocketMQ 5.3.2 版本镜像。
- 这个镜像是由 Apache 官方维护的,包含了完整的 RocketMQ 运行环境(JDK + RocketMQ 二进制文件)。
- 使用 Docker 可以避免手动编译、配置等复杂步骤,非常适合快速测试和开发。
2️⃣ 创建 Docker 网络
docker network create rocketmq
🔍 解释:
- Docker 默认容器之间不能直接通信,除非它们在同一个自定义网络中。
rocketmq
是我们创建的一个自定义桥接网络(bridge network),用于让NameServer
和Broker
容器可以互相访问。- 后续启动容器时通过
--network rocketmq
加入该网络,就可以用容器名作为主机名进行通信(例如:rmqnamesrv:9876
)。
✅ 类比:就像局域网内两台机器可以通过主机名访问一样。
3️⃣ 启动 NameServer(注册中心)
docker run -d \--name rmqnamesrv \-p 9876:9876 \--network rocketmq \apache/rocketmq:5.3.2 sh mqnamesrv
参数说明:
参数 | 作用 |
---|---|
-d | 后台运行容器 |
--name rmqnamesrv | 给容器命名,方便管理 |
-p 9876:9876 | 将宿主机的 9876 端口映射到容器的 9876(NameServer 默认端口) |
--network rocketmq | 加入自定义网络,便于与其他容器通信 |
sh mqnamesrv | 在容器中执行启动 NameServer 的命令 |
查看日志确认是否成功:
docker logs -f rmqnamesrv
✅ 成功标志:看到日志输出
The Name Server boot success...
4️⃣ 启动 Broker 和 Proxy(消息服务器)
(1)准备配置文件 broker.conf
echo "brokerIP1=127.0.0.1" > broker.conf
🔍 为什么需要这个?
- 默认情况下,Broker 会用自己的内网 IP 地址注册到 NameServer。
- 但在 Docker 中,容器的内网 IP 是虚拟的(如
172.x.x.x
),宿主机无法直接访问。 - 所以我们要强制 Broker 使用
127.0.0.1
(即宿主机 loopback 地址)对外暴露,这样客户端才能从本机连接。
📌 这是 Docker 部署 RocketMQ 最关键的一步,否则客户端连不上!
(2)启动 Broker + Proxy 容器
docker run -d \--name rmqbroker \--network rocketmq \-p 10912:10912 -p 10911:10911 -p 10909:10909 \-p 8080:8080 -p 8081:8081 \-e "NAMESRV_ADDR=rmqnamesrv:9876" \-v ./broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.conf \apache/rocketmq:5.3.2 sh mqbroker --enable-proxy \-c /home/rocketmq/rocketmq-5.3.2/conf/broker.conf
参数详解:
参数 | 说明 |
---|---|
--name rmqbroker | 容器名称 |
--network rocketmq | 加入同一网络,可访问 rmqnamesrv |
-p ... | 端口映射: • 10911 : Broker 控制端口• 10912 : Broker 数据端口• 8081 : Proxy gRPC 端口(客户端主要连接这里) |
-e NAMESRV_ADDR=rmqnamesrv:9876 | 设置环境变量,告诉 Broker 去哪个地址注册 |
-v ./broker.conf:/path/in/container | 挂载自定义配置文件进容器 |
sh mqbroker --enable-proxy | 启动 Broker 并启用 Proxy 模式(RocketMQ 5.x 新特性) |
-c /path/to/broker.conf | 指定加载的配置文件 |
验证是否启动成功:
docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"
✅ 成功标志:看到日志中有
The broker[broker-a,127.0.0.1:10911] boot success...
✅ 当前架构总结
组件 | 容器名 | 访问方式 | 用途 |
---|---|---|---|
NameServer | rmqnamesrv | localhost:9876 | 路由发现中心 |
Broker + Proxy | rmqbroker | localhost:8081 (Proxy) | 存储消息、处理收发请求 |
✅ 此时你已经部署了一个 单主模式的 RocketMQ 集群,可用于本地开发和测试。
5️⃣ 使用 SDK 发送和接收消息(Java 示例)
(1)添加依赖(Maven)
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.2</version>
</dependency>
💡 推荐版本与服务端一致(5.3.2),避免兼容性问题。
(2)进入容器创建 Topic
docker exec -it rmqbroker /bin/bash
./mqadmin updatetopic -t RequestTopic -c DefaultCluster
🔍 说明:
-t TestTopic
:要创建的主题名-c DefaultCluster
:集群名(Docker 镜像默认配置)- 如果不创建 Topic,Broker 也可以自动创建,但建议显式创建以控制队列数量等参数。
(3)Producer 示例代码(发送消息)
package org.apache.rocketmq.example.rpc;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class RequestProducer {public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "please_rename_unique_group_name";String topic = "RequestTopic";long ttl = 3000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);//You need to set namesrvAddr to the address of the local namesrvproducer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}
(4)Consumer 示例代码(接收消息)
package org.apache.rocketmq.example.rpc;import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;public class ResponseConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {String producerGroup = "please_rename_unique_group_name";String consumerGroup = "please_rename_unique_group_name";String topic = "RequestTopic";// create a producer to send reply messageDefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);replyProducer.setNamesrvAddr("127.0.0.1:9876");replyProducer.start();// create consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.setNamesrvAddr("127.0.0.1:9876");// recommend client configsconsumer.setPullTimeDelayMillsWhenException(0L);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s %n", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes(StandardCharsets.UTF_8);// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 3000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.subscribe(topic, "*");consumer.start();System.out.printf("Consumer Started.%n");}
}
🔍 关键点说明:
项目 | 说明 |
---|---|
NAMESRV_ADDR | 必须设置,查找服务端 |
ConsumerGroup | 必须指定,用于负载均衡和消费进度管理 |
FilterExpression("*", TAG) | 订阅所有标签的消息 |
6️⃣ 停止容器
# 停止 NameServer
docker stop rmqnamesrv# 停止 Broker
docker stop rmqbroker
✅ 也可以用
docker rm -f rmqnamesrv rmqbroker
一键删除容器。
🧠 总结:你该怎么理解这段文档?
步骤 | 目的 | 类比 Kafka |
---|---|---|
1. 拉取镜像 | 获取可运行的 RocketMQ 环境 | 下载 Kafka 镜像 |
2. 创建网络 | 实现容器间通信 | 自定义 Docker 网络 |
3. 启动 NameServer | 启动路由中心 | 类似 ZooKeeper |
4. 启动 Broker+Proxy | 启动消息服务器 | Kafka Broker |
5. 挂载 broker.conf | 修复 IP 注册问题(关键!) | 配置 advertised.listeners |
6. 使用 SDK | 实际开发集成方式 | 使用 kafka-clients |
7. 停止容器 | 清理资源 | docker-compose down |
✅ 实践建议(给初学者)
-
确保 Docker 正常运行:
docker --version docker run hello-world
-
检查端口占用:
- 9876 (NameServer)
- 10911, 10912 (Broker)
- 8081 (Proxy)
- 若被占用,可修改
-p
映射端口。
-
日志查看技巧:
docker logs rmqnamesrv | grep success docker logs rmqbroker | grep boot
-
常见问题排查:
- 客户端连不上?→ 检查
broker.conf
是否设置了brokerIP1=127.0.0.1
- 报
CONNECTION_TIMEOUT
?→ 检查防火墙或端口映射 - Topic 不存在?→ 使用
mqadmin updatetopic
创建
- 客户端连不上?→ 检查
-
推荐使用
docker-compose.yml
简化部署(更优雅):
version: '3.8'
services:namesrv:image: apache/rocketmq:5.3.2container_name: rmqnamesrvports:- "9876:9876"command: sh mqnamesrvnetworks:- rocketmqbroker:image: apache/rocketmq:5.3.2container_name: rmqbrokerports:- "10911:10911"- "10912:10912"- "8081:8081"environment:- NAMESRV_ADDR=namesrv:9876volumes:- ./broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.confcommand: sh mqbroker --enable-proxy -c /home/rocketmq/rocketmq-5.3.2/conf/broker.confdepends_on:- namesrvnetworks:- rocketmqnetworks:rocketmq:driver: bridge
保存为 docker-compose.yml
,然后运行:
# 启动
docker-compose up -d# 查看日志
docker-compose logs -f# 停止
docker-compose down
✅ 结论:这份文档教你如何用 Docker 快速搭建一个可用于开发测试的 RocketMQ 环境,比手动部署更简单、更干净。
需要我帮你写一个完整的 Spring Boot + RocketMQ 的 Docker 示例项目吗?欢迎继续提问!