当前位置: 首页 > news >正文

Zookeeper 与 Kafka

文章目录

  • Zookeeper 与 Kafka 技术文档
    • 一、Zookeeper
      • 1.1 Zookeeper 概述
      • 1.2 Zookeeper 工作机制
      • 1.3 Zookeeper 数据结构
      • 1.4 Zookeeper 应用场景
      • 1.5 Zookeeper 选举机制
        • 1.5.1 集群节点角色定义
        • 1.5.2 选举核心规则与关键参数
          • 1. 关键参数详解
          • 2. 优先级判断逻辑示例
        • 1.5.3 第一次启动选举(集群初始化)
          • 1. 3 节点集群(SID=1、2、3)选举流程
          • 2. 5 节点集群(SID=1-5)选举核心差异
        • 1.5.4 非第一次启动选举(Leader 故障重选)
          • 1. 选举触发前提
          • 2. 重选流程示例(5 节点故障场景)
            • 场景预设
            • 重选步骤
          • 3. 重选核心特点
      • 1.6 部署 Zookeeper 集群
        • 1.6.1 部署环境(统一配置)
        • 1.6.2 安装前准备(所有节点执行)
        • 1.6.3 安装与配置 Zookeeper(zk01 先行,再同步至其他节点)
        • 1.6.4 配置启动脚本与开机自启(所有节点执行)
    • 二、Kafka
      • 2.1 为什么需要消息队列(MQ)
      • 2.2 使用消息队列的好处
      • 2.3 消息队列的两种模式
        • 2.3.1 点对点模式(P2P)
        • 2.3.2 发布/订阅模式(Pub/Sub)
      • 2.4 Kafka 定义与简介
      • 2.5 Kafka 的特性
      • 2.6 Kafka 系统架构
        • 关键细节补充
      • 2.7 部署 Kafka 集群
        • 2.7.1 前提条件
        • 2.7.2 下载与安装(zk01 先行,再同步至其他节点)
        • 2.7.3 配置环境变量与启动脚本(所有节点执行)
      • 2.8 Kafka 命令行操作(常用命令)
        • 1. 创建 Topic
        • 2. 查看所有 Topic
        • 3. 查看 Topic 详情
        • 4. 发布消息(生产者)
        • 5. 消费消息(消费者)
        • 6. 修改 Topic 分区数(仅支持增加,不支持减少)
        • 7. 删除 Topic(可选)
      • 2.9 Kafka 架构深入
        • 2.9.1 工作流程与文件存储机制
        • 2.9.2 数据可靠性保证
        • 2.9.3 数据一致性问题(LEO 与 HW)
          • 1. Follower 故障恢复
          • 2. Leader 故障恢复
        • 2.9.4 ACK 应答机制(生产者可靠性配置)
      • 2.10 Filebeat+Kafka+ELK 部署(日志收集实战)
        • 2.10.1 部署架构(补充)
        • 2.10.2 Elasticsearch 集群部署(Node1:192.168.100.140;Node2 可选,步骤同 Node1)
          • 1. 部署 Elasticsearch 软件
        • 2.10.3 安装 Elasticsearch-head 插件(Node1:192.168.100.140,可视化管理 ES)
        • 2.10.4 Logstash 部署(192.168.100.129,对接 Kafka 拉取日志)
        • 2.10.5 Kibana 部署(192.168.100.140,对接 ES 可视化日志)
        • 2.10.6 整体验证(可选)
    • 三、常见问题与注意事项

Zookeeper 与 Kafka 技术文档

本文档对原始文档进行了结构梳理、错误修正、代码注释补充,统一了配置参数(如 IP 地址、路径),规范了术语(如修正拼写错误 patitionpartitionFollwerFollower),并通过分级标题、表格、代码块提升可读性,适用于学习与实际部署参考。

一、Zookeeper

Zookeeper 是分布式系统的核心协调组件,专注于解决分布式环境下的一致性同步、配置管理、集群选主等问题,是 Kafka、Hadoop 等框架的依赖基础。

1.1 Zookeeper 概述

Zookeeper 是一个分布式协调服务,核心价值在于简化分布式系统的管理复杂度,提供以下核心能力:

  • 高效可靠的协调与同步(如分布式锁、集群 Leader 选举);
  • 统一配置管理(集中存储配置,动态推送更新至所有节点);
  • 故障恢复(通过集群机制保证服务高可用);
  • 层次化命名空间(类似文件系统,存储元数据,支持节点监听)。

其设计目标是确保分布式节点间的数据一致性与协同工作,避免单点故障。

1.2 Zookeeper 工作机制

Zookeeper 基于观察者模式设计,核心逻辑可概括为:Zookeeper = 文件系统 + 通知机制,具体流程如下:

  1. 数据存储:以树状结构存储数据(节点为 ZNode),所有分布式节点可访问这些“公共数据”;
  2. 观察者注册:客户端(如 Kafka Broker)向 Zookeeper 注册“节点监听”(如监听 /kafka/brokers 节点变化);
  3. 事件通知:当被监听的 ZNode 发生变更(新增/删除/修改数据)时,Zookeeper 主动通知所有注册的观察者,触发客户端后续逻辑(如 Kafka 感知 Broker 上下线)。

1.3 Zookeeper 数据结构

Zookeeper 数据模型类似 Linux 文件系统,以 ZNode 为基本数据单元,形成树状结构,核心特性如下表:

核心概念详细说明
ZNode存储数据的基本单元,默认最大存储 1MB 数据,可包含子节点,通过路径唯一标识(如 /zookeeper/quota)。
节点类型1. 持久节点:创建后永久存在,需手动删除(如存储集群配置);
2. 临时节点:客户端会话断开后自动删除(如服务心跳、分布式锁);
3. 顺序节点:创建时自动添加递增编号(如 /task/000000001,用于分布式队列)。
节点属性每个 ZNode 包含:数据(data)、版本(version,用于乐观锁)、ACL 权限(访问控制)、子节点列表(children)。

1.4 Zookeeper 应用场景

Zookeeper 的核心是“协调”,典型应用场景如下:

应用场景实现逻辑
统一命名服务用“路径”替代 IP 地址(如 Kafka 用 /kafka/brokers/ids/0 记录 Broker 地址),降低节点访问的记忆成本。
统一配置管理1. 将集群配置(如 Kafka 消息过期时间)写入某个 ZNode
2. 所有节点监听该 ZNode
3. 配置修改后,Zookeeper 自动推送更新至所有节点。
统一集群管理1. 节点启动时将自身状态(如“在线”)写入 ZNode
2. 管理节点监听这些 ZNode,实时掌握集群节点状态(如监控 Broker 故障)。
服务器动态上下线客户端通过监听“服务器节点 ZNode”,实时感知服务器上线/下线(如微服务注册发现)。
软负载均衡ZNode 中记录每台服务器的访问量,客户端优先选择访问量最少的服务器,实现简易负载均衡。

