当前位置: 首页 > news >正文

Elasticsearch批量写入50万数据

🚀 Elasticsearch 异步批量写入优化实践

—— 使用 BulkProcessor 提升 ES 写入性能

📖 一、背景介绍

在日志采集、行为分析等系统中,往往需要将大量数据高效写入 Elasticsearch。
传统同步写入方式(逐条 index())在高并发、大数据量场景下会严重拖慢性能。

为解决此问题,我们采用 BulkProcessor 异步批量写入机制,结合索引参数优化,实现 高效、稳定、可控 的数据入库流程。

⚙️ 二、核心思路

批量写入:将多条数据组装为一个 Bulk 请求,一次性提交。

异步执行:写入任务由线程池异步处理,不阻塞主线程。

自动 Flush:根据条数、数据大小或时间间隔自动触发提交。

索引调优:写入前临时关闭副本和刷新机制,写完再恢复。

🧩 三、核心工具类:AsyncEsWriter

public class AsyncEsWriter{private static final Logger log = LoggerFactory.getLogger(AsyncOriginalLogEsWriter.class);private final RestHighLevelClient client;private BulkProcessor bulkProcessor;private final String indexName;public AsyncOriginalLogEsWriter(RestHighLevelClient client, String indexName) {this.client = client;this.indexName = indexName;}/*** 初始化 BulkProcessor** @param batchSize          每批条数* @param concurrentRequests 并发请求数* @param bulkSizeMB         每批最大字节数*/public void initBulkProcessor(int batchSize, int concurrentRequests, int bulkSizeMB) throws Exception {// -------------------------------// 临时关闭刷新和副本UpdateSettingsRequest disableSettings = new UpdateSettingsRequest(indexName);disableSettings.settings(Settings.builder().put("index.refresh_interval", "-1").put("index.number_of_replicas", 0));client.indices().putSettings(disableSettings, RequestOptions.DEFAULT);log.info("索引 {} 设置已临时修改:refresh=-1, replicas=0", indexName);// -------------------------------// 构建 BulkProcessorbulkProcessor = BulkProcessor.builder((request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long executionId, BulkRequest request) {log.info("准备写入 {} 条数据", request.numberOfActions());}@Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {if (response.hasFailures()) {log.error("批量写入部分失败: {}", response.buildFailureMessage());} else {log.info("批量写入成功, 条数: {}, 耗时: {} ms",request.numberOfActions(), response.getTook().getMillis());}}@Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {log.error("批量写入异常: ", failure);}}).setBulkActions(batchSize) // 每批条数.setBulkSize(new ByteSizeValue(bulkSizeMB, ByteSizeUnit.MB)) // 每批最大字节.setConcurrentRequests(concurrentRequests) // 并发请求数.setFlushInterval(TimeValue.timeValueSeconds(5)) // 定时 flush.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)) // 失败重试.build();}/*** 异步写入数据*/public void write(List<OriginalLog> originalLogList) {if (originalLogList == null || originalLogList.isEmpty()) return;for (OriginalLog ol : originalLogList) {IndexRequest indexRequest = new IndexRequest(indexName).id(ol.getId()).source(JsonUtils.toJson(ol), XContentType.JSON);bulkProcessor.add(indexRequest);}}/*** 关闭 BulkProcessor 并恢复索引设置*/public void close() {try {if (bulkProcessor != null) {bulkProcessor.awaitClose(60, TimeUnit.SECONDS);}} catch (InterruptedException e) {log.error("关闭 BulkProcessor 被中断", e);Thread.currentThread().interrupt();}try {// 恢复刷新和副本UpdateSettingsRequest restoreSettings = new UpdateSettingsRequest(indexName);restoreSettings.settings(Settings.builder().put("index.refresh_interval", "1s").put("index.number_of_replicas", 1));client.indices().putSettings(restoreSettings, RequestOptions.DEFAULT);log.info("索引 {} 设置恢复完成:refresh=1s, replicas=1", indexName);} catch (Exception ex) {log.error("恢复索引设置失败", ex);}}
}

🔍 主要职责

初始化 BulkProcessor

批量异步写入数据

临时修改索引配置(提高写入速度)

写入结束后恢复索引状态

💡 四、代码详解

