当前位置: 首页 > news >正文

Search After+PIT 解决ES深度分页问题

1.深分页和search after 原理 

深分页 (from/size)search_after
数据定位全局排序后跳过前 N 条基于上一页最后一条的排序值定位
排序开销每次请求重新全局排序 (O(N))仅首次全局排序,后续游标跳转 (O(1))
内存消耗堆内存存储完整结果集 (高风险OOM)无堆内存累积 (安全)
分页深度限制from + size ≤ 10000 (默认限制)无深度限制

2. 性能对比 

分页深度 
深分页响应时间 
search_after响应时间
1100ms100ms
100300ms110ms
10001500ms120ms
10000
超时/报错
130ms
3. 适用场景
深分页search_after
典型场景小数据量随机跳页大数据量连续翻页(如日志流)
排序要求任意排序字段必须指定唯一排序字段(如时间戳+ID)
跳页能力支持任意页跳转仅支持顺序翻页

4. Maven依赖配置

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-java</artifactId><version>8.12.0</version>
</dependency>

5.ES分页服务类

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;@Slf4j
@Service
public class EsSearchAfterService {private final ElasticsearchClient esClient;// 初始化ES客户端(通过构造函数注入)public EsSearchAfterService(ElasticsearchClient esClient) {this.esClient = esClient;}/*** 分页查询(支持深度分页)* @param indexName 索引名称* @param query 查询条件* @param sortField 主排序字段* @param pageSize 每页大小* @param pitId 时间点ID(首次查询传null)* @param searchAfter 分页游标(首次查询传null)* @return 分页结果(包含数据、下次分页游标、新的pitId)*/public PageResult<Object> searchWithPagination(String indexName,Query query,String sortField,int pageSize,String pitId,List<JsonData> searchAfter) throws IOException {// 1. 创建或延续PIT上下文String currentPitId = pitId;if (currentPitId == null) {CreatePitResponse pitResponse = esClient.createPit(c -> c.index(indexName).keepAlive(a -> a.time("30m")));currentPitId = pitResponse.id();log.info("Created new PIT: {}", currentPitId);}try {// 2. 构建SearchRequestSearchRequest.Builder searchBuilder = new SearchRequest.Builder().size(pageSize).query(query).pit(p -> p.id(currentPitId).keepAlive(a -> a.time("30m"))).sort(s -> s.field(f -> f.field(sortField).order(SortOrder.Desc))).sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc)));if (searchAfter != null && !searchAfter.isEmpty()) {searchBuilder.searchAfter(searchAfter);}// 3. 执行查询SearchResponse<Object> response = esClient.search(searchBuilder.build(), Object.class);// 4. 解析结果List<Object> data = new ArrayList<>();List<JsonData> nextSearchAfter = Collections.emptyList();if (response.hits().hits() != null && !response.hits().hits().isEmpty()) {List<Hit<Object>> hits = response.hits().hits();for (Hit<Object> hit : hits) {data.add(hit.source());}// 获取最后一个排序值nextSearchAfter = hits.get(hits.size() - 1).sort();}return new PageResult<>(data, nextSearchAfter, currentPitId);} catch (Exception e) {// 清理无效PITif (currentPitId != null && !currentPitId.equals(pitId)) {esClient.deletePit(d -> d.id(currentPitId));}throw new RuntimeException("ES查询失败", e);}}/*** 关闭PIT上下文*/public void closePit(String pitId) throws IOException {if (pitId != null && !pitId.isEmpty()) {DeletePitResponse response = esClient.deletePit(d -> d.id(pitId));log.info("Closed PIT {}: {}", pitId, response.succeeded());}}// 分页结果封装类public record PageResult<T>(List<T> data,List<JsonData> nextSearchAfter,String pitId) {}
}

6. 业务层使用示例

