ELK运维之路(Logstash-插件)
1. date插件实战(解决时间差)
解决访问时间和录入时间之间时间差的问题
- @timestamp 当前查询ES数据的时间
- timestamp 实际访问时间(应用访问时间)
在页面查询的过程中往往使用的写入时间和访问时间之间有时间差,此时可以使用date插件来进行操作,其他时间时间字段也可以参考
root@ubuntu2204test99:~/elkf/logstash/pipeline# cat logstash.conf # 通过Grok来对nginx常规日志进行拆分处理 input {beats {port => 5044} }# 过滤 filter {# 通过grok组件来对字段进行正则匹配,引用自带的匹配规则变量%{COMBINEDAPACHELOG}grok {match => {"message" => "%{COMBINEDAPACHELOG}"}remove_field => [ "ecs","input","agent" ]add_field => {"add_field_log" => "这是添加字段"}}# 通过date插件来,来将timestamp时间和\@timestamp一致# timestamp时间格式 "13/May/2022:15:47:24 +0800",要通过实际格式来不能直接套date {# 后面的时间格式化要根据前面的时间来写match => ["timestamp", "dd/MM/yyyy:HH:mm:ss Z"] # 使用 Z 来匹配时区timezone => "Asia/Shanghai" # 设置为实际的时区target => "parsed_timestamp" # 将解析后的时间存储到新的字段,可选(默认的字段@timestamp)} }output {stdout {}elasticsearch {hosts => ["192.168.1.99:9201","192.168.1.99:9202","192.168.1.99:9203"]user => "elastic"password => "123456"index => "logs-nginx-base-%{+yyyy.MM.dd}"} }
2.geoip插件实战
# 通过geoip插件,对客户端IP进行来源分析,可以搜索 Geoip filter plugin 插件使用,国内可能会下载失败
root@ubuntu2204test99:~/elkf/logstash/pipeline# cat logstash.conf
# 通过Grok来对nginx常规日志进行拆分处理
input {beats {port => 5044}
}# 过滤
filter {# 通过grok组件来对字段进行正则匹配,引用自带的匹配规则变量%{COMBINEDAPACHELOG}grok {match => {"message" => "%{COMBINEDAPACHELOG}"}remove_field => [ "ecs","input","agent" ]add_field => {"add_field_log" => "这是添加字段"}}geoip {# 基于什么字段进行分析(分析后clientip会出现更多字段,比如经纬度、城市名等)source => "clientip"# 指定想查看的字段(其他字段会被过滤掉)fileds => ["city_name","country_name","ip"]# 指定geoip的输出字段(可选)target => "ip_target"}
}output {stdout {}elasticsearch {hosts => ["192.168.1.99:9201","192.168.1.99:9202","192.168.1.99:9203"]user => "elastic"password => "123456"index => "logs-nginx-base-%{+yyyy.MM.dd}"}
}
3.useragent分析客户端设备类型
# useragent插件分析客户端设备类型,注意这里采集Nginx日志的时候是将nginx日志格式变为了json格式
root@ubuntu2204test99:~/elkf/logstash/pipeline# cat logstash.conf
# 通过Grok来对nginx常规日志进行拆分处理
input {beats {port => 5044}
}# 过滤
filter {# 通过grok组件来对字段进行正则匹配,引用自带的匹配规则变量%{COMBINEDAPACHELOG}grok {match => {"message" => "%{COMBINEDAPACHELOG}"}}geoip {source => "clientip"fileds => ["city_name","country_name","ip"]target => "ip_target"}useragent {source => "http_user_agent"target => "useragent_target" # 将分析数据存储在指定字段名中}
}output {stdout {}elasticsearch {hosts => ["192.168.1.99:9201","192.168.1.99:9202","192.168.1.99:9203"]user => "elastic"password => "123456"index => "logs-nginx-base-%{+yyyy.MM.dd}"}
}
4.Mutate插件
root@ubuntu2204test99:~/elkf/logstash/pipeline# cat logstash.conf
# 通过Grok来对nginx常规日志进行拆分处理
input {beats {port => 5044}
}# 过滤
filter {# 通过grok组件来对字段进行正则匹配,引用自带的匹配规则变量%{COMBINEDAPACHELOG}grok {match => {"message" => "%{COMBINEDAPACHELOG}"}}mutate {# 对指定字段message进行切割操作,切割符| ,切割后会生成类似于列表或者数组的内容,可以在输出中查看split => { "message" => "|" }}mutate {# 抽取切割后的内容,并指定字段名(使用的下标)add_field => {"user_id" => "%{[message][0]}"}}mutate {# 将user_id字段转换成整数类型(有多种类型可以转换)convert => {"user_id" => "integer"}}mutate {# 将user_id左右两边的空格剔除掉strip => ["user_id"]}mutate {# 拷贝user_id内容到user_id2字段copy => { "user_id" => "user_id2" }}mutate {# 将user_id2重命名rename => { "user_id2" => "user_id_rename" }}mutate {# 替换user_id2中原有内容replace => { "user_id2" => "替换内容" }}# 还有很多可以参考官方
}output {stdout {}elasticsearch {hosts => ["192.168.1.99:9201","192.168.1.99:9202","192.168.1.99:9203"]user => "elastic"password => "123456"index => "logs-nginx-base-%{+yyyy.MM.dd}"}
}