当前位置: 首页 > news >正文

分布式专题——56 微服务日志采集与分析系统实战

1 为什么要使用 ELK

  • 随着企业信息化进程加速,日志数据呈现量急剧增加、来源多样、格式复杂的特点,传统日志管理方式已难以满足需求,这是引入ELK的核心背景;

  • ELK(ElasticSearch、Logstash、Kibana)的由三个组件构成,各自承担关键功能,形成高效的日志管理方案:

    • Elasticsearch:提供强大的分布式搜索能力,支撑日志的快速检索;

    • Logstash:具备灵活的数据采集与处理功能,负责日志的收集和预处理;

    • Kibana:提供直观的数据可视化界面,将复杂日志数据以图表等形式呈现,便于理解分析;

    • 三者结合,为企业和开发人员提供了高效、实时、可扩展且易用的日志管理解决方案,助力提升工作效率和问题解决速度;

  • 使用ELK的主要原因

    • 集中化管理与高效检索日志

      • 大型分布式系统中,ELK可构建集中式日志系统,实现所有节点日志的统一收集、管理和访问,大幅提高定位问题的效率;

      • Elasticsearch的强大检索特性,能快速查询问题日志,显著提升运维人员的工作效率;

    • 全面的日志分析与系统监控

      • ELK可管理和分析多种日志类型,包括系统日志、应用程序日志、安全日志等,帮助系统运维和开发人员了解服务器软硬件信息、检查配置错误及其原因

      • 通过分析和监控日志,能及时了解服务器的负荷、性能和安全性,从而及时采取措施纠正错误;

    • 直观的数据可视化与理解

      • Kibana为Elasticsearch提供Web可视化界面,可生成各种维度的表格、图形,让复杂的日志数据“可视化”;

      • 可视化界面帮助用户更直观地理解和分析数据,进一步提升日志分析和系统监控的效果。

2 ELK 的整体架构分析

  • ELK架构分为两种类型:
    • 经典的ELK架构
    • 整合消息队列(Redis/Kafka/RabbitMQ)Nginx的ELK架构

2.1 经典的ELK架构

  • 组成

    • 经典ELK架构主要由 Filebeat + Logstash + Elasticsearch + Kibana 组成;
    • 早期架构仅包含Logstash + Elasticsearch + Kibana,后来因Filebeat轻量级、高效性的特点,逐渐被引入作为日志收集工具;

    在这里插入图片描述

  • 流程与组件分工

    • Beats(以Filebeat为代表):负责 Data Collection(数据收集),作为轻量级日志收集代理,部署在客户端,消耗资源少,高效收集日志数据;

    • Logstash:负责 Data Aggregation & Processing(数据聚合与处理),对Filebeat收集的日志数据进行过滤、转换等操作,再发送到Elasticsearch;

    • Elasticsearch:负责 Indexing & Storage(索引与存储),基于Lucene的分布式搜索和分析引擎,提供强大的数据存储和搜索能力;

    • Kibana:负责 Analysis & Visualization(分析与可视化),为Elasticsearch提供Web可视化界面,通过图表、仪表盘等形式直观查看和分析日志数据;

  • 特点

    • 日志收集:Filebeat轻量级,客户端部署,资源消耗少,高效收集日志;

    • 数据处理:Logstash作为数据处理管道,完成日志的过滤、转换等操作;

    • 存储与检索:Elasticsearch提供分布式存储和强大检索能力;

    • 可视化:Kibana实现日志数据的可视化分析;

  • 适用场景:主要适用于数据量较小的开发环境。但因缺少消息队列的缓冲机制,若Logstash或Elasticsearch故障,存在数据丢失的风险

2.2 整合消息队列+Nginx的ELK架构

  • 组成:在经典ELK架构基础上,整合 消息队列(Redis、Kafka、RabbitMQ任选)Nginx,形成更复杂的架构;

  • 流程与组件分工

    • Beats:依然负责 Data Collection(数据收集)

    • 消息队列(Redis/Kafka/RabbitMQ):负责 Buffering(缓冲),作为中间缓冲层;

    • Logstash:负责 Data Aggregation & Processing(数据聚合与处理)

    • Elasticsearch:负责 Indexing & Storage(索引与存储)

    • Kibana:负责 Analysis & Visualization(分析与可视化)

    • Nginx:作为高性能Web和反向代理服务器,优化系统性能和可用性;

    在这里插入图片描述

  • 特点

    • 消息队列:引入后作为缓冲机制,即使Logstash或Elasticsearch故障,日志数据也不会丢失;同时能修剪网络传输峰,降低数据丢失可能性;

    • Nginx:在负载均衡、缓存等方面发挥作用,提升系统性能和用户访问体验;

    • 扩展性:引入消息队列和Nginx后,架构扩展性增强,可根据实际需求动态调整各组件的资源分配和部署规模;

  • 适用场景:主要适用于生产环境,尤其是需要处理大数据量的场景。能确保数据的安全性和完整性,同时提供高性能的日志处理和可视化分析服务。

3 数据处理管道Logstash

