当前位置: 首页 > news >正文

实战指南:构建高可用生产级Kafka集群的完整教程

文章目录

      • **一、多节点 Kafka 集群部署**
        • 1. 环境准备
        • 2. ZooKeeper 集群配置
        • 3. Kafka 集群配置
      • **二、性能调优**
        • 1. JVM 参数优化
        • 2. Kafka 参数优化
      • **三、集群监控与管理**
        • 1. 使用 EFAK(kafka eagle)(可视化工具)
      • **四、灾难恢复验证**
      • **五、关键注意事项**

前文教程:Apache Kafka单节点极速部署指南:10分钟搭建开发单节点环境-CSDN博客

一、多节点 Kafka 集群部署

1. 环境准备

此处需注意,如已经安装测试过Kafka单节点,请在安装集群之前清空之前配置的目录

sudo rm -r /var/lib/zookeeper
sudo rm -r /var/lib/kafka-logs
sudo mkdir -p /var/lib/{zookeeper,kafka-logs}
sudo chown -R kafka:kafka /var/lib/{zookeeper,kafka-logs} /opt/kafka
  • 3台服务器:假设 IP 为 192.168.1.101, 192.168.1.102, 192.168.1.103
  • 统一安装 Kafka:在每台服务器重复 之前的安装步骤,但是暂时不启动各个服务,确保路径一致(如 /opt/kafka
  • 同步配置:确保所有节点配置文件的 zookeeper.connect 指向相同的 ZooKeeper 集群
2. ZooKeeper 集群配置
  1. 每台 ZooKeeper 节点配置

    config/zookeeper.properties

    # 基础时间单位(毫秒),默认 2000
    tickTime=2000
    
    # Leader 等待 Follower 初始连接的最长时间(tickTime 的倍数)
    initLimit=5
    
    # Leader 与 Follower 心跳同步的最大延迟时间(tickTime 的倍数)
    syncLimit=2
    
    # 其他原有配置保持不变
    dataDir=/var/lib/zookeeper
    clientPort=2181
    maxClientCnxns=0
    admin.enableServer=false
    # 集群节点列表(所有 ZooKeeper 节点需一致)
    server.1=192.168.1.101:2888:3888
    server.2=192.168.1.102:2888:3888
    server.3=192.168.1.103:2888:3888
    
  2. 创建 myid 文件(每个节点唯一):

    # 在 192.168.1.101 上执行
    echo "1" > /var/lib/zookeeper/myid
    
    # 在 192.168.1.102 上执行
    echo "2" > /var/lib/zookeeper/myid
    
    # 在 192.168.1.103 上执行
    echo "3" > /var/lib/zookeeper/myid
    

    此处要确认myid,对于用户kafka有权限。

    image-20250304181404183

  3. 启动 ZooKeeper 集群

    # 所有节点执行
    sudo systemctl start zookeeper
    # 检查集群状态(任一节点执行)
    echo srvr | nc <服务器IP> 2181 | grep Mode
    # 应输出 "leader" 或 "follower"
    

    img_v3_02k3_727f6e38-598c-47c5-b5da-2f7c2c938a3g

    img_v3_02k3_8f01b6f5-202c-47a0-b59e-61de268511fg

  4. 测试集群是否创建成功

    #模拟选举事件
    # 停止当前 Leader
    sudo systemctl stop zookeeper
    
    # 查看其他节点日志
    echo srvr | nc <服务器IP> 2181 | grep Mode
    

    img_v3_02k3_d0453183-b72c-486f-a394-6bace7d388fg

    可以看到Leader已经从133转到了134节点。

    #数据同步验证
    #在Leader节点创建临时节点并写入数据data
    /opt/kafka/bin/zookeeper-shell.sh 192.168.6.134:2181 <<< "create -e /test-node 'data'"
    ## 在 Follower 节点检查数据
    /opt/kafka/bin/zookeeper-shell.sh 192.168.6.133:2181 <<< "get /test-node"
    

    img_v3_02k3_05158fc0-be96-412e-b1c5-7719a89560cg

    img_v3_02k3_a4435d6f-8aae-43fe-8d44-a915b092026g

3. Kafka 集群配置
  1. 修改每台 Kafka 节点的 server.properties
    此处的其他配置于上一个教程一样即可。

    # 节点1 (192.168.1.101)
    broker.id=1
    listeners=PLAINTEXT://192.168.1.101:9092
    advertised.listeners=PLAINTEXT://192.168.1.101:9092
    zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181
    
    # 节点2 (192.168.1.102)
    broker.id=2
    listeners=PLAINTEXT://192.168.1.102:9092
    advertised.listeners=PLAINTEXT://192.168.1.102:9092
    zookeeper.connect=同上
    
    # 节点3 (192.168.1.103)
    broker.id=3
    listeners=PLAINTEXT://192.168.1.103:9092
    advertised.listeners=PLAINTEXT://192.168.1.103:9092
    zookeeper.connect=同上
    

    关键参数解释

    • advertised.listeners:客户端实际连接的地址(如果跨网络需配置公网IP或域名)
    • zookeeper.connect:所有 ZooKeeper 节点地址,用逗号分隔
  2. 启动 Kafka 集群

    # 所有节点执行
    sudo systemctl start kafka
    
    # 检查 Broker 是否注册到 ZooKeeper
    /opt/kafka/bin/zookeeper-shell.sh <服务器IP>:2181 ls /brokers/ids
    # 应输出 [1,2,3]
    

    image-20250305111129936

  3. 创建 Topic(验证集群)

    # 在任意节点执行
    /opt/kafka/bin/kafka-topics.sh --create \
      --topic cluster-test \
      --bootstrap-server 192.168.1.101:9092 \
      --partitions 3 \
      --replication-factor 3  # 副本数≤Broker数量
    
    # 查看 Topic 详情
    /opt/kafka/bin/kafka-topics.sh --describe \
      --topic cluster-test \
      --bootstrap-server 192.168.1.101:9092
    

    期望输出:每个分区的 LeaderReplicas 分布在多个 Broker 上。


二、性能调优

1. JVM 参数优化

编辑 Kafka 启动脚本 bin/kafka-server-start.sh

# 修改这一行
export KAFKA_HEAP_OPTS="-Xms3G -Xmx3G -XX:MetaspaceSize=96m -XX:+UseG1GC"
  • Xms/Xmx:堆内存(建议不超过物理内存的 50%)
  • UseG1GC:G1 垃圾回收器(适合大内存)
2. Kafka 参数优化
# server.properties
num.network.threads=8  # 网络线程数(默认3)
num.io.threads=16      # IO 线程数(默认8)
log.flush.interval.messages=10000  # 累积多少消息后刷盘
log.flush.interval.ms=1000         # 最多等待多久刷盘

三、集群监控与管理

1. 使用 EFAK(kafka eagle)(可视化工具)
  1. 安装

    访问官网下载安装包:EFAK

    image-20250306104505835

    下载后上传至服务器解压

    tar -zvxf kafka-eagle-bin-3.0.1.tar.gz
    cd kafka-eagle-bin-3.0.1/
    tar -zxvf efak-web-3.0.1-bin.tar.gz
    mv efak-web-3.0.1 /opt/efak
    
  2. 配置Mysql用来储存元数据

    CREATE DATABASE ke_paco DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
    CREATE USER 'ke_paco'@'%' IDENTIFIED BY 'paco123';
    GRANT ALL PRIVILEGES ON ke_paco.* TO 'ke_paco'@'%';
    FLUSH PRIVILEGES; -- 刷新权限生效
    
  3. 设置环境变量

    vim /etc/profile
    

    写入如下内容,配置jdkEFAK的环境变量:

    export KE_HOME=/opt/efak
    export PATH=$PATH:$KE_HOME/bin
    
    export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
    export PATH=$JAVA_HOME/bin:$PATH
    
  4. 修改配置文件

    cd /opt/efak/conf
    vim system-config.properties
    

    配置zookeeper地址:

    efak.zk.cluster.alias=cluster1
    cluster1.zk.list=192.168.6.131:2181,192.168.6.133:2181,192.168.6.134:2181
    

    此处建议复制kafka配置文件的zookeeper配置。

    配置Mysql:

    efak.driver=com.mysql.cj.jdbc.Driver
    efak.url=jdbc:mysql://<mysql ip>:3306/ke_paco?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
    efak.username=ke_paco
    efak.password=paco123
    
  5. 开启JMX监控
    修改Kafka启动配置,参考如下,按自己需要修改:

    vi /opt/kafka/bin/kafka-server-start.sh
    
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70 -Djava.rmi.server.hostname=<改为当前节点IP> -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
        export JMX_PORT="9999"
    

    image-20250306111826958

  6. 启动EFAK
    如若Kakfa未作任何其他配置,则可直接启动

    $KE_HOME/bin/ke.sh start
    

    启动成功后如图:

    image-20250306114521739

    访问UI界面:

    img_v3_02k4_c4694e2d-5c4d-47b4-b37f-8fc8ec75303g

    img_v3_02k4_fcd27012-fa82-4d39-9f37-dab21e3c00cg

    image-20250306114509512


四、灾难恢复验证

  1. 创建 Topic
    在 Kafka 集群中创建一个测试 Topic,用于后续的测试。

    # 创建 Topic,指定分区数和副本因子
    /opt/kafka/bin/kafka-topics.sh --create \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092 \
      --partitions 3 \
      --replication-factor 2
    
    # 查看 Topic 详情,确认创建成功
    /opt/kafka/bin/kafka-topics.sh --describe \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092
    

    输出示例
    835bfce2-a162-4cb5-8f8b-59c4e5625983

  2. 写入消息
    向 Topic 中写入测试消息,用于验证数据持久化和消费。
    操作步骤

    # 使用 kafka-console-producer 写入消息
    echo "test message 1" | /opt/kafka/bin/kafka-console-producer.sh \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092
    
    # 写入多条消息
    for i in {1..10}; do
      echo "test message $i" | /opt/kafka/bin/kafka-console-producer.sh \
        --topic cluster-test \
        --bootstrap-server 192.168.6.131:9092
    done
    

    f8392450-0e53-4e78-91c8-612661727aa6

  3. 消费消息
    启动一个消费者(任意其他节点),验证消息是否成功写入。
    操作步骤

    # 使用 kafka-console-consumer 消费消息
    /opt/kafka/bin/kafka-console-consumer.sh \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092 \
      --from-beginning
    

    25e404b8-8ba7-4528-9b4a-62509d490bbc

  4. 模拟 Broker 宕机

    停止一个 Broker,观察分区 Leader 是否切换。

    操作步骤

    # 停止 Broker 2
    sudo systemctl stop kafka  # 假设 Broker 2 的 ID 是 2
    
    # 查看 Topic 分区状态,确认 Leader 切换
    /opt/kafka/bin/kafka-topics.sh --describe \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092
    

    faa4ff58-a327-494e-a480-d3d38a1b5028说明

    • 停止 Broker 2 后,分区 0 的 Leader 仍然是 Broker 1,分区 1 的 Leader 从 Broker 2 切换到了 Broker 3。
  5. 验证数据持久化

    重启所有 Broker,验证消息是否仍然可以消费。

    操作步骤

    # 重启所有 Broker
    sudo systemctl restart kafka
    
    # 等待 Broker 重新启动(约 30 秒)
    sleep 30
    
    # 再次消费消息,验证数据是否保留
    /opt/kafka/bin/kafka-console-consumer.sh \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092 \
      --from-beginning
    

    输出示例

    f803bb1a-e543-463c-8871-cc5763de16d7

  6. 验证分区恢复

    确认停止的 Broker 重新加入集群后,分区状态恢复正常。

    操作步骤

    # 查看 Topic 分区状态
    /opt/kafka/bin/kafka-topics.sh --describe \
      --topic cluster-test \
      --bootstrap-server 192.168.6.131:9092
    

    image-20250306131951590


五、关键注意事项

  1. 网络防火墙

    • 开放 ZooKeeper 端口:2181(客户端), 2888(节点间通信), 3888(选举)
    • 开放 Kafka 端口:9092(明文), 9999(JMX)
  2. 时间同步

    # 所有节点安装 NTP
    sudo yum install ntp -y && sudo systemctl enable --now ntpd
    
  3. 定期清理日志

    # 手动触发日志删除
    /opt/kafka/bin/kafka-log-dirs.sh \
      --bootstrap-server 192.168.1.101:9092 \
      --describe --topic-list cluster-test
    

通过以上步骤,您将获得一个高可用、安全的 Kafka 生产级集群,并具备基础的监控和容灾能力。

相关文章:

  • 关于OceanBase与CDH适配的经验分享
  • 【北京迅为】iTOP-RK3568OpenHarmony系统南向驱动开发GPIO基础知识
  • 深色系B端系统界面,在何种场景下更加适合?
  • 西门子1200:ModbusRTU-威纶通变频器
  • 量子布尔运算:AI与Python的量子世界探秘
  • 在MWC2025,读懂华为如何以行践言
  • 在Spring Boot项目中分层架构
  • 10-Agent循环分析新闻并输出总结报告
  • 《Python基础教程》第5章笔记:条件、循环及其他语句
  • AT89S51 单片机手册解读:架构、功能与应用深度剖析
  • 【GoTeams】-1:项目基础搭建
  • 【HDLbits--counter】
  • DeepSeek开源Day4:DualPipeEPLB技术详解
  • GitHub CI流水线
  • Element-ui菜单名字过长,显示省略号,鼠标悬停显示
  • 微信小程序调用阿里云的大规模模型+后端 python 实现人与人工智能进行对话
  • C++ Primer 拷贝控制和资源管理
  • 【大学生体质】智能 AI 旅游推荐平台(Vue+SpringBoot3)-完整部署教程
  • PostgreSQL中的事务隔离
  • RK3568平台(GPIO篇)Android平台集成libgpiod库
  • 广州番禺疫情/百度seo按天计费
  • wordpress日志在哪个文件/关键词首页排名优化公司推荐
  • 荆门哪里有专门做企业网站的/制作网站模板
  • 一个域名一个ip做多个网站/百度sem推广具体做什么
  • 厦门做网站排名/宁波seo软件免费课程
  • 牡丹花网站建设策划书/全网推广的方式有哪些