使用Java多线程和POI进行Elasticsearch大批量数据导出
近期遇到一个需求,要求导出elasticsearch 8.x中的数据到Excel,这个需求存在一个比较难搞的问题,就是数据量大了,导出会超时,最好的做法是异步导出,然后通知前端去下载,前端可以异步的去查询导出进度或者后台轮询,等待服务端导出成功后,直接下载文件。笔者这边由于前端排期较满,所以才用了同步的方式,采用同步的方式导出,前端需要增加同步等待时间,而且会一直阻塞在导出页面,为了加快导出速度,笔者采用了多线程去做优化,最终将五万条记录数据的导出限制在40秒左右,这里的五万是elasticsearch 中的五万条记录,换算成Excel中的行数大概35万行左右,这里需要注意下,如果服务器的硬件配置(
主要是核心数和内存大小
)有限,建议还是采用异步导出,然后再下载的方式,避免OOM
。下面展开说明:
读取Elasticsearch数据
分页查询方式
Elasticsearch 的查询结果返回上限默认受 max_result_window
参数限制,其默认值为 10,000 条。但需根据不同的查询方式区分具体行为:
1. 默认分页查询(from + size
)的 10,000 条限制
-
机制:
当使用传统的分页方式(from
和size
参数)时,from + size
的总和不能超过max_result_window
的默认值 10,000。
例如:GET /your_index/_search { "from": 9000, "size": 1000, "query": { ... } }
此时
from + size = 10,000
,查询会成功;但若from + size > 10,000
,Elasticsearch 会抛出异常。 -
是否分页均受限制:
- 即使不显式分页(例如
from=0, size=15000
),只要size
超过max_result_window
,也会触发限制。 - 不指定
size
时:默认返回 10 条结果(与max_result_window
无关)。
- 即使不显式分页(例如
2. 绕过默认限制的其他查询方式
以下方法不受 max_result_window
限制,但需注意适用场景:
(1) Scroll API
- 用途:适合离线导出大数据(如全量数据迁移或批量处理)。
- 机制:创建快照式游标,分批次拉取数据。
- 限制:
- 不支持实时性要求高的场景(数据可能过期)。
- 需要手动清理 Scroll 上下文。
(2) Search After
- 用途:实时分页(如无限滚动列表)。
- 机制:基于上一页的排序值(如时间戳、唯一 ID)定位下一页。
- 限制:
- 需要指定唯一排序字段(确保分页顺序稳定)。
- 不支持跳页(只能连续翻页)。
3. 关键区别总结
查询方式 | 是否受 max_result_window 限制 | 适用场景 | 性能影响 |
---|---|---|---|
from + size | ✅ 是(默认 10,000) | 浅分页(前几百页) | 深度分页时性能极差 |
Scroll API | ❌ 否 | 大数据离线导出 | 资源占用高,需手动清理 |
Search After | ❌ 否 | 实时深分页(如无限滚动) | 高效,依赖排序字段 |
4. 是否需要调整 max_result_window
?
-
若必须使用
from + size
:
需通过以下方式修改索引设置:PUT /your_index/_settings { "index": { "max_result_window": 100000 # 调整为更大的值 } }
但需注意:
- 深度分页(如
from=99999
)会导致性能骤降(每个分片需遍历所有匹配文档)。 - 可能触发内存溢出(OOM),尤其是查询结果包含大字段时。
- 深度分页(如
-
推荐替代方案:
优先使用 Search After 或 Scroll API,避免直接修改max_result_window
。例如:// Search After 示例 GET /your_index/_search { "size": 1000, "query": { ... }, "sort": [ { "timestamp": "desc" }, { "_id": "asc" } // 确保排序唯一性 ], "search_after": [ "2023-10-01T00:00:00", "abc123" ] // 上一页最后一条的排序值 }
5. 在 Spring Boot 中的配置建议
- 使用
RestHighLevelClient
或ElasticsearchTemplate
:
直接调用 Scroll 或 Search After 接口,而非依赖from + size
。例如:// Search After 示例(Spring Data Elasticsearch) NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() .withQuery(QueryBuilders.matchAllQuery()) .withSort(SortBuilders.fieldSort("timestamp").order(SortOrder.DESC)) .withSort(SortBuilders.fieldSort("_id").order(SortOrder.ASC)) .withPageable(Pageable.unpaged()) // 禁用传统分页 .build(); SearchHits<YourDocument> hits = elasticsearchTemplate.search(searchQuery, YourDocument.class); Object[] lastSortValues = hits.getSearchHit(hits.size() - 1).getSortValues(); // 下一次查询时传入 search_after searchQuery.setSearchAfter(lastSortValues);
小结
- 默认限制:Elasticsearch 的
from + size
分页查询默认最多返回 10,000 条,无论是否显式分页。 - 绕过限制:使用 Scroll API 或 Search After 可突破此限制,但需根据场景选择合适方案。
- 性能优先:避免盲目调大
max_result_window
,优先优化查询逻辑或使用高效分页机制。
游标查询( Scroll API)方式
在 Elasticsearch 8.x 及更高版本中,官方推荐使用新的 Java API Client(elasticsearch-java
库)替代旧的 RestHighLevelClient
。以下是基于新客户端(ElasticsearchClient
)使用 Scroll API 的完整代码示例和步骤:
1. 添加依赖
确保 pom.xml
或 build.gradle
中包含最新版本的 Elasticsearch Java 客户端:
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.12.0</version> <!-- 检查最新版本 -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version> <!-- 匹配 Elasticsearch 版本 -->
</dependency>
2. 使用 Scroll API 的完整代码
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import java.io.IOException;
import java.util.List;
public class ScrollApiExample {
public void scrollData(ElasticsearchClient client, String indexName) throws IOException {
// 1. 初始化 Scroll 请求
SearchRequest searchRequest = SearchRequest.of(s -> s
.index(indexName)
.query(q -> q.matchAll(m -> m)) // 查询所有文档
.size(1000) // 每批次拉取 1000 条
.scroll(sb -> sb.time("1m")) // Scroll 有效期 1 分钟
);
// 2. 发送初始请求,获取 scrollId 和第一批数据
SearchResponse<Object> response = client.search(searchRequest, Object.class);
String scrollId = response.scrollId();
List<Hit<Object>> hits = response.hits().hits();
// 处理第一批数据
processHits(hits);
// 3. 循环拉取后续批次
while (hits != null && !hits.isEmpty()) {
// 构建 Scroll 请求
ScrollRequest scrollRequest = ScrollRequest.of(s -> s
.scrollId(scrollId)
.scroll(sb -> sb.time("1m")) // 续期 Scroll 有效期
);
// 发送请求获取下一批数据
SearchResponse<Object> scrollResponse = client.scroll(scrollRequest, Object.class);
scrollId = scrollResponse.scrollId(); // 更新 scrollId
hits = scrollResponse.hits().hits();
// 处理当前批次数据
processHits(hits);
}
// 4. 清理 Scroll 上下文
if (scrollId != null) {
ClearScrollRequest clearRequest = ClearScrollRequest.of(c -> c.scrollId(scrollId));
client.clearScroll(clearRequest);
}
}
private void processHits(List<Hit<Object>> hits) {
for (Hit<Object> hit : hits) {
Object source = hit.source(); // 获取文档内容(类型需与实际数据匹配)
System.out.println("Document: " + source);
}
}
}
3. 关键参数说明
参数/方法 | 说明 |
---|---|
.size(1000) | 每批次拉取的文档数(默认 10 )。 |
.scroll(s -> s.time("1m")) | 设置 Scroll 上下文的存活时间(如 1m 表示 1 分钟)。 |
client.search() | 发送初始搜索请求,返回第一批数据和 scrollId 。 |
client.scroll() | 根据 scrollId 获取下一批数据。 |
client.clearScroll() | 清理 Scroll 上下文,释放资源。 |
4. 注意事项
-
数据类型匹配:
示例中使用了Object.class
泛型,实际应根据索引文档的 Java 类型替换(如User.class
)。SearchResponse<User> response = client.search(searchRequest, User.class);
-
错误处理:
- 添加
try-catch
块处理IOException
或 Elasticsearch 异常。 - 确保在异常时仍清理 Scroll 上下文(避免资源泄漏)。
- 添加
-
性能优化:
- 根据数据量调整
size
(如5000
),但需权衡内存消耗。 - 避免在 Scroll 存活时间内处理过慢,导致上下文过期。
- 根据数据量调整
-
实时性限制:
Scroll API 基于数据快照,后续写入可能不会反映在结果中。若需实时遍历,改用 Search After。
5. 结合 Search After 实现高效分页
若需要实时分页,可改用 Search After(需指定唯一排序字段):
SearchRequest searchRequest = SearchRequest.of(s -> s
.index(indexName)
.query(q -> q.matchAll(m -> m))
.size(1000)
.sort(so -> so.field(f -> f.field("timestamp").order(SortOrder.Desc))) // 排序字段
.sort(so -> so.field(f -> f.field("_id").order(SortOrder.Asc))) // 确保唯一性
);
SearchResponse<User> response = client.search(searchRequest, User.class);
List<Hit<User>> hits = response.hits().hits();
// 获取最后一行的排序值
List<JsonData> lastSort = hits.get(hits.size() - 1).sort();
// 下次查询时传入 search_after
SearchRequest nextPageRequest = SearchRequest.of(s -> s
.index(indexName)
.query(q -> q.matchAll(m -> m))
.size(1000)
.searchAfter(lastSort) // 指定 search_after
.sort(so -> so.field(f -> f.field("timestamp").order(SortOrder.Desc)))
.sort(so -> so.field(f -> f.field("_id").order(SortOrder.Asc)))
);
小结
- Scroll API:适合离线大数据导出,但需手动管理上下文和内存。
- Search After:适合实时深分页,依赖唯一排序字段。
- 新客户端特性:Elasticsearch Java API Client 提供类型安全的 DSL,代码更简洁。
多线程处理ES数据
笔者这里采用了生产消费模式
,采用游标查询(Scroll API)
的方式,从ES中拉取数据放入阻塞队列供数据处理线程即消费者去处理,处理完成后放入线程安全的集合进行数据合并。
//每轮拉取的数据条数
int pullSize = 2000;
BoolQuery.Builder boolQueryBuilder = new BoolQuery.Builder();
if (!StringUtils.isEmpty(queryCondition.getFieldCondition())) {
if (!StringUtils.isEmpty(queryCondition.getFieldCondition().getId())) {
boolQueryBuilder.must(new Query.Builder()
.term(new TermQuery.Builder()
.field("Id")
.value(queryCondition.getFieldCondition().getId())
.build())
.build());
}
if ((!StringUtils.isEmpty(queryCondition.getTimeRangeStart())) && (!StringUtils.isEmpty(queryCondition.getTimeRangeEnd()))) {
//查询时间范围
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
//sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
//查询开始日期时间戳
long startTimeStamp = 0;
//查询结束日期时间戳
ZonedDateTime endTime = null;
try {
startTimeStamp = sdf.parse(queryCondition.getTimeRangeStart()).getTime();//0点
endTime = sdf.parse(queryCondition.getTimeRangeEnd())
.toInstant()
.atZone(ZoneId.of("Asia/Shanghai"))
.plusDays(1);
} catch (ParseException e) {
log.error("Time parse error :" + e.getMessage());
//throw new BusiException(e.getMessage());
}
long endTimeStamp = Date.from(endTime.toInstant()).getTime();
//查询时间范围
boolQueryBuilder.must(new Query.Builder().range(
new RangeQuery.Builder()
.field("timestamp")
.gte(JsonData.of(startTimeStamp))
.lt(JsonData.of(endTimeStamp))
.build())
.build());
}
}
//加入状态为 "END"
boolQueryBuilder.must(new Query.Builder()
.term(new TermQuery.Builder()
.field("status")
.value(Constant.END)
.build())
.build());
SearchRequest request = new SearchRequest.Builder()
.index(elasticProperties.getEsIndex())
.query(new Query.Builder()
.bool(boolQueryBuilder.build())
.build())
.searchType(SearchType.DfsQueryThenFetch)
.scroll(s -> s.time("1m"))//Scroll有效期
.size(pullSize)//每批次拉取2000条记录
.build();
//消费线程数
int consumerThreads = 8;
// 创建一个有界队列用于生产者和消费者之间的数据传递
BlockingQueue<List<Hit<Dialog>>> hitsBatchQueue = new LinkedBlockingQueue<>(consumerThreads * 2);
// 停止标志
AtomicBoolean done = new AtomicBoolean(false);
CountDownLatch producerLatch = new CountDownLatch(1);
CountDownLatch consumersLatch = new CountDownLatch(consumerThreads);
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(consumerThreads + 1);
// 启动生产者线程 - 负责执行scroll请求获取数据
executorService.submit(() -> {
String scrollId = null;
int batchesProduced = 0;
int totalHits = 0;
try {
SearchResponse<Dialog> searchResponse = elasticsearchClient.search(request, Dialog.class);
if (searchResponse != null) {
scrollId = searchResponse.scrollId();
log.debug(searchResponse.toString());
List<Hit<Dialog>> hits = searchResponse.hits().hits();
//第一批数据放入消费者队列
if ((hits != null) && (!hits.isEmpty())) {
//放入生产队列
hitsBatchQueue.put(hits);
totalHits += hits.size();
batchesProduced++;
log.info("生产者: 初始批次,获取记录数:{} ", hits.size());
}
// 继续scroll直到没有更多数据或达到最大记录数
while ((scrollId != null) && (hits != null) && (!hits.isEmpty()) && (totalHits < maxExportSize)) {
//构建Scroll请求
final String currentScrollId = scrollId;
ScrollRequest scrollRequest = ScrollRequest.of(s -> s.scrollId(currentScrollId).scroll(st -> st.time("1m")));// 续期 Scroll 有效期
ScrollResponse<AimcDialog> scrollResponse = elasticsearchClient.scroll(scrollRequest, AimcDialog.class);
log.debug(scrollResponse.toString());
// 更新scrollId
scrollId = scrollResponse.scrollId();
if ((scrollId != null) && (!scrollId.equals(currentScrollId))) {
clearScroll(currentScrollId);
}
hits = scrollResponse.hits().hits();
if ((hits != null) && (!hits.isEmpty())) {
//放入生产队列
hitsBatchQueue.put(hits);
totalHits += hits.size();
batchesProduced++;
//if (batchesProduced % 10 == 0) {
log.info("生产者: 已生产 " + batchesProduced + " 批次,总记录数: " + totalHits);
//}
} else {
break;
}
// 检查是否已达到最大记录数
if (totalHits >= maxExportSize) {
log.info("已达到最大记录数限制,停止生产: " + totalHits);
break;
}
}
// 清理 Scroll 上下文
if (scrollId != null) {
String currentScrollId = scrollId;
ClearScrollRequest clearRequest = ClearScrollRequest.of(c -> c.scrollId(currentScrollId));
elasticsearchClient.clearScroll(clearRequest);
}
}
} catch (Exception e) {
log.error("生产者线程出错: " + e.getMessage());
} finally {
// 清理scroll上下文
if (scrollId != null) {
clearScroll(scrollId);
}
// 标记生产完成
done.set(true);
producerLatch.countDown();
}
});
//用来维护id和名字的映射
ConcurrentHashMap<String, String> names = new ConcurrentHashMap<>();
//线程锁
ConcurrentHashMap<String, ReentrantLock> botLocks = new ConcurrentHashMap<>();
//总共处理了多少查询结果
final AtomicInteger totalFetched = new AtomicInteger(0);
//用于返回的结果
List<Log> logs = Collections.synchronizedList(new ArrayList<>());
// 启动消费者线程 - 负责处理队列中的数据
for (int i = 0; i < consumerThreads; i++) {
final int consumerId = i;
final List<Log> consumerResults = new ArrayList<>();
executorService.submit(() -> {
try {
int processedCount = 0;
while (!done.get() || !hitsBatchQueue.isEmpty()) {
List<Hit<Dialog>> hits = hitsBatchQueue.poll(500, TimeUnit.MILLISECONDS);
if (hits != null) {
List<Log> logList = processHits(hits, names, botLocks);
consumerResults.addAll(logList);
processedCount += logList.size();
totalFetched.addAndGet(logList.size());
if (processedCount % pullSize == 0) {
log.info("消费者 " + consumerId + ": 已处理 " + processedCount + " 条记录");
}
}
}
log.info("消费者 " + consumerId + " 完成,共处理 " + processedCount + " 条记录");
// 同步添加结果
synchronized (logs) {
logCalls.addAll(consumerResults);
}
} catch (Exception e) {
log.error("消费者 " + consumerId + " 出错: " + e.getMessage());
e.printStackTrace();
} finally {
consumersLatch.countDown();
}
});
}
try {// 等待生产者完成
boolean producerCompleted = producerLatch.await(10, TimeUnit.MINUTES);
if (!producerCompleted) {
log.error("生产者线程未在超时时间内完成");
}
// 等待所有消费者完成
boolean consumersCompleted = consumersLatch.await(10, TimeUnit.MINUTES);
if (!consumersCompleted) {
log.error("部分消费者线程未在超时时间内完成");
}
// 关闭线程池
executorService.shutdown();
boolean b = executorService.awaitTermination(1, TimeUnit.MINUTES);
log.info("数据拉取完成,总获取记录数: " + totalFetched.get());
} catch (Exception e) {
log.error(e.getMessage());
e.printStackTrace();
}
之所以采用生产消费模式,是因为从ES中批量拉取数据速度很快,但是对于每批次的数据处理是一个耗时操作,同时,数据处理过程中可能还会涉及远程调用,而且对于多个批次的数据可以多线程并行处理,这样耗时会大大减少。接下来讲一讲已经处理好的数据进行多线程导出到Excel。
导出数据到Excel
首先引入依赖
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.1.0</version>
</dependency>
另外EasyExcel
的性能好像优于POI,感兴趣的可以尝试下
<!-- EasyExcel -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>3.3.2</version>
</dependency>
导出Excel
Excel工具类
工具类中将从ES批量拉取的数据做分割处理,按照设定的大小分成若干个sheet,多个sheet采用多线程并行执行导出操作
public void downloadExcel(HttpServletRequest req, HttpServletResponse resp, String filename, Workbook workbook) {
try {
beforeDownload(req, resp, filename);
OutputStream out = resp.getOutputStream();
workbook.write(out);
out.flush();
out.close();
} catch (IOException e) {
throw new BusiException(e.getMessage());
} finally {
if (workbook instanceof SXSSFWorkbook) {
((SXSSFWorkbook) workbook).dispose(); // 清理临时文件
}
}
}
public void beforeDownload(HttpServletRequest req, HttpServletResponse resp, String filename) throws UnsupportedEncodingException {
filename = URLEncoder.encode(filename, "UTF-8");
resp.setContentType("application/octet-stream;charset=UTF-8");
resp.setHeader("Content-Disposition", "attachment;filename=" + filename);
resp.addHeader("Pargam", "no-cache");
resp.addHeader("Cache-Control", "no-cache");
}
public <C, M> Workbook createExcel(List<C> calls, Class<C> callClass, Class<M> commuClass) throws Exception {
// 使用 SXSSFWorkbook 替代 HSSFWorkbook
SXSSFWorkbook workbook = new SXSSFWorkbook(1000); // 缓存 100 行
if (calls == null || calls.isEmpty()) {
workbook.createSheet();
return workbook;
}
// 分片
List<List<C>> splitLists = splistList(calls, 5000, maxExportSize);
ExecutorService executor = Executors.newFixedThreadPool(Math.min(splitLists.size(), Runtime.getRuntime().availableProcessors()));
List<Future<Sheet>> futures = new ArrayList<>(splitLists.size());
// 缓存表头和 getter
List<FieldInfo> callFields = getFieldInfos(callClass);
List<FieldInfo> commuFields = getFieldInfos(commuClass);
List<String> headers = callFields.stream().map(f -> f.header).collect(Collectors.toList());
List<String> subHeaders = commuFields.stream().map(f -> f.header).collect(Collectors.toList());
// 样式
CellStyle headerStyle = createHeaderStyle(workbook);
CellStyle dataStyle = createDataStyle(workbook);
CellStyle subHeaderStyle = createHeaderStyle(workbook);
subHeaderStyle.setFillForegroundColor(HSSFColor.HSSFColorPredefined.SKY_BLUE.getIndex());
// 多线程并行处理每个分片
for (int i = 0; i < splitLists.size(); i++) {
final List<C> subList = splitLists.get(i);
final int finalI = i;
futures.add(executor.submit(() -> {
Sheet sheet;
synchronized (workbook) {
sheet = workbook.createSheet("Sheet" + (finalI + 1));
}
processSheet(sheet, subList, callClass, commuClass, headers, subHeaders, headerStyle, dataStyle, subHeaderStyle);
log.info("{}导出完成", sheet.getSheetName());
return sheet;
}));
}
// 等待所有任务完成
for (Future<Sheet> future : futures) {
try {
future.get();
} catch (Exception e) {
log.error("当前线程失败:{}", future);
e.printStackTrace();
}
}
executor.shutdown();
boolean b = executor.awaitTermination(10, TimeUnit.MINUTES);
return workbook;
}
//分割数据集合
public <T> List<List<T>> splistList(List<T> list, int subNum, int maxExportSize) {
List<List<T>> tNewList = new ArrayList<>();
int size = Math.min(list.size(), maxExportSize);
for (int i = 0; i < size; i += subNum) {
int end = Math.min(i + subNum, size);
tNewList.add(list.subList(i, end));
}
return tNewList;
}
调用Excel工具类创建Excel
调用工具类创建工作簿,并传输到前端页面进行下载。
List<Log> logs = DialogDao4Es.exportLogCalls(queryCondition);
Workbook workbook = null;
try {
workbook = excelExportUtil.createExcel(logs, Log.class, LogCommu.class);
} catch (Exception e) {
log.error("工作簿创建失败:{}", e.getMessage());
e.printStackTrace();
}
excelExportUtil.downloadExcel(req, resp, "日志.xlsx", workbook);
到这里分享就结束了,欢迎大家一起交流,有什么问题 可以评论区留言。