当前位置: 首页 > news >正文

Logstash 实战指南:从入门到生产级日志处理

Logstash 实战指南:从入门到生产级日志处理


一、Logstash 简介

Logstash 是一个功能强大的 服务器端数据处理管道,具备以下三大核心能力:

  • Input(输入):从多种来源采集数据(文件、Redis、Beats、Syslog 等)
  • Filter(过滤):对数据进行解析、清洗、丰富和转换
  • Output(输出):将处理后的数据发送至目标系统(Elasticsearch、Kafka、Redis、控制台等)

典型应用场景:

  • 日志集中采集与结构化
  • 多源数据聚合与标准化
  • 数据预处理与 ETL 流程

二、环境准备与安装

1. 下载 Logstash

wget https://artifacts.elastic.co/downloads/logstash/logstash-8.2.2-linux-x86_64.tar.gz

2. 解压安装包

tar -xf logstash-8.2.2-linux-x86_64.tar.gz -C /opt/

3. 配置环境变量

创建环境变量脚本:

# /etc/profile.d/logstash.sh
export LOGSTASH_HOME=/opt/logstash-8.2.2
export PATH=$PATH:$LOGSTASH_HOME/bin

加载环境变量:

source /etc/profile.d/logstash.sh

4.查看帮助信息

logstash -h
常用启动参数说明
参数说明
-n, --node.name NAME指定当前 Logstash 实例的名称,用于集群标识。
-f, --path.config CONFIG_PATH指定配置文件或配置目录路径。
-e, --config.string CONFIG_STRING通过命令行直接指定配置,常用于快速测试。
--pipeline.id ID指定 Pipeline 的唯一标识,默认为 main
-w, --pipeline.workers COUNT设置工作线程数,建议设为 CPU 核心数。
-b, --pipeline.batch.size SIZE批量处理事件数,默认 125,影响吞吐与延迟。
-u, --pipeline.batch.delay DELAY_IN_MS批处理最大等待时间,默认 50ms。
--path.data PATH指定数据存储路径,RPM 默认为 /usr/share/logstash/data
-l, --path.logs PATH指定日志输出路径。
--log.level LEVEL设置日志级别:fatal, error, warn, info, debug, trace
-V, --version查看 Logstash 版本。
-r, --config.reload.automatic启用配置文件热加载(自动重载)。
--config.reload.interval SECONDS配置文件检查间隔(需配合 -r 使用)。
--log.format FORMAT日志输出格式:plain(默认)或 json
--path.settings SETTINGS_DIR指定配置目录,默认为 /usr/share/logstash/config
-h, --help显示帮助信息。

三、快速入门:从标准输入到控制台输出

1. 命令行方式启动(仅用于测试)

logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'

输入任意文本(如 Hello Logstash),将看到如下结构化输出:

{"@version" => "1","@timestamp" => 2025-08-07T07:27:03.183912Z,"event" => {"original" => "Hello Logstash"},"message" => "Hello Logstash","host" => {"hostname" => "es-node-03"}
}

⚠️ 提示:命令行方式不适合生产环境,建议使用配置文件管理复杂逻辑。


2. 配置文件方式启动(推荐)

  • 创建配置工作目录
mkdir ~/logstash-config
  • 创建配置文件:
cat > ~/logstash-config/01-input-stdout.conf << 'EOF'
input {stdin {type => "user-input"}
}output {stdout {codec => rubydebug}
}
EOF

启动服务:

logstash -rf ~/logstash-config/01-input-stdout.conf

✅ 使用 -f 指定配置文件,-r 支持热重载(开发调试时使用 -rf)。

