elasticsearch8.1.0 中聚合功能的执行链路
在 Elasticsearch 8.1.0 中,聚合(Aggregations)功能的执行链路可以分为“请求解析 → 分片级聚合 → 协调节点汇总 → 结果返回”四个阶段。下面按时间顺序给出**一次 terms 桶聚合**在源码层面的关键调用路径(涉及的重要类与方法),方便你快速定位代码。所有类均位于 `org.elasticsearch.search.aggregations` 或其子包下。
--------------------------------
1. 请求解析与 DSL 构建
- `SearchModule.registerAggregations()` – 启动时将内置聚合解析器注册到 NamedXContentRegistry。
- `AggregationsAggregationBuilder.parse()` – Rest 层解析 `aggs` 段,生成 `TermsAggregationBuilder`。
- `SearchSourceBuilder#aggregation(AB)` – 把聚合构建器挂到 search request 上。
--------------------------------
2. Coordinator(接收节点)准备阶段
- `TransportSearchAction.doExecute()` – 判断是否需要跨分片,创建 `SearchTask`。
- `SearchService#parseSource()` → `AggregatorFactories.Builder.build()` – 将 `TermsAggregationBuilder` 编译为 `AggregatorFactory`(真正执行对象)。
- `SearchService.createContext()` – 构建 `SearchContext`,内部持有 `AggregatorFactories`。
--------------------------------
3. 分片级聚合(Shard request)
- `SearchService.executeQueryPhase()` – 对每个分片启动 `QueryPhase`。
- `QueryPhase#execute()` – 先走 `Lucene.search()` 拿到 `LeafReaderContext` 集合,再进入聚合链路:
– `AggregationPhase.preProcess()` – 创建 `Aggregator[]` 数组,调用 `AggregatorFactory.create()` 得到 `TermsAggregator`。
– `AggregatorBase.getLeafCollector()` – 为每个 segment 生成 `LeafBucketCollector`。
– `LeafBucketCollector.collect()` – 每收集一条 doc,就执行 `TermsAggregator.collectExistingBucket()`,更新 `LongHash`(bucketOrds)计数。
- 分片聚合完成后,调用 `Aggregator.buildTopLevel()` → `InternalTerms.buildEmpty()` 得到该分片的 `InternalAggregation` 结果。
--------------------------------
4. 协调节点汇总(Reduce)
- `TransportSearchAction$AsyncSearchAction.onShardResponse()` – 收集各分片返回的 `QuerySearchResult`。
- `SearchPhaseController#merge()` – 把多个 `InternalTerms` 放进 `ReduceContext`。
- `InternalTerms.reduce()` – 按 key 合并桶、累加 `doc_count`,并递归调用子聚合的 `reduce()`。
- 最终生成全局的 `InternalTerms`(已排序、裁剪 size),塞进 `SearchResponse.aggregations`。
--------------------------------
5. 结果返回
- `RestSearchAction.buildResponse()` – 将 `InternalAggregation` 序列化为 XContent,通过 REST 返回给用户。
--------------------------------
常用调试点速查
- 想看分片级计数:`TermsAggregator.collectExistingBucket()`
- 想看桶合并逻辑:`InternalTerms.reduce()`
- 想看桶排序/裁剪:`TermsAggregator.buildResult()` → `buildBucket()` → `OrderedTopNBuilder`
通过以上链路,即可从 DSL 入口一路跟踪到单分片聚合、跨分片 reduce 及最终返回的完整流程。