当前位置: 首页 > news >正文

Kafka消息中间件安装配置

Kafka消息中间件安装配置

  • 1 docker安装zookeeper集群(可忽略)
    • 1.1 创建相关目录
    • 1.2 zk-docker-compose.yaml
    • 1.3 验证集群状态
    • 1.4 启动服务命令
  • ※2 docker安装kafka集群(基于KRaft模式)
    • 2.1 参数讲解
    • 2.2 创建相关目录
    • 2.3 docker-compose.yaml
    • 2.4 验证集群状态
    • 2.5 开放相关端口

1 docker安装zookeeper集群(可忽略)

zookeeper 集群中的每个节点都需要一个唯一的标识(myid 文件)和统一的集群服务器列表配置(zoo.cfg 中的 server.x 列表)。

  • ZOO_MY_ID:表示当前 zookeeper 实例在集群中的编号,范围为1-255,所以一个 zookeeper 集群最多有 255 个节点
  • ZOO_SERVERS:表示当前 zookeeper 实例所在集群中的所有节点的编号、主机名(或IP地址)、端口

zookeeper 集群需要的端口:

  • 2181:客户端连接端口。
  • 2888:zookeeper 集群中的节点,Follower 与 Leader 之间进行数据同步和消息传递的端口。
  • 3888:用于 Leader 选举的端口。

1.1 创建相关目录

mkdir -p /opt/soft/kafka_cluster/zk1/{data,logs}
mkdir -p /opt/soft/kafka_cluster/zk2/{data,logs}
mkdir -p /opt/soft/kafka_cluster/zk3/{data,logs}sudo chmod -R 777 /opt/soft/kafka_cluster/zk1
sudo chmod -R 777 /opt/soft/kafka_cluster/zk2
sudo chmod -R 777 /opt/soft/kafka_cluster/zk3

1.2 zk-docker-compose.yaml

version: '3.2'services:zk1:image: zookeeper:3.9.3container_name: zk1restart: alwaysports:- "2181:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 1ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk1/data:/data- ./zk1/logs:/datalognetworks:- zk-netzk2:image: zookeeper:3.9.3container_name: zk2restart: alwaysports:- "2182:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 2ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zk3:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk2/data:/data- ./zk2/logs:/datalognetworks:- zk-netzk3:image: zookeeper:3.9.3container_name: zk3restart: alwaysports:- "2183:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 3ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk3/data:/data- ./zk3/logs:/datalognetworks:- zk-net# 给集群创建一个网络,名称自己随便定义,这里取名为 zk-net
networks:zk-net:driver: bridge

1.3 验证集群状态

集群启动后(大约需要等待一分钟进行选举),可以通过以下命令检查每个节点的状态:

# 进入节点1容器
docker exec -it zk1 /bin/bash# 在容器内执行状态检查命令
[root@node05 kafka_cluster]# docker exec -it zk1 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# exit[root@node05 kafka_cluster]# docker exec -it zk2 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# exit[root@node05 kafka_cluster]# docker exec -it zk3 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

注意:对每个节点执行此操作,输出会显示该节点的模式是 Mode: leader 还是 Mode: follower。一个健康的集群应该有一个 Leader 和两个 Follower。

1.4 启动服务命令

# -f表示指定某个配置文件名   -d:表示后台启动
docker-compose up -d 
# 查看当前服务
docker ps

※2 docker安装kafka集群(基于KRaft模式)

Kafka 4.0 引入了 KRaft 模式(Kafka Raft Metadata Mode),它使 Kafka 集群不再依赖 ZooKeeper 进行元数据管理。KRaft 模式简化了 Kafka 部署和管理,不需要额外配置 ZooKeeper 服务,使得集群的配置和运维更加高效。

2.1 参数讲解

通用KRaft配置

  • KAFKA_ENABLE_KRAFT=yes(允许使用kraft,使用kraft模式)
  • KAFKA_CFG_PROCESS_ROLES=broker,controller(指定Kafka进程的角色,在KRaft模式下,节点可以同时是broker和controller,或者是其中一种。)
  • KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER(指定供外部使用的控制类请求信息[指定用于控制器通信的监听器名称。)
  • KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093(定义Kafka监听的协议、接口和端口。内部通信和外部客户端访问的端口可以分开。)
  • KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT(将监听器名称映射到安全协议。)
  • KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID} (使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可)
  • KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093(定义参与控制器选举的所有投票者地址。格式为id@host:port,其中id是每个节点的BROKER_ID。)
  • ALLOW_PLAINTEXT_LISTENER=yes(允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用)

