当前位置: 首页 > news >正文

在 k8s 上部署 Kafka 4.0 3节点集群

本文详细介绍如何在 K3s 环境中部署一个 3 节点的 Apache Kafka 4.0.0 集群(对应 Confluent Platform 8.0.0),使用 KRaft 模式消除对 ZooKeeper 的依赖。

镜像与版本选择

​​confluentinc/cp-kafka:8.0.0​​:Confluent 官方镜像,提供企业级特性
Kafka 4.0.x 移除了对 ZooKeeper 的依赖(KRaft 模式 GA),带来显著性能提升和运维简化

docker pull confluentinc/cp-kafka:8.0.0

Kafka集群部署架构

  • 3节点Kafka集群​​:利用 StatefulSet 的稳定网络标识(kafka-0、kafka-1、kafka-2)
  • Pod反亲和性​​:确保3个副本分散在不同k3s节点(221、222、223)
  • KRaft模式​​:无需ZooKeeper,简化部署架构
  • 数据持久化​​:每个节点使用独立PVC存储数据

k8s配置文件

Kafka集群StatefulSet, Service配置

# kafka-cluster-3nodes.yaml
apiVersion: v1
kind: Service
metadata:name: kafka-headlessnamespace: kafkalabels:app: kafka
spec:selector:app: kafkaports:- name: plaintextport: 9092targetPort: 9092- name: controllerport: 9093targetPort: 9093clusterIP: None  # Headless Service用于Pod直接通信publishNotReadyAddresses: true  # 允许访问未就绪的Pod
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafkanamespace: kafka
spec:serviceName: kafka-headless  # 关联Headless Servicereplicas: 3  # 3节点集群selector:matchLabels:app: kafkaupdateStrategy:type: RollingUpdate  # 滚动更新策略template:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: appoperator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"  # 确保Pod分散在不同节点containers:- name: kafkaimage: mydockerregistry.com:5000/confluentinc/cp-kafka:8.0.0imagePullPolicy: IfNotPresentcommand: ["/bin/sh", "-c"]args:- |export KAFKA_NODE_ID=${HOSTNAME##*-}  # 从Pod名称提取节点IDecho "Starting Kafka with NODE_ID: $KAFKA_NODE_ID"exec /etc/confluent/docker/runports:- name: plaintextcontainerPort: 9092  # 客户端通信端口- name: controllercontainerPort: 9093  # 控制器通信端口- name: externalcontainerPort: 31092  # 外部访问端口hostPort: 31092  # 使用hostPort暴露到节点env:# 关键环境变量配置- name: POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name- name: K8S_NODE_IPvalueFrom:fieldRef:fieldPath: status.hostIP  # 获取节点IP用于外部访问- name: CLUSTER_IDvalue: "LTI4QjFBNTcwNTJENDM2Qk"  # 集群唯一标识- name: KAFKA_HEAP_OPTSvalue: "-Xms512m -Xmx512m"  # JVM内存配置- name: KAFKA_PROCESS_ROLESvalue: "broker,controller"  # 节点同时担任broker和controller- name: KAFKA_CONTROLLER_LISTENER_NAMESvalue: "CONTROLLER"- name: KAFKA_LISTENERSvalue: "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:31092"- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAPvalue: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"- name: KAFKA_ADVERTISED_LISTENERS# 内部使用Pod DNS,外部使用节点IPvalue: "PLAINTEXT://$(POD_NAME).kafka-headless.kafka:9092,EXTERNAL://$(K8S_NODE_IP):31092"- name: KAFKA_INTER_BROKER_LISTENER_NAMEvalue: "PLAINTEXT"- name: KAFKA_CONTROLLER_QUORUM_VOTERS# 配置3个控制器投票器value: "0@kafka-0.kafka-headless.kafka:9093,1@kafka-1.kafka-headless.kafka:9093,2@kafka-2.kafka-headless.kafka:9093"- name: KAFKA_LOG_DIRSvalue: "/var/lib/kafka/data"- name: KAFKA_MESSAGE_MAX_BYTESvalue: "10485760"  # 10MB最大消息大小- name: KAFKA_REPLICA_FETCH_MAX_BYTESvalue: "10485760"livenessProbe:tcpSocket:port: 9092  # 仅保留存活探针,去掉就绪探针initialDelaySeconds: 60  # 给予足够启动时间periodSeconds: 10timeoutSeconds: 5failureThreshold: 3volumeMounts:- name: kafka-datamountPath: /var/lib/kafka/datavolumeClaimTemplates:- metadata:name: kafka-dataspec:accessModes: [ "ReadWriteOnce" ]storageClassName: "local-path"  # 使用k3s默认的local-path存储类resources:requests:storage: 4Gi  # 每个节点4GB存储

Kafka主题创建Job配置