@Service
@RequiredArgsConstructor
public class OrderQueryService {private final EsSearchAfterService esService;public PaginatedOrders queryOrders(int pageSize, String lastPitId, List<JsonData> lastSearchAfter) {try {// 1. 构建查询条件Query query = Query.of(q -> q.bool(b -> b.must(m -> m.term(t -> t.field("status").value("paid")))));// 2. 执行分页查询PageResult<Object> result = esService.searchWithPagination("orders",query,"order_time",pageSize,lastPitId,lastSearchAfter);// 3. 转换为业务DTOList<OrderDTO> orders = convertToDTO(result.data());return new PaginatedOrders(orders, result.nextSearchAfter(), result.pitId());} catch (IOException e) {throw new BusinessException("订单查询失败", e);}}// DTO转换逻辑(示例)private List<OrderDTO> convertToDTO(List<Object> esSources) {// 实现实际的转换逻辑return Collections.emptyList();}// 分页结果DTOpublic record PaginatedOrders(List<OrderDTO> orders,List<JsonData> nextSearchAfter,String pitId) {}
}

7. 控制器层实现

@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderController {private final OrderQueryService orderService;@GetMappingpublic ResponseEntity<?> getOrders(@RequestParam(defaultValue = "20") int size,@RequestParam(required = false) String pitId,@RequestParam(required = false) List<String> searchAfter) {try {// 转换前端传来的searchAfter参数List<JsonData> searchAfterParams = Optional.ofNullable(searchAfter).orElse(Collections.emptyList()).stream().map(JsonData::of).toList();PaginatedOrders result = orderService.queryOrders(size, pitId, searchAfterParams);return ResponseEntity.ok().header("X-PIT-ID", result.pitId()).body(Map.of("data", result.orders(),"next_search_after", result.nextSearchAfter()));} catch (BusinessException e) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Map.of("error", e.getMessage()));}}@DeleteMapping("/pit/{pitId}")public ResponseEntity<?> closePitContext(@PathVariable String pitId) {try {orderService.closePit(pitId);return ResponseEntity.ok().build();} catch (Exception e) {return ResponseEntity.internalServerError().body(Map.of("error", "PIT关闭失败"));}}
}

8. 生产环境关键配置

elasticsearch:hosts: localhost:9200username: elasticpassword: your_passwordconnection-timeout: 30ssocket-timeout: 60s

9.ES客户端配置类

@Configuration
public class EsConfig {@Value("${elasticsearch.hosts}")private String hosts;@Beanpublic RestClient restClient() {return RestClient.builder(HttpHost.create(hosts)).setRequestConfigCallback(builder ->builder.setConnectTimeout(30000).setSocketTimeout(60000)).build();}@Beanpublic ElasticsearchClient elasticsearchClient(RestClient restClient) {ElasticsearchTransport transport = new RestClientTransport(restClient,new JacksonJsonpMapper());return new ElasticsearchClient(transport);}
}

10. 前端交互示例

无限滚动实现(React)

import React, { useState, useEffect } from 'react';const OrderList = () => {const [orders, setOrders] = useState([]);const [pitId, setPitId] = useState(null);const [searchAfter, setSearchAfter] = useState(null);const [loading, setLoading] = useState(false);const loadMore = async () => {setLoading(true);try {const params = new URLSearchParams({size: 20,...(pitId && { pitId }),...(searchAfter && { searchAfter: JSON.stringify(searchAfter) })});const response = await fetch(`/api/orders?${params}`);const { data, next_search_after } = await response.json();setOrders(prev => [...prev, ...data]);setSearchAfter(next_search_after);setPitId(response.headers.get('X-PIT-ID'));} finally {setLoading(false);}};// 组件卸载时清理PITuseEffect(() => {return () => {if (pitId) {fetch(`/api/orders/pit/${pitId}`, { method: 'DELETE' });}};}, [pitId]);return (<div>{/* 订单列表渲染 */}<button onClick={loadMore} disabled={loading}>{loading ? '加载中...' : '加载更多'}</button></div>);
};

相关文章:

  • GoFly企业版框架升级2.6.6版本说明(框架在2025-05-06发布了)
  • 不同大模型对提示词和问题的符号标识
  • 深入解析华为交换机中的VRRP原理
  • Linux 安装交叉编译器后丢失 `<asm/errno.h>` 的问题及解决方案
  • .idea和__pycache__文件夹分别是什么意思
  • Spark-Core(双Value类型)
  • 边缘计算,运维架构从传统的集中式向分布式转变
  • 亿级流量系统架构设计与实战(五)
  • Python 识别图片上标点位置
  • NVM完全指南:安装、配置与最佳实践
  • stm32常见错误
  • 网站网页经常 400 错误,清缓存后就好了的原因剖析
  • Python赋能自动驾驶:如何优化路径规划,让AI驾驶更聪明?
  • AI驱动的Kubernetes管理:kubectl-ai 如何简化你的云原生运维
  • SpringBoot3集成Mybatis
  • iPhone 和 Android 在日期格式方面的区别
  • 报表的那些事:四部演进史——架构视角下的技术跃迁与实战思考
  • java中try..catch如何捕捉超时的情况
  • LeetCode:对称二叉树
  • 编程日志4.27
  • “行人相撞案”现场视频公布,法院:表述不当造成误导
  • 巴西总统卢拉将访华
  • 国家主席习近平在莫斯科出席红场阅兵式
  • 蔡达峰:推动食品安全法全面有效实施,为维护人民群众身体健康提供有力法治保障
  • “上海之帆”巡展在日本大阪开幕,松江区组织企业集体出展
  • 教育部、国家发改委联合启动实施教师教育能力提升工程