Logstash中http_poller插件的用法
Logstash 的 http_poller
输入插件是一个非常强大的插件,用于从 HTTP API 定期抓取数据,并将其转换为 Logstash 事件进行处理。
核心功能
http_poller
就像一个机器人,按照你设定的时间表,定期去访问一个或多个 HTTP(S) 端点,然后将返回的数据(如 JSON、XML、Plain Text)打包成 Logstash 事件,送入 Filter 阶段进行处理。
基础配置示例
一个最简单的配置如下,它每隔 60 秒请求一次 API:
input {http_poller {urls => {my_api => "http://example.com/api/data" # `my_api` 是给这个URL起的名字}interval => 60 # 单位:秒。每隔60秒请求一次codec => "json" # 假设API返回的是JSON,自动解析成JSON对象}
}output {stdout { codec => rubydebug } # 将获取到的数据打印到控制台
}
完整参数详解与高级用法
1. urls
(必需)
定义一个要轮询的 URL 集合。它是一个 Hash,键是自定义的名称,值是完整的 URL。
-
单个 URL:
urls => {service_health => "https://my-app:8080/health" }
-
多个 URL:
urls => {service_health => "https://my-app:8080/health",recent_orders => "http://order-service:8080/api/orders/recent",user_count => "http://auth-service:8080/api/users/count" }
Logstash 会为每个 URL 独立发起请求,并生成独立的事件。
2. interval
(必需)
轮询间隔时间。可以是秒数,也可以是带有时间单位的字符串。
interval => 60 # 60秒
interval => 300 # 5分钟
interval => "1h" # 1小时
interval => "5m" # 5分钟
3. request_timeout
等待 HTTP 请求返回的超时时间(秒)。
request_timeout => 30 # 30秒后超时
4. schedule
使用 Cron 语法进行更精细的调度,比 interval
更灵活。如果设置了 schedule
,则 interval
失效。
schedule => "*/5 * * * *" # 每5分钟一次
schedule => "0 * * * *" # 每小时的第0分钟(整点)一次
schedule => "0 2 * * *" # 每天凌晨2点一次
5. codec
用于解码响应体的编解码器。对于 RESTful API,最常用的是 json
。
codec => "json" # 解析 application/json
codec => "plain" # 不解析,作为纯文本放入 `message` 字段
6. target
(极其重要)
将 HTTP 返回的完整响应数据存储到事件中的指定字段下。这是最佳实践,可以保持事件结构的清晰。
target => "api_response" # 响应数据将被放到 `[api_response]` 字段下
- 不使用
target
:API 返回的 JSON 的根字段会直接平铺到事件的根目录,容易造成字段冲突。 - 使用
target
:所有返回数据都封装在api_response
字段内,结构清晰,易于处理。
7. tags
为从该输入插件生成的事件添加标签。常用于后续的 Filter 条件判断。
tags => ["_http_poller", "_metrics"] # 给事件打上标签
8. HTTP 认证与头信息
-
基本认证 (Basic Auth):
auth => {user => "username"password => "password" }
-
自定义 HTTP Headers:
headers => {"X-API-Key" => "your-secret-api-key""User-Agent" => "Logstash HTTP Poller" }
-
HTTP 方法:
method => "post" # 默认为 "get"
一个完整的、生产环境风格的示例
这个示例展示了如何从安全的有状态 API 获取数据,并清晰地处理它们。
input {http_poller {urls => {cloud_metrics => "https://api.cloud-monitor.com/v1/metrics/nginx"app_events => "https://api.myapp.com/events?since=last"}# 使用cron语法进行更复杂的调度schedule => { "every" => "5m" } # 每5分钟一次# HTTP请求配置request_timeout => 45method => "get"headers => {"Authorization" => "Bearer ${API_TOKEN}" # 使用环境变量存储密钥"Accept" => "application/json"}# 处理响应codec => "json" # 解析JSON响应target => "fetch_data" # 将整个API响应存到 `[fetch_data]` 字段下tags => ["_http_poller", "config_source"] # 打上标签,便于过滤# 元数据:将哪个URL的响应也记录下来metadata_target => "http_poller_metadata"}
}filter {# 1. 首先处理http_poller来的事件:根据标签筛选if "config_source" in [tags] {# 2. 根据不同的URL来源,进行不同的处理if [http_poller_metadata][name] == "cloud_metrics" {# 处理来自 cloud_metrics API 的数据mutate { add_tag => ["metrics", "processed"] }# ... 其他处理逻辑,比如重命名字段等 ...}if [http_poller_metadata][name] == "app_events" {# 处理来自 app_events API 的数据mutate { add_tag => ["events", "processed"] }# ... 其他处理逻辑 ...}}
}output {# 将处理后的数据发送到Elasticsearch,并根据来源决定索引名if "metrics" in [tags] {elasticsearch {hosts => ["es:9200"]index => "cloud-metrics-%{+YYYY.MM.dd}"}}if "events" in [tags] {elasticsearch {hosts => ["es:9200"]index => "app-events-%{+YYYY.MM.dd}"}}# 用于调试:将所有http_poller来的事件也在控制台打印一份if "_http_poller" in [tags] {stdout {codec => rubydebug}}
}
总结
http_poller
的核心用法流程如下:
它的常见用途包括:
- 动态配置:从配置中心拉取最新配置(正如之前的例子)。
- 监控与指标收集:定期抓取健康检查、性能指标(如Prometheus metrics端点)。
- 数据同步:从外部API(如天气API、汇率API)获取数据,丰富你的日志信息。
- 告警与自愈:定期检查状态,如果发现异常,可以在Filter中触发告警操作。
关键点:使用 target
字段保持事件整洁,使用 tags
来标记事件来源,以便后续过滤。