3.1 概述

  • Logstash是免费且开放的服务器端数据处理管道,能够从多个来源采集数据、转换数据,然后将数据发送到目标存储库中;

  • 官网:Logstash:收集、解析和转换日志 | Elastic;

  • 应用场景:作为ETL工具(Extract-Transform-Load,提取-转换-加载)数据采集处理引擎,支持日志、指标、Web应用、数据库等多源数据的采集与处理,最终服务于分析(Analysis)、归档(Archiving)、监控(Monitoring)、告警(Alerting)等场景;

    在这里插入图片描述

3.2 Logstash的核心概念

  • Pipeline

    • 是Logstash的核心处理流程,包含Input—Filter—Output三个阶段

    • 负责插件生命周期管理队列管理,是数据从采集到输出的完整流转通道;

  • Logstash Event

    • 是数据在Logstash内部流转时的具体表现形式:在Input阶段被转换为Event,在Output阶段被转化为目标格式数据;

    • 本质是一个Java Object,可在配置文件中对其属性进行增删改查;

  • Codec(Code / Decode,编码/解码)

    • 将原始数据**decode(解码)**成Event;
    • 将Event **encode(编码)**成目标数据,实现数据格式的转换适配。

3.3 Logstash的数据传输原理

  • 数据传输遵循**“Input(数据采集)→ Filter(数据解析)→ Output(数据输出)”**的流程,各阶段有典型组件支撑:

    阶段功能典型组件示例
    Input数据采集与输入Stdin(标准输入)、JDBC(数据库连接)
    Filter实时解析和数据转换Mutate(数据修改)、Date(日期处理)、User Agent(用户代理解析)
    Output存储与数据导出Elasticsearch(主流存储目标)

    在这里插入图片描述

  • Logstash通过**管道(Pipeline)**完成数据的采集与处理,流程如下:

    • 数据采集(Input):从数据源(Data Source,如日志文件、数据库等)采集数据,支持多种输入插件,以连续的流式传输方式获取数据;

    • 数据过滤/预处理(Filter,可选):通过Filter插件对数据进行过滤、转换(如字段提取、格式调整等),构建结构化数据;

    • 数据输出(Output):将处理后的数据发送到目标存储(如Elasticsearch),支持多种输出插件配置;

    在这里插入图片描述

3.4 Logstash的安装与配置

3.4.1 Logstash的安装

  • Logstash官方安装文档地址:[Installing Logstash | Logstash Reference 8.14] | Elastic;

  • 下载并解压Logstash

    • Logstash下载地址:Past Releases | Elastic;

    • 选择版本 8.14.3

    • 不同系统的下载链接:

      • Windows系统https://artifacts.elastic.co/downloads/logstash/logstash-8.14.3-windows-x86_64.zip

      • Linux系统https://artifacts.elastic.co/downloads/logstash/logstash-8.14.3-linux-x86_64.tar.gz

  • 测试:运行最基本的Logstash管道

    • 进入Logstash目录:执行命令 cd logstash-8.14.3,切换到Logstash的安装目录;

    • 不同系统的测试命令:

      • Linux系统

        bin/logstash -e 'input { stdin {} } output { stdout {} }'
        
        • 其中-e选项表示“直接把配置放在命令中”,用于快速测试;
      • Windows系统

        .\bin\logstash.bat -e "input { stdin {} } output { stdout {} }"
        
    • 测试逻辑说明:

      • input { stdin {} }:配置Logstash从**标准输入(键盘输入)**采集数据;

      • output { stdout {} }:配置Logstash将处理后的数据输出到标准输出(终端)

  • 当在终端输入内容(如hellohello fox)后,Logstash会将输入内容以结构化的Event形式输出

    • 包含以下字段:

      • message:输入的原始内容(如"hello\r""hello fox\r");

      • @timestamp:数据处理的时间戳(如2022-06-10T05:36:47.382Z);

      • @version:Logstash事件的版本(固定为"1");

      • host:运行Logstash的主机名(如"LAPTOP-UTD9471P");

      在这里插入图片描述

    • 这表明Logstash的管道已成功运行,能够完成“输入→输出”的基本数据处理流程。

