当前位置: 首页 > news >正文

用好 Elasticsearch Ruby 传输层elastic-transport

一、它是什么?为什么重要?

  • elastic-transport:Elasticsearch 官方的 低层 Ruby 传输库

    • 负责:连接一个/多个节点、轮询与选择、失败节点维护、请求/响应日志与追踪、序列化、(可选)嗅探与节点重载、重试等。
    • 不负责:具体 API 的封装与调用(那是 elasticsearch gem 的事)。
  • 默认基于 Faraday 实现 HTTP 传输,支持 Faraday 1.x 与 2.x

  • 为了吞吐与延迟,建议使用支持持久连接(keep-alive)的 HTTP 库patrontyphoeus 等。

二、安装与最小示例

安装

gem install elastic-transport
# 或使用未发布版本(Bundler):
# gem 'elastic-transport', git: 'git@github.com:elastic/elastic-transport-ruby.git'

Hello, Transport

require 'elastic/transport'client = Elastic::Transport::Client.new # 默认 http://localhost:9200
resp = client.perform_request('GET', '_cluster/health')
puts resp.status
puts resp.body

返回的是 Elastic::Transport::Transport::Response,可取 body/status/headers,也可当 Hash 使用。

三、Faraday 适配器与 keep-alive:性能关键点

支持的持久连接库

  • Patron
  • Typhoeus(需 v1.4.0+,老版本与 Faraday 1 不兼容)
  • HTTPClient / Net::HTTPPersistent / Excon / Async::HTTP …

Faraday 1.x 用法

require 'patron'  # 直接 require 库,Faraday 会自动识别并使用其 adapterclient = Elastic::Transport::Client.new
p client.transport.connections.first.connection.builder.adapter
# => Faraday::Adapter::Patron

Faraday 2.x 用法(注意差异)

Faraday 2 需要单独安装并 require 对应 adapter

# Gemfile
gem 'faraday-patron'# 代码
require 'faraday'
require 'faraday/patron'client = Elastic::Transport::Client.new

指定 adapter(显式)

client = Elastic::Client.new(adapter: :net_http_persistent)# Faraday 2 若报 “:net_http_persistent is not registered”:
require 'faraday/net_http_persistent'
client = Elasticsearch::Client.new(adapter: :net_http_persistent)

经验法则:生产环境优先选择 Patron / Typhoeus,能显著降低连接建立开销,提升并发吞吐。

四、自定义 Faraday:transport_options 与配置块

方式 A:transport_options