Broker特定配置

  • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9194(定义客户端如何连接到Kafka。通常设置为宿主机的IP或域名和映射后的端口。)
  • KAFKA_BROKER_ID=1(broker.id,必须唯一)

可选:资源与性能

  • KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true(允许自动创建主题,生产环境建议关闭)
  • KAFKA_CFG_NODE_ID=1(node id 唯一并且和broker一致)

2.2 创建相关目录

mkdir -p /opt/soft/kafka_cluster/kafka1
mkdir -p /opt/soft/kafka_cluster/kafka2
mkdir -p /opt/soft/kafka_cluster/kafka3sudo chmod -R 777 /opt/soft/kafka_cluster/kafka1
sudo chmod -R 777 /opt/soft/kafka_cluster/kafka2
sudo chmod -R 777 /opt/soft/kafka_cluster/kafka3

2.3 docker-compose.yaml

  1. 部署前准备
# 1. 编辑.env 文件 注意HOST替换为主机ip
CLUSTER_ID=y2OnVDLzRYyxh3V156gnxw
HOST=192.168.1.151
  1. 编写docker-compose.yaml文件
version: "3.2"
services:kafka1:image: "bitnami/kafka:3.8.0"container_name: kafka11restart: alwaysuser: rootports:- 9192:9092 # 外部客户端访问端口- 9193:9093 # Controller监听端口,内部通信environment:### 通用KRaft配置 #### 允许使用kraft,使用kraft模式- KAFKA_ENABLE_KRAFT=yes# kafka角色,同时做为broker和controller[指定Kafka进程的角色。在KRaft模式下,节点可以同时是broker和controller,或者是其中一种。]- KAFKA_CFG_PROCESS_ROLES=broker,controller# 指定供外部使用的控制类请求信息[指定用于控制器通信的监听器名称。]- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER# 定义Kafka监听的协议、接口和端口。内部通信和外部客户端访问的端口可以分开。- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093# 将监听器名称映射到安全协议。- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}      # 定义参与控制器选举的所有投票者地址。格式为id@host:port,其中id是每个节点的BROKER_ID。- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用- ALLOW_PLAINTEXT_LISTENER=yes### Broker特定配置 #### 定义客户端如何连接到Kafka。通常设置为宿主机的IP或域名和映射后的端口。- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9192# broker.id,必须唯一- KAFKA_BROKER_ID=1### 可选:资源与性能 #### 允许自动创建主题,建议生产环境关闭- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true# node id 唯一并且和broker一致- KAFKA_CFG_NODE_ID=1volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka1:/bitnami/kafkanetworks:- kafka-netkafka2:image: "bitnami/kafka:3.8.0"container_name: kafka22restart: alwaysuser: rootports:- 9292:9092- 9293:9093environment:- KAFKA_ENABLE_KRAFT=yes- KAFKA_CFG_PROCESS_ROLES=broker,controller- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9292- KAFKA_BROKER_ID=2- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true- KAFKA_CFG_NODE_ID=2volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka2:/bitnami/kafkanetworks:- kafka-netkafka3:image: "bitnami/kafka:3.8.0"container_name: kafka33restart: alwaysuser: rootports:- 9392:9092- 9393:9093environment:- KAFKA_ENABLE_KRAFT=yes- KAFKA_CFG_PROCESS_ROLES=broker,controller- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9392- KAFKA_BROKER_ID=3- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true- KAFKA_CFG_NODE_ID=3volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka3:/bitnami/kafkanetworks:- kafka-net# 可选:Kafka UI 管理工具kafka-ui:image: provectuslabs/kafka-ui:latestcontainer_name: kafka-uirestart: alwaysports:- "9080:8080"environment:- KAFKA_CLUSTERS_0_NAME=local-kraft-cluster- KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092# 如果Kafka配置了认证,需在此设置depends_on:- kafka1- kafka2- kafka3networks:- kafka-netnetworks:kafka-net:driver: bridgeipam:config:- subnet: 172.30.0.0/16

