分布式专题——56 微服务日志采集与分析系统实战
1 为什么要使用 ELK
-
随着企业信息化进程加速,日志数据呈现量急剧增加、来源多样、格式复杂的特点,传统日志管理方式已难以满足需求,这是引入ELK的核心背景;
-
ELK(ElasticSearch、Logstash、Kibana)的由三个组件构成,各自承担关键功能,形成高效的日志管理方案:
-
Elasticsearch:提供强大的分布式搜索能力,支撑日志的快速检索;
-
Logstash:具备灵活的数据采集与处理功能,负责日志的收集和预处理;
-
Kibana:提供直观的数据可视化界面,将复杂日志数据以图表等形式呈现,便于理解分析;
-
三者结合,为企业和开发人员提供了高效、实时、可扩展且易用的日志管理解决方案,助力提升工作效率和问题解决速度;
-
-
使用ELK的主要原因
-
集中化管理与高效检索日志
-
在大型分布式系统中,ELK可构建集中式日志系统,实现所有节点日志的统一收集、管理和访问,大幅提高定位问题的效率;
-
Elasticsearch的强大检索特性,能快速查询问题日志,显著提升运维人员的工作效率;
-
-
全面的日志分析与系统监控
-
ELK可管理和分析多种日志类型,包括系统日志、应用程序日志、安全日志等,帮助系统运维和开发人员了解服务器软硬件信息、检查配置错误及其原因;
-
通过分析和监控日志,能及时了解服务器的负荷、性能和安全性,从而及时采取措施纠正错误;
-
-
直观的数据可视化与理解
-
Kibana为Elasticsearch提供Web可视化界面,可生成各种维度的表格、图形,让复杂的日志数据“可视化”;
-
可视化界面帮助用户更直观地理解和分析数据,进一步提升日志分析和系统监控的效果。
-
-
2 ELK 的整体架构分析
- ELK架构分为两种类型:
- 经典的ELK架构
- 整合消息队列(Redis/Kafka/RabbitMQ)和Nginx的ELK架构
2.1 经典的ELK架构
-
组成:
- 经典ELK架构主要由 Filebeat + Logstash + Elasticsearch + Kibana 组成;
- 早期架构仅包含
Logstash + Elasticsearch + Kibana,后来因Filebeat轻量级、高效性的特点,逐渐被引入作为日志收集工具;

-
流程与组件分工:
-
Beats(以Filebeat为代表):负责 Data Collection(数据收集),作为轻量级日志收集代理,部署在客户端,消耗资源少,高效收集日志数据;
-
Logstash:负责 Data Aggregation & Processing(数据聚合与处理),对Filebeat收集的日志数据进行过滤、转换等操作,再发送到Elasticsearch;
-
Elasticsearch:负责 Indexing & Storage(索引与存储),基于Lucene的分布式搜索和分析引擎,提供强大的数据存储和搜索能力;
-
Kibana:负责 Analysis & Visualization(分析与可视化),为Elasticsearch提供Web可视化界面,通过图表、仪表盘等形式直观查看和分析日志数据;
-
-
特点:
-
日志收集:Filebeat轻量级,客户端部署,资源消耗少,高效收集日志;
-
数据处理:Logstash作为数据处理管道,完成日志的过滤、转换等操作;
-
存储与检索:Elasticsearch提供分布式存储和强大检索能力;
-
可视化:Kibana实现日志数据的可视化分析;
-
-
适用场景:主要适用于数据量较小的开发环境。但因缺少消息队列的缓冲机制,若Logstash或Elasticsearch故障,存在数据丢失的风险。
2.2 整合消息队列+Nginx的ELK架构
-
组成:在经典ELK架构基础上,整合 消息队列(Redis、Kafka、RabbitMQ任选) 和 Nginx,形成更复杂的架构;
-
流程与组件分工:
-
Beats:依然负责 Data Collection(数据收集);
-
消息队列(Redis/Kafka/RabbitMQ):负责 Buffering(缓冲),作为中间缓冲层;
-
Logstash:负责 Data Aggregation & Processing(数据聚合与处理);
-
Elasticsearch:负责 Indexing & Storage(索引与存储);
-
Kibana:负责 Analysis & Visualization(分析与可视化);
-
Nginx:作为高性能Web和反向代理服务器,优化系统性能和可用性;