  1. 初始化 BulkProcessor
public void initBulkProcessor(int batchSize, int concurrentRequests, int bulkSizeMB) throws Exception {// 临时关闭刷新和副本,加速写入UpdateSettingsRequest disableSettings = new UpdateSettingsRequest(indexName);disableSettings.settings(Settings.builder().put("index.refresh_interval", "-1").put("index.number_of_replicas", 0));client.indices().putSettings(disableSettings, RequestOptions.DEFAULT);log.info("索引 {} 设置已临时修改:refresh=-1, replicas=0", indexName);// 构建 BulkProcessorbulkProcessor = BulkProcessor.builder((request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long id, BulkRequest req) {log.info("准备写入 {} 条数据", req.numberOfActions());}@Overridepublic void afterBulk(long id, BulkRequest req, BulkResponse res) {if (res.hasFailures()) {log.error("批量写入部分失败: {}", res.buildFailureMessage());} else {log.info("批量写入成功, 条数: {}, 耗时: {} ms",req.numberOfActions(), res.getTook().getMillis());}}@Overridepublic void afterBulk(long id, BulkRequest req, Throwable t) {log.error("批量写入异常: ", t);}}).setBulkActions(batchSize)                                   // 每批条数.setBulkSize(new ByteSizeValue(bulkSizeMB, ByteSizeUnit.MB)) // 每批最大字节.setConcurrentRequests(concurrentRequests)                   // 并发批次数.setFlushInterval(TimeValue.timeValueSeconds(5))             // 定时 flush.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)) // 失败重试策略.build();
}

参数说明:

参数 含义 示例

batchSize	每批最大条数	5000
concurrentRequests	并发执行批次数	4
bulkSizeMB	每批最大字节数	50MB
flushInterval	定时自动提交间隔	5s
BackoffPolicy	重试策略	每1秒重试3
  1. 异步写入方法
public void write(List<OriginalLog> originalLogList) {if (originalLogList == null || originalLogList.isEmpty()) return;for (OriginalLog ol : originalLogList) {IndexRequest request = new IndexRequest(indexName).id(ol.getId()).source(JsonUtils.toJson(ol), XContentType.JSON);bulkProcessor.add(request);}
}

每条数据都添加进 BulkProcessor,由其自动批量触发写入。

  1. 关闭与恢复
public void close() {try {if (bulkProcessor != null) {bulkProcessor.awaitClose(60, TimeUnit.SECONDS);}} catch (InterruptedException e) {log.error("关闭 BulkProcessor 被中断", e);Thread.currentThread().interrupt();}// 恢复原有索引设置try {UpdateSettingsRequest restore = new UpdateSettingsRequest(indexName);restore.settings(Settings.builder().put("index.refresh_interval", "1s").put("index.number_of_replicas", 1));client.indices().putSettings(restore, RequestOptions.DEFAULT);log.info("索引 {} 设置恢复完成:refresh=1s, replicas=1", indexName);} catch (Exception ex) {log.error("恢复索引设置失败", ex);}
}

✅ 注意:关闭时调用 awaitClose(),确保所有批量请求都已执行完毕。

🧪 五、调用示例

private void asynchronousProcessingOfInsertedData(List<Flow> flowList, List<Origina> originalList) {if (!originalList.isEmpty()) {try {AsyncOriginalLogEsWriter writer = new AsyncOriginalLogEsWriter(restHighLevelClient, "xxxlog");writer.initBulkProcessor(5000, 4, 50);writer.write(originalList);writer.close();} catch (Exception e) {log.error("originalLog 批量插入失败", e);}}if (!flowList.isEmpty()) {try {AsyncOriginalLogEsWriter writer = new AsyncOriginalLogEsWriter(restHighLevelClient, "xxxlog");writer.initBulkProcessor(5000, 4, 50);writer.write(flowList);writer.close();} catch (Exception e) {log.error("hk_flow_behavior 批量插入失败", e);}}
}

🚦 六、性能优化建议
优化项 建议值 说明

index.refresh_interval	-1 during write, 1s after	关闭实时刷新,减少写入压力
index.number_of_replicas	0 during write, 1 after	写入时只写主分片
batchSize	3000~5000	根据单条数据大小调整
bulkSize	50~100 MB	控制单批数据包大小
concurrentRequests	2~8	根据 ES 节点 CPU 数决定
flushInterval	3~10s	防止数据堆积
http://www.dtcms.com/a/507611.html

相关文章:

  • 爬取GitHub开源项目信息并生成词云:从数据抓取到可视化实践
  • 做阀门的网站域名有了怎么建设网站
  • 西安交大Nat. Commun:749.276 cm²认证效率19.50%,通过IEC测试迈向产线
  • 百度站长平台登录网站图片自动轮换怎么做的
  • KuiklyUI 科普:UI 如何映射到 Android View 并完成渲染
  • 【2025-系统规划与管理师】第11章:信息系统治理
  • Python中如何实现数据库迁移
  • 第6部分:使用Netty的常见坑与注意事项
  • 广东企业品牌网站建设价格免费做网站的方法
  • 家政小程序系统开发:打造便捷高效的家政服务平台
  • CVE-2025-57833研究分析
  • 基于西门子proneta软件的网络设备台账自动管理软件
  • 深入大模型-12-Python虚拟环境的管理venv和uv和conda
  • DINOv2分类网络onnxruntime和tensorrt部署
  • 医疗网站建设网站wordpress别名时间戳
  • YOLOv3 深度解析:网络架构、核心改进与目标检测实践
  • 数据防泄露(DLP)综合指南:从基础到实践
  • 福鼎网站开发深圳市工程交易服务网
  • 电厂VR安全事故体验系统:让着火体验从 “看见” 变 “亲历”
  • 万网建设网站wordpress伪静态 page
  • 大模型训练显存优化全方案:ZeRO、Offload与重计算技术对比
  • 推客小程序系统开发:从0技术架构与实现细节深度解析
  • YOLOv4 知识点总结
  • 常用的建站工具有哪些体育台球直播
  • 什么网站可以找试卷做备案 个人网站建设方案书
  • okx欧易注册与量化设置
  • 飞牛os上的docker容器安装MySQL
  • 时序数据库选型指南:从大数据视角看Apache IoTDB的核心优势
  • UART串口通讯协议
  • 深入解析 YOLOv4:兼顾速度与精度的目标检测王者