输入任意文本(如 Hello Logstash


四、实战案例 1:采集本地日志文件

使用 file 输入插件监控日志文件变化。

配置文件:file-to-stdout.conf

input {file {id => "app-logs"path => ["/tmp/app-*.log"]start_position => "beginning"         # 从文件开头读取(适合测试)sincedb_path => "/dev/null"           # 不记录读取位置,每次重新读(仅测试用)stat_interval => "1 second"           # 每秒检查一次文件变化discover_interval => 5                # 每5秒扫描一次目录新文件}
}output {stdout {codec => rubydebug}
}

日志示例:

# 创建三个测试日志文件
echo '2025-08-07T10:00:01Z app1 INFO User login successful for user=admin from=192.168.1.100' > /tmp/app-web1.log
echo '2025-08-07T10:00:05Z app2 ERROR Database connection failed: timeout, host=db01, retry=3' > /tmp/app-db-error.log
echo '2025-08-07T10:00:10Z app3 WARN Disk usage at 85% on /var/log, cleanup needed' >> /tmp/app-monitor.log

启动命令:

logstash -f /root/logstash-config/file-to-stdout.conf

💡 生产建议:sincedb_path 应指向持久化路径,避免重复读取。


五、实战案例 2:从 Redis 读取数据

Redis 常作为临时消息队列,Logstash 可从中消费数据。

1. 准备 Redis 数据(示例)

假设 Redis 运行在 192.168.10.10:6379,数据库 5 中有一个 List 类型的 Key:

redis-cli -h 192.168.130.62 -p 6379 -n 5
> LPUSH app_logs "user login success" "page not found" "api timeout"

2. 配置文件:redis-to-stdout.conf

input {redis {host => "192.168.130.62"port => 6379db => 5data_type => "list"key => "app_logs"codec => "plain"                     # 避免 JSON 解析失败}
}output {stdout {codec => rubydebug}
}

❗ 注意:若数据非 JSON 格式,避免使用默认 json codec,否则会添加 _jsonparsefailure 标签。

启动命令:

logstash -rf /root/logstash-config/redis-to-stdout.conf

✅ 提示:Redis 的 List 模式为“点对点”消费,数据被读取后即删除。如需广播或多消费者,建议使用 Kafka


六、实战案例 3:对接 Filebeat(生产常用架构)

在生产环境中,通常采用 Filebeat 负责采集,Logstash 负责处理 的架构。

1. Logstash 配置:接收 Beats 数据

文件:beats-to-stdout.conf

input {beats {port => 8848}
}# 定义filter插件,使用mutate组件来删除指定的字段
filter {mutate {remove_field => [ "host","input","tags", "@version", "ecs","agent","log","domain"]}
}output {stdout {codec => rubydebug}
}

启动:

logstash -rf /root/logstash-config/beats-to-stdout.conf

2. Filebeat 配置:发送日志到 Logstash

文件:filebeat-to-logstash.yaml

filebeat.inputs:- type: filestreamenabled: truepaths:- /var/log/nginx/access.log# 使用 NDJSON 解析器(每行一个 JSON 对象)parsers:- ndjson:overwrite_keys: truetarget: ""add_error_key: trueoutput.logstash:hosts: ["192.168.130.65:8848"]    # Logstash 服务器地址

启动 Filebeat:

./filebeat -e -c  config/filebeat-to-logstash.yaml

✅ 观察 Logstash 输出,确认数据成功接收。

3.采集数据到ES集群

Filebeat → Logstash → Elasticsearch + stdout(调试)


✅ 第一步:更新 Logstash 配置(支持输出到 ES)
文件:/root/logstash-config/beats-to-es.conf
input {beats {port => 8848}
}# 数据清洗:删除不需要的字段,定义filter插件,使用mutate组件来删除指定的字段
filter {mutate {remove_field => [ "host", "input", "tags", "@version", "ecs", "agent", "log", "domain" ]}# 可选:如果日志是 JSON 格式,可以启用 json 解析# json {#   source => "message"#   target => "json_parsed"#   remove_field => ["message"]# }
}# 同时输出到控制台(调试)和 Elasticsearch
output {# 调试用:输出到终端stdout {codec => rubydebug}# 生产用:输出到 ES 集群elasticsearch {hosts => ["http://192.168.130.61:9200", "http://192.168.130.62:9200", "http://192.168.130.65:9200"]index => "nginx-%{+yyyy.MM.dd}"}
}

💡 说明:

  • index => "nginx-%{+yyyy.MM.dd}":按天创建索引,如 nginx-2025.08.08

✅ 第二步:启动 Logstash
logstash -rf /root/logstash-config/beats-to-es.conf --path.data /root/logstash-data-beats

⚠️ 注意:

  • 使用独立的 --path.data 目录,避免冲突

✅ 第三步:配置 Filebeat(发送到 Logstash)
文件:/root/filebeat-config/filebeat-to-logstash.yaml
filebeat.inputs:- type: filestreamenabled: truepaths:- /var/log/nginx/access.log# 如果日志是 JSON 格式,启用 ndjson 解析parsers:- ndjson:overwrite_keys: truetarget: ""add_error_key: true# 输出到 Logstash
output.logstash:hosts: ["192.168.130.65:8848"]  # 替换为你的 Logstash 服务器 IPssl.enabled: false             # 如果没配 HTTPS 可关闭

✅ 第四步:启动 Filebeat
# 进入 Filebeat 目录
cd /usr/share/filebeat# 启动(前台输出日志,便于调试)
./filebeat -e -c /root/filebeat-config/filebeat-to-logstash.yaml

✅ 第五步:验证数据是否写入 Elasticsearch
1. 查看 ES 索引是否存在
curl -X GET "http://192.168.130.61:9200/_cat/indices?v" | grep nginx

你应该看到类似:

green  open   nginx-2025.08.08             oR5wViMDSKqApXMwOJBl7Q   2   1 
2. 查看最新文档
curl -X GET "http://192.168.130.61:9200/nginx-2025.08.08/_search?pretty" |jq

预上传 ES 索引模板(推荐)

PUT /_index_template/nginx-template
{"index_patterns": ["nginx-*"],"template": {"settings": {"number_of_shards": 2,"number_of_replicas": 1,"analysis": {"analyzer": {"my_analyzer": {"type": "standard"}}}},"mappings": {"properties": {"clientip": { "type": "ip" },"method": { "type": "keyword" },"status": { "type": "short" },"url": { "type": "keyword" },"timestamp": { "type": "date" }}},"aliases": {"nginx-logs": {}}}
}
Nginx 日志↓ (Filebeat 采集)
Logstash(过滤处理)↓ (Elasticsearch 输出)
Elasticsearch 集群↓ (后续可接 Kibana 可视化)
Kibana 展示分析
http://www.dtcms.com/a/332668.html

相关文章:

  • GitHub 热榜项目 - 日榜(2025-08-15)
  • 硬核实用!R+贝叶斯解决真实问题:参数估计(含可靠性分析) + 回归建模(含贝叶斯因子比较) + 生产级计算实践 赠「常见报错解决方案」秘籍!
  • ubuntu 24.04 通过部署ollama提供大模型api接口
  • 线程P5 | 单例模式[线程安全版]~懒汉 + 饿汉
  • CANDB++中的CAN_DBC快速编辑方法,使用文本编辑器(如notepad++和VScode)
  • Redis 知识点与应用场景
  • 六十六、【Linux数据库】MySQL数据导入导出 、 管理表记录 、 匹配条件
  • 日本服务器哪些服务商是可以免费试用的?
  • 拒绝“效果图”返工:我用Substance 3D Stager构建产品可视化工作流
  • 计算机视觉(opencv)实战五——图像平滑处理(均值滤波、方框滤波、高斯滤波、中值滤波)附加:视频逐帧平滑处理
  • vue2生命周期详解
  • Claude Opus 4.1深度解析:抢先GPT5发布,AI编程之王主动出击?
  • 【线上问题】1分钟学会如何定位 Java 应用 CPU 飙升问题
  • Spring中存在两个相同的Bean是否会报错?
  • Amazon Bedrock如何轻松实现复杂的生成式AI模型?
  • 纯C++实现halcon的threshold
  • 【Java EE进阶 --- SpringBoot】初识Spring(创建SpringBoot项目)
  • zynq代办事项
  • Vue 侦听器(watch 与 watchEffect)全解析2
  • 【100页PPT】数字化转型集团信息化总体解决方案(附下载方式)
  • Swift 实战:用最长递增子序列算法解“俄罗斯套娃信封”问题(LeetCode 354)
  • 日本服务器租用选哪个机房国内访问比较快?
  • 【LINUX网络】HTTP协议基本结构、搭建自己的HTTP简单服务器
  • 企微用户部门同步HRS系统
  • 电脑上练打字用什么软件最好:10款打字软件评测
  • 滑窗|贪心
  • Sonatype Nexus Repository Manager docker版本安装
  • [优选算法专题二滑动窗口——无重复字符的最长子串]
  • Linux应用层开发--线程
  • react性能优化之useRef和useState