当前位置: 首页 > news >正文

Logstash_Input插件


输入(Input)插件详解

数据管道的首要任务是获取数据。Logstash的输入插件(Input Plugins)就是这个管道的“水龙头”,负责从各种数据源中接收或拉取数据,并将其转换为Logstash事件(Event),投入流水线中。本章将详细剖析最常用和最核心的输入插件,帮助你根据不同的场景做出最合适的技术选型与配置。

3.1 文件类输入:filelog4j

从文件(尤其是日志文件)中读取数据是Logstash最经典的应用场景。

file 插件 (最常用)

这是从文件尾部读取内容(类似于 tail -f 命令)的标准插件。

核心配置项:

input {file {# 【必填】指定要读取的文件路径,支持通配符(Glob)模式path => ["/var/log/nginx/access.log", "/var/log/nginx/error.log"]# 【强烈推荐】为不同类型的数据打上标签,方便后续条件处理tags => ["nginx", "access"]# 【重要】指定编码,对于中文日志通常设为 "UTF-8"codec => "plain" { charset => "UTF-8" }# 【核心机制】记录上次读取位置的文件,保证重启后不重复读取数据sincedb_path => "/var/lib/logstash/sincedb_nginx" # 默认在 $HOME 下,生产环境必须显式指定# 从文件开头还是结尾开始读取。`beginning` 适用于初次导入历史数据start_position => "beginning" # 或 "end" (默认值)# 其他有用参数exclude => "*.gz"           # 排除压缩文件stat_interval => "1 second" # 检查文件状态的时间间隔discover_interval => 15     # 检查是否有新文件的间隔秒数}
}

架构师建议:

  • sincedb机制sincedb文件记录了每个被监听文件的inode号和上次读取的字节位置。这是Logstash实现断点续传、避免数据重复或丢失的关键。必须确保生产环境中此文件有可靠存储,尤其是在容器化部署时,通常需要挂载持久化卷来保存它。
  • 性能考量:监听大量文件(数千个)会消耗较多系统资源,因为每个文件都需要一个文件描述符和定时检查。

log4j 插件 (适用于Java生态)

允许Logstash作为一个Log4j SocketAppender的接收服务器,直接接收Java应用通过Log4j网络协议发送的日志事件。

配置示例:

input {log4j {host => "0.0.0.0"    # 监听的地址port => 4560         # 监听的端口mode => "server"     # 作为服务器运行}
}

架构师建议:

  • 优势:相比通过文件读取,这种方式延迟极低,几乎是实时的。并且日志格式已经是结构化的,无需复杂的Grok解析。
  • 劣势:引入了应用与日志收集器之间的网络依赖。如果Logstash服务不可用,可能会导致应用日志记录阻塞(取决于Log4j配置的缓冲和重试策略)。
  • 适用场景:对日志实时性要求极高的Java应用集群。通常会在Logstash前部署一个TCP负载均衡器或使用消息队列来解耦和提高可靠性。

3.2 网络协议类输入:tcp, udp, http

这些插件让Logstash化身为一个网络服务器,接收通过网络协议发送来的数据。

tcp 插件 (可靠传输)

提供基于TCP的可靠数据流传输。

input {tcp {port => 5000codec => json_lines # 非常常用!假设客户端发送的是以换行符分隔的JSON字符串type => "app_tcp_logs" # 自定义类型字段}
}

适用场景:任何可以通过TCP Socket发送数据的客户端(如自定义应用、网络设备、其他Logagent)。数据可靠性高。

udp 插件 (高性能,非可靠传输)

提供基于UDP的数据报传输。速度快,但可能丢失数据。

input {udp {port => 5160buffer_size => 65536 # 处理大报文时需要调整}
}

适用场景:对性能要求极高、且可以容忍少量数据丢失的场景,如监控指标(Metrics)上报、DNS日志等。

http 插件 (RESTful接口)

将Logstash变为一个HTTP端点,接收POST、PUT等请求。

input {http {host => "0.0.0.0"port => 8080# 可接受的内容类型additional_content_types => ["application/json", "text/plain"]# 自定义响应码,确认接收成功response_headers => { "Content-Type" => "text/plain" }codec => json}
}

适用场景:非常适合前端JS错误日志上报、Webhook(如GitHub、Jenkins)、以及任何无法安装Beats但能发起HTTP请求的场景。

3.3 消息队列集成:kafka, rabbitmq, redis

在大型分布式架构中,引入消息队列(Message Queue)作为数据缓冲层是至关重要的最佳实践。它解耦了数据生产者和消费者(Logstash),提供了削峰填谷的能力,增强了系统的弹性和可靠性。

kafka 插件 (大数据生态标准)

input {kafka {bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092" # Kafka集群地址topics => ["nginx-logs", "app-logs"]                         # 订阅的主题group_id => "logstash_consumers"                             # 消费者组ID,实现负载均衡auto_offset_reset => "latest"                                # 从最新偏移量开始消费consumer_threads => 4                                        # 并发消费线程数,提升吞吐codec => json                                                # 假设Kafka中存储的是JSON格式消息}
}

架构师建议:

  • 核心价值解耦与弹性。Beats或应用直接将数据写入高可用的Kafka集群,Logstash消费者组可以从Kafka中按自己的节奏消费数据。即使Logstash集群需要维护或重启,数据也会安全地保存在Kafka中,不会丢失。
  • 性能调优:通过增加 consumer_threads 和部署多个Logstash节点,可以水平扩展消费能力。

rabbitmq 插件 (AMQP协议代表)

input {rabbitmq {host => "rabbitmq-host"queue => "logstash_queue"durable => true      # 队列持久化auto_delete => falsecodec => "json"}
}

redis 插件 (高性能缓存/队列)

可以从Redis的List或Pub/Sub通道中读取数据。

input {redis {host => "redis-host"data_type => "list"    # 或 "channel" (pub/sub)key => "logstash_list" # list的名称或channel的主题batch_count => 125     # 一次批量获取的事件数}
}

3.4 云平台与周期型输入:s3, cloudwatch, jdbc

s3 插件 (处理云上归档日志)

从Amazon S3存储桶中读取文件,非常适合处理定期归档到S3的日志文件(如由AWS Lambda或S3传输网关归档的ALB/Native Gateway日志)。

input {s3 {bucket => "my-app-logs-archive"region => "us-east-1"prefix => "alb/" # 可选,只处理特定前缀的文件interval => "60" # 检查新文件的间隔秒数}
}

jdbc 插件 (数据库数据同步)

这是一个轮询式输入插件,通过定期执行SQL查询,将数据库中的数据同步到Logstash中。

input {jdbc {# 数据库连接参数jdbc_connection_string => "jdbc:mysql://db-host:3306/my_database"jdbc_user => "logstash_user"jdbc_password => "your_password"jdbc_driver_library => "/path/to/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 查询配置statement => "SELECT * FROM orders WHERE updated_at > :sql_last_value"schedule => "*/5 * * * *" # 每5分钟执行一次 (cron语法)use_column_value => truetracking_column => "updated_at"tracking_column_type => "timestamp"}
}

适用场景:将业务数据库中的表(如用户信息、订单数据)同步到Elasticsearch中,以提供搜索和分析功能。注意:此插件通常不用于同步高频变更数据,更适合批量同步。实时同步应使用CDC(Change Data Capture)工具如Debezium。

3.5 Input插件通用配置项

几乎所有Input插件都支持一些通用参数:

  • type:为事件设置一个类型字段。可用于后续条件过滤(if [type] == "nginx")。
  • tags:为事件添加任意标签数组。常用于标记数据来源或属性。
  • codec:在输入阶段即可使用的编解码器。例如,如果数据源是JSON格式,可以在输入阶段直接解析:codec => json

总结

选择正确的Input插件是设计数据管道的第一步。作为架构师,你需要权衡:

  • 数据可靠性要求:是选择可靠的TCP/文件,还是高性能但可能丢失的UDP?
  • 系统耦合度:是让应用直接连接Logstash,还是通过Kafka/RabbitMQ解耦?
  • 数据来源特性:是实时流(文件tail、网络)、还是批量数据(S3、JDBC)?
http://www.dtcms.com/a/343909.html

相关文章:

  • Chrome和Edge如何开启暗黑模式
  • 浏览器插件优化工具:bypass paywalls chrome
  • 【TrOCR】根据任务特性设计词表vocab.json
  • 今日科技热点 | NVIDIA AI芯片、5G加速与大数据平台演进——技术驱动未来
  • ESP32C5在espidf环境下报错5g bitmap contains only invalid channels= @xff
  • 龙虎榜——20250822
  • 线上日志排查问题
  • docker 查看容器 docker 筛选容器
  • 使用 Ragas 评估你的 Elasticsearch LLM 应用
  • 基于Python的伊人酒店管理系统 Python+Django+Vue.js
  • 基于Docker的高可用WordPress集群部署:Nginx负载均衡+Mysql主从复制+ProxySQL读写分离
  • Unreal Engine UFloatingPawnMovement
  • SpringBoot集成ELK
  • 【Dubbo】高性能的 RPC
  • 零基础从头教学Linux(Day 18)
  • Slither 审计自己写的智能合约
  • 《R for Data Science (2e)》免费中文翻译 (第5章) --- Data tidying
  • 园区 “一表多属” 电仪表能碳数据归集与编码实施文档
  • 《LINUX系统编程》笔记p3
  • 赛灵思ZYNQ官方文档UG585自学翻译笔记与代码示例:XILINX UART控制器详解:特性与功能
  • 新手向:计算机视觉入门OpenCV实战项目
  • elasticsearch 7.x elasticsearch 使用scroll滚动查询一页,删除一页,影响后面滚动的查询吗
  • 【LeetCode热题100道笔记+动画】最大子数组和
  • 任务同步和锁
  • 基于django/python的服装销售系统平台/服装购物系统/基于django/python的服装商城
  • sqli-labs通关笔记-第61关 GET字符型报错注入(单引号双括号闭合 限制5次探测机会)
  • 基于Django的学校实验室预约管理系统/基于python的实验室管理系统的设计与实现#python#django#FLASK
  • JAVA基础-java虚拟机
  • uniapp googlepay支付 内购项目
  • 豆包AI PPT与秒出PPT对比评测:谁更适合你?