-
-
特点:
-
消息队列:引入后作为缓冲机制,即使Logstash或Elasticsearch故障,日志数据也不会丢失;同时能修剪网络传输峰,降低数据丢失可能性;
-
Nginx:在负载均衡、缓存等方面发挥作用,提升系统性能和用户访问体验;
-
扩展性:引入消息队列和Nginx后,架构扩展性增强,可根据实际需求动态调整各组件的资源分配和部署规模;
-
-
适用场景:主要适用于生产环境,尤其是需要处理大数据量的场景。能确保数据的安全性和完整性,同时提供高性能的日志处理和可视化分析服务。
3 数据处理管道Logstash
3.1 概述
-
Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据、转换数据,然后将数据发送到目标存储库中;
-
官网:Logstash:收集、解析和转换日志 | Elastic;
-
应用场景:作为ETL工具(Extract-Transform-Load,提取-转换-加载)和数据采集处理引擎,支持日志、指标、Web应用、数据库等多源数据的采集与处理,最终服务于分析(Analysis)、归档(Archiving)、监控(Monitoring)、告警(Alerting)等场景;

3.2 Logstash的核心概念
-
Pipeline
-
是Logstash的核心处理流程,包含Input—Filter—Output三个阶段;
-
负责插件生命周期管理和队列管理,是数据从采集到输出的完整流转通道;
-
-
Logstash Event
-
是数据在Logstash内部流转时的具体表现形式:在Input阶段被转换为Event,在Output阶段被转化为目标格式数据;
-
本质是一个Java Object,可在配置文件中对其属性进行增删改查;
-
-
Codec(Code / Decode,编码/解码):
- 将原始数据**decode(解码)**成Event;
- 将Event **encode(编码)**成目标数据,实现数据格式的转换适配。
3.3 Logstash的数据传输原理
-
数据传输遵循**“Input(数据采集)→ Filter(数据解析)→ Output(数据输出)”**的流程,各阶段有典型组件支撑:
阶段 功能 典型组件示例 Input 数据采集与输入 Stdin(标准输入)、JDBC(数据库连接) Filter 实时解析和数据转换 Mutate(数据修改)、Date(日期处理)、User Agent(用户代理解析) Output 存储与数据导出 Elasticsearch(主流存储目标) 
-
Logstash通过**管道(Pipeline)**完成数据的采集与处理,流程如下:
-
数据采集(Input):从数据源(Data Source,如日志文件、数据库等)采集数据,支持多种输入插件,以连续的流式传输方式获取数据;
-
数据过滤/预处理(Filter,可选):通过Filter插件对数据进行过滤、转换(如字段提取、格式调整等),构建结构化数据;
-
数据输出(Output):将处理后的数据发送到目标存储(如Elasticsearch),支持多种输出插件配置;

-
3.4 Logstash的安装与配置
3.4.1 Logstash的安装
-
Logstash官方安装文档地址:[Installing Logstash | Logstash Reference 8.14] | Elastic;
-
下载并解压Logstash
-
Logstash下载地址:Past Releases | Elastic;
-
选择版本 8.14.3;
-
不同系统的下载链接:
-
Windows系统:
https://artifacts.elastic.co/downloads/logstash/logstash-8.14.3-windows-x86_64.zip; -
Linux系统:
https://artifacts.elastic.co/downloads/logstash/logstash-8.14.3-linux-x86_64.tar.gz;
-
-
-
测试:运行最基本的Logstash管道
-
进入Logstash目录:执行命令
cd logstash-8.14.3,切换到Logstash的安装目录; -
不同系统的测试命令:
-
Linux系统:
bin/logstash -e 'input { stdin {} } output { stdout {} }'- 其中
-e选项表示“直接把配置放在命令中”,用于快速测试;
- 其中
-
Windows系统:
.\bin\logstash.bat -e "input { stdin {} } output { stdout {} }"
-
-
测试逻辑说明:
-
input { stdin {} }:配置Logstash从**标准输入(键盘输入)**采集数据; -
output { stdout {} }:配置Logstash将处理后的数据输出到标准输出(终端);
-
-
-
当在终端输入内容(如
hello、hello fox)后,Logstash会将输入内容以结构化的Event形式输出-
包含以下字段:
-
message:输入的原始内容(如"hello\r"、"hello fox\r"); -
@timestamp:数据处理的时间戳(如2022-06-10T05:36:47.382Z); -
@version:Logstash事件的版本(固定为"1"); -
host:运行Logstash的主机名(如"LAPTOP-UTD9471P");

