用 Node.js 玩转 Elasticsearch从安装到增删改查
一、要求与准备
- Node.js ≥ 14.x(建议 LTS)
- npm(随 Node 安装)
- 一个可访问的 Elasticsearch(Elastic Cloud 或自建)
项目初始化:
mkdir es-node-starter && cd $_
npm init -y
npm i @elastic/elasticsearch
二、连接 Elasticsearch
推荐:Elastic Cloud + API Key(最省心、最安全)
把敏感信息放到环境变量里更稳。
# .env(示例)
ELASTIC_NODE=https://<your-endpoint> # “My deployment” 页面里复制
ELASTIC_API_KEY_ID=<id>
ELASTIC_API_KEY_SECRET=<api_key>
// es.js
const { Client } = require('@elastic/elasticsearch')const client = new Client({node: process.env.ELASTIC_NODE,auth: {apiKey: {id: process.env.ELASTIC_API_KEY_ID,api_key: process.env.ELASTIC_API_KEY_SECRET}},requestTimeout: 30_000 // SDK 级超时,按需调整
})module.exports = { client }
自建集群可用
nodes: ['https://es1:9200', ...] + basic auth
,务必启用 HTTPS 与最小权限账号。
三、基础操作:CRUD + 搜索
下面用一个脚本,顺序演示建索引 → 写入 → 读取 → 搜索 → 更新 → 删除文档 → 删除索引。
// demo.js
const { client } = require('./es')const INDEX = 'my_index'
const ID = 'my_document_id'async function main() {// 1) 创建索引await client.indices.create({ index: INDEX }).catch(e => {if (e.meta?.body?.error?.type !== 'resource_already_exists_exception') throw e})// 2) 写入一条文档await client.index({index: INDEX,id: ID,document: { foo: 'foo', bar: 'bar' }// refresh: 'wait_for' // 演示可开;生产写多查少请勿每次刷新})// 3) 获取文档const got = await client.get({ index: INDEX, id: ID })console.log('GET =>', got._source)// 4) 搜索(match 查询)const found = await client.search({index: INDEX,query: { match: { foo: 'foo' } },size: 10})console.log('SEARCH hits =>', found.hits.hits.map(h => h._source))// 5) 更新文档(部份字段)await client.update({index: INDEX,id: ID,doc: { foo: 'bar', new_field: 'new value' }})// 6) 删除文档await client.delete({ index: INDEX, id: ID })// 7) 删除索引await client.indices.delete({ index: INDEX })
}main().catch(err => {console.error(err)process.exit(1)
})
运行:
node demo.js
四、进阶小贴士(避免线上踩坑)
- 刷新策略:写入后立刻可搜到,需要
refresh
,但它代价不小。批量写入(日志/ETL)建议关闭即时刷新,改为按索引刷新间隔或在批次末尾手动refresh
。 - 分页:小页用
from/size
,深分页换search_after
,别把from
拉得过大。 - 命中总数:如果不需要精确总数,别设
track_total_hits: true
;可用阈值(如1000
)折中性能。 - 超时与取消:SDK 有
requestTimeout
;查询体里还能设"timeout": "2s"
(分片级收集时间)。前者防整体阻塞,后者防尾延迟。 - 可观测:用
client.diagnostic
订阅请求/响应/错误事件,输出 took、状态码与错误信息,记得脱敏 header。 - 批量写:大规模写入用
helpers.bulk
(内置分批、重试),吞吐友好且代码少。 - 类型:用 TypeScript 会舒服很多:自动补全、错误提前暴露。
五、常见扩展片段
Bulk 写入(高效吞吐):
const { helpers } = require('@elastic/elasticsearch')async function bulkIndex(client, docs) {await helpers.bulk(client, {datasource: docs, // 数组或 async iterableonDocument: (doc) => ({ index: { _index: 'my_index', _id: doc.id } }),flushBytes: 5 * 1024 * 1024, // 每批 ~5MBretries: 3})
}
ES|QL(可读性更好的查询语法):
await client.esql.query({query: `FROM my_index | WHERE foo == "bar" | LIMIT 10`
})
Async Search(长查询不阻塞):
const sub = await client.asyncSearch.submit({index: 'my_index',query: { match_all: {} },wait_for_completion_timeout: '2s',keep_on_completion: true
})
const res = await client.asyncSearch.get({ id: sub.id, wait_for_completion_timeout: '1s' })
六、目录结构建议(小项目)
.
├─ .env
├─ es.js # 客户端实例
├─ demo.js # 教程脚本(CRUD/搜索演示)
└─ package.json
七、总结
- 安装客户端、用 API Key 连上集群,是跑通一切的前提。
- CRUD 与搜索 API 与 REST 一一对应,上手非常快。
- 生产里关注 刷新/分页/超时/批量/可观测 这些“隐藏成本”,性能和稳定性会好很多。
如果你告诉我运行环境(Cloud / 自建)和实际用例(搜索、日志、ETL、RAG),我可以把上面的示例打包成一个可直接运行的最小项目,顺便配好 ESLint/TS/脚本和简单的性能基准。