3.4.2 Logstash的配置

  • Logstash官方配置文档地址:[Creating a Logstash pipeline | Logstash Reference 8.14] | Elastic;

  • Logstash的管道配置文件对每种类型的插件(Input、Filter、Output)提供单独配置部分,用于处理管道事件;

    • Input部分

      input {stdin {}
      }
      
      • 功能:配置数据输入源,此处使用stdin插件,从**标准输入(键盘)**采集数据;
    • Filter部分

      filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}" }}date {match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]}
      }
      
      • grok插件:通过正则匹配(%{COMBINEDAPACHELOG}是预定义的Apache日志格式),尝试将message字段解析为结构化数据;

      • date插件:尝试将名为timestamp的字段按格式dd/MMM/yyyy:HH:mm:ss Z解析为时间戳,用于统一时间格式;

    • Output部分

      output {elasticsearch {index => "logstash-demo"hosts => ["localhost:9200"]}stdout { codec => rubydebug }
      }
      
      • elasticsearch插件:将处理后的数据输出到Elasticsearch,指定索引为logstash-demo,Elasticsearch服务地址为localhost:9200

      • stdout插件:同时将数据输出到终端,codec => rubydebug用于以结构化格式(便于调试)展示输出内容;

  • 每个配置部分可包含一个或多个插件,Logstash会按照插件在配置文件中出现的顺序依次处理(例如Filter中的grok先执行,再执行date);

  • 执行以下命令启动Logstash并加载配置文件(假设配置文件名为logstash-demo.conf):

    bin/logstash -f logstash-demo.conf
    

    在这里插入图片描述

    • 当输入hello后,终端输出结构化数据,包含以下关键信息:
      • @version:事件版本(固定为"1"

      • host:运行Logstash的主机信息(如"hostname" => "Java"

      • tags:包含"_grokparsefailure",说明grok插件解析"hello"时失败(因hello不符合Apache日志格式)

      • message:原始输入内容("hello\r"

      • @timestamp:事件处理的时间戳(如2024-08-28T14:21:13.961676600Z

      • event.original:原始事件内容("hello\r"

  • 通过Elasticsearch的检索API可查看到,数据已成功写入logstash-demo索引,字段与终端输出一致,验证了数据已同步到Elasticsearch中;

    在这里插入图片描述

3.4.3 Logstash的插件

3.4.3.1 插件分类
  • Logstash插件分为Input、Filter、Output、Codec四大类,各自承担不同功能;

  • Input Plugins(输入插件)

    • 官方文档:[Input plugins | Logstash Reference 8.14] | Elastic;

    • 核心特点:一个Pipeline可以包含多个Input插件,支持从多种数据源采集数据;

    • 常见插件示例:

      • 基础输入:Stdin(标准输入)、File(文件);
      • 中间件/数据库:BeatsLog4JElasticsearchJDBCKafkaRabbitmqRedis
      • 协议/服务:JMXHTTPWebsocketUDPTCP
      • 云存储/平台:Google Cloud StorageS3GithubTwitter
  • Filter Plugins(过滤插件)

    • 官方文档:[Filter plugins | Logstash Reference 8.14] | Elastic;

    • 核心功能:对Logstash Event进行解析、字段删除、类型转换等处理;

    • 常见插件示例:

      • Date:日期解析;
      • Dissect:分割符解析;
      • Grok:正则匹配解析;
      • Mutate:对字段做多种操作(Convert类型转换、Gsub字符串替换、Split/Join/Merge字符串/数组操作、Rename字段重命名、Update/Replace字段内容更新、Remove_field字段删除);
      • Ruby:利用Ruby代码动态修改Event;
  • Output Plugins(输出插件)

    • 官方文档:[Output plugins | Logstash Reference 8.14] | Elastic;

    • 核心特点:是Pipeline的最后一个阶段,负责将Event发送到特定目的地;

    • 常见插件示例:

      • 存储引擎:Elasticsearch
      • 通知/监控:EmailPageduty
      • 数据库/中间件:InfluxdbKafkaMongodbOpensdbZabbix
      • 协议/服务:HttpTCPWebsocket
  • Codec Plugins(编解码插件)

    • 官方文档:[Codec plugins | Logstash Reference 8.14] | Elastic;

    • 核心功能:将原始数据decode成Event,再将Event encode成目标数据,实现数据格式的转换适配;

    • 内置插件示例:Line/MultilineJSON/Avro/Cef(ArcSight Common Event Format)、Dots/Rubydebug

3.4.3.2 Codec插件之Multiline(多行编解码)
  • 用于处理多行数据(如异常堆栈信息),核心是通过配置参数将多行合并为一个Event;

  • 配置参数

    • pattern:设置行匹配的正则表达式;

    • what:匹配成功时,该行属于上一个事件(previous)还是下一个事件(next)

    • negate:是否对pattern的结果取反(true/false);

  • 例:处理Java异常堆栈

    • 以包含多行的NullPointerException堆栈为例,配置文件multiline-exception.conf如下:

      input {stdin {codec => multiline {pattern => "^\s"  # 匹配以空白开头的行what => "previous"  # 匹配行属于上一个事件}}
      }
      filter {}
      output {stdout { codec => rubydebug }
      }
      
    • 执行命令:bin/logstash -f multiline-exception.conf,即可将多行异常堆栈合并为一个Event输出。

3.4.3.3 Logstash Queue(队列)
  • 用于在Pipeline中缓冲数据,分为两种类型:

  • In Memory Queue(内存队列):数据存储在内存中,若进程Crash或机器宕机,会导致数据丢失

  • Persistent Queue(持久化队列)

    • 特点:数据持久化到磁盘,即使机器宕机,数据也不会丢失;能保证数据被消费,可替代Kafka等消息队列的缓冲区作用;

    • 配置方式(在pipelines.yml中设置):

      queue.type: persisted  # 默认是memory
      queue.max_bytes: 4gb   # 队列最大容量
      
  • 队列在Pipeline中的流程:Input → Codec → Queue → Batcher → Filter → Output,多个Input可共享Queue进行数据缓冲与分发;

    在这里插入图片描述

3.4.4 练习:同步MySQL数据到ElasticSearch

3.4.4.1 需求分析
  • 将MySQL数据库中的用户数据同步到ElasticSearch,借助ES的全文搜索能力提升搜索速度,具体需求包括:
    • 新增用户信息需同步到ES

    • 用户信息更新后,ES需同步更新

    • 支持增量更新(只同步变化的数据)

    • 用户注销后,不能被ES搜索到

3.4.4.2 实现思路
  • 借助Logstash的JDBC Input Plugin将数据从MySQL读取到Logstash,再通过Elasticsearch Output Plugin同步到ES。JDBC Input Plugin的关键能力:

    • 需自行提供JDBC Driver(如MySQL的JDBC驱动包)

    • 支持定时任务调度(语法基于Rufus-scheduler,扩展了Cron语法)

    • 支持通过Tracking_column/sql_last_value记录状态,实现增量更新

  • 官方文档:[Jdbc input plugin | Logstash Reference 8.14] | Elastic。

3.4.4.3 JDBC Input Plugin实现步骤
  • 拷贝JDBC依赖:将MySQL的JDBC驱动包(如mysql-connector-java-5.1.49.jar)拷贝到Logstash的自定义目录(如logstash-8.14.3/drivers);

  • 准备配置文件mysql-demo.conf。配置文件分为inputoutput两个部分,实现MySQL数据采集与ES同步:

    input {jdbc {# JDBC驱动路径与类名jdbc_driver_library => "/home/fox/logstash-8.14.3/driver/mysql-connector-java-5.1.49.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"# MySQL连接信息jdbc_connection_string => "jdbc:mysql://localhost:3306/test?useSSL=false"jdbc_user => "root"jdbc_password => "123456"# 增量更新配置use_column_value => true  # 启用字段值追踪tracking_column => "last_updated"  # 追踪的字段(需是数值或时间类型)tracking_column_type => "numeric"  # 追踪字段类型(numeric/ timestamp)record_last_run => true  # 记录最后一次运行状态last_run_metadata_path => "jdbc-position.txt"  # 状态保存路径statement => "SELECT * FROM user where last_updated > :sql_last_value;"  # 增量查询SQLschedule => "* * * * * *"  # Cron表达式,设置任务触发频率}
    }output {# 同步到Elasticsearchelasticsearch {document_id => "%{id}"  # 文档ID关联MySQL的id字段document_type => "_doc"index => "users"  # ES索引名hosts => ["http://localhost:9200"]username => "elastic"  # ES认证信息password => "123456"}# 终端输出调试(可选)stdout {codec => rubydebug}
    }
    
  • 运行Logstash。执行命令启动Logsta sh并加载配置文件:

    bin/logstash -f mysql-demo.conf
    
3.4.4.4 测试
  • 表结构:创建user表,包含id(主键)、nameaddresslast_updated(增量追踪字段)、is_deleted(注销标记字段);

    CREATE TABLE `user` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(50) DEFAULT NULL,`address` varchar(50) DEFAULT NULL,`last_updated` bigint DEFAULT NULL,`is_deleted` int DEFAULT NULL,PRIMARY KEY (`id`)
    ) ENGINE=InnoDB;
    
  • 插入数据:新增用户“张三”,并设置last_updated为当前时间戳;

    INSERT INTO user(name,address,last_updated,is_deleted) 
    VALUES("张三","广州天河",unix_timestamp(NOW()),0);
    
  • 更新数据:更新“张三”的地址,并更新last_updated

    UPDATE user SET address="广州白云山",last_updated=unix_timestamp(NOW()) WHERE name="张三";
    
  • 注销用户:标记“张三”为注销(is_deleted=1),并更新last_updated

    UPDATE user SET is_deleted=1,last_updated=unix_timestamp(NOW()) WHERE name="张三";
    
  • 数据同步效果

    • 插入、更新操作后,Logstash会通过增量查询将变化的数据同步到ES,终端可通过stdout { codec => rubydebug }看到结构化的同步数据;

    • 注销操作后,数据仍会同步到ES,但可通过ES的Alias机制过滤掉注销用户;

3.4.4.5 ES中查询优化(Alias机制)
  • 通过创建**别名(Alias)**并添加过滤条件,实现“只显示未被注销的用户”:

  • 创建Alias

    POST /_aliases
    {"actions": [{"add": {"index": "users","alias": "view_users","filter": { "term": { "is_deleted": 0 } }}}]
    }
    
  • 通过Alias查询:只会返回is_deleted=0的用户,注销用户(is_deleted=1)不会被检索到

    POST view_users/_search
    {"query": {"term": {"name.keyword": {"value": "张三"}}}
    }
    

4 轻量级采集器FileBeat

4.1 概述

  • Beats是一个免费且开放的平台,集合了多种单一用途的数据采集器,可从成百上千甚至上万台机器和系统向Logstash或Elasticsearch发送数据;

  • Beats包含多个针对不同数据类型的采集器:

    • Filebeat:采集日志文件

    • Metricbeat:采集指标数据

    • Packetbeat:采集网络数据

    • Winlogbeat:采集Windows事件日志

    • Auditbeat:采集审计数据

    • Heartbeat:用于运行时间监控

    • Functionbeat:无服务器的采集器;

    在这里插入图片描述

  • FileBeat是专门用于转发和收集日志数据的轻量级采集工具,可作为代理安装在服务器上,监控指定路径的日志文件,收集日志数据后发送到Elasticsearch或Logstash。

4.2 工作原理

  • FileBeat的工作流程由Prospector(勘探器)、Harvester(收割机)、Libbeat三个核心组件协同完成:

    • Prospector(勘探器):启动FileBeat时会启动一个或多个Input,用于监控指定的日志数据位置(如/var/log/*.log/var/log/nginx/*等路径);

    • Harvester(收割机):针对每一个被监控的文件启动一个Harvester,负责读取文件的日志内容,并将新的日志发送到Libbeat;

    • Libbeat:负责收集多个Harvester的数据,并将数据发送到输出端(Output),支持直接发送到Elasticsearch,或先发送到Logstash再转发到Elasticsearch;

  • 以监控/var/log/*.log(Prospector 1)和/var/log/nginx/*(Prospector 2)为例:

    • Prospector 1监控/var/log/*.log下的log1文件,为其启动Harvester读取日志,将新日志发送到Libbeat;

    • Prospector 2监控/var/log/nginx/*下的error.log文件,为其启动Harvester读取日志,将新日志发送到Libbeat;

    • Libbeat收集所有Harvester的数据后,可选择直接发送到Elasticsearch,或先发送到Logstash再由Logstash转发到Elasticsearch;

    在这里插入图片描述

4.3 Logstash VS FileBeat

  • 资源消耗对比

    • Logstash:运行在JVM(Java虚拟机)上,资源消耗较大

    • FileBeat:基于Golang编写,功能相对少但资源消耗小,更轻量级,占用资源更少;

  • 功能定位对比

    • 两者都具备日志收集功能,但FileBeat更轻量;

    • Logstash独有Filter功能:可对日志进行过滤、分析(如字段提取、格式转换等);

  • 典型配合流程:

    • FileBeat采集日志,发送到**消息队列(Redis、MQ等)**做缓冲;

    • Logstash从消息队列获取日志,利用Filter功能过滤分析;

    • 最终将处理后的数据存储到Elasticsearch;

  • FileBeat支持背压压制

    • 当向Logstash或Elasticsearch发送数据时,使用背压敏感协议

    • 若Logstash忙于处理数据,会通知FileBeat减慢读取速度

    • 拥塞解决后,FileBeat恢复原速度继续传输数据,以此适配数据量波动,保证系统稳定。

4.4 FileBeat的安装与配置

  • 下载并解压FileBeat

    • FileBeat官方安装配置文档:[Filebeat quick start: installation and configuration | Filebeat Reference 8.14] | Elastic;

    • 下载地址:Past Releases | Elastic;

    • 选择版本:8.14.3

    • Windows系统下载链接:https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.14.3-windows-x86_64.zip

    • Linux系统:执行命令下载并解压

      curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.14.3-linux-x86_64.tar.gz
      tar xzvf filebeat-8.14.3-linux-x86_64.tar.gz
      
  • 编辑配置。修改filebeat.yml以配置Elasticsearch和Kibana的连接信息:

    output.elasticsearch:hosts: ["192.168.65.174:9200", "192.168.65.192:9200", "192.168.65.204:9200"]  # Elasticsearch集群地址username: "elastic"  # Elasticsearch认证用户名password: "123456"   # Elasticsearch认证密码setup.kibana:host: "192.168.65.174:5601"  # Kibana地址
    
  • 启用和配置数据收集模块

    • 从FileBeat安装目录中执行以下操作,启用特定服务的日志收集模块;

    • 查看可启用的模块列表

      ./filebeat modules list
      
    • 启用Nginx模块

      • 启用命令:

        ./filebeat modules enable nginx
        
      • 若需修改Nginx日志路径,编辑modules.d/nginx.yml

        - module: nginxaccess:enabled: truevar.paths: ["/var/log/nginx/access.log*"]  # Nginx访问日志路径
        
    • 启用Logstash模块

      • 启用命令:

        ./filebeat modules enable logstash
        
      • 若需修改Logstash日志路径,编辑modules.d/logstash.yml

        - module: logstashlog:enabled: truevar.paths: ["/home/fox/logstash-8.14.3/logs/*.log"]  # Logstash日志路径
        
  • 启动FileBeat

    • 加载Kibana仪表板(可选,若未配置过则执行)

      ./filebeat setup
      
    • 启动FileBeat

      ./filebeat -e
      
    • 验证启动效果。启动成功后,可在Kibana的Observability → Logs → Stream中查看收集到的日志(如Logstash的日志),确认数据已成功传输和可视化;

      在这里插入图片描述

4.5 练习1:FileBeat采集Tomcat服务器日志并发送到Logstash

  • 需求背景:Tomcat服务器运行时产生大量日志(如访问日志),需通过FileBeat采集这些日志,并发送到Logstash进行后续处理;

  • 配置FileBeat采集Tomcat日志并发送到Logstash

    • 创建配置文件filebeat-tomcat.yml。配置FileBeat的日志采集路径、多行合并规则,以及Logstash的连接信息:

      filebeat.inputs:
      - type: logenabled: truepaths:- /home/fox/apache-tomcat-9.0.93/logs/*access*.*  # Tomcat访问日志路径# 多行合并规则(因Tomcat日志以IP开头,需合并非IP开头的行到上一行)multiline.pattern: '^\d+\.\d+\.\d+\.\d+ '  # 匹配IP开头的正则multiline.negate: true  # 对pattern取反(即非IP开头的行)multiline.match: after  # 合并到上一行的末尾output.logstash:enabled: truehosts: ["localhost:5044"]  # Logstash监听地址与端口
      
      • 其中,multiline参数用于处理多行日志(如异常堆栈):
        • pattern:正则表达式,匹配行的特征;

        • negatetrue表示对pattern结果取反;

        • matchafter表示将匹配行合并到上一行末尾;

    • 启动FileBeat并指定配置文件。执行命令启动FileBeat:

      ./filebeat -e -c filebeat-tomcat.yml
      
    • 可能出现的异常及解决

      • 异常1:配置文件权限过高(安全限制)

        • 报错:Exiting: error loading config file: config file "...yml" can only be writable by the owner...

        • 解决:修改文件权限为644(仅 owner 可写):

          chmod 644 filebeat-tomcat.yml
          
      • 异常2:无法连接Logstash

        • 报错:Failed to connect to backoff(async(tcp://...)): dial tcp ...: connect: connection refused
        • 原因:未启动或未正确配置Logstash,导致FileBeat无法连接;
  • 配置Logstash接收FileBeat数据并打印

    • 创建Logstash配置文件logstsh-tomcat.conf。配置Logstash的Beats输入插件(监听FileBeat的连接)和终端输出(用于调试):

      input {beats {port => 5044  # 与FileBeat配置的端口一致}
      }output {stdout {codec => rubydebug  # 以结构化格式输出到终端}
      }
      
    • 测试Logstash配置是否正确**。执行命令验证配置语法:

      bin/logstash -f config/logstsh-tomcat.conf --config.test_and_exit
      
      • 若输出Config Validation Result: OK,则配置正确;
    • 启动Logstash。执行命令启动Logstash,并启用配置自动重载:

      bin/logstash -f config/logstsh-tomcat.conf --config.reload.automatic
      
  • 测试:访问Tomcat,验证日志传输。访问Tomcat服务后,查看Logstash的终端输出,若能看到类似以下的结构化日志数据,说明传输成功:

    {"tags" => ["beats_input_codec_plain_applied"],"input" => { "type" => "log" },"event" => { "original" => "192.168.65.103 - - [30/Aug/2024:13:29:11 +0800] \"GET /docs/ HTTP/1.1\" 403 904" },"@version" => "1","log" => { "offset" => 1559, "file" => { "path" => "/home/fox/apache-tomcat-9.0.93/logs/localhost_access_log.2024-08-30.txt" } },"message" => "192.168.65.103 - - [30/Aug/2024:13:29:11 +0800] \"GET /docs/ HTTP/1.1\" 403 904","agent" => { "id" => "ed5031e7-fc13-441c-b87a-b554cb02df3c", "type" => "filebeat", "ephemeral_id" => "3f03ad41-d6f2-4483-9b78-9c7c456d031c", "version" => "8.14.3", "name" => "192-168-65-211" }
    }
    

4.6 练习2:整合ELK采集与分析Tomcat日志

4.6.1 Logstash输出数据到Elasticsearch

  • 若需将Tomcat日志从Logstash同步到Elasticsearch,需修改Logstash的output配置;

  • 配置文件logstsh-tomcat.conf

    input {beats {port => 5044  # 接收FileBeat数据的端口}
    }output {elasticsearch {hosts => ["http://localhost:9200"]  # ES地址index => "tomcat-logs"  # ES索引名user => "elastic"  # ES认证用户名password => "123456"  # ES认证密码}stdout {codec => rubydebug  # 终端调试输出(可选)}
    }
    
  • 启动Logstash

    bin/logstash -f config/logstsh-tomcat.conf --config.reload.automatic
    
  • 测试ES数据存储。通过ES检索API查询tomcat-logs索引,若能看到结构化的Tomcat日志数据,说明同步成功;

    在这里插入图片描述

4.6.2 利用Logstash过滤器解析日志

  • 为了将Tomcat日志从“原始字符串”解析为结构化字段(如IP、时间、请求方式等),需借助Logstash的GrokMutateDate插件;

  • 查看Logstash已经安装的插件:

    bin/logstash-plugin list
    

    在这里插入图片描述

4.6.2.1 Grok插件:日志结构化解析
  • 官方文档:[Grok filter plugin | Logstash Reference 8.14] | Elastic;

  • 作用:将非结构化日志通过正则匹配解析为结构化字段,可理解为“增强版正则表达式”。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是其他任意的日志格式;

  • 语法%{SYNTAX:SEMANTIC},其中SYNTAX是Grok模式名称(如IPHTTPDATE),SEMANTIC是自定义字段名(如ipdate);

    • 例:

      %{NUMBER:duration} %{IP:client}
      // duration表示:匹配一个数字,client表示匹配一个IP地址
      
    • 默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%{NUMBER:duration:int} %{IP:client}

  • 常用的Grok模式:GROK模式语法参考-日志服务-阿里云;

  • 用法

    filter {grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }}
    }
    
    • 比如针对下面的Tomcat访问日志格式:

      192.168.65.103 - - [23/Jun/2022:22:37:23 +0800] "GET /docs/images/docs-stylesheet.css HTTP/1.1" 200 5780
      
    • 解析后的字段:

      字段名说明
      client IP浏览器端IP
      timestamp请求的时间戳
      method请求方式(GET/POST)
      uri请求的链接地址
      status服务器端响应状态
      length响应的数据长度
    • Grok模式为:

      %{IP:ip} - - \[%{HTTPDATE:date}\] "%{WORD:method} %{PATH:uri} %{DATA:protocol}" %{INT:status:int} %{INT:length:int}
      
  • 为了方便测试,可以使用Kibana进行Grok开发:

    在这里插入图片描述

  • 修改Logstash配置文件:

    vim config/logstash-console.confinput {beats {port => 5044}
    }filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}" }
    }
    }output {stdout {codec => rubydebug}
    }
    
  • 启动Logstash测试:

    bin/logstash -f config/logstash-console.conf --config.reload.automatic
    

    在这里插入图片描述

4.6.2.2 Mutate插件:字段过滤
  • 用于删除不需要的字段,精简数据结构:

    mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
    }
    
4.6.2.3 Date插件:日期格式转换
  • 官方文档:[Date filter plugin | Logstash Reference 8.14] | Elastic;

  • 用法:

    在这里插入图片描述

  • date字段转换为「年月日 时分秒」格式。默认字段经过Date插件处理后,会输出到@timestamp字段,所以可以通过修改target属性来重新定义输出字段:

    date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]target => "date"
    }
    
4.6.2.4 filter完整的配置
  • filter完整的配置:

    在这里插入图片描述

  • 测试效果:

    在这里插入图片描述

4.6.2.5 输出到ElasticSearch指定索引
  • 通过index参数指定ES索引名称,支持时间格式化语法(如%{+YYYY-MM}按年月拆分索引)。注意:

    • 索引名不能包含大写字符;
    • 若使用时间格式化,Filter需确保输出包含@timestamp字段,否则日期解析会失败;
    output {elasticsearch {index => "tomcat_web_log_%{+YYYY-MM}"hosts => ["http://localhost:9200"]user => "elastic"password => "123456"}stdout{codec => rubydebug}
    }
    
  • 完整的Logstash配置logstash-tomcat-es.conf

    input {beats {port => 5044}
    }filter {grok {match => {"message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status:int} %{INT:length:int}"}}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]}date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]target => "date"}
    }output {stdout {codec => rubydebug}elasticsearch {index => "tomcat_web_log_%{+YYYY-MM}"  # 按年月生成索引hosts => ["http://localhost:9200"]user => "elastic"password => "123456"}
    }
    
  • 启动Logstash:

    bin/logstash -f config/logstash-tomcat-es.conf --config.reload.automatic
    
  • 查询ES中是否有数据:

    在这里插入图片描述

  • 通过Kibana分析服务日志:

    • 在Kibana的Stack Management → 数据视图中,创建针对tomcat_web_log_*索引的视图,指定时间字段(如@timestamp);

      在这里插入图片描述

    • 进入Discover模块,选择刚创建的数据视图,即可:

      在这里插入图片描述

      • 查看日志的结构化字段(如ipmethodstatus等);

      • 筛选特定条件的日志(如status: 403的错误请求);

        在这里插入图片描述

5 微服务整合ELK实现日志采集与分析实战

5.1 实现思路分析

  • Spring Boot微服务日志流向ELK的流程如下:

    • Spring Boot应用生成日志数据,通过Logback日志框架记录日志;

    • Logstash作为日志收集器,接收Spring Boot发送的日志数据;

    • Logstash解析和过滤日志数据(可选格式化、处理);

    • 处理后的日志发送到Elasticsearch,存储在分布式索引中;

    • Kibana连接Elasticsearch,实现日志数据的可视化分析;

    在这里插入图片描述

5.2 微服务整合Logstash实现日志采集

  • 使用Logstash日志插件(引入依赖)。在Spring Boot项目的pom.xml中引入Logstash-Logback依赖,实现日志向Logstash的传输:

    <dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>6.3</version>
    </dependency>
    
  • 配置Logback(logback-spring.xml。通过Logback配置,将日志发送到Logstash的TCP端口:

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration debug="false"><property name="LOG_HOME" value="logs/elk-demo.log" /><!-- 控制台输出Appender --><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern></encoder></appender><!-- Logstash TCP输出Appender --><appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender"><destination>192.168.65.211:4560</destination> <!-- Logstash监听的IP和端口 --><encoder class="net.logstash.logback.encoder.LogstashEncoder"><customFields>{"appname": "elk-demo"}</customFields> <!-- 自定义字段(应用标识) --></encoder></appender><!-- 日志输出级别 --><root level="INFO"><appender-ref ref="STDOUT" /><appender-ref ref="logstash" /> <!-- 启用Logstash输出 --></root>
    </configuration>
    
  • 配置Logstash并启动(elk-demo.conf。创建Logstash配置文件,定义输入(TCP接收日志)、输出(Elasticsearch存储):

    input {tcp {host => "0.0.0.0"  # 监听所有IPport => 4560       # 与Logback配置的端口一致mode => "server"codec => json_lines  # 日志格式为JSON}stdin {}  # 标准输入(可选,用于调试)
    }filter {# 可选:日志过滤、解析逻辑
    }output {stdout {codec => rubydebug  # 终端调试输出(可选)}elasticsearch {hosts => ["127.0.0.1:9200"]  # Elasticsearch地址index => "%{[appname]}-%{+YYYY.MM.dd}"  # 按应用名和日期生成索引}
    }
    
  • 启动Logstash:

    bin/logstash -f config/elk-demo.conf  # 后台启动可加 &
    

5.3 测试验证

  • Logstash控制台验证。调用Spring Boot应用的接口,触发日志输出,查看Logstash控制台,若能看到类似以下的结构化日志,说明接收成功:

    {"level" => "ERROR","@timestamp" => 2024-08-30T08:24:03.429Z,"thread_name" => "http-nio-8080-exec-1","logger_name" => "org.tuling.elkdemo.controller.HelloController","@version" => "1","message" => "HelloController执行-----log.error","level_value" => 40000,"appname" => "elk-demo"
    }
    
  • Elasticsearch索引验证。在Kibana的索引管理中,查看是否存在以elk-demo-开头的索引(如elk-demo-2024.08.30),验证日志已存储到ES;

    在这里插入图片描述

5.4 通过Kibana分析微服务日志

  • 创建数据视图。在Kibana的Stack Management → 数据视图中,创建针对elk-demo*索引的视图,指定时间字段(如@timestamp);

    在这里插入图片描述

  • Discover模块分析日志。进入Discover模块,选择刚创建的数据视图,即可:

    • 查看微服务日志的结构化字段(如levelmessagethread_name等);

    • 筛选特定级别、特定服务的日志(如level: ERROR的错误日志);

    在这里插入图片描述

http://www.dtcms.com/a/613345.html

相关文章:

  • 团购网站自个做中国最好室内设计公司排名榜
  • 如何使用WPF做工控主页
  • 深入探讨HarmonyOS分布式剪贴板:技术原理与开发实践
  • USB3.0PHY介绍
  • Three.js的阴影相关实现路径
  • WPF转换器机制
  • SLAM中的非线性优-3D图优化之轴角在Opencv-PNP中的应用(三)
  • 如何用WPF做工控设置界面
  • ✨WPF编程进阶【7.2】:动画类型(附源码)
  • 建设网站的安全性介绍深圳seo优化排名推广
  • 上海迈诺网站建设东莞专业做网站的公司有哪些
  • 深度学习中的激活函数全解析:该选哪一个?
  • MySQL复盘总结
  • 对于多方安全计算的中止安全的理解
  • 西游记路线图:12-39,大唐到乌鸡国,幕后boss标注
  • 【学习笔记】DiffFNO: Diffusion Fourier Neural Operator
  • 电磁场中的旋度Curl与散度div
  • KCF 算法在ROS 2 操作系统里面(详解)
  • 《Dev-C++分辨率低-解决办法》
  • Dubbo异步调用实战指南:提升微服务并发性能
  • 【Linux】冯诺依曼体系结构与操作系统概述
  • 简单企业网站模板php做的网站预览
  • 2025年数维杯数学建模挑战赛(秋季赛)【ABCD题】论文首发+百种模型组合+代码分享
  • OpenHarmony内核开发实战手册:编译构建、HCK框架与性能优化
  • 自建开发工具IDE(五)数据库预览——东方仙盟炼气期
  • MATLAB 实现多能源系统(MES)多目标优化
  • 构建企业级机器学习平台:基于Ray和DeepSpeed的半导体AI实践
  • 卡索(CASO)汽车调查:终端体验,是汽车品牌隐形的胜负关键
  • C语言编译器安卓版 | 高效便捷的手机编程环境
  • 子集合判断(map)