Kafka消息中间件安装配置
Kafka消息中间件安装配置
- 1 docker安装zookeeper集群(可忽略)
- 1.1 创建相关目录
- 1.2 zk-docker-compose.yaml
- 1.3 验证集群状态
- 1.4 启动服务命令
- ※2 docker安装kafka集群(基于KRaft模式)
- 2.1 参数讲解
- 2.2 创建相关目录
- 2.3 docker-compose.yaml
- 2.4 验证集群状态
- 2.5 开放相关端口
1 docker安装zookeeper集群(可忽略)
zookeeper 集群中的每个节点都需要一个唯一的标识(myid
文件)和统一的集群服务器列表配置(zoo.cfg
中的 server.x
列表)。
- ZOO_MY_ID:表示当前 zookeeper 实例在集群中的编号,范围为1-255,所以一个 zookeeper 集群最多有 255 个节点
- ZOO_SERVERS:表示当前 zookeeper 实例所在集群中的所有节点的编号、主机名(或IP地址)、端口
zookeeper 集群需要的端口:
- 2181:客户端连接端口。
- 2888:zookeeper 集群中的节点,Follower 与 Leader 之间进行数据同步和消息传递的端口。
- 3888:用于 Leader 选举的端口。
1.1 创建相关目录
mkdir -p /opt/soft/kafka_cluster/zk1/{data,logs}
mkdir -p /opt/soft/kafka_cluster/zk2/{data,logs}
mkdir -p /opt/soft/kafka_cluster/zk3/{data,logs}sudo chmod -R 777 /opt/soft/kafka_cluster/zk1
sudo chmod -R 777 /opt/soft/kafka_cluster/zk2
sudo chmod -R 777 /opt/soft/kafka_cluster/zk3
1.2 zk-docker-compose.yaml
version: '3.2'services:zk1:image: zookeeper:3.9.3container_name: zk1restart: alwaysports:- "2181:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 1ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk1/data:/data- ./zk1/logs:/datalognetworks:- zk-netzk2:image: zookeeper:3.9.3container_name: zk2restart: alwaysports:- "2182:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 2ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zk3:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk2/data:/data- ./zk2/logs:/datalognetworks:- zk-netzk3:image: zookeeper:3.9.3container_name: zk3restart: alwaysports:- "2183:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 3ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk3/data:/data- ./zk3/logs:/datalognetworks:- zk-net# 给集群创建一个网络,名称自己随便定义,这里取名为 zk-net
networks:zk-net:driver: bridge
1.3 验证集群状态
集群启动后(大约需要等待一分钟进行选举),可以通过以下命令检查每个节点的状态:
# 进入节点1容器
docker exec -it zk1 /bin/bash# 在容器内执行状态检查命令
[root@node05 kafka_cluster]# docker exec -it zk1 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# exit[root@node05 kafka_cluster]# docker exec -it zk2 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# exit[root@node05 kafka_cluster]# docker exec -it zk3 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
注意:对每个节点执行此操作,输出会显示该节点的模式是 Mode: leader 还是 Mode: follower。一个健康的集群应该有一个 Leader 和两个 Follower。
1.4 启动服务命令
# -f表示指定某个配置文件名 -d:表示后台启动
docker-compose up -d
# 查看当前服务
docker ps
※2 docker安装kafka集群(基于KRaft模式)
Kafka 4.0 引入了 KRaft 模式(Kafka Raft Metadata Mode),它使 Kafka 集群不再依赖 ZooKeeper 进行元数据管理。KRaft 模式简化了 Kafka 部署和管理,不需要额外配置 ZooKeeper 服务,使得集群的配置和运维更加高效。
2.1 参数讲解
通用KRaft配置
KAFKA_ENABLE_KRAFT=yes
(允许使用kraft,使用kraft模式)KAFKA_CFG_PROCESS_ROLES=broker,controller
(指定Kafka进程的角色,在KRaft模式下,节点可以同时是broker和controller,或者是其中一种。)KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
(指定供外部使用的控制类请求信息[指定用于控制器通信的监听器名称。)KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
(定义Kafka监听的协议、接口和端口。内部通信和外部客户端访问的端口可以分开。)KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
(将监听器名称映射到安全协议。)KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}
(使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可)KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
(定义参与控制器选举的所有投票者地址。格式为id@host:port,其中id是每个节点的BROKER_ID。)ALLOW_PLAINTEXT_LISTENER=yes
(允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用)
Broker特定配置
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9194
(定义客户端如何连接到Kafka。通常设置为宿主机的IP或域名和映射后的端口。)KAFKA_BROKER_ID=1
(broker.id,必须唯一)
可选:资源与性能
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
(允许自动创建主题,生产环境建议关闭)KAFKA_CFG_NODE_ID=1
(node id 唯一并且和broker一致)
2.2 创建相关目录
mkdir -p /opt/soft/kafka_cluster/kafka1
mkdir -p /opt/soft/kafka_cluster/kafka2
mkdir -p /opt/soft/kafka_cluster/kafka3sudo chmod -R 777 /opt/soft/kafka_cluster/kafka1
sudo chmod -R 777 /opt/soft/kafka_cluster/kafka2
sudo chmod -R 777 /opt/soft/kafka_cluster/kafka3
2.3 docker-compose.yaml
- 部署前准备
# 1. 编辑.env 文件 注意HOST替换为主机ip
CLUSTER_ID=y2OnVDLzRYyxh3V156gnxw
HOST=192.168.1.151
- 编写docker-compose.yaml文件
version: "3.2"
services:kafka1:image: "bitnami/kafka:3.8.0"container_name: kafka11restart: alwaysuser: rootports:- 9192:9092 # 外部客户端访问端口- 9193:9093 # Controller监听端口,内部通信environment:### 通用KRaft配置 #### 允许使用kraft,使用kraft模式- KAFKA_ENABLE_KRAFT=yes# kafka角色,同时做为broker和controller[指定Kafka进程的角色。在KRaft模式下,节点可以同时是broker和controller,或者是其中一种。]- KAFKA_CFG_PROCESS_ROLES=broker,controller# 指定供外部使用的控制类请求信息[指定用于控制器通信的监听器名称。]- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER# 定义Kafka监听的协议、接口和端口。内部通信和外部客户端访问的端口可以分开。- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093# 将监听器名称映射到安全协议。- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID} # 定义参与控制器选举的所有投票者地址。格式为id@host:port,其中id是每个节点的BROKER_ID。- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用- ALLOW_PLAINTEXT_LISTENER=yes### Broker特定配置 #### 定义客户端如何连接到Kafka。通常设置为宿主机的IP或域名和映射后的端口。- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9192# broker.id,必须唯一- KAFKA_BROKER_ID=1### 可选:资源与性能 #### 允许自动创建主题,建议生产环境关闭- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true# node id 唯一并且和broker一致- KAFKA_CFG_NODE_ID=1volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka1:/bitnami/kafkanetworks:- kafka-netkafka2:image: "bitnami/kafka:3.8.0"container_name: kafka22restart: alwaysuser: rootports:- 9292:9092- 9293:9093environment:- KAFKA_ENABLE_KRAFT=yes- KAFKA_CFG_PROCESS_ROLES=broker,controller- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9292- KAFKA_BROKER_ID=2- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true- KAFKA_CFG_NODE_ID=2volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka2:/bitnami/kafkanetworks:- kafka-netkafka3:image: "bitnami/kafka:3.8.0"container_name: kafka33restart: alwaysuser: rootports:- 9392:9092- 9393:9093environment:- KAFKA_ENABLE_KRAFT=yes- KAFKA_CFG_PROCESS_ROLES=broker,controller- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9392- KAFKA_BROKER_ID=3- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true- KAFKA_CFG_NODE_ID=3volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka3:/bitnami/kafkanetworks:- kafka-net# 可选:Kafka UI 管理工具kafka-ui:image: provectuslabs/kafka-ui:latestcontainer_name: kafka-uirestart: alwaysports:- "9080:8080"environment:- KAFKA_CLUSTERS_0_NAME=local-kraft-cluster- KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092# 如果Kafka配置了认证,需在此设置depends_on:- kafka1- kafka2- kafka3networks:- kafka-netnetworks:kafka-net:driver: bridgeipam:config:- subnet: 172.30.0.0/16
2.4 验证集群状态
- 进入其中一个容器,使用Kafka命令行工具验证集群是否正常运行。
docker exec -it kafka11 /bin/bash# 进入容器后,列出主题(此时应为空)
kafka-topics.sh --bootstrap-server localhost:9092 --list
- 创建一个测试主题
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 3 --replication-factor 3
- 查看主题详情,确认分区和副本分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topicTopic: test-topic TopicId: 2ChsTY_IT1KiuC0AP8-25w PartitionCount: 3 ReplicationFactor: 3 Configs:Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Elr: LastKnownElr:Topic: test-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Elr: LastKnownElr:Topic: test-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Elr: LastKnownElr:
- 测试消息生产与消费
# 在一个终端运行生产者
docker exec -it kafka11 /bin/bashkafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
docker exec -it kafka11 /bin/sh# 在另一个终端运行消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
在生产端输入消息,应该在消费端能看到。
5. 访问Kafka UI(如果部署了):
打开浏览器访问 http://your-host-ip:9080,应该能看到Kafka UI界面,并可以查看集群、主题、消费者组等信息。
2.5 开放相关端口
# kafka1相关端口
firewall-cmd --zone=public --add-port=9192/tcp --permanent
firewall-cmd --zone=public --add-port=9193/tcp --permanent# kafka2相关端口
firewall-cmd --zone=public --add-port=9292/tcp --permanent
firewall-cmd --zone=public --add-port=9293/tcp --permanent## kafka3相关端口
firewall-cmd --zone=public --add-port=9392/tcp --permanent
firewall-cmd --zone=public --add-port=9393/tcp --permanentfirewall-cmd --zone=public --add-port=9080/tcp --permanent# 重启防火墙
firewall-cmd --reload# 查看开放的端口
firewall-cmd --list-port