1.5 Zookeeper 选举机制

Zookeeper 作为分布式协调服务,其核心能力之一是通过 Leader 选举机制 确保集群数据一致性与服务可用性。当集群启动或主节点故障时,会自动从节点中选出唯一 Leader 统筹写操作与数据同步,同时明确各节点的角色分工,避免“脑裂”(多主节点冲突)问题。

1.5.1 集群节点角色定义

Zookeeper 集群节点分为三类,角色不同则职责与参与选举的权限完全不同,具体分工如下:

角色核心职责是否参与 Leader 选举
Leader1. 处理集群所有 写请求(如创建/删除 ZNode、修改节点数据);
2. 将写操作生成的事务日志同步至所有 Follower;
3. 协调 Follower 节点的数据一致性,解决同步冲突。
-(本身是选举结果)
Follower1. 处理集群所有 读请求,提升读操作吞吐量;
2. 接收 Leader 同步的事务数据,保存本地日志并反馈确认;
3. 当 Leader 故障时,参与重新选举。
Observer1. 仅处理 读请求,不承担写操作相关职责;
2. 同步 Leader 数据以保证读数据准确性;
3. 不参与选举,仅作为“读扩展节点”减轻 Follower 压力。
1.5.2 选举核心规则与关键参数

Leader 选举的本质是“按优先级筛选节点”,优先级由 3 个核心参数 决定,规则优先级从高到低为:EPOCH(任期)> ZXID(事务 ID)> SID(服务器 ID)

1. 关键参数详解

三个参数共同构成节点的“选举身份标识”,各参数的生成逻辑与作用如下:

  • EPOCH(任期编号)
    每一轮 Leader 选举成功后,新 Leader 会将 EPOCH 数值 +1(初始为 0),代表“当前集群的统治任期”。例如:第 1 次选举出的 Leader 任期 EPOCH=1,若该 Leader 故障,新选举出的 Leader 任期 EPOCH=2。
    作用:确保“新任期的节点优先于旧任期”,避免旧 Leader 恢复后与新 Leader 冲突(旧 Leader EPOCH 更小,会自动降级为 Follower)。

  • ZXID(事务 ID)
    一个 64 位的数值,高 32 位是当前 EPOCH,低 32 位是当前任期内的事务计数(每处理一次写请求,事务计数 +1)。例如:ZXID=0x100000005 代表“EPOCH=1,当前任期内第 5 次事务”。
    作用:标识节点数据的“新鲜度”——ZXID 越大,说明节点处理的事务越新,数据与集群最新状态越一致。

  • SID(服务器唯一 ID)
    由运维人员在节点配置目录下的 myid 文件中手动指定(如 zk-node-01 的 myid=1,zk-node-02 的 myid=2),集群内所有节点 SID 不可重复
    作用:当 EPOCH 和 ZXID 完全相同时(如集群初始化时所有节点无事务记录),以 SID 作为“最终仲裁依据”,SID 越大的节点优先级越高。

2. 优先级判断逻辑示例

若节点 A(EPOCH=2,ZXID=100,SID=3)与节点 B(EPOCH=2,ZXID=98,SID=5)竞争 Leader:

  1. 先比 EPOCH:两者均为 2,持平;
  2. 再比 ZXID:100 > 98,节点 A 优先级更高;
  3. 无需比较 SID,节点 A 胜出。
1.5.3 第一次启动选举(集群初始化)

集群首次启动时,所有节点均无历史 Leader 信息,初始状态为 LOOKING(待选举状态),需通过“多轮选票交换”选出首个 Leader。以下以 3 节点集群(SID=1、2、3)5 节点集群(SID=1-5) 为例,说明选举流程(注:Zookeeper 要求集群节点数为奇数,确保“半数通过”规则可执行,半数 = (节点数 + 1) / 2,如 3 节点需 ≥2 票,5 节点需 ≥3 票)。

1. 3 节点集群(SID=1、2、3)选举流程
  1. 节点 1 启动

    • 进入 LOOKING 状态,生成首张选票(投给自己:EPOCH=0,ZXID=0,SID=1);
    • 此时仅 1 票,未达半数(需 ≥2 票),节点 1 保持 LOOKING,等待其他节点连接。
  2. 节点 2 启动

    • 进入 LOOKING 状态,生成选票(投给自己:EPOCH=0,ZXID=0,SID=2);
    • 节点 1 与节点 2 建立通信,交换选票:
      • 节点 1 发现节点 2 的 SID(2)> 自身 SID(1),修改选票为投给节点 2;
      • 节点 2 发现节点 1 的 SID(1)< 自身 SID(2),保持选票投给自己;
    • 此时总票数:节点 2 得 2 票,达到半数(≥2 票);
    • 节点 2 状态变更为 LEADING(Leader),节点 1 状态变更为 FOLLOWING(Follower)。
  3. 节点 3 启动

    • 进入 LOOKING 状态后,通过集群通信发现已存在 Leader(节点 2);
    • 无需参与选举,自动同步节点 2 的数据,状态变更为 FOLLOWING
2. 5 节点集群(SID=1-5)选举核心差异

5 节点集群的选举逻辑与 3 节点一致(均遵循 “EPOCH→ZXID→SID” 规则),核心差异在于需集齐 ≥3 票(半数门槛)才能当选 Leader,具体流程更清晰体现 “节点启动数量与票数达标” 的关联:

  1. 节点 1 启动
    • 投自己 1 票(1 票<3 票,未达标),保持 LOOKING;
  2. 节点 2 启动
    • 与节点 1 交换选票,因 2 的 SID 更大,两者均投 2(2 票<3 票,未达标),仍保持 LOOKING;
  3. 节点 3 启动
    • 与节点 1、2 交换选票,1、2 发现 3 的 SID 最大,均改投 3,此时 3 得 3 票(刚好满足半数门槛),节点 3 当选 Leader,状态变为 LEADING;节点 1、2 同步数据,变为 FOLLOWING。
    • 后续节点 4、5 启动时,通过集群通信发现已存在 Leader(节点 3),无需参与选举,直接同步 Leader 数据,状态自动变为 FOLLOWING。

