Logstash 实战指南:从入门到生产级日志处理
Logstash 实战指南:从入门到生产级日志处理
一、Logstash 简介
Logstash 是一个功能强大的 服务器端数据处理管道,具备以下三大核心能力:
- Input(输入):从多种来源采集数据(文件、Redis、Beats、Syslog 等)
- Filter(过滤):对数据进行解析、清洗、丰富和转换
- Output(输出):将处理后的数据发送至目标系统(Elasticsearch、Kafka、Redis、控制台等)
典型应用场景:
- 日志集中采集与结构化
- 多源数据聚合与标准化
- 数据预处理与 ETL 流程
二、环境准备与安装
1. 下载 Logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.2.2-linux-x86_64.tar.gz
2. 解压安装包
tar -xf logstash-8.2.2-linux-x86_64.tar.gz -C /opt/
3. 配置环境变量
创建环境变量脚本:
# /etc/profile.d/logstash.sh
export LOGSTASH_HOME=/opt/logstash-8.2.2
export PATH=$PATH:$LOGSTASH_HOME/bin
加载环境变量:
source /etc/profile.d/logstash.sh
4.查看帮助信息
logstash -h
常用启动参数说明
参数 | 说明 |
---|---|
-n, --node.name NAME | 指定当前 Logstash 实例的名称,用于集群标识。 |
-f, --path.config CONFIG_PATH | 指定配置文件或配置目录路径。 |
-e, --config.string CONFIG_STRING | 通过命令行直接指定配置,常用于快速测试。 |
--pipeline.id ID | 指定 Pipeline 的唯一标识,默认为 main 。 |
-w, --pipeline.workers COUNT | 设置工作线程数,建议设为 CPU 核心数。 |
-b, --pipeline.batch.size SIZE | 批量处理事件数,默认 125,影响吞吐与延迟。 |
-u, --pipeline.batch.delay DELAY_IN_MS | 批处理最大等待时间,默认 50ms。 |
--path.data PATH | 指定数据存储路径,RPM 默认为 /usr/share/logstash/data 。 |
-l, --path.logs PATH | 指定日志输出路径。 |
--log.level LEVEL | 设置日志级别:fatal , error , warn , info , debug , trace 。 |
-V, --version | 查看 Logstash 版本。 |
-r, --config.reload.automatic | 启用配置文件热加载(自动重载)。 |
--config.reload.interval SECONDS | 配置文件检查间隔(需配合 -r 使用)。 |
--log.format FORMAT | 日志输出格式:plain (默认)或 json 。 |
--path.settings SETTINGS_DIR | 指定配置目录,默认为 /usr/share/logstash/config 。 |
-h, --help | 显示帮助信息。 |
三、快速入门:从标准输入到控制台输出
1. 命令行方式启动(仅用于测试)
logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'
输入任意文本(如 Hello Logstash
),将看到如下结构化输出:
{"@version" => "1","@timestamp" => 2025-08-07T07:27:03.183912Z,"event" => {"original" => "Hello Logstash"},"message" => "Hello Logstash","host" => {"hostname" => "es-node-03"}
}
⚠️ 提示:命令行方式不适合生产环境,建议使用配置文件管理复杂逻辑。
2. 配置文件方式启动(推荐)
- 创建配置工作目录
mkdir ~/logstash-config
- 创建配置文件:
cat > ~/logstash-config/01-input-stdout.conf << 'EOF'
input {stdin {type => "user-input"}
}output {stdout {codec => rubydebug}
}
EOF
启动服务:
logstash -rf ~/logstash-config/01-input-stdout.conf
✅ 使用 -f
指定配置文件,-r
支持热重载(开发调试时使用 -rf
)。
输入任意文本(如 Hello Logstash
)
四、实战案例 1:采集本地日志文件
使用 file
输入插件监控日志文件变化。
配置文件:file-to-stdout.conf
input {file {id => "app-logs"path => ["/tmp/app-*.log"]start_position => "beginning" # 从文件开头读取(适合测试)sincedb_path => "/dev/null" # 不记录读取位置,每次重新读(仅测试用)stat_interval => "1 second" # 每秒检查一次文件变化discover_interval => 5 # 每5秒扫描一次目录新文件}
}output {stdout {codec => rubydebug}
}
日志示例:
# 创建三个测试日志文件
echo '2025-08-07T10:00:01Z app1 INFO User login successful for user=admin from=192.168.1.100' > /tmp/app-web1.log
echo '2025-08-07T10:00:05Z app2 ERROR Database connection failed: timeout, host=db01, retry=3' > /tmp/app-db-error.log
echo '2025-08-07T10:00:10Z app3 WARN Disk usage at 85% on /var/log, cleanup needed' >> /tmp/app-monitor.log
启动命令:
logstash -f /root/logstash-config/file-to-stdout.conf
💡 生产建议:
sincedb_path
应指向持久化路径,避免重复读取。
五、实战案例 2:从 Redis 读取数据
Redis 常作为临时消息队列,Logstash 可从中消费数据。
1. 准备 Redis 数据(示例)
假设 Redis 运行在 192.168.10.10:6379
,数据库 5 中有一个 List 类型的 Key:
redis-cli -h 192.168.130.62 -p 6379 -n 5
> LPUSH app_logs "user login success" "page not found" "api timeout"
2. 配置文件:redis-to-stdout.conf
input {redis {host => "192.168.130.62"port => 6379db => 5data_type => "list"key => "app_logs"codec => "plain" # 避免 JSON 解析失败}
}output {stdout {codec => rubydebug}
}
❗ 注意:若数据非 JSON 格式,避免使用默认
json
codec,否则会添加_jsonparsefailure
标签。
启动命令:
logstash -rf /root/logstash-config/redis-to-stdout.conf
✅ 提示:Redis 的 List 模式为“点对点”消费,数据被读取后即删除。如需广播或多消费者,建议使用 Kafka。
六、实战案例 3:对接 Filebeat(生产常用架构)
在生产环境中,通常采用 Filebeat 负责采集,Logstash 负责处理 的架构。
1. Logstash 配置:接收 Beats 数据
文件:beats-to-stdout.conf
input {beats {port => 8848}
}# 定义filter插件,使用mutate组件来删除指定的字段
filter {mutate {remove_field => [ "host","input","tags", "@version", "ecs","agent","log","domain"]}
}output {stdout {codec => rubydebug}
}
启动:
logstash -rf /root/logstash-config/beats-to-stdout.conf
2. Filebeat 配置:发送日志到 Logstash
文件:filebeat-to-logstash.yaml
filebeat.inputs:- type: filestreamenabled: truepaths:- /var/log/nginx/access.log# 使用 NDJSON 解析器(每行一个 JSON 对象)parsers:- ndjson:overwrite_keys: truetarget: ""add_error_key: trueoutput.logstash:hosts: ["192.168.130.65:8848"] # Logstash 服务器地址
启动 Filebeat:
./filebeat -e -c config/filebeat-to-logstash.yaml
✅ 观察 Logstash 输出,确认数据成功接收。
3.采集数据到ES集群
Filebeat → Logstash → Elasticsearch + stdout(调试)
✅ 第一步:更新 Logstash 配置(支持输出到 ES)
文件:/root/logstash-config/beats-to-es.conf
input {beats {port => 8848}
}# 数据清洗:删除不需要的字段,定义filter插件,使用mutate组件来删除指定的字段
filter {mutate {remove_field => [ "host", "input", "tags", "@version", "ecs", "agent", "log", "domain" ]}# 可选:如果日志是 JSON 格式,可以启用 json 解析# json {# source => "message"# target => "json_parsed"# remove_field => ["message"]# }
}# 同时输出到控制台(调试)和 Elasticsearch
output {# 调试用:输出到终端stdout {codec => rubydebug}# 生产用:输出到 ES 集群elasticsearch {hosts => ["http://192.168.130.61:9200", "http://192.168.130.62:9200", "http://192.168.130.65:9200"]index => "nginx-%{+yyyy.MM.dd}"}
}
💡 说明:
index => "nginx-%{+yyyy.MM.dd}"
:按天创建索引,如nginx-2025.08.08
✅ 第二步:启动 Logstash
logstash -rf /root/logstash-config/beats-to-es.conf --path.data /root/logstash-data-beats
⚠️ 注意:
- 使用独立的
--path.data
目录,避免冲突
✅ 第三步:配置 Filebeat(发送到 Logstash)
文件:/root/filebeat-config/filebeat-to-logstash.yaml
filebeat.inputs:- type: filestreamenabled: truepaths:- /var/log/nginx/access.log# 如果日志是 JSON 格式,启用 ndjson 解析parsers:- ndjson:overwrite_keys: truetarget: ""add_error_key: true# 输出到 Logstash
output.logstash:hosts: ["192.168.130.65:8848"] # 替换为你的 Logstash 服务器 IPssl.enabled: false # 如果没配 HTTPS 可关闭
✅ 第四步:启动 Filebeat
# 进入 Filebeat 目录
cd /usr/share/filebeat# 启动(前台输出日志,便于调试)
./filebeat -e -c /root/filebeat-config/filebeat-to-logstash.yaml
✅ 第五步:验证数据是否写入 Elasticsearch
1. 查看 ES 索引是否存在
curl -X GET "http://192.168.130.61:9200/_cat/indices?v" | grep nginx
你应该看到类似:
green open nginx-2025.08.08 oR5wViMDSKqApXMwOJBl7Q 2 1
2. 查看最新文档
curl -X GET "http://192.168.130.61:9200/nginx-2025.08.08/_search?pretty" |jq
预上传 ES 索引模板(推荐)
PUT /_index_template/nginx-template
{"index_patterns": ["nginx-*"],"template": {"settings": {"number_of_shards": 2,"number_of_replicas": 1,"analysis": {"analyzer": {"my_analyzer": {"type": "standard"}}}},"mappings": {"properties": {"clientip": { "type": "ip" },"method": { "type": "keyword" },"status": { "type": "short" },"url": { "type": "keyword" },"timestamp": { "type": "date" }}},"aliases": {"nginx-logs": {}}}
}
Nginx 日志↓ (Filebeat 采集)
Logstash(过滤处理)↓ (Elasticsearch 输出)
Elasticsearch 集群↓ (后续可接 Kibana 可视化)
Kibana 展示分析