Filebeat+Kafka+ELK 日志采集实战
前言
第一部分:Zookeeper
概念回顾
重要数据结构
核心特性
触发选举的场景
选举流程(Fast Leader Election)
部署要点(操作清单)
第二部分:Kafka
为什么使用 Kafka
架构组成
Producer/Consumer & Consumer Group
偏移量(Offset)
作用:
典型配置:
数据存储机制
ack 机制与可靠性
选型建议
部署注意点
kafka命令行相关操作
第三部分:Filebeat + Kafka + ELK
环境配置
Filebeat
小建议
ELK部署补充
其他补充内容
Elasticsearch(索引与模板)
为什么要用索引模板
ILM 建议
Kibana
第四部分:常见问题与排查
0. 先理清数据路径
1. 验证 Filebeat 是否真的采集到数据
2. 验证 Kafka 中是否有消息
3. 验证 Logstash 是否消费到 Kafka
5. 验证 Logstash → Elasticsearch 通道
6. 检查 Elasticsearch 问题
7. 常用诊断命令汇总
第五部分:实用范例
总结
前言
-
Zookeeper:分布式协调服务,负责元数据管理、选举、配置和节点状态通知。集群运行依赖
zoo.cfg
和每台节点的myid
。 -
Kafka:高吞吐、低延迟的分布式消息系统,基于 topic/partition/replica 提供可扩展与容错能力。Kafka 与 Zookeeper 协同管理集群元信息(取决于 Kafka 版本)。
-
日志采集流水线常见模式:
-
Filebeat → Logstash → Elasticsearch(适合需要复杂解析的场景)
-
Filebeat → Kafka → Logstash → Elasticsearch(适合多消费者/解耦场景)
-
-
常见故障:Filebeat 与实际日志位置不一致、Logstash 未正确消费 Kafka、Elasticsearch 权限或 mapping 问题、索引模板/ILM 配置缺失。
第一部分:Zookeeper
概念回顾
Zookeeper 是一个分布式协调服务,提供简单可靠的机制来管理分布式系统中的配置信息、命名、同步和组服务。它本质上是一个轻量级的、带通知机制的分布式“文件系统”。
重要数据结构
-
ZNode:树形节点,类似文件系统,可以存储少量数据(默认通常推荐小于 1MB)。
-
节点类型:
-
持久(persistent): 直到显式删除
-
临时(ephemeral): 客户端断开后自动删除(常用于临时注册、会话)
-
顺序(sequential): 创建时带自增后缀,常用于队列或锁
-
核心特性
-
强一致性(ZAB 协议保证)
-
观察者/通知机制(watch)
-
会话概念(session),会话超时、重连策略
-
选举机制(Leader 选举,详细版)
Zookeeper 集群中的每个服务器都可能担任 Leader 或 Follower。Leader 负责处理写请求并同步给其他 Follower,Follower 处理读请求并转发写请求。
触发选举的场景
-
集群初次启动
-
当前 Leader 故障或网络隔离
选举流程(Fast Leader Election)
-
初始状态:各节点状态为 LOOKING,自荐为 Leader,投自己一票,并广播自己的投票(包括服务器ID、ZXID、epoch)。
-
收集投票:节点收到来自其他节点的投票后,比较以下优先级:
-
先比 epoch(任期号),大的优先
-
再比 ZXID(最新事务ID),大的优先
-
如果相同,再比 myid(服务器ID),大的优先
-
-
更新选票:如果发现对方更优,则更新自己的投票,重新广播。
-
形成多数(Quorum):当某个候选人获得超过半数节点的投票,就成为 Leader,其余节点改为 Follower 状态。
-
同步阶段:Leader 向 Follower 同步最新数据状态,完成后集群进入正常服务状态。
提示:Zookeeper 的 Leader 选举算法经历过多个版本(Fast Leader Election 是常用的),新版本增加了选举性能和一致性。
-
部署要点(操作清单)
服务名称 | IP地址 | 服务 |
---|---|---|
zk02 (2C/4G) | 192.168.10.16 | zookeeper-3.5.7 kafka_2.13-2.7.1 jdk_1.8 |
zk02 (2C/4G) | 192.168.10.16 | zookeeper-3.5.7 kafka_2.13-2.7.1 jdk_1.8 |
zk03 (2C/4G) | 192.168.10.17 | zookeeper-3.5.7 kafka_2.13-2.7.1 jdk_1.8 |
-
安装 JDK(OpenJDK)
-
下载 Zookeeper 二进制包并解压
-
配置
zoo.cfg
:tickTime
,initLimit
,syncLimit
,dataDir
,dataLogDir
,clientPort
,server.A=B:C:D
等 -
在
dataDir
下创建myid
文件,写入节点数字(与server.X
中的 X 一致) -
开启防火墙端口或关闭防火墙(建议开放所需端口并确认安全组)
-
创建 systemd 或 init 脚本确保开机自启
示例关键配置片段:
cd /opt tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7 cd /usr/local/zookeeper-3.5.7/conf/ cp zoo_sample.cfg zoo.cfg vim zoo.cfg tickTime=2000 #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒 initLimit=10 #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示 为10*2s syncLimit=5 #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer dataDir=/usr/local/zookeeper-3.5.7/data ●修改,指定保存Zookeeper中的数据的目录, 目录需要单独创建 dataLogDir=/usr/local/zookeeper-3.5.7/logs ●添加,指定存放日志的目录,目录需要单独创建 clientPort=2181 #客户端连接端口 #添加集群信息 server.1=192.168.10.18:3188:3288 server.2=192.168.10.21:3188:3288 server.3=192.168.10.22:3188:3288 //拷贝配置好的 Zookeeper 配置文件到其他机器上 scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.10.21:/usr/local/zookeeper3.5.7/conf/ scp/usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.10.22:/usr/local/zookeeper3.5.7/conf/ //在每个节点上创建数据目录和日志目录 mkdir/usr/local/zookeeper-3.5.7/data mkdir /usr/local/zookeeper-3.5.7/logs //在每个节点的dataDir指定的目录下创建一个 myid 的文件 echo 1 > /usr/local/zookeeper-3.5.7/data/myid echo 2 > /usr/local/zookeeper-3.5.7/data/myid echo 3 > /usr/local/zookeeper-3.5.7/data/myid //配置 Zookeeper 启动脚本 vim /etc/init.d/zookeeper #!/bin/bash #chkconfig:2345 20 90 #description:Zookeeper Service Control Script ZK_HOME='/usr/local/zookeeper-3.5.7' case $1 in start)echo "---------- zookeeper 启动 ------------"$ZK_HOME/bin/zkServer.sh start ;; stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop ;; restart)echo "---------- zookeeper 重启 ------------"$ZK_HOME/bin/zkServer.sh restart ;; status)echo "---------- zookeeper 状态 ------------"$ZK_HOME/bin/zkServer.sh status ;; *)echo "Usage: $0 {start|stop|restart|status}" esac // 设置开机自启 chmod +x /etc/init.d/zookeeper chkconfig --add zookeeper //分别启动 Zookeeper service zookeeper start //查看当前状态 service zookeeper status
第二部分:Kafka
为什么使用 Kafka
-
高吞吐、持久化、分区并行、良好水平扩展、支持多消费者
-
场景:业务异步处理、日志收集、流处理(Spark/Flink)、事件溯源
架构组成
-
Broker:Kafka 服务器
-
Topic:消息分类单位
-
Partition:topic 的分片,消息在 partition 内有序
-
Replica(副本) 与 ISR:副本机制保证容错,ISR 维护跟随且处于同步的 follower
-
Leader / Follower:每个 partition 只有一个 leader 负责读写
Producer/Consumer & Consumer Group
-
Producer 发送消息到 topic(可指定 key 来保证同 key 的消息落到同一 partition)
-
Consumer 属于某个 Consumer Group:group 内成员分担 partition 的消费
-
offset 用于记录 consumer 消费位置(可以存储在 Kafka 内的
__consumer_offsets
topic)
偏移量(Offset)
-
每条消息在其所在的 Partition 中都有一个递增的偏移量 offset(从0开始)。
-
offset 是 逻辑位移标识,不是真正的物理地址,用于定位消息在分区中的位置。
-
每个 Consumer Group 都有自己独立的 offset 记录(即消费到哪里)。
-
Kafka 把这些 offset 存储在特殊的
__consumer_offsets
topic(默认),或者由用户自己管理(enable.auto.commit=false 时手动提交)。
作用:
-
确保消费进度:消费者重启后可以从上次提交的 offset 继续消费,不丢数据也不重复消费(具体依赖配置)。
-
支持回溯与重放:可以手动指定 offset(如 earliest/latest/自定义数值)实现数据重放或跳过。
-
消费隔离:同一个 topic 被多个 Consumer Group 消费时,各自 offset 独立,不相互干扰。
-
灵活容错:offset 与消息存储解耦,允许消费者崩溃后恢复。
典型配置:
-
auto.offset.reset=latest
:如果找不到有效 offset,从最新消息开始消费。 -
auto.offset.reset=earliest
:如果找不到有效 offset,从头开始消费。 -
enable.auto.commit=true
:自动提交 offset;false 则需要手动提交。
数据存储机制
-
Partition 由多个 segment 组成(.log + .index)
-
retention(基于时间、大小或通过 log compaction)
ack 机制与可靠性
Kafka 的 ack(确认机制)用于在生产者和 broker 之间平衡“吞吐量”与“可靠性”:
-
acks=0
:生产者发送消息后不等待任何确认。最高吞吐、最低可靠性(broker 故障时可能丢消息)。 -
acks=1
(默认):Leader 写入成功后即返回 ack。Follower 尚未同步完成时如果 Leader 挂掉,可能丢失这部分消息。中等可靠性、中等延迟。 -
acks=all
(或 -1):Leader 等待 ISR(所有同步副本)都写入成功后才返回 ack。最高可靠性,最低吞吐。但如果 Leader 在发送 ack 前崩溃,可能导致数据重复。
补充:
-
从 Kafka 0.11 版本开始引入幂等性生产者(idempotent producer),可避免重复写入,即使重试也只持久化一条记录。
-
对于需要精确一次(Exactly-once)语义,可结合事务 API 实现。
选型建议
-
对日志类或允许少量丢失的数据可用 acks=1
-
对金融等敏感数据或必须可靠传输用 acks=all
-
配合
retries
、max.in.flight.requests.per.connection
等参数提升稳定性
部署注意点
-
server.properties
常见项:broker.id
,listeners
,log.dirs
,zookeeper.connect
,num.partitions
,log.retention.hours
-
启动脚本与开机自启配置
-
topic 创建与管理:
kafka-topics.sh --create
/--describe
/--list
cd /opt/ tar zxvf kafka_2.13-2.7.1.tgz mv kafka_2.13-2.7.1 /usr/local/kafka //修改配置文件 cd /usr/local/kafka/config/ cp server.properties{,.bak} vim server.properties broker.id=0 ●21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2 listeners=PLAINTEXT://192.168.10.18:9092 ●31行,指定监听的IP和端口,如果修改每个 broker的IP需区分开来,也可保持默认配置不用修改 num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改 num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数 socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小 socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小 log.dirs=/usr/local/kafka/logs #60行,kafka运行日志存放的路径,也是数据存放的路径num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆 盖 num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量 log.retention.hours=168 #103行,segment文件(数据文件)保留的最长时间,单位为小时,默 认为7天,超时将被删除 log.segment.bytes=1073741824 #110行,一个segment文件最大的大小,默认为 1G,超出将新建 一个新的segment文件 zookeeper.connect=192.168.10.18:2181,192.168.10.21:2181,192.168.10.22:2181 ●123行,配置连接Zookeeper集群地址 //修改环境变量 vim /etc/profile export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile //配置 Zookeeper 启动脚本 vim /etc/init.d/kafka #!/bin/bash # chkconfig: 2345 22 88 # description: Kafka Service Control Script KAFKA_HOME='/usr/local/kafka' case "$1" instart)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties;;stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh;;restart)$0 stop$0 start;;status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka.Kafka | grep -v grep | wc -l)if [ "$count" -eq 0 ]; thenecho "kafka is not running"elseecho "kafka is running"fi;;*)echo "Usage: $0 {start|stop|restart|status}";; esac //设置开机自启 chmod +x /etc/init.d/kafka chkconfig --add kafka //分别启动 Kafka service kafka start
kafka命令行相关操作
//创建topic kafka-topics.sh --create --zookeeper 192.168.10.18:2181,192.168.10.21:2181,192.168.10.22:2181 --replication-factor 2 - -partitions 3 --topic test kafka-topics.sh --create --zookeeper 192.168.10.18:2181,192.168.10.20:2181,192.168.10.21:2181 --replication-factor 2 - -partitions 3 --topic test --------------------------------------------------------------------------------- ---- --zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可 --replication-factor:定义分区副本数,1 代表单副本,建议为 2 --partitions:定义分区数 --topic:定义 topic 名称 --------------------------------------------------------------------------------- ---- //查看当前服务器中的所有 topic kafka-topics.sh --list --zookeeper 192.168.10.18:2181,192.168.10.21:2181,192.168.10.22:2181 //查看某个 topic 的详情 kafka-topics.sh --describe --zookeeper 192.168.10.18:2181,192.168.10.21:2181,192.168.10.22:2181 //发布消息 kafka-console-producer.sh --broker-list 192.168.10.18:9092,192.168.10.21:9092,192.168.10.22:9092 --topic test //消费消息 kafka-console-consumer.sh --bootstrap-server 192.168.10.18:9092,192.168.10.21:9092,192.168.10.22:9092 --topic test --frombeginning --------------------------------------------------------------------------------- ---- --from-beginning:会把主题中以往所有的数据都读取出来 --------------------------------------------------------------------------------- ---- //修改分区数 kafka-topics.sh--zookeeper 192.168.10.18:2181,192.168.10.21:2181,192.168.10.22:2181 --alter --topic test -- partitions 6 //删除 topic kafka-topics.sh --delete --zookeeper 192.168.10.18:2181,192.168.10.21:2181,192.168.10.22:2181 --topic test
第三部分:Filebeat + Kafka + ELK
环境配置
服务与配置名称 | IP地址 | 服务 |
---|---|---|
Node1节点(2C/4G) | 192.168.10.14 | Elasticsearch Kibana |
Logstash节点 | 192.168.10.16 | Logstash |
Filebeat节点 | 192.168.10.17 | Filebeat Apache |
Filebeat
示例(把日志发到 Kafka):
cd /usr/local/filebeat vim filebeat.yml filebeat.prospectors: - type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]# 注:这里要注释掉logstash output,否则会与output.kafka冲突报错 #----------------------------- Logstash output -------------------------------- #output.logstash:# The Logstash hosts# hosts: ["192.168.114.251:5044"] # Optional SSL. By default is off.# List of root certificates for HTTPS server verifications#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # Certificate for SSL client authentication#ssl.certificate: "/etc/pki/client/cert.pem" # Client Certificate Key#ssl.key: "/etc/pki/client/cert.key" #----------------------------- kafka output -------------------------------- output.kafka:enabled: truehosts: ["192.168.114.50:9092","192.168.114.40:9092","192.168.114.30:9092"] #指 定 Kafka 集群配置topic: "httpd" #指定 Kafka 的 topic#启动 filebeat ./filebeat -e -c filebeat.yml #注:rpm安装就直接 cd /etc/filebeat/ filebeat -e -c filebeat.yml
小建议
-
Filebeat 应部署在产生日志的机器上(尽量把采集器放到日志源机器)。
-
考虑使用
fields
或tags
标记日志类型,便于 downstream 做路由。 -
使用
filebeat modules enable apache
可以自动解析 Apache access/error 日志并输出 ECS 字段。
ELK部署补充
#在 Logstash 组件所在节点上新建一个 Logstash 配置文件 cd /etc/logstash/conf.d/ vim kafka.conf input {kafka {bootstrap_servers => "192.168.10.18:9092,192.168.10.21:9092,192.168.10.22:9092" #kafka集群地址topics => "httpd" #拉取的kafka的指定topictype => "httpd_kafka" #指定 type 字段codec => "json" #解析json格式的日志数据auto_offset_reset => "latest" #拉取最近数据,earliest为从头开始拉取2.10.4 Kibana 添加 decorate_events => true #传递给elasticsearch的数据额外增加kafka的属性数据} } output {if "access" in [tags] {elasticsearch {hosts => ["192.168.10.13:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.10.13:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug } } # 修改pipelines.yml,用于解析kafka数据 - pipeline.id: kafkapath.config: "/etc/logstash/conf.d/kafka.conf"#启动 logstash logstash -f kafka.conf 注:生产黑屏操作es时查看所有的索引:curl -X GET "localhost:9200/_cat/indices?v"
其他补充内容
Elasticsearch(索引与模板)
为什么要用索引模板
-
提前定义字段类型(避免动态 mapping 不正确,例如把 ip 当作 text)
-
指定时间字段、关键字段类型,提高查询性能
-
配合 ILM(索引生命周期管理)实现自动 rollover / 删除
示例简单模板(JSON):
{"index_patterns": ["httpd_access-*"],"settings": {"number_of_shards": 1,"number_of_replicas": 1},"mappings": {"properties": {"@timestamp": { "type": "date" },"clientip": { "type": "ip" },"request": { "type": "keyword" },"response": { "type": "integer" }}} }
创建模板:
curl -XPUT "http://192.168.10.13:9200/_index_template/httpd_template" -H 'Content-Type: application/json' -d @httpd_template.json
ILM 建议
-
为热点日志创建 rollover 策略(每天 rollover 或大小限制)
-
为历史日志设置删除或冷存储策略
Kibana
-
在 Kibana 中创建 Index Pattern(例如
httpd_access-*
),然后用 Discover 查看;可用 Filebeat 模块或自定义 Dashboard 展示流量/错误趋势。
第四部分:常见问题与排查
Kafka 有 topic、Logstash 在运行,但 ES 没索引系统化成排查步骤:
0. 先理清数据路径
Filebeat(产生日志主机) → Kafka → Logstash(消费) → Elasticsearch
一个常见误配置:把 Filebeat 部署在没有日志的机器上(或配置了错误的
paths
)→ Kafka topic 空 → Logstash 无数据 → ES 无索引
1. 验证 Filebeat 是否真的采集到数据
-
在 Filebeat 主机检查日志文件是否存在:
ls -l /var/log/httpd/access_log /var/log/httpd/error_log tail -n 20 /var/log/httpd/access_log
-
查看 Filebeat 的日志:通常在
/var/log/filebeat/filebeat
或使用journalctl -u filebeat
。 -
临时把 Filebeat 输出改为控制台(
filebeat -e -c filebeat.yml
)看事件结构(JSON)是否带有你期望的 fields/tags。
2. 验证 Kafka 中是否有消息
-
使用 kafka-console-consumer:
kafka-console-consumer.sh --bootstrap-server 192.168.10.18:9092 --topic httpd --from-beginning --timeout-ms 2000
-
或用
kafka-topics.sh --describe
看分区/leader/ISR
3. 验证 Logstash 是否消费到 Kafka
-
启动 Logstash 时保留
stdout { codec => rubydebug }
,重启 Logstash,观察是否打印 JSON 事件。 -
如果没有输出:
-
检查
bootstrap_servers
是否正确 -
检查
group_id
与auto_offset_reset
(如果 group 已存在并且 offset 在末尾,设置earliest
作测试)
-
4. 验证事件结构是否含 tags
-
Logstash stdout 输出中看是否包含
tags
字段(或你用fields.log_type
) -
如果 Filebeat 没带 tag,可在 Logstash filter 中
mutate { add_tag => ["access"] }
做临时补救
5. 验证 Logstash → Elasticsearch 通道
-
临时修改 Logstash output:把
index => "test-httpd-%{+YYYY.MM.dd}"
(无条件)写入 ES,查看curl -X GET 'http://192.168.10.13:9200/_cat/indices?v'
-
如果写入失败:查看 Logstash 日志(
/var/log/logstash/logstash-plain.log
)查看错误(如mapper_parsing_exception
、security_exception
等)
6. 检查 Elasticsearch 问题
-
检查 ES 是否可访问:
curl -XGET http://192.168.10.13:9200
返回版本信息 -
如果开启安全:确认用户名/密码或 API Key 是否在 Logstash 的 Elasticsearch 输出中配置
-
检查磁盘和 JVM 内存(ES 无空间或 OOM 会拒绝写入)
7. 常用诊断命令汇总
-
Filebeat 日志:
journalctl -u filebeat -f
或/var/log/filebeat/*
-
Kafka 消费:
kafka-console-consumer.sh --bootstrap-server ... --topic httpd --from-beginning
-
Kafka topic info:
kafka-topics.sh --describe --topic httpd --zookeeper zk:2181
-
Logstash 日志:
tail -f /var/log/logstash/logstash-plain.log
-
ES 索引:
curl -s 'http://es:9200/_cat/indices?v'
-
ES 模板:
curl -s 'http://es:9200/_index_template?pretty'
第五部分:实用范例
-
在 Apache 主机上安装并启用 Filebeat Apache module(自动解析)
filebeat modules enable apache filebeat setup --index-management -E output.logstash.hosts=["127.0.0.1:5044"] systemctl enable --now filebeat
-
在 Logstash 使用 beats input:
input { beats { port => 5044 } } # filter:可直接使用 filebeat apache 字段,不再需要 grok output { elasticsearch { hosts => ["http://192.168.10.13:9200"] index => "filebeat-apache-%{+YYYY.MM.dd}" } }
-
如果要使用 Kafka 做缓冲,确保 Filebeat 在产生日志机器,输出为 Kafka,Logstash 使用 kafka input 消费。
总结
-
将采集器放在日志源头:Filebeat 直接运行在产生日志的机器,减少网络拷贝和权限问题。
-
生产环境优先 use Kafka 做解耦:当你有多个下游消费者或需要削峰时,Kafka 是合适的中间层。
-
始终打开测试通道:
stdout { codec => rubydebug }
在排查时非常有用。 -
使用 Index Template + ILM 保护 Elasticsearch,避免 mapping 漂移和磁盘被日志吞没。
-
记录每一次变更:consumer group、topic 名称、offset reset 策略等会影响数据消费。