Logstash读取日志从文件到mysql数据库

一、概述
Logstash作为一个功能强大的数据收集引擎,通过其灵活的管道设计和丰富的插件生态系统,为处理多样化数据提供了强大支持。无论是日志分析、指标收集还是通用数据集成场景,Logstash都能提供可靠、高效的解决方案。
Logstash的基本流程架构遵循Input → Filter → Output的模式,这种管道式设计使得数据能够以流式方式被处理:
-  
Input(输入阶段):负责从各种数据源采集数据。Logstash支持多种输入选择,可以在同一时间从众多常用来源捕捉事件。常见的输入插件包括:
-  
File:从文件系统读取数据
 -  
Beats:接收来自Filebeat等轻量级数据采集器的数据
 -  
Kafka:从消息队列消费数据
 -  
JDBC:从数据库读取数据
 
 -  
 -  
Filter(过滤阶段):此阶段可选,负责对数据进行解析、转换和丰富。Logstash过滤器能够实时解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式。
常用过滤器包括:
- grok:解析并构造任意文本,是将非结构化日志数据解析为结构化可查询内容的最佳方式
 - mutate:对事件字段执行常规转换(重命名、删除、替换、修改字段)
 - geoip:添加IP地址的地理位置信息
 - drop:完全删除特定事件
 
 -  
Output(输出阶段):将处理后的数据发送到目标存储或分析系统。常见的输出目标包括:
-  
Elasticsearch:用于搜索和分析
 -  
文件系统:存储为磁盘文件
 -  
数据库:如Redis、Kafka等
 
 -  
 
二、下载与安装
下载地址:https://www.elastic.co/downloads/logstash
2.1 Window安装后的目录

2.2 安装mysql插件
由于日志需要输出到数据库,需要
jdbc输出插件;
jdbc输出插件不是Logstash的标准内置插件,需要手动安装:
# 在线安装(确保网络连通)
./bin/logstash-plugin install logstash-output-jdbc# 若服务器无法访问外网,可采用离线安装方式
# 1. 在有网络的机器上准备离线包
./bin/logstash-plugin prepare-offline-pack --overwrite --output logstash-output-jdbc.zip logstash-output-jdbc
# 2. 将ZIP包复制到目标服务器并安装
./bin/logstash-plugin install file:///path/to/logstash-output-jdbc.zip
 
安装后的插件位置:
logstash-9.2.0\vendor\bundle\jruby\*
2.3 其他准备工作
- 把MySQL JDBC 驱动 Jar 包放到指定位置,后续配置使用:
 
数据库的驱动文件:mysql-connector-j-8.2.0.jar
logstash-9.2.0\mysql-connector-j-8.2.0.jar
- 数据库创建表,存放日志信息:
 
CREATE TABLE app_logs (id INT AUTO_INCREMENT PRIMARY KEY,log_time TIME,thread VARCHAR(255),log_level VARCHAR(50),class_method VARCHAR(500), -- 可能包含类和方法名message TEXT,ingestion_time DATETIME DEFAULT CURRENT_TIMESTAMP
);
 
- 创建记录读取进度的文件,后续配置使用:
 
logstash-9.2.0/sincedb/sincedb.txt
三、配置详解
配置路径:D:\MyWorkSpace\logstash-9.2.0\config\logstash-db.conf
input {file {path => ["/data/app/logs/wingate-server/*.log"]start_position => "beginning"sincedb_path => "D:/MyWorkSpace/logstash-9.2.0/sincedb/sincedb.txt"codec => plaintype => "server_logs"discover_interval => 15stat_interval => 1}
}filter {grok {match => { "message" => "%{TIME:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:loglevel}%{SPACE}%{DATA:class_method}\[%{DATA:method},%{NUMBER:line}\]\[\] - %{GREEDYDATA:log_message}"}}# 解析时间戳(可选)date {match => [ "timestamp", "HH:mm:ss.SSS", "ISO8601" ]target => "@timestamp"}# 添加自定义标签(可选)if [loglevel] == "ERROR" {mutate {add_tag => [ "error_log" ]}}
}output {jdbc {connection_string => "jdbc:mysql://localhost:3306/logstash?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true"username => "root"password => "000000"driver_class => "com.mysql.cj.jdbc.Driver"driver_jar_path => "D:/MyWorkSpace/logstash-9.2.0/mysql-connector-j-8.2.0.jar"statement => [ "INSERT INTO app_logs (log_time, thread, log_level, class_method, message) VALUES (?, ?, ?, ?, ?)", "timestamp", "thread", "loglevel", "class_method", "log_message" ]}stdout { codec => rubydebug } # 用于调试,生产环境可注释掉
}
 
3.1 Input部分解析
Input 部分定义了 Logstash 的数据输入源。配置使用了 file插件从指定路径的日志文件采集数据。
input {file {path => ["/data/app/logs/wingate-server/*.log"]  # 监听的文件路径(支持通配符)start_position => "beginning"                    # 从文件开头读取(默认从结尾)sincedb_path => "D:/MyWorkSpace/logstash-9.2.0/sincedb/sincedb.txt"  # 记录读取进度codec => plain                                   # 编解码器(纯文本格式)type => "server_logs"                            # 自定义事件类型标识discover_interval => 15                         # 检查新文件的间隔(秒)stat_interval => 1                               # 检查文件更新的频率(秒)}
}
 
关键参数说明:
-  
path:指定日志文件的 