client = Elastic::Client.new(transport_options: {request: { open_timeout: 1 },headers: { user_agent: 'MyApp' },params:  { format: 'yaml' },ssl:     { verify: false } # 仅在开发/测试使用}
)

方式 B:传入配置块(可使用任何 Faraday 中间件)

require 'patron'
client = Elastic::Client.new(host: 'localhost', port: '9200') do |f|f.response :loggerf.adapter  :patron
end

五、更“底”的定制:自建/注入 Transport

预先构建 Transport 再交给 Client

require 'patron'transport_configuration = ->(f) dof.response :loggerf.adapter  :patron
endtransport = Elastic::Transport::Transport::HTTP::Faraday.new(hosts: [{ host: 'localhost', port: '9200' }],&transport_configuration
)client = Elastic::Client.new(transport: transport)

运行时注入(切换到受保护的集群等)

faraday_configuration = ->(f) dof.instance_variable_set :@ssl, { verify: false } # 示例,生产别这么做f.adapter :excon
endfaraday_client = Elastic::Transport::Transport::HTTP::Faraday.new(hosts: [{host: 'my-protected-host', port: '443',user: 'USERNAME', password: 'PASSWORD', scheme: 'https'}],&faraday_configuration
)client = Elastic::Client.new         # 先创建默认
client.transport = faraday_client    # 再注入

使用捆绑的 Curb 传输实现

require 'curb'
require 'elastic/transport/transport/http/curb'client = Elastic::Client.new(transport_class: Elastic::Transport::Transport::HTTP::Curb
)# 自定义 Curb
transport = Elastic::Transport::Transport::HTTP::Curb.new(hosts: [{ host: 'localhost', port: '9200' }],&->(c) { c.verbose = true }
)
client = Elastic::Client.new(transport: transport)

自己写一个 Transport

实现 {Elastic::Transport::Transport::Base} 约定的接口,将类传入 transport_class: 或直接注入 client.transport = …,即可完全接管请求管道。


六、连接池、节点发现与故障处理

  • 连接选择策略:内置 round-robin、random,也可自定义 Selector。
  • 失败与重试:自动维护“死连接”,可在错误时触发节点重载(基于集群状态)或按需重载。
  • 嗅探(Sniffer):允许发现集群节点并加入连接池(按需开启)。
  • 请求日志与追踪:可插任意兼容 Ruby logging 接口的 logger/tracer。
  • 序列化:可自定义 Serializer(例如替换 JSON 序列化策略)。

这些能力由 Elastic::Transport::Transport 统筹,结合 Connections::Collection/Connection/Selector 等组件协作完成。

七、常见错误与排障

  • Faraday 2 适配器未注册
    报错:Faraday::Error: :net_http_persistent is not registered on Faraday::Adapter
    处理:确保已安装并 require 'faraday/net_http_persistent' 或对应 adapter(如 faraday-patronrequire 'faraday/patron')。

  • Typhoeus 版本过低
    v1.4.0+ 才与 Faraday 1 兼容。

  • SSL 验证
    ssl: { verify: false } 仅限开发测试;生产请正确配置 CA 证书与 TLS。

八、实践清单:把吞吐和稳定性拉满

  1. 启用 keep-alive:优先 patron/typhoeus 适配器。
  2. 合理超时:设置 open_timeout/read_timeout,防止阻塞。
  3. 连接复用:尽量复用同一 client 实例(连接池才有价值)。
  4. 日志分级:生产环境避免全量 :logger,仅在排障时打开。
  5. 按需嗅探:拓扑稳定时可关闭以减少额外请求;拓扑变动频繁时开启更稳。
  6. 错误重试:利用内置重试与死连接维护,避免单点异常放大。

九、内部架构

  • Elastic::Transport::Client

    • 组合 Elastic::Transport::Transport

      • 内含:Connections(连接池)、loggertracerserializersniffer

      • Connections::Collection:若干 Connection + 一个 Selector

        • Connection:主机/端口/会话(持久连接)
        • Selector:Round-robin / Random / 自定义
      • Serializer:请求/响应的编解码

      • Sniffer:发现集群节点

      • Response:包装 ES JSON 响应(body/status/headers

十、观察 keep-alive 效果

require 'faraday'
require 'faraday/patron' # Faraday 2
require 'elastic/transport'client = Elastic::Transport::Client.new10.times doclient.nodes.stats(metric: 'http')['nodes'].values.each do |n|puts "#{n['name']} : #{n['http']['total_opened']}"end
end
# 持续输出的 total_opened 基本不增长,说明连接被复用(keep-alive 生效)
http://www.dtcms.com/a/340525.html

相关文章:

  • Redisson3.14.1及之后连接阿里云redis代理模式,使用分布式锁:ERR unknown command ‘WAIT‘
  • python中selenium怎么使用
  • KUKA机器人KUKA.ConveyorTech传送带跟踪程序举例解析
  • Python采集易贝(eBay)商品详情API接口,json数据返回
  • 今日科技风向|从AI芯片定制到阅兵高科技展示——聚焦技术前沿洞察
  • MySQL 数据库知识点与注意事项总结
  • spring整合JUnit
  • 阿里云ECS服务器的公网IP地址
  • WPF Alert弹框控件 - 完全使用指南
  • Non-stationary Diffusion For Probabilistic Time Series Forecasting论文阅读笔记
  • LoRa 网关与节点组网方案
  • 基于Java虚拟线程的高并发作业执行框架设计与性能优化实践指南
  • 【Bluedroid】A2DP Source 端会话启动流程与核心机制解析(btif_a2dp_source_start_session)
  • UIGestureRecognizer 各个子类以及其作用
  • iOS开发之UICollectionView为什么需要配合UICollectionViewFlowLayout使用
  • 氯化钇:科技与高性能材料的核心元素
  • C++高频知识点(三十)
  • 嵌入式音频开发(3)- AudioService核心功能
  • 机器学习数学基础与商业实践指南:从统计显著性到预测能力的认知升级
  • Node.js中的Prisma应用:现代数据库开发的最佳实践
  • 河南萌新联赛2025第六场 - 郑州大学
  • Java:将视频上传到腾讯云并通过腾讯云点播播放
  • 【Task02】:四步构建简单rag(第一章3节)
  • 第三阶段数据-4:SqlHelper类,数据库删除,DataTable创建
  • 【考研408数据结构-08】 图论基础:存储结构与遍历算法
  • Opencv模板匹配
  • 27.语言模型
  • Java + 工业物联网 / 智慧楼宇 面试问答模板
  • C#APP.Config配置文件解析
  • 案例分享:BRAV-7123助力家用型人形机器人,智能生活未来已来