当前位置: 首页 > news >正文

Rocky Linux 10 部署 Kafka 集群

1.概述

Apache Kafka 是一个开源的消息系统,它提供了一个分布式、分区的、可复制的提交日志服务。

2.特点

  • 消息(Message):Kafka 中最基本的数据单元是消息。每条消息都是一个键值对,键和值可以是任意的字节数组。
  • 主题(Topic):消息的类别被称为 Topic。生产者将消息发布到特定的 Topic,而消费者订阅一个或多个 Topic 来接收消息。
  • 分区(Partition):每个 Topic 可以分为多个 Partition。Partition 不仅允许消息通过多个消费者并发处理,还提供了 Kafka 的扩展性和顺序保证。每个 Partition 都是一个有序的、不可变的消息序列。
  • 偏移量(Offset):每条消息在它所属的 Partition 中都有一个唯一的序列号称为 Offset。消费者通过 Offset 来跟踪它们已经消费了哪些消息。
  • 生产者(Producer):负责将消息发布到 Kafka 的 Topic 中。生产者可以选择将消息发送到特定的 Partition。
  • 消费者(Consumer):订阅一个或多个 Topic 并处理发布的消息。消费者追踪它们所消费的消息 Offset,并定期提交这些信息到 Kafka 或外部存储中。
  • 经纪人(Broker):Kafka 集群由一个或多个服务器组成,这些服务器被称为 Brokers。每个 Broker 可以容纳多个 Topic 和 Partition。
  • 日志(Log):从实现的角度来看,Partition 实际上就是一个分布式提交日志。新的消息不断被追加到 Partition 的末尾。
  • 复制(Replication):为了容错,Kafka 允许为每个 Partition 设置一个或多个副本。这样即使某些 Broker 出现故障,系统也能继续运行。
  • 领导者(Leader)与追随者(Follower):对于每个 Partition,Kafka 选择一个副本作为 Leader,其余副本则作为 Followers。Leader 负责处理读写请求,Follower 则从 Leader 同步数据。

3.系统环境准备

3.1集群信息
节点IP地址Kafka版本
node1192.168.100.52.13-4.0.0
node2192.168.100.62.13-4.0.0
node3192.168.100.72.13-4.0.0
3.2IP配置

配置node1节点IP地址

cat > /etc/NetworkManager/system-connections/ens160.nmconnection <<EOF
[connection]
id=ens160
type=ethernet
autoconnect-priority=-999
interface-name=ens160[ethernet][ipv4]
address1=192.168.100.5/24
dns=114.114.114.114;8.8.8.8;
gateway=192.168.100.254
may-fail=false
method=manual[ipv6]
addr-gen-mode=eui64
method=ignore[proxy]
EOF

配置node2节点IP地址

cat > /etc/NetworkManager/system-connections/ens160.nmconnection <<EOF
[connection]
id=ens160
type=ethernet
autoconnect-priority=-999
interface-name=ens160[ethernet][ipv4]
address1=192.168.100.6/24
dns=114.114.114.114;8.8.8.8;
gateway=192.168.100.254
may-fail=false
method=manual[ipv6]
addr-gen-mode=eui64
method=ignore[proxy]
EOF

配置node3节点IP地址

cat > /etc/NetworkManager/system-connections/ens160.nmconnection <<EOF
[connection]
id=ens160
type=ethernet
autoconnect-priority=-999
interface-name=ens160[ethernet][ipv4]
address1=192.168.100.7/24
dns=114.114.114.114;8.8.8.8;
gateway=192.168.100.254
may-fail=false
method=manual[ipv6]
addr-gen-mode=eui64
method=ignore[proxy]
EOF
3.3修改主机名

node1上执行

hostnamectl set-hostname node1

node2上执行

hostnamectl set-hostname node2

node3上执行

hostnamectl set-hostname node3
3.4关闭防火墙

3个节点均需要执行

systemctl stop firewalld
systemctl disable  firewalld
systemctl status  firewalld
3.5关闭SELINUX

3个节点均需要执行

