当前位置: 首页 > news >正文

logstash收集数据

防止ES的的I/O的压力过大,使用redis/kafka进行缓冲。

对redis的要求

Redis input plugin | Logstash Reference [8.17] | Elastic

一般企业要求的架构

我实现的架构

filebeat把数据传给logstash

配置好filebeat把收集到的数据输入到redis

然后执行命令,filebeat就开始往redis中写数据

cd /etc/filebeat

/usr/share/filebeat/bin/filebeat -e -c filebeat.yml   

logstash配置,从redis中读数据输出到elasticsearch

当我使用filebeat收集数据,传到redis,然后logstash从redis读取数据的时候有一个大问题就是

filebeat收集的是json格式的nginx访问日志,它被filebeat收集到redis的时候,会对这个收集的日志和与日志相关的元数据进行封装然后输出到redis。等logstash从redis里面取出来的时候,得到的数据是 收集的日志(message)  和 元数据字段的集合。

(就变成了一个字段message,其他都是没用的元数据字段。)

我收集的nginx日志格式

要解决的问题

1.json格式的日志数据message未正确解析

这是在filebeat直接向logstash传送数据的时候发送的错误,因为传送过去的数据没有设置正确的字符集,导致收集到的日志乱码

kibana显示

改正后基本上就没有乱码格式了,一个event是一条日志

2.删除无用的元数据字段

Filebeat直接连接logtsash的时候,filebeat通过output.logstash发送数据,默认不会添加完整元数据,输出到redis的时候,filebeat使用output.redis将日志包装成完整事件结构,包含所有的元数据。

Redis 作为缓冲队列:数据存储到 Redis 时,Filebeat 会保留完整的 Beat 事件结构(包括 @metadatahostagent 等)。

所以当logstash从redis取日志数据的时候,会收集无用的元数据(只有message是收集的访问日志),我们要把无用的元数据过滤掉

然后会得到一整个大的message字段

3.过滤出日志里面的单个信息

所以删除无用的元数据之后如果我们不仅想得到访问日志,还想得到访问日志里面得具体数据,使用这个具体数据进行画图等等(比如我们想得到remote_addr字段,就需要对message进行解析)

input {
  redis {
    host       => "10.8.0.23"
    port       => 6379
    password   => "Pu@1uC2016"
    db         => 0
    data_type  => "list"
    key        => "nginx-accesslog"
    codec      => json  # 自动解析外层 JSON
  }
}
#nginx的原始的json格式的数据是这样的
#{"time_local":"2025-03-28T05:55:57+08:00","remote_addr":"45.90.163.37","remote_user":"","request":"PUT /v1/agent/service/register
#HTTP/1.1","status":"400","body_bytes_sent":"173","request_time":"0.263","http_referer":"","http_user_agent":"","http_x_forwarded_for":"",
#"upstream_addr":"","upstream_response_time":""}


#但是message里面的数据是这样的,我们要把message中的数据转化为json格式,然后json过滤才能把字段(比如ip)过滤出来,然后可以对数据做图形分析
# "message" => "{\"time_local\":\"2025-03-28T18:12:23+08:00\",\"remote_addr\":\"10.8.0.23\",\"request\":\"HEAD / HTTP/1.1\",\"status\":\"200\",
#\"body_bytes_sent\":\"0\",\"request_time\":\"0.000\",\"http_referer\":\"\",\"http_user_agent\":\"curl/7.61.1\",}"