-
-
这表明Logstash的管道已成功运行,能够完成“输入→输出”的基本数据处理流程。
-
3.4.2 Logstash的配置
-
Logstash官方配置文档地址:[Creating a Logstash pipeline | Logstash Reference 8.14] | Elastic;
-
Logstash的管道配置文件对每种类型的插件(Input、Filter、Output)提供单独配置部分,用于处理管道事件;
-
Input部分
input {stdin {} }- 功能:配置数据输入源,此处使用
stdin插件,从**标准输入(键盘)**采集数据;
- 功能:配置数据输入源,此处使用
-
Filter部分
filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}" }}date {match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]} }-
grok插件:通过正则匹配(%{COMBINEDAPACHELOG}是预定义的Apache日志格式),尝试将message字段解析为结构化数据; -
date插件:尝试将名为timestamp的字段按格式dd/MMM/yyyy:HH:mm:ss Z解析为时间戳,用于统一时间格式;
-
-
Output部分
output {elasticsearch {index => "logstash-demo"hosts => ["localhost:9200"]}stdout { codec => rubydebug } }-
elasticsearch插件:将处理后的数据输出到Elasticsearch,指定索引为logstash-demo,Elasticsearch服务地址为localhost:9200; -
stdout插件:同时将数据输出到终端,codec => rubydebug用于以结构化格式(便于调试)展示输出内容;
-
-
-
每个配置部分可包含一个或多个插件,Logstash会按照插件在配置文件中出现的顺序依次处理(例如Filter中的
grok先执行,再执行date); -
执行以下命令启动Logstash并加载配置文件(假设配置文件名为
logstash-demo.conf):bin/logstash -f logstash-demo.conf
- 当输入
hello后,终端输出结构化数据,包含以下关键信息:-
@version:事件版本(固定为"1") -
host:运行Logstash的主机信息(如"hostname" => "Java") -
tags:包含"_grokparsefailure",说明grok插件解析"hello"时失败(因hello不符合Apache日志格式) -
message:原始输入内容("hello\r") -
@timestamp:事件处理的时间戳(如2024-08-28T14:21:13.961676600Z) -
event.original:原始事件内容("hello\r")
-
- 当输入
-
通过Elasticsearch的检索API可查看到,数据已成功写入
logstash-demo索引,字段与终端输出一致,验证了数据已同步到Elasticsearch中;
3.4.3 Logstash的插件
3.4.3.1 插件分类
-
Logstash插件分为Input、Filter、Output、Codec四大类,各自承担不同功能;
-
Input Plugins(输入插件)
-
官方文档:[Input plugins | Logstash Reference 8.14] | Elastic;
-
核心特点:一个Pipeline可以包含多个Input插件,支持从多种数据源采集数据;
-
常见插件示例:
- 基础输入:
Stdin(标准输入)、File(文件); - 中间件/数据库:
Beats、Log4J、Elasticsearch、JDBC、Kafka、Rabbitmq、Redis; - 协议/服务:
JMX、HTTP、Websocket、UDP、TCP; - 云存储/平台:
Google Cloud Storage、S3、Github、Twitter;
- 基础输入:
-
-
Filter Plugins(过滤插件)
-
官方文档:[Filter plugins | Logstash Reference 8.14] | Elastic;
-
核心功能:对Logstash Event进行解析、字段删除、类型转换等处理;
-
常见插件示例:
Date:日期解析;Dissect:分割符解析;Grok:正则匹配解析;Mutate:对字段做多种操作(Convert类型转换、Gsub字符串替换、Split/Join/Merge字符串/数组操作、Rename字段重命名、Update/Replace字段内容更新、Remove_field字段删除);Ruby:利用Ruby代码动态修改Event;
-
-
Output Plugins(输出插件)
-
官方文档:[Output plugins | Logstash Reference 8.14] | Elastic;
-
核心特点:是Pipeline的最后一个阶段,负责将Event发送到特定目的地;
-
常见插件示例:
- 存储引擎:
Elasticsearch; - 通知/监控:
Email、Pageduty; - 数据库/中间件:
Influxdb、Kafka、Mongodb、Opensdb、Zabbix; - 协议/服务:
Http、TCP、Websocket;
- 存储引擎:
-
-
Codec Plugins(编解码插件)
-
官方文档:[Codec plugins | Logstash Reference 8.14] | Elastic;
-
核心功能:将原始数据decode成Event,再将Event encode成目标数据,实现数据格式的转换适配;
-
内置插件示例:
Line/Multiline、JSON/Avro/Cef(ArcSight Common Event Format)、Dots/Rubydebug。
-
3.4.3.2 Codec插件之Multiline(多行编解码)
-
用于处理多行数据(如异常堆栈信息),核心是通过配置参数将多行合并为一个Event;
-
配置参数
-
pattern:设置行匹配的正则表达式; -
what:匹配成功时,该行属于上一个事件(previous)还是下一个事件(next); -
negate:是否对pattern的结果取反(true/false);
-
-
例:处理Java异常堆栈
-
以包含多行的
NullPointerException堆栈为例,配置文件multiline-exception.conf如下:input {stdin {codec => multiline {pattern => "^\s" # 匹配以空白开头的行what => "previous" # 匹配行属于上一个事件}} } filter {} output {stdout { codec => rubydebug } } -
执行命令:
bin/logstash -f multiline-exception.conf,即可将多行异常堆栈合并为一个Event输出。
-
3.4.3.3 Logstash Queue(队列)
-
用于在Pipeline中缓冲数据,分为两种类型:
-
In Memory Queue(内存队列):数据存储在内存中,若进程Crash或机器宕机,会导致数据丢失;
-
Persistent Queue(持久化队列)
-
特点:数据持久化到磁盘,即使机器宕机,数据也不会丢失;能保证数据被消费,可替代Kafka等消息队列的缓冲区作用;
-
配置方式(在
pipelines.yml中设置):queue.type: persisted # 默认是memory queue.max_bytes: 4gb # 队列最大容量
-
-
队列在Pipeline中的流程:
Input → Codec → Queue → Batcher → Filter → Output,多个Input可共享Queue进行数据缓冲与分发;
3.4.4 练习:同步MySQL数据到ElasticSearch
3.4.4.1 需求分析
- 将MySQL数据库中的用户数据同步到ElasticSearch,借助ES的全文搜索能力提升搜索速度,具体需求包括:
-
新增用户信息需同步到ES
-
用户信息更新后,ES需同步更新
-
支持增量更新(只同步变化的数据)
-
用户注销后,不能被ES搜索到
-
3.4.4.2 实现思路
-
借助Logstash的JDBC Input Plugin将数据从MySQL读取到Logstash,再通过Elasticsearch Output Plugin同步到ES。JDBC Input Plugin的关键能力:
-
需自行提供JDBC Driver(如MySQL的JDBC驱动包)
-
支持定时任务调度(语法基于Rufus-scheduler,扩展了Cron语法)
-
支持通过
Tracking_column/sql_last_value记录状态,实现增量更新
-
-
官方文档:[Jdbc input plugin | Logstash Reference 8.14] | Elastic。
3.4.4.3 JDBC Input Plugin实现步骤
-
拷贝JDBC依赖:将MySQL的JDBC驱动包(如
mysql-connector-java-5.1.49.jar)拷贝到Logstash的自定义目录(如logstash-8.14.3/drivers); -
准备配置文件
mysql-demo.conf。配置文件分为input、output两个部分,实现MySQL数据采集与ES同步:input {jdbc {# JDBC驱动路径与类名jdbc_driver_library => "/home/fox/logstash-8.14.3/driver/mysql-connector-java-5.1.49.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"# MySQL连接信息jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useSSL=false"jdbc_user => "root"jdbc_password => "123456"# 增量更新配置use_column_value => true # 启用字段值追踪tracking_column => "last_updated" # 追踪的字段(需是数值或时间类型)tracking_column_type => "numeric" # 追踪字段类型(numeric/ timestamp)record_last_run => true # 记录最后一次运行状态last_run_metadata_path => "jdbc-position.txt" # 状态保存路径statement => "SELECT * FROM user where last_updated > :sql_last_value;" # 增量查询SQLschedule => "* * * * * *" # Cron表达式,设置任务触发频率} }output {# 同步到Elasticsearchelasticsearch {document_id => "%{id}" # 文档ID关联MySQL的id字段document_type => "_doc"index => "users" # ES索引名hosts => ["http://localhost:9200"]username => "elastic" # ES认证信息password => "123456"}# 终端输出调试(可选)stdout {codec => rubydebug} } -
运行Logstash。执行命令启动Logsta sh并加载配置文件:
bin/logstash -f mysql-demo.conf
3.4.4.4 测试
-
表结构:创建
user表,包含id(主键)、name、address、last_updated(增量追踪字段)、is_deleted(注销标记字段);CREATE TABLE `user` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(50) DEFAULT NULL,`address` varchar(50) DEFAULT NULL,`last_updated` bigint DEFAULT NULL,`is_deleted` int DEFAULT NULL,PRIMARY KEY (`id`) ) ENGINE=InnoDB; -
插入数据:新增用户“张三”,并设置
last_updated为当前时间戳;INSERT INTO user(name,address,last_updated,is_deleted) VALUES("张三","广州天河",unix_timestamp(NOW()),0); -
更新数据:更新“张三”的地址,并更新
last_updated;UPDATE user SET address="广州白云山",last_updated=unix_timestamp(NOW()) WHERE name="张三"; -
注销用户:标记“张三”为注销(
is_deleted=1),并更新last_updated;UPDATE user SET is_deleted=1,last_updated=unix_timestamp(NOW()) WHERE name="张三"; -
数据同步效果:
-
插入、更新操作后,Logstash会通过增量查询将变化的数据同步到ES,终端可通过
stdout { codec => rubydebug }看到结构化的同步数据; -
注销操作后,数据仍会同步到ES,但可通过ES的Alias机制过滤掉注销用户;
-
3.4.4.5 ES中查询优化(Alias机制)
-
通过创建**别名(Alias)**并添加过滤条件,实现“只显示未被注销的用户”:
-
创建Alias:
POST /_aliases {"actions": [{"add": {"index": "users","alias": "view_users","filter": { "term": { "is_deleted": 0 } }}}] } -
通过Alias查询:只会返回
is_deleted=0的用户,注销用户(is_deleted=1)不会被检索到POST view_users/_search {"query": {"term": {"name.keyword": {"value": "张三"}}} }
4 轻量级采集器FileBeat
4.1 概述
-
Beats是一个免费且开放的平台,集合了多种单一用途的数据采集器,可从成百上千甚至上万台机器和系统向Logstash或Elasticsearch发送数据;
-
Beats包含多个针对不同数据类型的采集器:
-
Filebeat:采集日志文件;
-
Metricbeat:采集指标数据;
-
Packetbeat:采集网络数据;
-
Winlogbeat:采集Windows事件日志;
-
Auditbeat:采集审计数据;
-
Heartbeat:用于运行时间监控;
-
Functionbeat:无服务器的采集器;

-
-
FileBeat是专门用于转发和收集日志数据的轻量级采集工具,可作为代理安装在服务器上,监控指定路径的日志文件,收集日志数据后发送到Elasticsearch或Logstash。
4.2 工作原理
-
FileBeat的工作流程由Prospector(勘探器)、Harvester(收割机)、Libbeat三个核心组件协同完成:
-
Prospector(勘探器):启动FileBeat时会启动一个或多个Input,用于监控指定的日志数据位置(如
/var/log/*.log、/var/log/nginx/*等路径); -
Harvester(收割机):针对每一个被监控的文件启动一个Harvester,负责读取文件的日志内容,并将新的日志发送到Libbeat;
-
Libbeat:负责收集多个Harvester的数据,并将数据发送到输出端(Output),支持直接发送到Elasticsearch,或先发送到Logstash再转发到Elasticsearch;
-
-
以监控
/var/log/*.log(Prospector 1)和/var/log/nginx/*(Prospector 2)为例:-
Prospector 1监控
/var/log/*.log下的log1文件,为其启动Harvester读取日志,将新日志发送到Libbeat; -
Prospector 2监控
/var/log/nginx/*下的error.log文件,为其启动Harvester读取日志,将新日志发送到Libbeat; -
Libbeat收集所有Harvester的数据后,可选择直接发送到Elasticsearch,或先发送到Logstash再由Logstash转发到Elasticsearch;

-
4.3 Logstash VS FileBeat
-
资源消耗对比
-
Logstash:运行在JVM(Java虚拟机)上,资源消耗较大;
-
FileBeat:基于Golang编写,功能相对少但资源消耗小,更轻量级,占用资源更少;
-
-
功能定位对比
-
两者都具备日志收集功能,但FileBeat更轻量;
-
Logstash独有Filter功能:可对日志进行过滤、分析(如字段提取、格式转换等);
-
-
典型配合流程:
-
FileBeat采集日志,发送到**消息队列(Redis、MQ等)**做缓冲;
-
Logstash从消息队列获取日志,利用Filter功能过滤分析;
-
最终将处理后的数据存储到Elasticsearch;
-
-
FileBeat支持背压压制:
-
当向Logstash或Elasticsearch发送数据时,使用背压敏感协议;
-
若Logstash忙于处理数据,会通知FileBeat减慢读取速度;
-
拥塞解决后,FileBeat恢复原速度继续传输数据,以此适配数据量波动,保证系统稳定。
-
4.4 FileBeat的安装与配置
-
下载并解压FileBeat
-
FileBeat官方安装配置文档:[Filebeat quick start: installation and configuration | Filebeat Reference 8.14] | Elastic;
-
下载地址:Past Releases | Elastic;
-
选择版本:8.14.3
-
Windows系统下载链接:
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.14.3-windows-x86_64.zip; -
Linux系统:执行命令下载并解压
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.14.3-linux-x86_64.tar.gz tar xzvf filebeat-8.14.3-linux-x86_64.tar.gz
-
-
编辑配置。修改
filebeat.yml以配置Elasticsearch和Kibana的连接信息:output.elasticsearch:hosts: ["192.168.65.174:9200", "192.168.65.192:9200", "192.168.65.204:9200"] # Elasticsearch集群地址username: "elastic" # Elasticsearch认证用户名password: "123456" # Elasticsearch认证密码setup.kibana:host: "192.168.65.174:5601" # Kibana地址 -
启用和配置数据收集模块
-
从FileBeat安装目录中执行以下操作,启用特定服务的日志收集模块;
-
查看可启用的模块列表
./filebeat modules list -
启用Nginx模块
-
启用命令:
./filebeat modules enable nginx -
若需修改Nginx日志路径,编辑
modules.d/nginx.yml:- module: nginxaccess:enabled: truevar.paths: ["/var/log/nginx/access.log*"] # Nginx访问日志路径
-
-
启用Logstash模块
-
启用命令:
./filebeat modules enable logstash -
若需修改Logstash日志路径,编辑
modules.d/logstash.yml:- module: logstashlog:enabled: truevar.paths: ["/home/fox/logstash-8.14.3/logs/*.log"] # Logstash日志路径
-
-
-
启动FileBeat
-
加载Kibana仪表板(可选,若未配置过则执行)
./filebeat setup -
启动FileBeat
./filebeat -e -
验证启动效果。启动成功后,可在Kibana的Observability → Logs → Stream中查看收集到的日志(如Logstash的日志),确认数据已成功传输和可视化;

-
4.5 练习1:FileBeat采集Tomcat服务器日志并发送到Logstash
-
需求背景:Tomcat服务器运行时产生大量日志(如访问日志),需通过FileBeat采集这些日志,并发送到Logstash进行后续处理;
-
配置FileBeat采集Tomcat日志并发送到Logstash
-
创建配置文件
filebeat-tomcat.yml。配置FileBeat的日志采集路径、多行合并规则,以及Logstash的连接信息:filebeat.inputs: - type: logenabled: truepaths:- /home/fox/apache-tomcat-9.0.93/logs/*access*.* # Tomcat访问日志路径# 多行合并规则(因Tomcat日志以IP开头,需合并非IP开头的行到上一行)multiline.pattern: '^\d+\.\d+\.\d+\.\d+ ' # 匹配IP开头的正则multiline.negate: true # 对pattern取反(即非IP开头的行)multiline.match: after # 合并到上一行的末尾output.logstash:enabled: truehosts: ["localhost:5044"] # Logstash监听地址与端口- 其中,
multiline参数用于处理多行日志(如异常堆栈):-
pattern:正则表达式,匹配行的特征; -
negate:true表示对pattern结果取反; -
match:after表示将匹配行合并到上一行末尾;
-
- 其中,
-
启动FileBeat并指定配置文件。执行命令启动FileBeat:
./filebeat -e -c filebeat-tomcat.yml -
可能出现的异常及解决
-
异常1:配置文件权限过高(安全限制)
-
报错:
Exiting: error loading config file: config file "...yml" can only be writable by the owner... -
解决:修改文件权限为
644(仅 owner 可写):chmod 644 filebeat-tomcat.yml
-
-
异常2:无法连接Logstash
- 报错:
Failed to connect to backoff(async(tcp://...)): dial tcp ...: connect: connection refused - 原因:未启动或未正确配置Logstash,导致FileBeat无法连接;
- 报错:
-
-
-
配置Logstash接收FileBeat数据并打印
-
创建Logstash配置文件
logstsh-tomcat.conf。配置Logstash的Beats输入插件(监听FileBeat的连接)和终端输出(用于调试):input {beats {port => 5044 # 与FileBeat配置的端口一致} }output {stdout {codec => rubydebug # 以结构化格式输出到终端} } -
测试Logstash配置是否正确**。执行命令验证配置语法:
bin/logstash -f config/logstsh-tomcat.conf --config.test_and_exit- 若输出
Config Validation Result: OK,则配置正确;
- 若输出
-
启动Logstash。执行命令启动Logstash,并启用配置自动重载:
bin/logstash -f config/logstsh-tomcat.conf --config.reload.automatic
-
-
测试:访问Tomcat,验证日志传输。访问Tomcat服务后,查看Logstash的终端输出,若能看到类似以下的结构化日志数据,说明传输成功:
{"tags" => ["beats_input_codec_plain_applied"],"input" => { "type" => "log" },"event" => { "original" => "192.168.65.103 - - [30/Aug/2024:13:29:11 +0800] \"GET /docs/ HTTP/1.1\" 403 904" },"@version" => "1","log" => { "offset" => 1559, "file" => { "path" => "/home/fox/apache-tomcat-9.0.93/logs/localhost_access_log.2024-08-30.txt" } },"message" => "192.168.65.103 - - [30/Aug/2024:13:29:11 +0800] \"GET /docs/ HTTP/1.1\" 403 904","agent" => { "id" => "ed5031e7-fc13-441c-b87a-b554cb02df3c", "type" => "filebeat", "ephemeral_id" => "3f03ad41-d6f2-4483-9b78-9c7c456d031c", "version" => "8.14.3", "name" => "192-168-65-211" } }
4.6 练习2:整合ELK采集与分析Tomcat日志
4.6.1 Logstash输出数据到Elasticsearch
-
若需将Tomcat日志从Logstash同步到Elasticsearch,需修改Logstash的
output配置; -
配置文件
logstsh-tomcat.confinput {beats {port => 5044 # 接收FileBeat数据的端口} }output {elasticsearch {hosts => ["http://localhost:9200"] # ES地址index => "tomcat-logs" # ES索引名user => "elastic" # ES认证用户名password => "123456" # ES认证密码}stdout {codec => rubydebug # 终端调试输出(可选)} } -
启动Logstash
bin/logstash -f config/logstsh-tomcat.conf --config.reload.automatic -
测试ES数据存储。通过ES检索API查询
tomcat-logs索引,若能看到结构化的Tomcat日志数据,说明同步成功;
4.6.2 利用Logstash过滤器解析日志
-
为了将Tomcat日志从“原始字符串”解析为结构化字段(如IP、时间、请求方式等),需借助Logstash的
Grok、Mutate、Date插件; -
查看Logstash已经安装的插件:
bin/logstash-plugin list
4.6.2.1 Grok插件:日志结构化解析
-
官方文档:[Grok filter plugin | Logstash Reference 8.14] | Elastic;
-
作用:将非结构化日志通过正则匹配解析为结构化字段,可理解为“增强版正则表达式”。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是其他任意的日志格式;
-
语法:
%{SYNTAX:SEMANTIC},其中SYNTAX是Grok模式名称(如IP、HTTPDATE),SEMANTIC是自定义字段名(如ip、date);-
例:
%{NUMBER:duration} %{IP:client} // duration表示:匹配一个数字,client表示匹配一个IP地址 -
默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:
%{NUMBER:duration:int} %{IP:client};
-
-
常用的Grok模式:GROK模式语法参考-日志服务-阿里云;
-
用法:
filter {grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }} }-
比如针对下面的Tomcat访问日志格式:
192.168.65.103 - - [23/Jun/2022:22:37:23 +0800] "GET /docs/images/docs-stylesheet.css HTTP/1.1" 200 5780 -
解析后的字段:
字段名 说明 client IP 浏览器端IP timestamp 请求的时间戳 method 请求方式(GET/POST) uri 请求的链接地址 status 服务器端响应状态 length 响应的数据长度 -
Grok模式为:
%{IP:ip} - - \[%{HTTPDATE:date}\] "%{WORD:method} %{PATH:uri} %{DATA:protocol}" %{INT:status:int} %{INT:length:int}
-
-
为了方便测试,可以使用Kibana进行Grok开发:

-
修改Logstash配置文件:
vim config/logstash-console.confinput {beats {port => 5044} }filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}" } } }output {stdout {codec => rubydebug} } -
启动Logstash测试:
bin/logstash -f config/logstash-console.conf --config.reload.automatic
4.6.2.2 Mutate插件:字段过滤
-
用于删除不需要的字段,精简数据结构:
mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"] }
4.6.2.3 Date插件:日期格式转换
-
官方文档:[Date filter plugin | Logstash Reference 8.14] | Elastic;
-
用法:

-
将
date字段转换为「年月日 时分秒」格式。默认字段经过Date插件处理后,会输出到@timestamp字段,所以可以通过修改target属性来重新定义输出字段:date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]target => "date" }
4.6.2.4 filter完整的配置
-
filter完整的配置:

-
测试效果:

4.6.2.5 输出到ElasticSearch指定索引
-
通过
index参数指定ES索引名称,支持时间格式化语法(如%{+YYYY-MM}按年月拆分索引)。注意:- 索引名不能包含大写字符;
- 若使用时间格式化,Filter需确保输出包含
@timestamp字段,否则日期解析会失败;
output {elasticsearch {index => "tomcat_web_log_%{+YYYY-MM}"hosts => ["http://localhost:9200"]user => "elastic"password => "123456"}stdout{codec => rubydebug} } -
完整的Logstash配置
logstash-tomcat-es.confinput {beats {port => 5044} }filter {grok {match => {"message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}"}}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]}date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]target => "date"} }output {stdout {codec => rubydebug}elasticsearch {index => "tomcat_web_log_%{+YYYY-MM}" # 按年月生成索引hosts => ["http://localhost:9200"]user => "elastic"password => "123456"} } -
启动Logstash:
bin/logstash -f config/logstash-tomcat-es.conf --config.reload.automatic -
查询ES中是否有数据:

-
通过Kibana分析服务日志:
-
在Kibana的Stack Management → 数据视图中,创建针对
tomcat_web_log_*索引的视图,指定时间字段(如@timestamp);
-
进入Discover模块,选择刚创建的数据视图,即可:

-
查看日志的结构化字段(如
ip、method、status等); -
筛选特定条件的日志(如
status: 403的错误请求);
-
-
5 微服务整合ELK实现日志采集与分析实战
5.1 实现思路分析
-
Spring Boot微服务日志流向ELK的流程如下:
-
Spring Boot应用生成日志数据,通过Logback日志框架记录日志;
-
Logstash作为日志收集器,接收Spring Boot发送的日志数据;
-
Logstash解析和过滤日志数据(可选格式化、处理);
-
处理后的日志发送到Elasticsearch,存储在分布式索引中;
-
Kibana连接Elasticsearch,实现日志数据的可视化分析;

-
5.2 微服务整合Logstash实现日志采集
-
使用Logstash日志插件(引入依赖)。在Spring Boot项目的
pom.xml中引入Logstash-Logback依赖,实现日志向Logstash的传输:<dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>6.3</version> </dependency> -
配置Logback(
logback-spring.xml)。通过Logback配置,将日志发送到Logstash的TCP端口:<?xml version="1.0" encoding="UTF-8"?> <configuration debug="false"><property name="LOG_HOME" value="logs/elk-demo.log" /><!-- 控制台输出Appender --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern></encoder></appender><!-- Logstash TCP输出Appender --><appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender"><destination>192.168.65.211:4560</destination> <!-- Logstash监听的IP和端口 --><encoder class="net.logstash.logback.encoder.LogstashEncoder"><customFields>{"appname": "elk-demo"}</customFields> <!-- 自定义字段(应用标识) --></encoder></appender><!-- 日志输出级别 --><root level="INFO"><appender-ref ref="STDOUT" /><appender-ref ref="logstash" /> <!-- 启用Logstash输出 --></root> </configuration> -
配置Logstash并启动(
elk-demo.conf)。创建Logstash配置文件,定义输入(TCP接收日志)、输出(Elasticsearch存储):input {tcp {host => "0.0.0.0" # 监听所有IPport => 4560 # 与Logback配置的端口一致mode => "server"codec => json_lines # 日志格式为JSON行}stdin {} # 标准输入(可选,用于调试) }filter {# 可选:日志过滤、解析逻辑 }output {stdout {codec => rubydebug # 终端调试输出(可选)}elasticsearch {hosts => ["127.0.0.1:9200"] # Elasticsearch地址index => "%{[appname]}-%{+YYYY.MM.dd}" # 按应用名和日期生成索引} } -
启动Logstash:
bin/logstash -f config/elk-demo.conf # 后台启动可加 &
5.3 测试验证
-
Logstash控制台验证。调用Spring Boot应用的接口,触发日志输出,查看Logstash控制台,若能看到类似以下的结构化日志,说明接收成功:
{"level" => "ERROR","@timestamp" => 2024-08-30T08:24:03.429Z,"thread_name" => "http-nio-8080-exec-1","logger_name" => "org.tuling.elkdemo.controller.HelloController","@version" => "1","message" => "HelloController执行-----log.error","level_value" => 40000,"appname" => "elk-demo" } -
Elasticsearch索引验证。在Kibana的索引管理中,查看是否存在以
elk-demo-开头的索引(如elk-demo-2024.08.30),验证日志已存储到ES;
5.4 通过Kibana分析微服务日志
-
创建数据视图。在Kibana的Stack Management → 数据视图中,创建针对
elk-demo*索引的视图,指定时间字段(如@timestamp);
-
Discover模块分析日志。进入Discover模块,选择刚创建的数据视图,即可:
-
查看微服务日志的结构化字段(如
level、message、thread_name等); -
筛选特定级别、特定服务的日志(如
level: ERROR的错误日志);

-