# kafka-topics-create.yaml
apiVersion: batch/v1
kind: Job
metadata:name: kafka-topics-createnamespace: kafka
spec:backoffLimit: 100  # 允许重试多次template:spec:restartPolicy: Nevercontainers:- name: create-topicsimage: mydockerregistry.com:5000/confluentinc/cp-kafka:8.0.0command:- sh- -c- |# 等待所有Kafka节点就绪KAFKA_NODES="kafka-0.kafka-headless.kafka:9092 kafka-1.kafka-headless.kafka:9092 kafka-2.kafka-headless.kafka:9092"echo "Waiting for all Kafka nodes to be ready..."for node in $KAFKA_NODES; dountil kafka-topics --list --bootstrap-server $node > /dev/null 2>&1; doecho "Waiting for $node..."sleep 10donedone# 创建业务所需主题kafka-topics --create --bootstrap-server kafka-0.kafka-headless.kafka:9092 --topic up.egw.telemetry \--partitions 6 --replication-factor 3 --config retention.ms=3600000 --if-not-existskafka-topics --create --bootstrap-server kafka-0.kafka-headless.kafka:9092 --topic up.egw.model \--partitions 6 --replication-factor 3 --config retention.ms=604800000 --if-not-existskafka-topics --create --bootstrap-server kafka-0.kafka-headless.kafka:9092 --topic up.egw.alert \--partitions 6 --replication-factor 3 --config retention.ms=604800000 --if-not-existskafka-topics --create --bootstrap-server kafka-0.kafka-headless.kafka:9092 --topic up.egw.notify \--partitions 6 --replication-factor 3 --config retention.ms=3600000 --if-not-existskafka-topics --create --bootstrap-server kafka-0.kafka-headless.kafka:9092 --topic down.egw.notify \--partitions 6 --replication-factor 3 --config retention.ms=3600000 --if-not-existskafka-topics --create --bootstrap-server kafka-0.kafka-headless.kafka:9092 --topic audit \--partitions 1 --replication-factor 3 --config retention.ms=604800000 --if-not-existsecho "All topics created"

部署命令

# 创建命名空间
kubectl create ns kafka# 部署Kafka集群
kubectl apply -f kafka-cluster-3nodes.yaml# 等待集群就绪后创建主题
kubectl apply -f kafka-topics-create.yaml# 检查部署状态
kubectl get pods -n kafka -w
kubectl logs -n kafka -l app=kafka -f

部署遇到问题和解决方法

问题1: 集群启动失败,Pod不Ready, 集群就绪探针阻碍初始化

​​原因​​:Kafka KRaft 集群在初始化阶段需要节点互相发现才能监听端口,就绪探针会误判节点未就绪
​​解决方案​​:临时移除就绪探针,仅保留存活探针,并增加初始延迟

问题2: 所有Kafka节点的外部地址(EXTERNAL)配置为同一个地址时, 从外部订阅会失败, 但发布正常

​​原因​​:Kafka 的 advertised.listeners必须与客户端实际连接地址严格匹配, 3个节点应该配3个独立外部地址
​​解决方案​​:为每个节点配置独立的外部访问地址 (仅临时测试用, 生产环境不提供外部地址)

# 使用hostPort为每个Pod暴露独立端口
ports:
- name: externalcontainerPort: 31092hostPort: 31092  # 直接绑定到节点端口# 广告监听器使用节点IP和固定端口
- name: KAFKA_ADVERTISED_LISTENERSvalue: "PLAINTEXT://$(POD_NAME).kafka-headless.kafka:9092,EXTERNAL://$(K8S_NODE_IP):31092"

内部访问:使用 kafka-0.kafka-headless.kafka:9092等内部DNS
外部访问:使用 k8s节点IP:NodePort(31092)连接指定的broker

部署脚本参考:

完整脚本可查看 GitHub 仓库:https://github.com/PCJ600/kafka-deploy

http://www.dtcms.com/a/390581.html

相关文章:

  • k8s 部署 EMQX 5.8.6 静态三节点集群
  • UVa1374/LA3621 Power Calculus
  • 以 NoETL 重塑 AI-Ready 的数据底座,Aloudata 获评 IDC 面向生成式 AI 的数据基础设施核心厂商
  • 声音转文字API平台推荐
  • Vue3: watch watchEffect
  • 梯度提升算法及其在回归与分类中的应用实战
  • 【自然语言处理与大模型】大模型应用开发四个场景
  • 深度神经网络-传播原理
  • 交通仿真术语
  • 关于Oracle主外键约束的几个SQL语句
  • Python 操作 SQLite:Peewee ORM 与传统 sqlite3.connect 的全方位对比
  • go资深之路笔记(四)中间件(Middleware)设计模式
  • MySQL分库分表迁移:ETL平台如何实现数据合并与聚合
  • [极客大挑战 2019]BabySQL
  • SQL-索引使用
  • 数据库和数据仓库有什么区别
  • SpringBoot2.7X整合Swagger、Redission3.X的bug
  • uniapp安卓原生插件实现开启ble Server[外围模式]
  • React 18.2中使用React Router 6.4
  • 人员在岗监测技术研究:基于计算机视觉的智能监管方案
  • 实测AI Ping,一个大模型服务选型的实用工具——行业实践与深度优化策略
  • 通过QuickAPI优化金融系统API:安全快捷的数据共享最佳实践
  • 第4节 添加视频字幕到剪映(Coze扣子空间剪映小助手零基础教程)
  • 算法 --- BFS 解决 FloodFill 算法
  • telnet 一个 ip+端口却无法退出 着急
  • UVa1602/LA3224 Lattice Animals
  • Docker BuildKit 实现 Golang 编译加速
  • [x-cmd] 在 Android 的 Termux 和 iOS 的 iSH 中安装 X-CMD
  • CTFSHOW 中期测评(一)web486 - web501
  • android-USB-STM32