核心差异总结:相较于 3 节点 “2 个节点启动即可出 Leader”,5 节点需启动至 “半数节点数量(3 个)” 时,SID 最大的节点才会集齐多数票(3 票)当选,更突出 “半数门槛” 对选举结果的影响。

1.5.4 非第一次启动选举(Leader 故障重选)

当已存在的 Leader 节点因故障(如宕机、网络中断)下线时,集群会触发重新选举。此时部分节点可能仍处于 FOLLOWING 状态(能连接 Leader),部分节点会进入 LOOKING 状态(无法连接 Leader),选举流程分为“状态判断”和“参数比较”两步。

1. 选举触发前提

节点定期向 Leader 发送心跳包,若 超过心跳超时时间(默认 2000ms)未收到 Leader 响应,则判定 Leader 故障,节点状态从 FOLLOWING 切换为 LOOKING;当集群中 LOOKING 状态的节点数达到一定规模时,触发重新选举。

2. 重选流程示例(5 节点故障场景)
场景预设
  • 原集群:5 节点(SID=1、2、3、4、5),当前 Leader 为节点 3(EPOCH=2,ZXID=150);
  • 各节点状态:节点 1(EPOCH=2,ZXID=150)、节点 2(EPOCH=2,ZXID=148)、节点 4(EPOCH=2,ZXID=150)、节点 5(EPOCH=2,ZXID=145);
  • 故障发生:Leader 节点 3 宕机,节点 5 因网络中断进入 LOOKING 状态,剩余正常节点:1、2、4(均进入 LOOKING)。
重选步骤
  1. 第一步:交换选票,初始化投票
    节点 1、2、4 分别生成初始选票(投给自己):

    • 节点 1:(EPOCH=2,ZXID=150,SID=1)
    • 节点 2:(EPOCH=2,ZXID=148,SID=2)
    • 节点 4:(EPOCH=2,ZXID=150,SID=4)
  2. 第二步:按优先级比较,调整选票

    • 比较 EPOCH:三者均为 2,持平;
    • 比较 ZXID:节点 1、4 的 ZXID=150 > 节点 2 的 148,节点 2 选票调整为投给“ZXID 更大的节点”(优先选 SID 大的节点 4);
    • 比较 SID:节点 4 的 SID=4 > 节点 1 的 1,节点 1 选票调整为投给节点 4;
  3. 第三步:统计票数,确认 Leader

    • 节点 4 得票:节点 1、2、4 共 3 票,达到 5 节点集群的半数(≥3 票);
    • 节点 4 状态变更为 LEADING(新 Leader),节点 1、2 状态变更为 FOLLOWING
    • 节点 5 恢复网络后,发现新 Leader(节点 4),同步数据后变为 FOLLOWING
3. 重选核心特点
  • 数据优先:ZXID 更大的节点(数据更新)优先当选,确保新 Leader 拥有集群最新数据,避免数据回滚;
  • 任期唯一:新 Leader 当选后,EPOCH 从 2 升级为 3,所有 Follower 同步新 EPOCH,旧 Leader(若恢复)因 EPOCH 较小,会自动放弃 Leader 身份。

1.6 部署 Zookeeper 集群

1.6.1 部署环境(统一配置)
服务名称IP 地址部署软件系统要求
zk01192.168.100.160Zookeeper-3.5.7、Kafka-2.7.1、JDK8CentOS 7/8,2C4G
zk02192.168.100.170同上同上
zk03192.168.100.180同上同上
1.6.2 安装前准备(所有节点执行)
# 1. 关闭防火墙与 SELinux(避免端口拦截)
systemctl stop firewalld          # 临时关闭防火墙
systemctl disable firewalld       # 开机禁用防火墙
setenforce 0                      # 临时关闭 SELinux
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config  # 永久禁用# 2. 安装 JDK(Zookeeper 依赖 Java 环境,二选一即可)
# 方式 1:yum 安装(简单,适合快速部署)
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version  # 验证,需显示 1.8.x 版本# 方式 2:官网 tar 包安装(适合指定 JDK 版本)
cd /opt
wget https://repo.huaweicloud.com/java/jdk/8u91-b14/jdk-8u91-linux-x64.tar.gz  # 华为源加速
tar zxvf jdk-8u91-linux-x64.tar.gz
mv jdk1.8.0_91 /usr/java/jdk-8u91-linux  # 统一路径# 3. 配置 JDK 环境变量(方式 2 需执行)
vim /etc/profile.d/java.sh
export JAVA_HOME=/usr/java/jdk-8u91-linux  # JDK 安装路径
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar  # 类路径
export PATH=$JAVA_HOME/bin:$PATH  # 加入系统 PATHsource /etc/profile.d/java.sh  # 生效环境变量
java -version  # 验证,需显示 1.8.x 版本# 4. 下载 Zookeeper 安装包(所有节点执行或仅在 zk01 下载后分发)
cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz  # 官方源
1.6.3 安装与配置 Zookeeper(zk01 先行,再同步至其他节点)
# 1. 解压并移动到统一目录
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7  # 统一安装路径# 2. 修改核心配置文件 zoo.cfg
cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfg  # 复制默认配置为生效配置
vim zoo.cfg# 配置文件关键参数(已添加注释)
tickTime=2000                # 通信心跳时间(毫秒),集群节点间心跳间隔
initLimit=10                 # Leader 与 Follower 初始连接超时(单位:tickTime),10*2s=20s
syncLimit=5                  # Leader 与 Follower 同步数据超时(5*2s=10s),超时则剔除 Follower
dataDir=/usr/local/zookeeper-3.5.7/data  # 数据存储目录(需手动创建)
dataLogDir=/usr/local/zookeeper-3.5.7/logs  # 日志存储目录(需手动创建,分离数据与日志提升性能)
clientPort=2181              # 客户端连接端口(如 Kafka 连接 Zookeeper 用此端口)
# 集群节点配置:server.SID=IP:通信端口:选举端口
server.1=192.168.100.160:3188:3288  # zk01:3188=Leader-Follower 通信端口;3288=选举端口
server.2=192.168.100.170:3188:3288  # zk02
server.3=192.168.100.180:3188:3288  # zk03# 3. 创建数据与日志目录
mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs# 4. 配置当前节点的 SID(与 zoo.cfg 中 server.1 对应)
echo 1 > /usr/local/zookeeper-3.5.7/data/myid  # zk01 写 1,zk02 写 2,zk03 写 3# 5. 同步配置到 zk02 和 zk03(避免重复操作)
scp -r /usr/local/zookeeper-3.5.7/ 192.168.100.170:/usr/local/  # 同步到 zk02
scp -r /usr/local/zookeeper-3.5.7/ 192.168.100.180:/usr/local/  # 同步到 zk03# 6. 分别在 zk02 和 zk03 修改 myid
# zk02 执行:
echo 2 > /usr/local/zookeeper-3.5.7/data/myid
# zk03 执行:
echo 3 > /usr/local/zookeeper-3.5.7/data/myid
1.6.4 配置启动脚本与开机自启(所有节点执行)
# 1. 创建系统服务脚本
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'
case $1 in
start)echo "---------- zookeeper 启动 ------------"$ZK_HOME/bin/zkServer.sh start
;;
stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop
;;
restart)echo "---------- zookeeper 重启 ------------"$ZK_HOME/bin/zkServer.sh restart
;;
status)echo "---------- zookeeper 状态 ------------"$ZK_HOME/bin/zkServer.sh status
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac# 2. 赋予脚本执行权限
chmod +x /etc/init.d/zookeeper# 3. 添加到系统服务并设置开机自启
chkconfig --add zookeeper  # 注册服务
chkconfig zookeeper on     # 开机自启# 4. 启动并验证
service zookeeper start    # 启动 Zookeeper
service zookeeper status   # 验证状态:zk01/zk02/zk03 中必有一个 Leader,其余为 Follower