#要把下面转化为上面的这样的json格式(上面是标准的json格式),filter里面的json工具才能把message这个大字段里面的小字段取出来
filter {
  # ========== 第一步:处理 message 字段中的内层 JSON ==========
  mutate {                                          
    gsub => [
        "message", "\\\"", "\""     # 将 \" 替换为普通引号
       ] 
     }
    mutate {
    # 删除末尾多余逗号(如 ",}" → "}")
    gsub => [
      "message", ",}", "}",
      "message", ",]", "]"
    ]
     }



  # 解析 message 字段中的业务日志   这个json过滤,它只作用于json格式的数据
  json {
    source => "message"
    target => "nginx_log"  # 解析结果存入 nginx_log 子对象
    remove_field => ["message"]
    tag_on_failure => ["_json_parse_failure"]  # 标记解析失败的日志
  }
  # ========== 第二步:提升业务字段到根层级 ==========
  mutate {
    rename => {
      "[nginx_log][time_local]"          => "time_local"
      "[nginx_log][remote_addr]"         => "remote_addr"
      "[nginx_log][request]"             => "request"
      "[nginx_log][status]"              => "status"
      "[nginx_log][body_bytes_sent]"     => "body_bytes_sent"
      "[nginx_log][http_referer]"        => "http_referer"
      "[nginx_log][http_user_agent]"     => "http_user_agent"
    }
  }


#把一些无用的元数据过滤掉不要
  # ========== 第三步:清理所有元数据字段 ==========
  mutate {
    remove_field => [
      "host", "agent", "ecs", "log", "input", "fields",
      "@version", "event", "nginx_log"
    ]
  }
  # ========== 第四步:修正时间戳 ==========
  date {
    match  => [ "time_local", "ISO8601" ]
    target => "@timestamp"  # 覆盖默认时间戳
  }
}
output {
  elasticsearch {
    hosts  => ["https://10.8.0.23:9200"]
    index  => "dami-logs-%{+YYYY.MM.dd}"
    user   => "elastic"
    password => "sxm@325468"
    ssl => true
    cacert => "/logstash/http_ca.crt"    
  }
  # 调试时开启 stdout,查看完整字段
  stdout {
    codec => rubydebug {
      metadata => true  # 显示元数据(确认字段结构)
    }
  }
}

最后可以得到message里面的单个小字段信息,以前只能在message中进行查看,不能对数据进行分析和画图,现在可以了

filter过滤器的其他用法

Filter plugins | Logstash Reference [8.17] | Elastic

1.grok  

也可以康康这个人的文章

logstash过滤器插件filter详解及实例 - 峰哥ge - 博客园

官方文档

Grok filter plugin | Logstash Reference [8.17] | Elastic

grok %{语法:语义}  

#语法在配置文件里已经定义好了 

#语义是自己定义的,表示要将获得的字段放在哪个key里,例如下面ip就是key值,取出的字段值是value

match => {"message" => "%{IPV4:ip}"}    #message是收集到的每一条数据

/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-patterns-core-4.3.4/patterns/ecs-v1/grok-patterns   #语法在配置文件里已经定义好了 

match => {"message" => "%{IPV4:ip}"}    #message是收集到的每一条数据

match => {"message" => "%{@HTTPDATE:time}"}    #

match => {"message" => "%{LOGLEVEL:level}"}       #

2.groip 通过ip定位物理位置

Geoip 过滤器插件 |Logstash 参考 [8.17] |弹性的

logstash上要安装这个数据库(要先注册才能看)

Download GeoIP Databases | MaxMind

我的geoip配置写错了但是我懒得改了,我的阿里云服务器要释放了。

logstash收集到的数据还可以存放到数据库中,然后可以自己公司开发一个前端工具连接到数据库进行数据分析

相关文章:

  • Spring Boot 2.x 到 3.x 迁移实战:Redis 配置篇
  • 测试用例与需求脱节的修复方案
  • 基于MFC按钮逻辑
  • Linux内核审计规则及其数据结构
  • 基于OpenCV+MediaPipe手部追踪(1/2)
  • shell脚本实例练习(持续更新)
  • 为AI聊天工具添加一个知识系统 之154:理论框架、工程方案及两者的结合架构
  • 深入理解 JavaScript 的 Set 对象
  • 全面解析PRN文件:从原理到可视化的完整指南 【标签打印】
  • MySql修改全部表和字段编码
  • 计算机体系结构的性能瓶颈100例
  • opencv在iOS 中的使用
  • Linux | i.MX6ULL 终结者学习指南(1)
  • MyBatis分页插件混用问题解析:IPage与PageHelper的冲突与解决
  • Linux系统部署OA项目
  • 初识MySQl · 内置函数
  • OpenCV图像处理:边缘检测
  • YOLOv8 中的损失函数解析
  • 《 C语言实现:金字塔式星号图案打印》
  • Transformer革命:人工智能如何突破语言理解的边界
  • 上海皮影戏《九色鹿》闪耀塞尔维亚,再获2项国际大奖
  • 印尼总统20年来首次访泰:建立战略伙伴关系,加强打击网络诈骗等合作
  • 俄乌直接谈判勉强收场,特朗普再次“电话外交”能否有用?|907编辑部
  • 1块钱解锁2万部微短剧还能日更,侵权盗版难题怎么破?
  • 玉林一河段出现十年最大洪水,一村民被冲走遇难
  • 柬埔寨果农:期待柬埔寨榴莲走进中国市场