用好 Elasticsearch Ruby 传输层elastic-transport
一、它是什么?为什么重要?
-
elastic-transport
:Elasticsearch 官方的 低层 Ruby 传输库- 负责:连接一个/多个节点、轮询与选择、失败节点维护、请求/响应日志与追踪、序列化、(可选)嗅探与节点重载、重试等。
- 不负责:具体 API 的封装与调用(那是
elasticsearch
gem 的事)。
-
默认基于 Faraday 实现 HTTP 传输,支持 Faraday 1.x 与 2.x。
-
为了吞吐与延迟,建议使用支持持久连接(keep-alive)的 HTTP 库:
patron
、typhoeus
等。
二、安装与最小示例
安装
gem install elastic-transport
# 或使用未发布版本(Bundler):
# gem 'elastic-transport', git: 'git@github.com:elastic/elastic-transport-ruby.git'
Hello, Transport
require 'elastic/transport'client = Elastic::Transport::Client.new # 默认 http://localhost:9200
resp = client.perform_request('GET', '_cluster/health')
puts resp.status
puts resp.body
返回的是
Elastic::Transport::Transport::Response
,可取body/status/headers
,也可当 Hash 使用。
三、Faraday 适配器与 keep-alive:性能关键点
支持的持久连接库
- Patron
- Typhoeus(需 v1.4.0+,老版本与 Faraday 1 不兼容)
- HTTPClient / Net::HTTPPersistent / Excon / Async::HTTP …
Faraday 1.x 用法
require 'patron' # 直接 require 库,Faraday 会自动识别并使用其 adapterclient = Elastic::Transport::Client.new
p client.transport.connections.first.connection.builder.adapter
# => Faraday::Adapter::Patron
Faraday 2.x 用法(注意差异)
Faraday 2 需要单独安装并 require 对应 adapter:
# Gemfile
gem 'faraday-patron'# 代码
require 'faraday'
require 'faraday/patron'client = Elastic::Transport::Client.new
指定 adapter(显式)
client = Elastic::Client.new(adapter: :net_http_persistent)# Faraday 2 若报 “:net_http_persistent is not registered”:
require 'faraday/net_http_persistent'
client = Elasticsearch::Client.new(adapter: :net_http_persistent)
经验法则:生产环境优先选择 Patron / Typhoeus,能显著降低连接建立开销,提升并发吞吐。
四、自定义 Faraday:transport_options 与配置块
方式 A:transport_options
client = Elastic::Client.new(transport_options: {request: { open_timeout: 1 },headers: { user_agent: 'MyApp' },params: { format: 'yaml' },ssl: { verify: false } # 仅在开发/测试使用}
)
方式 B:传入配置块(可使用任何 Faraday 中间件)
require 'patron'
client = Elastic::Client.new(host: 'localhost', port: '9200') do |f|f.response :loggerf.adapter :patron
end
五、更“底”的定制:自建/注入 Transport
预先构建 Transport 再交给 Client
require 'patron'transport_configuration = ->(f) dof.response :loggerf.adapter :patron
endtransport = Elastic::Transport::Transport::HTTP::Faraday.new(hosts: [{ host: 'localhost', port: '9200' }],&transport_configuration
)client = Elastic::Client.new(transport: transport)
运行时注入(切换到受保护的集群等)
faraday_configuration = ->(f) dof.instance_variable_set :@ssl, { verify: false } # 示例,生产别这么做f.adapter :excon
endfaraday_client = Elastic::Transport::Transport::HTTP::Faraday.new(hosts: [{host: 'my-protected-host', port: '443',user: 'USERNAME', password: 'PASSWORD', scheme: 'https'}],&faraday_configuration
)client = Elastic::Client.new # 先创建默认
client.transport = faraday_client # 再注入
使用捆绑的 Curb 传输实现
require 'curb'
require 'elastic/transport/transport/http/curb'client = Elastic::Client.new(transport_class: Elastic::Transport::Transport::HTTP::Curb
)# 自定义 Curb
transport = Elastic::Transport::Transport::HTTP::Curb.new(hosts: [{ host: 'localhost', port: '9200' }],&->(c) { c.verbose = true }
)
client = Elastic::Client.new(transport: transport)
自己写一个 Transport
实现 {Elastic::Transport::Transport::Base}
约定的接口,将类传入 transport_class:
或直接注入 client.transport = …
,即可完全接管请求管道。
六、连接池、节点发现与故障处理
- 连接选择策略:内置 round-robin、random,也可自定义 Selector。
- 失败与重试:自动维护“死连接”,可在错误时触发节点重载(基于集群状态)或按需重载。
- 嗅探(Sniffer):允许发现集群节点并加入连接池(按需开启)。
- 请求日志与追踪:可插任意兼容 Ruby logging 接口的 logger/tracer。
- 序列化:可自定义 Serializer(例如替换 JSON 序列化策略)。
这些能力由
Elastic::Transport::Transport
统筹,结合Connections::Collection/Connection/Selector
等组件协作完成。
七、常见错误与排障
-
Faraday 2 适配器未注册
报错:Faraday::Error: :net_http_persistent is not registered on Faraday::Adapter
处理:确保已安装并require 'faraday/net_http_persistent'
或对应 adapter(如faraday-patron
→require 'faraday/patron'
)。 -
Typhoeus 版本过低
需 v1.4.0+ 才与 Faraday 1 兼容。 -
SSL 验证
ssl: { verify: false }
仅限开发测试;生产请正确配置 CA 证书与 TLS。
八、实践清单:把吞吐和稳定性拉满
- 启用 keep-alive:优先
patron
/typhoeus
适配器。 - 合理超时:设置
open_timeout/read_timeout
,防止阻塞。 - 连接复用:尽量复用同一
client
实例(连接池才有价值)。 - 日志分级:生产环境避免全量
:logger
,仅在排障时打开。 - 按需嗅探:拓扑稳定时可关闭以减少额外请求;拓扑变动频繁时开启更稳。
- 错误重试:利用内置重试与死连接维护,避免单点异常放大。
九、内部架构
-
Elastic::Transport::Client
-
组合
Elastic::Transport::Transport
-
内含:
Connections
(连接池)、logger
、tracer
、serializer
、sniffer
-
Connections::Collection:若干
Connection
+ 一个 Selector- Connection:主机/端口/会话(持久连接)
- Selector:Round-robin / Random / 自定义
-
Serializer:请求/响应的编解码
-
Sniffer:发现集群节点
-
Response:包装 ES JSON 响应(
body/status/headers
)
-
-
十、观察 keep-alive 效果
require 'faraday'
require 'faraday/patron' # Faraday 2
require 'elastic/transport'client = Elastic::Transport::Client.new10.times doclient.nodes.stats(metric: 'http')['nodes'].values.each do |n|puts "#{n['name']} : #{n['http']['total_opened']}"end
end
# 持续输出的 total_opened 基本不增长,说明连接被复用(keep-alive 生效)