二、Kafka

Kafka 是分布式发布/订阅消息队列,基于 Zookeeper 协调,主打高吞吐量、低延迟,适用于大数据实时处理(如日志收集、流计算)。

2.1 为什么需要消息队列(MQ)

在高并发场景下,同步请求易导致“链路阻塞”(如大量请求直接访问数据库引发锁竞争、连接耗尽),消息队列通过异步解耦缓解系统压力,核心应用场景:

  • 异步处理:如用户注册后,异步发送短信/邮件,无需等待结果返回;
  • 流量削峰:如秒杀活动,用 MQ 缓冲突发请求,避免后端服务被压垮;
  • 应用解耦:如订单系统与库存系统通过 MQ 通信,无需直接调用,降低依赖;
  • 消息通讯:如分布式系统间的跨节点消息传递。

主流 MQ 对比:ActiveMQ(功能全但性能一般)、RabbitMQ(低延迟但吞吐量有限)、RocketMQ(阿里开源,适合金融级场景)、Kafka(高吞吐量,适合大数据场景)。

2.2 使用消息队列的好处

优势说明
解耦上下游系统通过 MQ 通信,无需感知对方实现,可独立扩展或修改(如替换订单系统不影响库存系统)。
可恢复性若消费端故障,消息暂存 MQ 中,恢复后继续消费,避免数据丢失。
缓冲平衡生产端(如日志产生速度)与消费端(如日志分析速度)的处理能力差异。
峰值处理能力突发流量(如双 11)由 MQ 缓冲,后端服务按自身能力消费,避免崩溃。
异步通信生产端无需等待消费端响应,提升整体链路响应速度。

2.3 消息队列的两种模式

2.3.1 点对点模式(P2P)
  • 特点:一对一,消费者主动拉取消息,消息消费后被删除;
  • 流程:生产者 → 消息队列 → 单个消费者(多个消费者竞争,仅一个能消费);
  • 场景:如任务分配(一个任务仅需一个worker处理)。
2.3.2 发布/订阅模式(Pub/Sub)
  • 特点:一对多,消费者订阅主题(Topic),消息消费后不删除,所有订阅者均可消费;
  • 流程:生产者 → 主题(Topic) → 多个消费者(均能收到消息);
  • 场景:如日志广播(多个系统需同时消费日志数据)。

2.4 Kafka 定义与简介

  • 定义:分布式、基于发布/订阅模式的消息队列,主打高吞吐量,适用于大数据实时处理;
  • 背景:最初由 LinkedIn 开发,2010 年贡献给 Apache 基金会,成为顶级开源项目;
  • 技术栈:用 Scala 语言编写,依赖 Zookeeper 存储集群元数据(如 Broker 列表、Topic 配置);
  • 核心场景:日志收集(如 Filebeat+Kafka+ELK)、流计算(如 Spark Streaming/Flink 对接 Kafka)。

2.5 Kafka 的特性

  • 高吞吐量:每秒可处理几十万条消息,通过分区(Partition)并行读写实现;
  • 低延迟:端到端延迟最低仅几毫秒,满足实时处理需求;
  • 可扩展性:支持集群热扩展,新增 Broker 无需停机;
  • 持久性:消息持久化到本地磁盘,通过分片(Segment)和索引优化读写性能;
  • 可靠性:支持多副本(Replica),避免单点故障导致数据丢失;
  • 高并发:支持数千个客户端同时读写。

2.6 Kafka 系统架构

Kafka 核心组件及关系如下:

组件作用说明
BrokerKafka 服务器,一个 Broker 就是一个节点,集群由多个 Broker 组成,可存储多个 Topic。
Topic消息分类的逻辑单元(类似数据库表),生产者向 Topic 发消息,消费者从 Topic 消费消息。
PartitionTopic 的物理分区,一个 Topic 可分为多个 Partition,实现并行读写(提升吞吐量)。
ReplicaPartition 的副本,用于数据备份(如 2 个副本表示 1 个 Leader + 1 个 Follower)。
Leader每个 Partition 的主副本,负责处理读写请求,Follower 从 Leader 同步数据。
Follower每个 Partition 的从副本,仅同步数据,Leader 故障后可升级为新 Leader。
Producer消息生产者,向 Topic 的 Partition 推送(Push)消息。
Consumer消息消费者,从 Partition 拉取(Pull)消息。
Consumer Group(CG)消费者组,多个 Consumer 组成一个 CG,同一 CG 内的 Consumer 消费不同 Partition(避免重复消费),不同 CG 可重复消费同一 Topic。
Offset消息在 Partition 中的唯一标识(自增整数),记录消费者的消费位置,故障恢复后从上次 Offset 继续消费。
Zookeeper存储 Kafka 元数据:Broker 列表、Topic 配置、CG 消费 Offset(Kafka 0.9 前),协调集群选主。
关键细节补充
  1. Partition 数据路由规则(生产者选择 Partition 的逻辑):

    1. 若指定 Partition,则直接写入;
    2. 未指定 Partition 但指定 Key(如消息的用户 ID),对 Key 做 Hash 取模,映射到对应 Partition;
    3. 既未指定 Partition 也未指定 Key,采用轮询(Round-Robin)分配。
  2. Partition 设计原因

    • 扩展:一个 Topic 的数据分散到多个 Broker,支持集群横向扩展;
    • 并发:多个 Consumer 可同时消费不同 Partition,提升消费速度。
  3. ISR 集合(In-Sync Replicas):

    • Leader 维护的“与自身数据同步的 Follower 列表”,若 Follower 同步滞后超阈值,会被移出 ISR;
    • 只有 ISR 中的 Follower 有资格在 Leader 故障后升级为新 Leader。