setenforce 0
sed -i "s/^SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config
3.6DNF源配置

3个节点均需要执行

sed -e 's|^mirrorlist=|#mirrorlist=|g'     -e 's|^#baseurl=http://dl.rockylinux.org/$contentdir|baseurl=https://mirrors.aliyun.com/rockylinux|g'     -i.bak     /etc/yum.repos.d/rocky*.repo
3.7时间同步配置

3个节点均需要执行

dnf install -y chrony
sed -i 's|pool[[:space:]]*2\.rocky\.pool\.ntp\.org[[:space:]]*iburst|server ntp.aliyun.com iburst|' /etc/chrony.conf
systemctl restart chronyd
chronyc sources
3.8主机映射配置

3个节点均需要执行

tee >> /etc/hosts <<EOF
192.168.100.5   node1
192.168.100.6   node2
192.168.100.7   node3
EOF

4.软件安装

4.1安装 Kafka 集群
4.1.1下载Kafka

3个节点均需要执行

wget https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz
4.1.2解压Kafka

3个节点均需要执行

tar zxvf kafka_2.13-4.0.0.tgz -C /usr/local
ln -s kafka_2.13-4.0.0 kafka
4.1.3metadata.log.dir 目录创建及授权
mkdir -p /tmp/kafka-logs
chmod 755 /tmp/kafka-logs
4.1.4修改配置文件

node1上执行

cat > /usr/local/kafka/config/server.properties <<EOF
node.id=1# 节点角色:controller 和 broker
process.roles=controller,broker# 元数据日志存储目录(需提前创建并赋权)
metadata.log.dir=/tmp/kafka-logs# Controller 之间的投票节点列表(格式:id@host:port)
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093# 监听器定义:名称、协议、host:port
# 注意:CONTROLLER 是用于 controller 间通信的监听器
#       PLAINTEXT 是用于 broker 对外服务的监听器
listeners=PLAINTEXT://:9092,CONTROLLER://:9093# 指定用于 controller 通信的监听器名称(必须是 listeners 中的一个)
controller.listener.names=CONTROLLER# 监听器安全协议映射
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT# Broker 间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT# 客户端连接地址(外部可访问的 host:port)
advertised.listeners=PLAINTEXT://node1:9092
EOF

node2上执行

cat > /usr/local/kafka/config/server.properties <<EOF
node.id=2# 节点角色:controller 和 broker
process.roles=controller,broker# 元数据日志存储目录(需提前创建并赋权)
metadata.log.dir=/tmp/kafka-logs# Controller 之间的投票节点列表(格式:id@host:port)
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093# 监听器定义:名称、协议、host:port
# 注意:CONTROLLER 是用于 controller 间通信的监听器
#       PLAINTEXT 是用于 broker 对外服务的监听器
listeners=PLAINTEXT://:9092,CONTROLLER://:9093# 指定用于 controller 通信的监听器名称(必须是 listeners 中的一个)
controller.listener.names=CONTROLLER# 监听器安全协议映射
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT# Broker 间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT# 客户端连接地址(外部可访问的 host:port)
advertised.listeners=PLAINTEXT://node2:9092
EOF

node3上执行

cat > /usr/local/kafka/config/server.properties <<EOF
node.id=3# 节点角色:controller 和 broker
process.roles=controller,broker# 元数据日志存储目录(需提前创建并赋权)
metadata.log.dir=/tmp/kafka-logs# Controller 之间的投票节点列表(格式:id@host:port)
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093# 监听器定义:名称、协议、host:port
# 注意:CONTROLLER 是用于 controller 间通信的监听器
#       PLAINTEXT 是用于 broker 对外服务的监听器
listeners=PLAINTEXT://:9092,CONTROLLER://:9093# 指定用于 controller 通信的监听器名称(必须是 listeners 中的一个)
controller.listener.names=CONTROLLER# 监听器安全协议映射
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT# Broker 间通信使用的监听器名称
inter.broker.listener.name=PLAINTEXT# 客户端连接地址(外部可访问的 host:port)
advertised.listeners=PLAINTEXT://node3:9092
EOF
4.1.5生成集群 ID