2.4 验证集群状态

  1. 进入其中一个容器,使用Kafka命令行工具验证集群是否正常运行。
docker exec -it kafka11 /bin/bash# 进入容器后,列出主题(此时应为空)
kafka-topics.sh --bootstrap-server localhost:9092 --list
  1. 创建一个测试主题
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 3 --replication-factor 3
  1. 查看主题详情,确认分区和副本分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topicTopic: test-topic       TopicId: 2ChsTY_IT1KiuC0AP8-25w PartitionCount: 3       ReplicationFactor: 3    Configs:Topic: test-topic       Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3      Elr:    LastKnownElr:Topic: test-topic       Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1      Elr:    LastKnownElr:Topic: test-topic       Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2      Elr:    LastKnownElr:
  1. 测试消息生产与消费
# 在一个终端运行生产者
docker exec -it kafka11 /bin/bashkafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

docker exec -it kafka11 /bin/sh# 在另一个终端运行消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

在生产端输入消息,应该在消费端能看到。
5. 访问Kafka UI(如果部署了):
打开浏览器访问 http://your-host-ip:9080,应该能看到Kafka UI界面,并可以查看集群、主题、消费者组等信息。

2.5 开放相关端口

# kafka1相关端口
firewall-cmd --zone=public --add-port=9192/tcp --permanent
firewall-cmd --zone=public --add-port=9193/tcp --permanent# kafka2相关端口
firewall-cmd --zone=public --add-port=9292/tcp --permanent
firewall-cmd --zone=public --add-port=9293/tcp --permanent## kafka3相关端口
firewall-cmd --zone=public --add-port=9392/tcp --permanent
firewall-cmd --zone=public --add-port=9393/tcp --permanentfirewall-cmd --zone=public --add-port=9080/tcp --permanent# 重启防火墙
firewall-cmd --reload# 查看开放的端口
firewall-cmd --list-port
http://www.dtcms.com/a/360967.html

相关文章:

  • Ruoyi项目MyBatis升级MyBatis-Plus指南
  • sentinel异常处理机制
  • 2025机器人产业大洗牌:新兴初创企业的技术革命与崛起之路
  • 【Spring Cloud微服务】8.深度实战:微服务稳定性的守护神——Sentinel
  • Linux下usb设备驱动框架实现:定义核心结构体数据
  • 从Java全栈开发到微服务架构:一次真实的面试实录
  • leetcode算法刷题的第二十三天
  • GitLab 18.3 正式发布,更新多项 DevOps、CI/CD 功能【一】
  • Linux上perf工具的使用-基础采样
  • 云端虚拟云手机该如何进行使用?
  • 高并发场景下的热点数据处理:从预热到多级缓存的性能优化实践
  • 云手机和云游戏之间有着哪些区别?
  • 手机版碰一碰发视频源码搭建,技术实现与实操指南
  • 基于大数据的京东手机销售数据 可视化分析设计与开发03446原创的定制程序,java、PHP、python、C#小程序、文案全套、毕设程序定制、成品等
  • 【音视频】WebRTC QoS 概述
  • 云端虚拟手机:云手机的原理是什么?
  • 【科普向-第七篇】Git全家桶介绍:Git/Gitlab/GitHub/TortoiseGit/Sourcetree
  • 电动两轮车手机导航投屏方案调研报告
  • vscode翻译插件
  • hasOwnProperty是什么
  • Linux部署OSM本地服务测试环境
  • Linux UDisks守护进程曝本地提权漏洞CVE-2025-8067,PoC已发布
  • [Android] 京墨 v1.15.2 —— 古诗词文、汉语字典、黄历等查询阅读学习宝典(可离线)
  • 别再说AppInventor2只能开发安卓了!苹果iOS现已支持!
  • AI-调查研究-66-机器人 机械臂 软件算法体系:轨迹规划·视觉定位·力控策略
  • HarmonyOS 应用开发深度实践:深入 Stage 模型与 ArkTS 声明式 UI
  • STM32-FreeRTOS操作系统-任务创建
  • 开源AI大模型AI智能名片S2B2C商城小程序赋能下的“信息找人“:人工智能驱动的线下零售精准化革命
  • 高效大规模创新3D重建模型iLRM
  • 【STM32】贪吃蛇 [阶段 3] 增强模块结构(架构优化)