2.7 部署 Kafka 集群

2.7.1 前提条件
  • Zookeeper 集群已部署并正常运行(参考 1.6 节);
  • 所有节点已安装 JDK8(与 Zookeeper 共享 JDK 环境);
  • 关闭防火墙与 SELinux(参考 1.6.2 节)。
2.7.2 下载与安装(zk01 先行,再同步至其他节点)
# 1. 下载 Kafka 安装包(所有节点执行或仅 zk01 下载后分发)
cd /opt
# 清华源加速(Kafka 版本:2.7.1,Scala 版本:2.13)
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz# 2. 解压并移动到统一目录
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka  # 统一安装路径# 3. 修改核心配置文件 server.properties
cd /usr/local/kafka/config/
cp server.properties server.properties.bak  # 备份默认配置
vim server.properties# 配置文件关键参数(已添加注释)
broker.id=0  #21行 Broker 唯一 ID(zk01=0,zk02=1,zk03=2,不可重复)
#31行 监听地址:PLAINTEXT 表示无加密协议,IP 为当前节点 IP(zk01 填 192.168.100.160)
listeners=PLAINTEXT://192.168.100.160:9092
num.network.threads=3  #42行 处理网络请求的线程数(默认即可)
num.io.threads=8       #45行 处理磁盘 IO 的线程数(建议大于硬盘数量,默认即可)
socket.send.buffer.bytes=102400    #48行 发送缓冲区大小(默认即可)
socket.receive.buffer.bytes=102400 #51行 接收缓冲区大小(默认即可)
socket.request.max.bytes=104857600 #54行 最大请求大小(默认即可)
log.dirs=/usr/local/kafka/logs     #60行 消息存储目录(需手动创建,存储 Partition 数据)
num.partitions=1                   #65行 新建 Topic 的默认分区数(可在创建 Topic 时指定)
num.recovery.threads.per.data.dir=1 # 69行数据恢复线程数(默认即可)
log.retention.hours=168            #103行 消息保留时间(默认 7 天,超时自动删除)
log.segment.bytes=1073741824       #110行 单个 Segment 文件大小(默认 1G,超大会切割)
#123行 连接 Zookeeper 集群地址(与 Zookeeper 部署的 IP 一致)
zookeeper.connect=192.168.100.160:2181,192.168.100.170:2181,192.168.100.180:2181
zookeeper.connection.timeout.ms=6000  # Zookeeper 连接超时时间(默认即可)# 4. 创建消息存储目录
mkdir /usr/local/kafka/logs# 5. 同步配置到 zk02 和 zk03
scp -r /usr/local/kafka/ 192.168.100.170:/usr/local/  # 同步到 zk02
scp -r /usr/local/kafka/ 192.168.100.180:/usr/local/  # 同步到 zk03# 6. 分别修改 zk02 和 zk03 的 server.properties
# zk02 执行:
vim /usr/local/kafka/config/server.properties
broker.id=1  #21行 修改为 1
listeners=PLAINTEXT://192.168.100.170:9092  #31行 修改为 zk02 的 IP# zk03 执行:
vim /usr/local/kafka/config/server.properties
broker.id=2  #21行 修改为 2
listeners=PLAINTEXT://192.168.100.180:9092  #31行 修改为 zk03 的 IP
2.7.3 配置环境变量与启动脚本(所有节点执行)
# 1. 配置 Kafka 环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka  # Kafka 安装路径
export PATH=$PATH:$KAFKA_HOME/bin   # 加入系统 PATHsource /etc/profile  # 生效环境变量# 2. 创建系统服务脚本(方便启停)
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88  # 启动优先级 22(低于 Zookeeper 的 20),关闭优先级 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"# -daemon:后台启动(避免占用终端),指定配置文件${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh  # 停止脚本
;;
restart)echo "---------- Kafka 重启 ------------"$0 stop  # 调用自身 stop 方法$0 start # 调用自身 start 方法
;;
status)echo "---------- Kafka 状态 ------------"# 查看是否有 Kafka 进程(排除 grep 自身)count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac# 3. 赋予脚本执行权限并设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka  # 注册服务
chkconfig kafka on     # 开机自启(需在 Zookeeper 之后启动)# 4. 启动 Kafka(需先确保 Zookeeper 已启动)
service kafka start
service kafka status  # 验证状态,显示“kafka is running”即为正常

2.8 Kafka 命令行操作(常用命令)

所有命令在任意 Kafka 节点执行即可,需确保 Zookeeper 和 Kafka 已启动。

1. 创建 Topic
# 创建名为 test 的 Topic,分区数 3,副本数 2(副本数 ≤ Broker 数)
kafka-topics.sh \
--create \
--zookeeper 192.168.100.160:2181,192.168.100.170:2181,192.168.100.180:2181 \  # Zookeeper 集群地址
--replication-factor 2 \  # 副本数(建议 2,兼顾可靠性与性能)
--partitions 3 \          # 分区数(建议与 Broker 数匹配,提升并行度)
--topic httpd              # Topic 名称
2. 查看所有 Topic
kafka-topics.sh \
--list \
--zookeeper 192.168.100.160:2181,192.168.100.170:2181,192.168.100.180:2181
3. 查看 Topic 详情
kafka-topics.sh \
--describe \
--zookeeper 192.168.100.160:2181,192.168.100.170:2181,192.168.100.180:2181 \
--topic httpd  # 指定 Topic 名称
4. 发布消息(生产者)
kafka-console-producer.sh \
--broker-list 192.168.100.160:9092,192.168.100.170:9092,192.168.100.180:9092 \  # Kafka 集群地址
--topic httpd  # 向 test Topic 发消息
# 执行后输入消息内容,按 Enter 发送,Ctrl+C 退出
5. 消费消息(消费者)
kafka-console-consumer.sh \
--bootstrap-server 192.168.100.160:9092,192.168.100.170:9092,192.168.100.180:9092 \  # Kafka 集群地址
--topic httpd \  # 从 test Topic 消费消息
--from-beginning  # 消费历史所有消息(不加则只消费新消息)
6. 修改 Topic 分区数(仅支持增加,不支持减少)
kafka-topics.sh \
--zookeeper 192.168.100.160:2181,192.168.100.170:2181,192.168.100.180:2181 \
--alter \
--topic test \
--partitions 6  # 从 3 增加到 6
7. 删除 Topic(可选)
kafka-topics.sh \
--delete \
--zookeeper 192.168.100.160:2181,192.168.100.170:2181,192.168.100.180:2181 \
--topic httpd
# 注:需确保 server.properties 中 delete.topic.enable=true(默认 true),否则仅标记删除