node1上执行

bin/kafka-storage.sh random-uuid
4.1.6格式化存储目录

3个节点均需要执行

cd /usr/local/kafka
bin/kafka-storage.sh format -t abcd-1234-efgh-5678 -c config/server.properties

执行结果

Formatting metadata directory /tmp/kafka-logs with metadata.version 4.0-IV3.

并且在 /tmp/kafka-logs/ 目录下会生成一个 meta.properties 文件,内容如下

cat /tmp/kafka-logs/meta.properties 
#
#Tue Aug 12 14:35:28 CST 2025
node.id=1
directory.id=WLMkRpvo2ZQFqvmHsGv2aA
version=1
cluster.id=XzHNVPgSTVKRBqsFWEC6AQ
4.1.7启动Kafka

3个节点上执行

nohup bin/kafka-server-start.sh config/server.properties &
4.1.8查看KRaft元数据集群的状态
bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status

5.验证 Kafka 集群

5.1创建一个测试 topic

在任意节点上执行

/usr/local/kafka/bin/kafka-topics.sh --create \--topic ceshi-topic \--bootstrap-server node1:9092 \--partitions 3 \--replication-factor 3
5.2查看 topic 列表

在任意节点上执行

/usr/local/kafka/bin/kafka-topics.sh --list \--bootstrap-server node1:9092
5.3启动生产者测试

node1上执行

/usr/local/kafka/bin/kafka-console-producer.sh \--bootstrap-server node1:9092 \--topic test-topic输入测试内容,如,Hello,Kafka
5.4启动消费者测试

node2上执行

/usr/local/kafka/bin/kafka-console-consumer.sh \--bootstrap-server node1:9092 \--topic test-topic \--from-beginning输出内容:Hello,Kafka
http://www.dtcms.com/a/327034.html

相关文章:

  • 全国产飞腾d2000+复旦微690t信号处理模块
  • 微软发布GPT-5赋能的Copilot:重构办公场景的智能革命
  • 数字孪生重构园区管理效率:技术落地与产业升级的三重跃迁
  • 亚马逊优惠券视觉体系重构:颜色标签驱动的消费决策效率革命
  • Nginx 启用 HTTPS:阿里云免费 SSL 证书详细图文教程(新手0.5小时可完成)
  • 从基础编辑器到智能中枢:OpenStation 为 VSCode 注入大模型动力
  • 正向传播与反向传播(神经网络思维的逻辑回归)
  • 【R语言数据分析开发指南】
  • 读《精益数据分析》:UGC平台的数据指标梳理
  • 【跨服务器的数据自动化下载--安装公钥,免密下载】
  • TinyVue表格重构性能优化详解
  • STL容器的使用时机
  • Appium+Python 实现移动应用自动化测试:从基础到实战
  • STFT、log-mel、MFCC 的区别
  • 梳理一下实现3D显示的途径有哪些?
  • QT(概述、基础函数、界面类、信号和槽)
  • Qt之QMetaEnum的简单使用(含源码和注释)
  • [激光原理与应用-253]:理论 - 几何光学 - 变焦镜头的组成原理及图示解析
  • excel-随笔记
  • 单例模式,动态代理,微服务原理
  • [特殊字符]深度解析 FastMCP:重构MCP应用开发的全维度革命
  • 当机械臂装上「智能大脑」:Deepoc具身智能模型如何重构传统自动化​
  • 力扣经典算法篇-50-单词规律(双哈希结构+正反向求解)
  • 【Golang】pprof 使用介绍:从数据采集到可视化分析
  • windows版本:Prometheus+Grafana(普罗米修斯+格拉法纳)监控 JVM
  • Webpack 介绍与使用的详细介绍
  • ChatGpt 5系列文章1——编码与智能体
  • 地图可视化实践录:显示地理区域图
  • 【Bug经验分享】由jsonObject-TypeReference引发的序列化问题
  • 完整多端口 Nginx Docker部署 + GitLab Runner注册及标签使用指南