Elasticsearch批量写入50万数据
🚀 Elasticsearch 异步批量写入优化实践
—— 使用 BulkProcessor 提升 ES 写入性能
📖 一、背景介绍
在日志采集、行为分析等系统中,往往需要将大量数据高效写入 Elasticsearch。
传统同步写入方式(逐条 index())在高并发、大数据量场景下会严重拖慢性能。
为解决此问题,我们采用 BulkProcessor 异步批量写入机制,结合索引参数优化,实现 高效、稳定、可控 的数据入库流程。
⚙️ 二、核心思路
批量写入:将多条数据组装为一个 Bulk 请求,一次性提交。
异步执行:写入任务由线程池异步处理,不阻塞主线程。
自动 Flush:根据条数、数据大小或时间间隔自动触发提交。
索引调优:写入前临时关闭副本和刷新机制,写完再恢复。
🧩 三、核心工具类:AsyncEsWriter
public class AsyncEsWriter{private static final Logger log = LoggerFactory.getLogger(AsyncOriginalLogEsWriter.class);private final RestHighLevelClient client;private BulkProcessor bulkProcessor;private final String indexName;public AsyncOriginalLogEsWriter(RestHighLevelClient client, String indexName) {this.client = client;this.indexName = indexName;}/*** 初始化 BulkProcessor** @param batchSize 每批条数* @param concurrentRequests 并发请求数* @param bulkSizeMB 每批最大字节数*/public void initBulkProcessor(int batchSize, int concurrentRequests, int bulkSizeMB) throws Exception {// -------------------------------// 临时关闭刷新和副本UpdateSettingsRequest disableSettings = new UpdateSettingsRequest(indexName);disableSettings.settings(Settings.builder().put("index.refresh_interval", "-1").put("index.number_of_replicas", 0));client.indices().putSettings(disableSettings, RequestOptions.DEFAULT);log.info("索引 {} 设置已临时修改:refresh=-1, replicas=0", indexName);// -------------------------------// 构建 BulkProcessorbulkProcessor = BulkProcessor.builder((request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long executionId, BulkRequest request) {log.info("准备写入 {} 条数据", request.numberOfActions());}@Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {if (response.hasFailures()) {log.error("批量写入部分失败: {}", response.buildFailureMessage());} else {log.info("批量写入成功, 条数: {}, 耗时: {} ms",request.numberOfActions(), response.getTook().getMillis());}}@Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {log.error("批量写入异常: ", failure);}}).setBulkActions(batchSize) // 每批条数.setBulkSize(new ByteSizeValue(bulkSizeMB, ByteSizeUnit.MB)) // 每批最大字节.setConcurrentRequests(concurrentRequests) // 并发请求数.setFlushInterval(TimeValue.timeValueSeconds(5)) // 定时 flush.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)) // 失败重试.build();}/*** 异步写入数据*/public void write(List<OriginalLog> originalLogList) {if (originalLogList == null || originalLogList.isEmpty()) return;for (OriginalLog ol : originalLogList) {IndexRequest indexRequest = new IndexRequest(indexName).id(ol.getId()).source(JsonUtils.toJson(ol), XContentType.JSON);bulkProcessor.add(indexRequest);}}/*** 关闭 BulkProcessor 并恢复索引设置*/public void close() {try {if (bulkProcessor != null) {bulkProcessor.awaitClose(60, TimeUnit.SECONDS);}} catch (InterruptedException e) {log.error("关闭 BulkProcessor 被中断", e);Thread.currentThread().interrupt();}try {// 恢复刷新和副本UpdateSettingsRequest restoreSettings = new UpdateSettingsRequest(indexName);restoreSettings.settings(Settings.builder().put("index.refresh_interval", "1s").put("index.number_of_replicas", 1));client.indices().putSettings(restoreSettings, RequestOptions.DEFAULT);log.info("索引 {} 设置恢复完成:refresh=1s, replicas=1", indexName);} catch (Exception ex) {log.error("恢复索引设置失败", ex);}}
}
🔍 主要职责
初始化 BulkProcessor
批量异步写入数据
临时修改索引配置(提高写入速度)
写入结束后恢复索引状态
💡 四、代码详解
- 初始化 BulkProcessor
public void initBulkProcessor(int batchSize, int concurrentRequests, int bulkSizeMB) throws Exception {// 临时关闭刷新和副本,加速写入UpdateSettingsRequest disableSettings = new UpdateSettingsRequest(indexName);disableSettings.settings(Settings.builder().put("index.refresh_interval", "-1").put("index.number_of_replicas", 0));client.indices().putSettings(disableSettings, RequestOptions.DEFAULT);log.info("索引 {} 设置已临时修改:refresh=-1, replicas=0", indexName);// 构建 BulkProcessorbulkProcessor = BulkProcessor.builder((request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long id, BulkRequest req) {log.info("准备写入 {} 条数据", req.numberOfActions());}@Overridepublic void afterBulk(long id, BulkRequest req, BulkResponse res) {if (res.hasFailures()) {log.error("批量写入部分失败: {}", res.buildFailureMessage());} else {log.info("批量写入成功, 条数: {}, 耗时: {} ms",req.numberOfActions(), res.getTook().getMillis());}}@Overridepublic void afterBulk(long id, BulkRequest req, Throwable t) {log.error("批量写入异常: ", t);}}).setBulkActions(batchSize) // 每批条数.setBulkSize(new ByteSizeValue(bulkSizeMB, ByteSizeUnit.MB)) // 每批最大字节.setConcurrentRequests(concurrentRequests) // 并发批次数.setFlushInterval(TimeValue.timeValueSeconds(5)) // 定时 flush.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)) // 失败重试策略.build();
}
参数说明:
参数 含义 示例
batchSize 每批最大条数 5000
concurrentRequests 并发执行批次数 4
bulkSizeMB 每批最大字节数 50MB
flushInterval 定时自动提交间隔 5s
BackoffPolicy 重试策略 每1秒重试3次
- 异步写入方法
public void write(List<OriginalLog> originalLogList) {if (originalLogList == null || originalLogList.isEmpty()) return;for (OriginalLog ol : originalLogList) {IndexRequest request = new IndexRequest(indexName).id(ol.getId()).source(JsonUtils.toJson(ol), XContentType.JSON);bulkProcessor.add(request);}
}
每条数据都添加进 BulkProcessor,由其自动批量触发写入。
- 关闭与恢复
public void close() {try {if (bulkProcessor != null) {bulkProcessor.awaitClose(60, TimeUnit.SECONDS);}} catch (InterruptedException e) {log.error("关闭 BulkProcessor 被中断", e);Thread.currentThread().interrupt();}// 恢复原有索引设置try {UpdateSettingsRequest restore = new UpdateSettingsRequest(indexName);restore.settings(Settings.builder().put("index.refresh_interval", "1s").put("index.number_of_replicas", 1));client.indices().putSettings(restore, RequestOptions.DEFAULT);log.info("索引 {} 设置恢复完成:refresh=1s, replicas=1", indexName);} catch (Exception ex) {log.error("恢复索引设置失败", ex);}
}
✅ 注意:关闭时调用 awaitClose(),确保所有批量请求都已执行完毕。
🧪 五、调用示例
private void asynchronousProcessingOfInsertedData(List<Flow> flowList, List<Origina> originalList) {if (!originalList.isEmpty()) {try {AsyncOriginalLogEsWriter writer = new AsyncOriginalLogEsWriter(restHighLevelClient, "xxxlog");writer.initBulkProcessor(5000, 4, 50);writer.write(originalList);writer.close();} catch (Exception e) {log.error("originalLog 批量插入失败", e);}}if (!flowList.isEmpty()) {try {AsyncOriginalLogEsWriter writer = new AsyncOriginalLogEsWriter(restHighLevelClient, "xxxlog");writer.initBulkProcessor(5000, 4, 50);writer.write(flowList);writer.close();} catch (Exception e) {log.error("hk_flow_behavior 批量插入失败", e);}}
}
🚦 六、性能优化建议
优化项 建议值 说明
index.refresh_interval -1 during write, 1s after 关闭实时刷新,减少写入压力
index.number_of_replicas 0 during write, 1 after 写入时只写主分片
batchSize 3000~5000 根据单条数据大小调整
bulkSize 50~100 MB 控制单批数据包大小
concurrentRequests 2~8 根据 ES 节点 CPU 数决定
flushInterval 3~10s 防止数据堆积