2.9 Kafka 架构深入

2.9.1 工作流程与文件存储机制
  1. 工作流程
    生产者 → Topic(逻辑分类)→ Partition(物理存储,按路由规则分配)→ 消费者(按 CG 分组消费,一个 Partition 仅被一个 CG 内的 Consumer 消费)。

  2. 文件存储机制

    • 每个 Partition 对应一个文件夹(命名:Topic名称-分区号,如 test-0);
    • 文件夹内包含多个 Segment 文件(由 .index 索引文件和 .log 数据文件组成);
    • Segment 命名规则:以该 Segment 第一条消息的 Offset 命名(如 00000000000000000000.index);
    • 索引文件(.index)存储 Offset 与数据文件(.log)的物理地址映射,加速消息查询;
    • 数据文件(.log)存储实际消息内容,超过 log.segment.bytes 时自动切割新 Segment。
2.9.2 数据可靠性保证

Kafka 通过 ACK 应答机制 确保生产者发送的消息可靠存储,核心逻辑:

  • 生产者发送消息到 Partition 的 Leader 后,Leader 同步数据给 Follower;
  • Follower 同步完成后向 Leader 发送 ACK;
  • Leader 收到足够 ACK 后,向生产者返回 ACK,生产者确认消息发送成功。
2.9.3 数据一致性问题(LEO 与 HW)
  • LEO(Log End Offset):每个副本(Leader/Follower)的最大 Offset(即最新消息的 Offset);
  • HW(High Watermark):消费者能访问的最大 Offset,等于所有副本中最小的 LEO(确保消费者只能消费已同步到所有副本的消息)。
1. Follower 故障恢复
  1. Follower 故障后被移出 ISR 集合;
  2. Follower 恢复后,读取本地 HW,删除 HW 之后的消息(避免数据不一致);
  3. 从 HW 开始向 Leader 同步数据,直到 LEO ≥ HW,重新加入 ISR 集合。
2. Leader 故障恢复
  1. Leader 故障后,从 ISR 集合中选举新 Leader;
  2. 所有 Follower 删除本地 HW 之后的消息;
  3. 从新 Leader 同步数据,确保所有副本数据一致。
2.9.4 ACK 应答机制(生产者可靠性配置)

生产者通过 request.required.acks 参数设置 ACK 级别,权衡可靠性与性能:

ACK 级别含义可靠性性能适用场景
0生产者无需等待 Broker 应答,直接发送下一批消息最低(可能丢失)最高日志采集(允许少量丢失)
1(默认)生产者仅等待 Leader 应答(Leader 存储消息后返回 ACK)中等(Leader 故障可能丢失)中等一般业务场景
-1(all)生产者等待 Leader 和所有 ISR 中的 Follower 应答后返回 ACK最高(几乎不丢失)最低金融级场景(如交易数据)

注意:Kafka 0.11+ 版本支持 幂等性生产者,通过 enable.idempotence=true 配置,避免因网络重试导致的消息重复(即使 ACK=-1 也不会重复存储)。

2.10 Filebeat+Kafka+ELK 部署(日志收集实战)

前提说明
已完成 Zookeeper 集群(zk01/zk02/zk03:192.168.100.160/170/180)与 Kafka 集群部署,当前需完成 Elasticsearch 部署Elasticsearch-head 插件部署Logstash 部署 及关联 Kafka 配置,最终通过 Kibana 可视化日志。

2.10.1 部署架构(补充)

基于已部署的 Zookeeper/Kafka,后续核心流程:
Logstash(192.168.100.129)从 Kafka 拉取 httpd 主题日志 → 过滤解析后输出至 Elasticsearch(192.168.100.140)→ Kibana(192.168.100.140)对接 ES 实现日志可视化。

2.10.2 Elasticsearch 集群部署(Node1:192.168.100.140;Node2 可选,步骤同 Node1)
1. 部署 Elasticsearch 软件

(1)安装 Elasticsearch—rpm 包

# 上传 elasticsearch-6.6.1.rpm 到 /opt 目录下 
cd /opt
rpm -ivh elasticsearch-6.6.1.rpm 

(2)加载系统服务

systemctl daemon-reload    
systemctl enable elasticsearch.service

(3)修改 Elasticsearch 主配置文件

cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak
vim /etc/elasticsearch/elasticsearch.yml
# 17行:取消注释,指定集群名字
cluster.name: my-elk-cluster
# 23行:取消注释,指定节点名字(Node1 为 node1,Node2 为 node2)
node.name: node1
# 33行:取消注释,指定数据存放路径
path.data: /data/elk_data
# 37行:取消注释,指定日志存放路径
path.logs: /var/log/elasticsearch/
# 43行:取消注释,改为启动时不锁定内存
bootstrap.memory_lock: false
# 55行:取消注释,设置监听地址(0.0.0.0 代表所有地址)
network.host: 0.0.0.0
# 59行:取消注释,ES 服务默认监听端口 9200
http.port: 9200
# 68行:取消注释,集群发现通过单播实现,指定待发现的节点(Node1/Node2 IP)
discovery.zen.ping.unicast.hosts: ["192.168.100.140", "192.168.100.150"]  # 若单节点,仅填自身IP# 验证配置(过滤注释行)
grep -v "^#" /etc/elasticsearch/elasticsearch.yml

(4)创建数据存放路径并授权

mkdir -p /data/elk_data
chown elasticsearch:elasticsearch /data/elk_data/

(5)启动 Elasticsearch 并验证

# 启动服务
systemctl start elasticsearch.service
# 检查端口(9200 为 ES 服务端口)
netstat -antp | grep 9200

(6)查看节点与集群健康状态

  • 浏览器访问 Node1 节点http://192.168.100.140:9200,显示节点信息(如 node.name: node1)即为成功;
  • 查看集群健康:http://192.168.100.140:9200/_cluster/health?prettystatusgreen(健康)、yellow(数据完整但副本异常)均正常;
  • 查看集群状态:http://192.168.100.140:9200/_cluster/state?pretty
2.10.3 安装 Elasticsearch-head 插件(Node1:192.168.100.140,可视化管理 ES)

Elasticsearch 5.0+ 后,head 插件需独立安装,依赖 nodephantomjs

1. 编译安装 node(依赖)

# 上传 node-v8.2.1.tar.gz 到 /opt 目录
yum install gcc gcc-c++ make -ycd /opt
tar zxvf node-v8.2.1.tar.gzcd node-v8.2.1/
./configure
make && make install

2. 安装 phantomjs(前端框架)

# 上传 phantomjs-2.1.1-linux-x86_64.tar.bz2 到 /opt 目录
cd /opt
tar jxvf phantomjs-2.1.1-linux-x86_64.tar.bz2 -C /usr/local/src/
cd /usr/local/src/phantomjs-2.1.1-linux-x86_64/bin
cp phantomjs /usr/local/bin

3. 安装 Elasticsearch-head 工具

# 上传 elasticsearch-head.tar.gz 到 /opt 目录
cd /opt
tar zxvf elasticsearch-head.tar.gz -C /usr/local/src/
cd /usr/local/src/elasticsearch-head/
npm install

4. 修改 Elasticsearch 跨域配置(支持 head 访问)

vim /etc/elasticsearch/elasticsearch.yml
# 末尾添加以下内容
http.cors.enabled: true				# 开启跨域访问支持
http.cors.allow-origin: "*"			# 允许所有域名访问# 重启 ES 生效
systemctl restart elasticsearch

5. 启动 elasticsearch-head 服务

# 需在 head 解压目录下启动(读取 gruntfile.js 配置)
cd /usr/local/src/elasticsearch-head/
npm run start &# 验证端口(head 默认监听 9100)
netstat -natp | grep 9100

6. 验证 head 插件
浏览器访问 http://192.168.100.140:9100/,在“连接集群”输入 http://192.168.100.140:9200,点击“连接”,集群健康值为 green 即正常。

7. 插入测试索引(可选)

# 插入索引 index-demo1,类型 test,ID 1
curl -X PUT 'localhost:9200/index-demo1/test/1?pretty&pretty' -H 'content-Type: application/json' -d '{"user":"zhangsan","mesg":"hello world"}'

浏览器刷新 head 页面,在“数据浏览”可查看该索引及数据。

2.10.4 Logstash 部署(192.168.100.129,对接 Kafka 拉取日志)

1. 基础环境准备
(1)更改主机名(可选,便于识别)

hostnamectl set-hostname logstash

(2)安装 Java 环境(Logstash 依赖 JDK)

yum -y install java
java -version  # 验证安装(需 JDK 8+)

(3)安装 Logstash

# 上传 logstash-6.6.1.rpm 到 /opt 目录
cd /opt
rpm -ivh logstash-6.6.1.rpm                           
# 启动并设置开机自启
systemctl start logstash.service                      
systemctl enable logstash.service# 建立软链接(方便命令调用)
ln -s /usr/share/logstash/bin/logstash /usr/local/bin/

2. 测试 Logstash 基础功能
Logstash 核心参数:

  • -f:指定配置文件;
  • -e:命令行直接写配置(测试用);
  • -t:验证配置文件语法。

(1)测试“标准输入→标准输出”(管道模式)

logstash -e 'input { stdin{} } output { stdout{} }'
# 输入任意内容(如“test logstash”),终端会输出带时间戳的日志,按 Ctrl+C 退出

(2)测试“标准输入→JSON 格式化输出”

logstash -e 'input { stdin{} } output { stdout{ codec=>rubydebug } }'
# 输入“test json format”,终端输出结构化 JSON 日志(含 @timestamp、host、message 等字段)

(3)测试“标准输入→Elasticsearch”(验证 ES 连通性)

logstash -e 'input { stdin{} } output { elasticsearch { hosts=>["192.168.100.140:9200"] } }'
# 输入“test es connection”,通过 ES-head 查看是否生成新索引(默认索引名 logstash-yyyy.MM.dd)

3. 配置 Logstash 从 Kafka 拉取日志(核心配置)
结合已部署的 Kafka 集群(192.168.100.160/170/180:9092),配置 Logstash 输入(Kafka)、过滤(解析 Apache 日志)、输出(Elasticsearch)。

# 新建 Kafka 日志处理配置文件(存放于 /etc/logstash/conf.d/,Logstash 默认扫描该目录)
vim /etc/logstash/conf.d/kafka.conf# 配置内容(输入→过滤→输出)
input {kafka {# Kafka 集群地址(与 Filebeat 输出的 Kafka 地址一致)bootstrap_servers => "192.168.100.160:9092,192.168.100.170:9092,192.168.100.180:9092"topics => "httpd"  # 拉取的 Kafka 主题(与 Filebeat 输出的 Topic 一致)type => "httpd_kafka"  # 自定义类型标识codec => "json"  # 解析 JSON 格式日志(与 Filebeat 输出格式一致)auto_offset_reset => "latest"  # 从最新消息开始拉取(earliest 为从头拉取)decorate_events => true  # 日志中添加 Kafka 元数据(如 Topic、Partition)}
}# 过滤环节(解析 Apache 访问日志为结构化数据,仅处理 access 标签日志)
filter {if "access" in [tags] {  # 匹配 Filebeat 打的 "access" 标签grok {# 解析 Apache 访问日志格式(示例:192.168.1.1 - - [01/Jan/2024:12:00:00 +0800] "GET / HTTP/1.1" 200 1234)match => { "message" => "%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:log_time}\] \"%{WORD:method} %{URIPATHPARAM:request} %{WORD:http_version}\" %{NUMBER:status} %{NUMBER:bytes_sent}" }}# 转换日志时间为 Logstash 标准时间戳(覆盖 @timestamp)date {match => ["log_time", "dd/MMM/yyyy:H:m:s Z"]target => "@timestamp"}}
}# 输出环节(按标签输出到 ES 不同索引)
output {# 访问日志输出到 httpd_access-yyyy.MM.dd 索引if "access" in [tags] {elasticsearch {hosts => ["192.168.100.140:9200"]  # ES 节点地址index => "httpd_access-%{+YYYY.MM.dd}"  # 按天分割索引}}# 错误日志输出到 httpd_error-yyyy.MM.dd 索引if "error" in [tags] {elasticsearch {hosts => ["192.168.100.140:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}# stdout { codec => rubydebug }  # 测试用:终端输出日志,生产可注释
}

4. 验证配置并启动 Logstash

# 1. 验证配置文件语法(输出“Configuration OK”即为正确)
logstash -t -f /etc/logstash/conf.d/kafka-httpd.conf# 2. 启动 Logstash(加载 Kafka 配置文件)
systemctl restart logstash  # 重启服务加载新配置
# 或前台启动(查看实时日志,测试用):logstash -f /etc/logstash/conf.d/kafka-httpd.conf

5. 验证 Logstash 输出(可选)
通过 ES-head 查看是否生成 httpd_access-yyyy.MM.ddhttpd_error-yyyy.MM.dd 索引(需 Filebeat 已采集日志并发送到 Kafka)。

2.10.5 Kibana 部署(192.168.100.140,对接 ES 可视化日志)

1. 安装 Kibana

# 上传 kibana-6.6.1-x86_64.rpm 到 /opt 目录
cd /opt
rpm -ivh kibana-6.6.1-x86_64.rpm

2. 修改 Kibana 主配置文件

vim /etc/kibana/kibana.yml
# 2行:取消注释,Kibana 默认监听端口 5601
server.port: 5601
# 7行:取消注释,设置监听地址(0.0.0.0 允许外部访问)
server.host: "0.0.0.0"
# 28行:取消注释,设置对接的 ES 地址(与 ES 服务地址一致)
elasticsearch.hosts: ["http://192.168.100.140:9200"] 
# 37行:取消注释,设置 Kibana 元数据存储索引(默认 .kibana)
kibana.index: ".kibana"

3. 启动 Kibana 服务

systemctl start kibana.service
systemctl enable kibana.service# 验证端口
netstat -natp | grep 5601

4. 配置 Kibana 索引模式(可视化日志)
(1)访问 Kibana:浏览器打开 http://192.168.100.140:5601(首次登录无账号密码)。
(2)创建访问日志索引模式:

  • 左侧菜单 → ManagementIndex PatternsCreate index pattern
  • 输入索引模式:httpd_access-*(匹配所有访问日志索引),点击 Next step
  • 时间字段选择 @timestamp(ES 标准时间戳),点击 Create index pattern
    (3)创建错误日志索引模式:
  • 重复上述步骤,输入索引模式 httpd_error-*,时间字段仍选 @timestamp

5. 查看日志
左侧菜单 → Discover,顶部下拉框选择 httpd_access-*httpd_error-*,可通过时间范围(如“Last 15 minutes”)、字段筛选(如 client_ipstatus)查看结构化日志。

2.10.6 整体验证(可选)
  1. 生成测试日志:在 Filebeat 节点(192.168.100.200)执行 curl http://127.0.0.1?test=elk,生成 1 条 Apache 访问日志。
  2. 验证 Kafka 日志:在任意 Kafka 节点执行 /usr/local/kafka/bin/kafka-console-consumer.sh --topic httpd --bootstrap-server 192.168.100.160:9092 --from-beginning,可看到 Filebeat 发送的 JSON 日志。
  3. 验证 ES 索引:通过 ES-head 查看 httpd_access-yyyy.MM.dd 索引是否新增数据。
  4. 验证 Kibana 展示:在 Kibana Discover 页面选择 httpd_access-*,可找到刚才生成的测试日志(request 字段含 ?test=elk)。

三、常见问题与注意事项

  1. Zookeeper 集群无法选举 Leader

    • 检查 myid 文件是否与 zoo.cfgserver.SID 对应;
    • 检查防火墙是否开放 2181(客户端)、3188(通信)、3288(选举)端口;
    • 确保集群节点数为奇数(如 3、5,避免脑裂)。
  2. Kafka 无法连接 Zookeeper

    • 检查 Zookeeper 集群是否正常运行(service zookeeper status);
    • 检查 server.propertieszookeeper.connect 配置是否正确;
    • 检查 Kafka 节点是否能 ping 通 Zookeeper 节点。
  3. Filebeat 无法输出到 Kafka

    • 检查 Kafka 集群是否正常运行,httpd Topic 是否存在;
    • 检查 Filebeat 节点是否能访问 Kafka 9092 端口(telnet 192.168.100.160 9092);
    • 查看 Filebeat 日志(./filebeat -e -c filebeat.yml)排查错误。
  4. Kafka 消息积压

    • 增加 Consumer 数量(不超过 Partition 数);
    • 优化 Consumer 消费逻辑(如批量消费);
    • 调整 Topic 分区数(增加并行度)。
http://www.dtcms.com/a/403250.html

相关文章:

  • 巴斯勒相机:30 年技术沉淀,重新定义机器视觉效率​
  • 【Kotlin进阶】泛型的高级特性
  • h.265格式的视频在浏览器无法正常播放,使用ffprobe转为h.264
  • sysbench mysql 单表 insert 压测 , 自定义lua测试脚本
  • 石家庄制作网站的公司电商流量平台
  • 二手车网站html模板西宁网站设计
  • 零成本上线动态博客:用 Rin + Cloudflare 部署个人博客的完整指南
  • 家用净水器DIY,75G经典5级Ro净水器
  • 七、OpenCV中的视频的读写
  • ClipboardApp —— Mac 专属轻量级剪切板助手(开源)
  • 【开题答辩全过程】以 LoveEditing视频编辑社团网站为例,包含答辩的问题和答案
  • ARM芯片架构之CoreSight ROM Table 的SoC设计思路
  • 门户网站建设的平台搭建hello md5 wordpress
  • LeetCode 485.最大连续1的个数
  • 【综述】Processes at the intracellular scale 细胞内尺度的过程
  • 截取字符串
  • 【LeetCode热题100(29/100)】删除链表的倒数第 N 个结点
  • PyTorch 数据处理与可视化全攻略
  • 【LeetCode】912. 排序数组、手撕快速排序
  • 国内企业建站模板淘宝代运营去哪里找
  • VTK基础(05):VTK的渲染窗口嵌入到QT的控件当中
  • 深入解析 List 容器组件:构建高效、可交互的列表解决方案
  • 06.容器存储
  • 自己做的网站为何手机不能浏览快闪ppt模板免费下载
  • 动态内存管理 干货2
  • pdf转图片:pdf2image
  • 高校档案网站建设网站如何做成app
  • 画质及画面刷新率如何调整?正式升级!2K240帧原画级教程
  • 兰州网站的建设群晖搭建的wordpress外网访问
  • Redis常见八股文