当前位置: 首页 > news >正文

【PostgreSQL内核学习:哈希聚合(HashAgg)执行流程与函数调用关系分析】

PostgreSQL内核学习:HashAgg

  • 引言
  • 简单案例描述:使用哈希聚合处理 DISTINCT 查询
    • 背景
    • SQL 查询和数据集
    • 执行计划分解
    • 哈希聚合的工作原理总结
    • 关键观察点:为什么选择 HashAgg?
  • 源码解读
    • create_agg_plan 函数
      • 查询计划与函数的对应关系
      • 总结
    • ExecInitAgg 函数
      • `ExecInitAgg` 函数的大致流程
      • `ExecInitAgg` 函数的细化流程
      • Mermaid 流程图
    • ExecAgg 函数
      • `ExecAgg` 函数的细化流程
      • 示例查询关联
      • Mermaid 流程图(细化分支逻辑)
    • agg_fill_hash_table 函数
      • 示例查询关联
      • Mermaid 流程图(细化分支逻辑)
    • lookup_hash_entries 函数
      • 示例查询关联
      • Mermaid 流程图(原始逻辑)
    • hashagg_finish_initial_spills 函数
      • 示例查询关联
      • Mermaid 流程图(原始逻辑)
    • agg_retrieve_hash_table 函数
      • 示例查询关联
      • Mermaid 流程图(原始逻辑)
    • agg_retrieve_hash_table_in_memory 函数
      • 示例查询关联
      • Mermaid 流程图(原始逻辑)
    • agg_retrieve_direct 函数
      • 示例查询关联
      • Mermaid 流程图(原始逻辑)
  • 总结
    • 函数名称与中文操作解释
    • 示例查询关联(`SELECT DISTINCT a FROM t1 LIMIT 10`)

声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-18 beta2 的开源代码和《PostgresSQL数据库内核分析》一书

引言

  在关系型数据库的世界中,高效处理涉及分组聚合的查询对于性能至关重要,尤其是在处理大规模数据集时。PostgreSQL 作为一个强大的开源数据库,提供了多种执行聚合查询的策略,其中之一就是哈希聚合(Hash Aggregation,通常称为 HashAgg)。这种机制通过使用内存中的哈希表来处理 GROUP BYDISTINCT 以及聚合函数(如 COUNTSUMAVG 等),在特定场景下展现出卓越的性能。
  哈希聚合在分组数量相对较少、输入数据量较大的情况下表现尤为出色,相比其他策略(如基于排序的 GroupAggregate),它在某些场景下能够显著提升性能。然而,其效率受到内存可用性、数据分布以及查询特性的影响。深入理解哈希聚合的工作原理、内部实现以及优缺点,对于数据库开发者、管理员以及希望优化查询性能的用户来说至关重要。
  本文档旨在为 PostgreSQL 的哈希聚合提供一个全面的介绍。我们将探讨其核心概念、PostgreSQL 源代码中的实现细节、执行流程以及调优和优化的实际建议。无论你是 PostgreSQL 的贡献开发者、优化查询的数据库管理员,还是对数据库内部机制感兴趣的学习者,本文档都将为你提供深入理解和有效利用 HashAgg 的知识。

简单案例描述:使用哈希聚合处理 DISTINCT 查询

背景

  在 PostgreSQL 中,SELECT DISTINCT 是一种常见的查询操作,用于从数据集中提取唯一值。当查询涉及 DISTINCT 且不包含 ORDER BY 或复杂的聚合函数时,PostgreSQL 优化器通常会选择哈希聚合(HashAgg) 策略,通过构建内存中的哈希表来高效去除重复值。本案例将通过一个具体的 SQL 查询,展示哈希聚合的工作原理,并分析其查询计划。

SQL 查询和数据集

DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (a int, b text);INSERT INTO t1
SELECT (i % 100), md5(i::text)
FROM generate_series(1, 1000) g(i);
ANALYZE t1;-- 查询 SELECT DISTINCT a FROM t1 LIMIT 10 的目标是从 t1 表中提取 a 列的唯一值,并限制返回前 10 个结果。
-- 预期结果:返回 10 个不同的 a 值(例如 0 到 9),具体顺序可能因哈希聚合的无序性而变化。
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT DISTINCT a FROM t1 LIMIT 10;
Limit  (cost=21.50..21.60 rows=10 width=4) (actual time=0.723..0.728 rows=10 loops=1)Output: aBuffers: shared hit=9->  HashAggregate  (cost=21.50..22.50 rows=100 width=4) (actual time=0.721..0.725 rows=10 loops=1)Output: aGroup Key: t1.aBatches: 1  Memory Usage: 24kBBuffers: shared hit=9->  Seq Scan on public.t1  (cost=0.00..19.00 rows=1000 width=4) (actual time=0.018..0.198 rows=1000 loops=1)Output: a, bBuffers: shared hit=9Planning:Buffers: shared hit=5Planning Time: 0.131 msExecution Time: 0.767 ms

执行计划分解

  1. 顶层节点:Limit

    • 作用:限制输出结果为前 10 行。
    • 成本:估计成本为 21.50..21.60,实际执行时间为 0.723..0.728 ms,返回 10 行。
    • 说明Limit 节点从其子节点(HashAggregate)获取数据,一旦收集到 10 行就停止处理,体现了 PostgreSQL 的优化能力(避免处理不必要的数据)。
  2. 中间节点HashAggregate

    • 作用:执行 DISTINCT 操作,通过哈希聚合去除 a 列的重复值。
    • 成本:估计成本为 21.50..22.50,实际执行时间为 0.721..0.725 ms
    • 关键信息
      • Group Key: t1.a:聚合基于 a 列进行,哈希表以 a 的值作为键。
      • Batches: 1:表示所有数据都在内存中处理,未发生磁盘溢出(spill to disk)。
      • Memory Usage: 24kB:哈希表使用了 24kB 的内存,远低于默认的 work_mem(通常为 4MB),说明数据量较小,内存足以容纳所有唯一值。
    • 说明HashAggregate 构建一个哈希表,将 a 列的每个唯一值存储为键。由于 Limit 节点限制了输出,HashAggregate 在收集到 10 个唯一值后可能提前终止(如果优化器支持早停优化)。
  3. 底层节点Seq Scan

    • 作用:对表 t1 执行顺序扫描,读取所有 1000 行数据。
    • 成本:估计成本为 0.00..19.00,实际执行时间为 0.018..0.198 ms
    • 缓冲区Buffers: shared hit=9 表示读取了 9 个缓冲块(每个块 8kB),数据可能已缓存。
    • 说明:顺序扫描适合小表或无索引的场景,这里从 t1 读取所有行并传递给 HashAggregate
  4. 规划和执行时间

    • Planning Time: 0.131 ms:查询规划时间短,表明优化器快速选择了合适的计划。
    • Execution Time: 0.767 ms:总执行时间非常短,适合小规模数据。

哈希聚合的工作原理总结

  哈希聚合(HashAgg)是 PostgreSQL 处理 GROUP BYDISTINCT 查询的一种高效策略,通过在内存中构建哈希表来实现快速分组和去重。以示例查询 SELECT DISTINCT a FROM t1 LIMIT 10 为例,HashAgg 的工作机制包括以下步骤:首先,底层 Seq Scan 节点顺序读取表 t11000 行数据,提取 a 列值;然后HashAggregate 节点为 a 列的每个值计算哈希值,插入到一个以 a 值作为键的哈希表中,若值已存在则跳过,从而实现去重;最后Limit 节点从哈希表中获取前 10 个唯一值后停止处理。本例中,哈希表仅占用 24kB 内存(远低于 work_mem),无需磁盘溢出,执行时间仅为 0.725 毫秒,展现了哈希聚合在小规模唯一值场景下的高效性。

关键观察点:为什么选择 HashAgg?

  优化器选择 HashAgg 的原因在于其对 DISTINCT 查询的高效支持和本例查询特性的契合。首先,查询使用 DISTINCT 且无 ORDER BY,适合用哈希表快速去重,而非基于排序的 GroupAggregate其次,统计信息(通过 ANALYZE t1)显示 a 列有约 100 个唯一值,数量较小,哈希表内存占用低(24kB),无需磁盘溢出此外Limit 10 允许提前终止,进一步减少处理开销。相比排序策略,HashAgg 在无序输出和小规模分组场景下具有更低的计算复杂度和内存需求,因此成为优化器的首选。

源码解读

create_agg_plan 函数

  以下是为 create_agg_plan 函数添加中文注释的代码版本,并结合示例查询 EXPLAIN (ANALYZE, BUFFERS, VERBOSE) SELECT DISTINCT a FROM t1 LIMIT 10; 解释每行代码的作用。注释将清晰说明函数的逻辑,并与查询计划关联,帮助大家来理解哈希聚合(HashAgg)计划的生成过程。

/** create_agg_plan** 创建一个 Agg 计划节点,用于实现最佳路径(best_path)及其子路径的递归计划。* 参数:*   root - 规划器的全局信息,包含查询上下文和统计信息。*   best_path - 优化器选择的最佳聚合路径(AggPath)。* 返回值:*   Agg* - 创建的聚合计划节点。* *   作用:create_agg_plan 是优化器中生成聚合计划的入口函数。root 包含查询的全局上下文(如 t1 表的统计信息),best_path 是优化器选择的最佳路径(本例中为 AggPath,指定了 HashAggregate 策略)。*   查询关联:对于 SELECT DISTINCT a FROM t1 LIMIT 10,优化器分析查询后确定使用 HashAggregate(因为 DISTINCT 无 ORDER BY),并通过 AggPath 传递聚合策略和子路径(Seq Scan)。*/
static Agg *
create_agg_plan(PlannerInfo *root, AggPath *best_path)
{Agg		   *plan;          // 聚合计划节点Plan	   *subplan;       // 子计划(底层扫描或连接计划)List	   *tlist;         // 目标列表(输出列)List	   *quals;         // 限定条件(WHERE 或 HAVING 子句)/** Agg 节点可以投影输出,因此对子计划的目标列表要求不严格,* 但需要确保分组列(grouping columns)在子计划中可用。*/subplan = create_plan_recurse(root, best_path->subpath, CP_LABEL_TLIST);/** 构建 Agg 计划节点的目标列表,基于最佳路径的输出列。*/tlist = build_path_tlist(root, &best_path->path);/** 对限定条件(qual)进行排序和优化,确保执行时高效。*/quals = order_qual_clauses(root, best_path->qual);/** 调用 make_agg 创建 Agg 计划节点,设置聚合策略、分组列等属性。* 参数说明:*   tlist - 目标列表,定义输出列。*   quals - 限定条件(如 HAVING)。*   aggstrategy - 聚合策略(例如 AGG_HASHED 或 AGG_SORTED)。*   aggsplit - 聚合拆分级别(例如普通聚合或最终聚合)。*   list_length(best_path->groupClause) - 分组列数量。*   extract_grouping_cols - 提取分组列的索引。*   extract_grouping_ops - 提取分组操作符(如 =)。*   extract_grouping_collations - 提取分组列的排序规则。*   NIL - 分组集(grouping sets),本例中为空。*   NIL - 链式聚合节点(chained agg nodes),本例中为空。*   numGroups - 估计的分组数量。*   transitionSpace - 聚合过渡状态的内存需求。*   subplan - 子计划(例如 Seq Scan)。*/plan = make_agg(tlist, quals,best_path->aggstrategy,best_path->aggsplit,list_length(best_path->groupClause),extract_grouping_cols(best_path->groupClause,subplan->targetlist),extract_grouping_ops(best_path->groupClause),extract_grouping_collations(best_path->groupClause,subplan->targetlist),NIL,NIL,best_path->numGroups,best_path->transitionSpace,subplan);/** 将最佳路径的通用信息(如成本、行数估计)复制到 Agg 计划节点。*/copy_generic_path_info(&plan->plan, (Path *) best_path);/** 返回创建的 Agg 计划节点。*/return plan;
}

  打印堆栈信息:

(gdb) bt
#0  create_agg_plan (root=0x1bed5e8, best_path=0x1cea0e0) at createplan.c:2313
#1  0x000000000088f61f in create_plan_recurse (root=0x1bed5e8, best_path=0x1cea0e0, flags=1) at createplan.c
:510
#2  0x00000000008939d8 in create_limit_plan (root=0x1bed5e8, best_path=0x1cea470, flags=1) at createplan.c:2
914
#3  0x000000000088f6cd in create_plan_recurse (root=0x1bed5e8, best_path=0x1cea470, flags=1) at createplan.c
:537
#4  0x000000000088f277 in create_plan (root=0x1bed5e8, best_path=0x1cea470) at createplan.c:350
#5  0x00000000008a2f2b in standard_planner (parse=0x1becbd8, query_string=0x1bebc30 "SELECT DISTINCT a FROM
t1 LIMIT 10;", cursorOptions=2048, boundParams=0x0) at planner.c:421
#6  0x00000000008a2c84 in planner (parse=0x1becbd8, query_string=0x1bebc30 "SELECT DISTINCT a FROM t1 LIMIT
10;", cursorOptions=2048, boundParams=0x0) at planner.c:282
#7  0x00000000009e3cfb in pg_plan_query (querytree=0x1becbd8, query_string=0x1bebc30 "SELECT DISTINCT a FROMt1 LIMIT 10;", cursorOptions=2048, boundParams=0x0) at postgres.c:912
#8  0x00000000009e3e8e in pg_plan_queries (querytrees=0x1bed598, query_string=0x1bebc30 "SELECT DISTINCT a F
ROM t1 LIMIT 10;", cursorOptions=2048, boundParams=0x0) at postgres.c:1006
#9  0x00000000009e420d in exec_simple_query (query_string=0x1bebc30 "SELECT DISTINCT a FROM t1 LIMIT 10;") a
t postgres.c:1203
#10 0x00000000009e8cb9 in PostgresMain (dbname=0x1c24050 "postgres", username=0x1c24038 "kuchiki") at postgr
es.c:4766
#11 0x00000000009e089e in BackendMain (startup_data=0x7fff47d0585c "", startup_data_len=4) at backend_startu
p.c:107
#12 0x000000000090d681 in postmaster_child_launch (child_type=B_BACKEND, startup_data=0x7fff47d0585c "", sta
rtup_data_len=4, client_sock=0x7fff47d05880) at launch_backend.c:274
#13 0x0000000000912a76 in BackendStartup (client_sock=0x7fff47d05880) at postmaster.c:3414
#14 0x0000000000910549 in ServerLoop () at postmaster.c:1648
#15 0x000000000090ff12 in PostmasterMain (argc=3, argv=0x1be67e0) at postmaster.c:1346
#16 0x00000000007d1de4 in main (argc=3, argv=0x1be67e0) at main.c:197

  以下是 create_agg_plan 函数中每行代码如何与该查询的计划生成相关联的详细解释:

  1. 函数签名和参数

    static Agg *create_agg_plan(PlannerInfo *root, AggPath *best_path)
    
    • 作用create_agg_plan 是优化器中生成聚合计划的入口函数。root 包含查询的全局上下文(如 t1 表的统计信息),best_path 是优化器选择的最佳路径(本例中为 AggPath,指定了 HashAggregate 策略)。
    • 查询关联:对于 SELECT DISTINCT a FROM t1 LIMIT 10,优化器分析查询后确定使用 HashAggregate(因为 DISTINCTORDER BY),并通过 AggPath 传递聚合策略和子路径(Seq Scan)。
  2. 变量声明

    Agg *plan; Plan *subplan; List *tlist; List *quals;
    
    • 作用:声明变量用于存储聚合计划节点 (plan)、子计划 (subplan)、目标列表 (tlist) 和限定条件 (quals)。
    • 查询关联plan 将成为 HashAggregate 节点,subplan 对应 Seq Scan on public.t1tlist 包含输出列 aquals 本例中为空(无 HAVINGWHERE)。
  3. 创建子计划

    subplan = create_plan_recurse(root, best_path->subpath, CP_LABEL_TLIST);
    
    • 作用:递归调用 create_plan_recurse 为子路径(best_path->subpath)生成计划。CP_LABEL_TLIST 确保子计划的目标列表包含分组所需的列(本例中为 t1.a)。
    • 查询关联:子路径是一个 Seq Scan 路径,生成 Seq Scan on public.t1 节点,读取表 t11000 行数据(rows=1000),输出列 ab(尽管 b 未在 DISTINCT 中使用)。
  4. 构建目标列表

    tlist = build_path_tlist(root, &best_path->path);
    
    • 作用:基于 best_path 的路径信息,构建 Agg 计划节点的目标列表,定义输出列。
    • 查询关联:目标列表包含 t1.aOutput: a),因为查询仅需要 DISTINCT a 的结果。build_path_tlist 确保输出符合 SELECT DISTINCT a 的要求。
  5. 优化限定条件

    quals = order_qual_clauses(root, best_path->qual);
    
    • 作用:对限定条件(best_path->qual)进行排序和优化,确保执行时高效。本例中 qual 为空(无 WHEREHAVING)。
    • 查询关联:查询没有限定条件,因此 quals 为空,HashAggregate 直接处理 Seq Scan 的输出。
  6. 创建 Agg 节点

    plan = make_agg(tlist, quals,best_path->aggstrategy,best_path->aggsplit,list_length(best_path->groupClause),extract_grouping_cols(best_path->groupClause, subplan->targetlist),extract_grouping_ops(best_path->groupClause),extract_grouping_collations(best_path->groupClause, subplan->targetlist),NIL,NIL,best_path->numGroups,best_path->transitionSpace,subplan);
    
    • 作用:调用 make_agg 创建 Agg 计划节点,设置聚合策略、分组列、操作符、排序规则、分组数量、过渡空间等属性。
    • 查询关联
      • tlist:包含 t1.a
      • quals:为空。
      • aggstrategyAGG_HASHED(对应 HashAggregate)。
      • aggsplit:普通聚合(无拆分)。
      • list_length(best_path->groupClause):分组列数量为 1(t1.a)。
      • extract_grouping_cols:提取 t1.a 的列索引(Group Key: t1.a)。
      • extract_grouping_ops:分组操作符为 =
      • extract_grouping_collations:排序规则(本例中为默认)。
      • NIL:无分组集或链式聚合。
      • numGroups:估计 100 个唯一值(rows=100)。
      • transitionSpace:聚合过渡状态的内存需求(本例中较小)。
      • subplanSeq Scan 节点。
    • 结果:生成 HashAggregate 节点,计划输出 100 行(rows=100),实际因 Limit 10 仅输出 10 行。
  7. 复制路径信息

    copy_generic_path_info(&plan->plan, (Path *) best_path);
    
    • 作用:将 best_path 的通用信息(如成本、行数估计)复制到 Agg 计划节点,确保计划的成本和行数预测准确。
    • 查询关联HashAggregate 的成本为 21.50..22.50,行数估计为 100,与 best_path 的估计一致。
  8. 返回计划节点

    return plan;
    
    • 作用:返回创建的 Agg 计划节点,供上层节点(如 Limit)使用。
    • 查询关联HashAggregate 节点被 Limit 节点包裹,最终形成查询计划。

查询计划与函数的对应关系

  1. 整体流程

    • create_agg_plan 是优化器中生成 HashAggregate 节点的核心函数。示例查询的 DISTINCT a 触发了 AggPath 的选择(AGG_HASHED 策略),create_agg_plan 将其转换为可执行的 Agg 计划节点。
    • 查询计划的 HashAggregate 节点由 make_agg 创建,Seq Scan 节点由 create_plan_recurse 生成。
  2. 关键点

    • 子计划生成create_plan_recurse 生成了 Seq Scan on public.t1,读取 1000 行数据。
    • 目标列表tlist 确保 HashAggregate 输出 t1.a,符合 SELECT DISTINCT a
    • 聚合策略best_path->aggstrategy = AGG_HASHED 决定了使用哈希表去重,计划显示 Batches: 1, Memory Usage: 24kB

总结

  通过为 create_agg_plan 添加中文注释,并结合示例查询 SELECT DISTINCT a FROM t1 LIMIT 10,我们可以清晰地看到 PostgreSQL 如何将优化器的 AggPath 转换为可执行的 HashAggregate 计划节点。

ExecInitAgg 函数

  ExecInitAggPostgreSQL 中用于初始化聚合节点(Agg)运行时状态(AggState)的核心函数,负责为聚合操作(如 GROUP BYDISTINCT 或聚合函数)设置执行环境。它处理哈希聚合(HashAgg)、排序聚合(SortedAgg)或混合模式(MixedAgg),并初始化子计划、表达式上下文、哈希表和聚合函数的运行时数据。以下是为 ExecInitAgg 函数添加中文注释的代码版本

/** ExecInitAgg** 创建聚合节点的运行时状态(AggState),并初始化其子计划树。* 参数:*   node - 规划器生成的聚合计划节点(Agg)*   estate - 执行状态,包含查询的全局上下文*   eflags - 执行标志,控制节点行为* 返回值:*   AggState* - 初始化的聚合节点运行时状态*/
AggState *
ExecInitAgg(Agg *node, EState *estate, int eflags)
{AggState   *aggstate;          // 聚合节点的运行时状态AggStatePerAgg peraggs;        // 每个聚合函数的状态数组AggStatePerTrans pertransstates; // 每个转换状态的状态数组AggStatePerGroup *pergroups;   // 每个分组的状态数组Plan	   *outerPlan;         // 子计划(外层计划)ExprContext *econtext;         // 表达式上下文TupleDesc	scanDesc;          // 输入元组的描述符int			max_aggno;         // 最大聚合函数编号int			max_transno;       // 最大转换状态编号int			numaggrefs;        // 聚合函数引用总数int			numaggs;           // 唯一聚合函数数量int			numtrans;          // 唯一转换状态数量int			phase;             // 当前阶段int			phaseidx;          // 阶段索引ListCell   *l;                 // 列表迭代器Bitmapset  *all_grouped_cols = NULL; // 所有分组列的位图集int			numGroupingSets = 1; // 分组集数量,默认为 1int			numPhases;         // 阶段总数int			numHashes;         // 哈希聚合阶段数量int			i = 0;             // 循环计数器int			j = 0;             // 循环计数器bool		use_hashing = (node->aggstrategy == AGG_HASHED ||node->aggstrategy == AGG_MIXED); // 是否使用哈希聚合/* 检查不支持的执行标志 */Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));/** 创建聚合状态结构*/aggstate = makeNode(AggState); // 分配并初始化 AggState 节点aggstate->ss.ps.plan = (Plan *) node; // 设置计划节点aggstate->ss.ps.state = estate; // 设置执行状态aggstate->ss.ps.ExecProcNode = ExecAgg; // 设置执行函数为 ExecAggaggstate->aggs = NIL;          // 初始化聚合函数列表为空aggstate->numaggs = 0;         // 初始化聚合函数数量为 0aggstate->numtrans = 0;        // 初始化转换状态数量为 0aggstate->aggstrategy = node->aggstrategy; // 设置聚合策略(如 AGG_HASHED)aggstate->aggsplit = node->aggsplit; // 设置聚合拆分模式aggstate->maxsets = 0;         // 初始化最大分组集数量aggstate->projected_set = -1;  // 初始化当前投影分组集aggstate->current_set = 0;     // 初始化当前分组集aggstate->peragg = NULL;       // 初始化每个聚合函数的状态aggstate->pertrans = NULL;     // 初始化每个转换状态aggstate->curperagg = NULL;    // 初始化当前聚合函数aggstate->curpertrans = NULL;  // 初始化当前转换状态aggstate->input_done = false;  // 初始化输入未完成aggstate->agg_done = false;    // 初始化聚合未完成aggstate->pergroups = NULL;    // 初始化分组状态数组aggstate->grp_firstTuple = NULL; // 初始化分组的首元组aggstate->sort_in = NULL;      // 初始化排序输入aggstate->sort_out = NULL;     // 初始化排序输出/** phases[0] 在排序/普通模式下为虚拟阶段,哈希模式下为实际阶段*/numPhases = (use_hashing ? 1 : 2); // 哈希模式为 1 阶段,排序模式为 2 阶段numHashes = (use_hashing ? 1 : 0); // 哈希模式为 1 个哈希阶段,否则为 0/** 计算最大分组集数量,决定内存分配大小,并计算阶段总数。* 哈希/混合模式的节点只贡献一个阶段。*/if (node->groupingSets){numGroupingSets = list_length(node->groupingSets); // 获取分组集数量foreach(l, node->chain){Agg *agg = lfirst(l); // 获取链式聚合节点numGroupingSets = Max(numGroupingSets,list_length(agg->groupingSets)); // 更新最大分组集数量/** 额外的 AGG_HASHED 节点属于阶段 0,其他节点增加一个阶段*/if (agg->aggstrategy != AGG_HASHED)++numPhases; // 非哈希策略增加阶段else++numHashes; // 哈希策略增加哈希阶段}}aggstate->maxsets = numGroupingSets; // 设置最大分组集数量aggstate->numphases = numPhases;     // 设置阶段总数aggstate->aggcontexts = (ExprContext **)palloc0(sizeof(ExprContext *) * numGroupingSets); // 为每个分组集分配表达式上下文数组/** 创建表达式上下文:包括输入元组处理、输出元组处理、哈希表和每个分组集的上下文。* 使用 ExecAssignExprContext 统一创建。*/ExecAssignExprContext(estate, &aggstate->ss.ps); // 创建主表达式上下文aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; // 保存临时上下文for (i = 0; i < numGroupingSets; ++i){ExecAssignExprContext(estate, &aggstate->ss.ps); // 为每个分组集创建上下文aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext; // 存储上下文}if (use_hashing)aggstate->hashcontext = CreateWorkExprContext(estate); // 为哈希聚合创建专用上下文ExecAssignExprContext(estate, &aggstate->ss.ps); // 创建额外的表达式上下文/** 初始化子节点。* 若为哈希聚合,子计划无需高效支持 REWIND(重置)。*/if (node->aggstrategy == AGG_HASHED)eflags &= ~EXEC_FLAG_REWIND; // 禁用 REWIND 标志outerPlan = outerPlan(node); // 获取外层计划outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags); // 初始化子计划/** 初始化输入元组类型*/aggstate->ss.ps.outerops =ExecGetResultSlotOps(outerPlanState(&aggstate->ss),&aggstate->ss.ps.outeropsfixed); // 获取子计划的结果槽操作aggstate->ss.ps.outeropsset = true; // 标记结果槽操作已设置ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,aggstate->ss.ps.outerops); // 创建扫描槽scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; // 获取输入元组描述符/** 如果阶段数超过 2(包含虚拟阶段 0),需要排序元组,分配排序槽*/if (numPhases > 2){aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,&TTSOpsMinimalTuple); // 创建排序用的额外元组槽/** 子计划输出可能与排序输出槽类型不同,调整 outerops 设置*/if (aggstate->ss.ps.outeropsfixed &&aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)aggstate->ss.ps.outeropsfixed = false; // 禁用固定操作标志}/** 初始化结果类型、槽和投影信息*/ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual); // 初始化结果元组槽ExecAssignProjectionInfo(&aggstate->ss.ps, NULL); // 设置投影信息/** 初始化子表达式。* 确保没有嵌套聚合函数(由解析器检查),并查找限定条件中的 Aggref。*/aggstate->ss.ps.qual =ExecInitQual(node->plan.qual, (PlanState *) aggstate); // 初始化限定条件/** 计算唯一聚合函数和转换状态数量*/numaggrefs = list_length(aggstate->aggs); // 获取聚合函数引用总数max_aggno = -1; // 初始化最大聚合函数编号max_transno = -1; // 初始化最大转换状态编号foreach(l, aggstate->aggs){Aggref *aggref = (Aggref *) lfirst(l); // 获取聚合函数引用max_aggno = Max(max_aggno, aggref->aggno); // 更新最大聚合函数编号max_transno = Max(max_transno, aggref->aggtransno); // 更新最大转换状态编号}numaggs = max_aggno + 1; // 计算唯一聚合函数数量numtrans = max_transno + 1; // 计算唯一转换状态数量/** 为每个阶段准备分组集数据和比较函数的查找数据,同时累积分组列*/aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData)); // 分配阶段数据数组aggstate->num_hashes = numHashes; // 设置哈希阶段数量if (numHashes){aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes); // 分配哈希阶段数据aggstate->phases[0].numsets = 0; // 初始化阶段 0 的分组集数量aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int)); // 分配分组集长度数组aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *)); // 分配分组列位图数组}phase = 0; // 初始化阶段索引for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx){Agg *aggnode; // 当前聚合节点Sort *sortnode; // 排序节点(若有)if (phaseidx > 0){aggnode = list_nth_node(Agg, node->chain, phaseidx - 1); // 获取链式聚合节点sortnode = castNode(Sort, outerPlan(aggnode)); // 获取排序节点}else{aggnode = node; // 使用当前聚合节点sortnode = NULL; // 阶段 0 无排序}Assert(phase <= 1 || sortnode); // 确保非阶段 0 有排序节点if (aggnode->aggstrategy == AGG_HASHED || aggnode->aggstrategy == AGG_MIXED){AggStatePerPhase phasedata = &aggstate->phases[0]; // 阶段 0 数据AggStatePerHash perhash; // 哈希阶段数据Bitmapset *cols = NULL; // 分组列位图Assert(phase == 0); // 哈希聚合只在阶段 0i = phasedata->numsets++; // 增加分组集计数perhash = &aggstate->perhash[i]; // 获取哈希阶段数据phasedata->aggnode = node; // 设置阶段 0 的聚合节点phasedata->aggstrategy = node->aggstrategy; // 设置聚合策略perhash->aggnode = aggnode; // 保存实际聚合节点phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols; // 设置分组列数量for (j = 0; j < aggnode->numCols; ++j)cols = bms_add_member(cols, aggnode->grpColIdx[j]); // 添加分组列phasedata->grouped_cols[i] = cols; // 保存分组列位图all_grouped_cols = bms_add_members(all_grouped_cols, cols); // 累积所有分组列continue;}else{AggStatePerPhase phasedata = &aggstate->phases[++phase]; // 下一个阶段数据int num_sets; // 分组集数量phasedata->numsets = num_sets = list_length(aggnode->groupingSets); // 设置分组集数量if (num_sets){phasedata->gset_lengths = palloc(num_sets * sizeof(int)); // 分配分组集长度数组phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *)); // 分配分组列位图数组i = 0;foreach(l, aggnode->groupingSets){int current_length = list_length(lfirst(l)); // 当前分组集的列数Bitmapset *cols = NULL; // 分组列位图for (j = 0; j < current_length; ++j)cols = bms_add_member(cols, aggnode->grpColIdx[j]); // 添加分组列phasedata->grouped_cols[i] = cols; // 保存分组列位图phasedata->gset_lengths[i] = current_length; // 保存分组集长度++i;}all_grouped_cols = bms_add_members(all_grouped_cols,phasedata->grouped_cols[0]); // 累积分组列}else{Assert(phaseidx == 0); // 阶段 0 无分组集phasedata->gset_lengths = NULL; // 无分组集长度phasedata->grouped_cols = NULL; // 无分组列位图}/** 为排序聚合预计算比较函数的查找数据*/if (aggnode->aggstrategy == AGG_SORTED){phasedata->eqfunctions =(ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *)); // 分配比较函数数组for (int k = 0; k < phasedata->numsets; k++){int length = phasedata->gset_lengths[k]; // 分组集列数if (length == 0)continue; // 空分组集跳过if (phasedata->eqfunctions[length - 1] != NULL)continue; // 已存在的长度跳过phasedata->eqfunctions[length - 1] =execTuplesMatchPrepare(scanDesc,length,aggnode->grpColIdx,aggnode->grpOperators,aggnode->grpCollations,(PlanState *) aggstate); // 准备比较函数}if (aggnode->numCols > 0 &&phasedata->eqfunctions[aggnode->numCols - 1] == NULL){phasedata->eqfunctions[aggnode->numCols - 1] =execTuplesMatchPrepare(scanDesc,aggnode->numCols,aggnode->grpColIdx,aggnode->grpOperators,aggnode->grpCollations,(PlanState *) aggstate); // 为所有分组列准备比较函数}}phasedata->aggnode = aggnode; // 设置阶段的聚合节点phasedata->aggstrategy = aggnode->aggstrategy; // 设置聚合策略phasedata->sortnode = sortnode; // 设置排序节点}}/** 将所有分组列转换为降序列表*/i = -1;while ((i = bms_next_member(all_grouped_cols, i)) >= 0)aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols); // 添加分组列到列表/** 设置聚合结果存储和每个聚合函数的工作存储*/econtext = aggstate->ss.ps.ps_ExprContext; // 获取表达式上下文econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs); // 分配聚合值存储econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); // 分配聚合空值标志peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); // 分配每个聚合函数状态pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans); // 分配每个转换状态aggstate->peragg = peraggs; // 设置聚合函数状态数组aggstate->pertrans = pertransstates; // 设置转换状态数组aggstate->all_pergroups =(AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)* (numGroupingSets + numHashes)); // 分配所有分组状态pergroups = aggstate->all_pergroups; // 设置分组状态指针if (node->aggstrategy != AGG_HASHED){for (i = 0; i < numGroupingSets; i++){pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)* numaggs); // 为每个分组集分配状态}aggstate->pergroups = pergroups; // 设置分组状态数组pergroups += numGroupingSets; // 更新指针}/** 为哈希聚合初始化哈希表相关结构*/if (use_hashing){Plan *outerplan = outerPlan(node); // 获取外层计划uint64 totalGroups = 0; // 总分组数aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,"HashAgg meta context",ALLOCSET_DEFAULT_SIZES); // 创建哈希聚合元上下文aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,&TTSOpsMinimalTuple); // 创建溢出读取槽aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,&TTSOpsVirtual); // 创建溢出写入槽aggstate->hash_pergroup = pergroups; // 设置哈希分组状态aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,outerplan->plan_width,node->transitionSpace); // 计算哈希表条目大小/** 综合所有分组集设置内存限制和分区估计*/for (int k = 0; k < aggstate->num_hashes; k++)totalGroups += aggstate->perhash[k].aggnode->numGroups; // 累积分组数hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,&aggstate->hash_mem_limit,&aggstate->hash_ngroups_limit,&aggstate->hash_planned_partitions); // 设置哈希表限制find_hash_columns(aggstate); // 查找哈希列/* 仅在非 EXPLAIN 模式下分配哈希表 */if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))build_hash_tables(aggstate); // 构建哈希表aggstate->table_filled = false; // 标记哈希表未填充aggstate->hash_batches_used = 1; // 初始化批次为 1(未溢出)}/** 初始化当前阶段,哈希模式为阶段 0,排序模式为阶段 1*/if (node->aggstrategy == AGG_HASHED){aggstate->current_phase = 0; // 设置哈希模式为阶段 0initialize_phase(aggstate, 0); // 初始化阶段 0select_current_set(aggstate, 0, true); // 选择当前分组集}else{aggstate->current_phase = 1; // 设置排序模式为阶段 1initialize_phase(aggstate, 1); // 初始化阶段 1select_current_set(aggstate, 0, false); // 选择当前分组集}/** 查找聚合函数信息,初始化每个聚合和转换状态的不变字段*/foreach(l, aggstate->aggs){Aggref *aggref = lfirst(l); // 获取聚合函数引用AggStatePerAgg peragg; // 当前聚合函数状态AggStatePerTrans pertrans; // 当前转换状态Oid aggTransFnInputTypes[FUNC_MAX_ARGS]; // 聚合输入类型int numAggTransFnArgs; // 转换函数参数数量int numDirectArgs; // 直接参数数量HeapTuple aggTuple; // 聚合函数元组Form_pg_aggregate aggform; // 聚合函数元数据AclResult aclresult; // 权限检查结果Oid finalfn_oid; // 最终函数 OIDOid serialfn_oid, deserialfn_oid; // 序列化/反序列化函数 OIDOid aggOwner; // 聚合函数所有者Expr *finalfnexpr; // 最终函数表达式Oid aggtranstype; // 转换状态类型/* 规划器应确保聚合在正确级别 */Assert(aggref->agglevelsup == 0);/* 拆分模式应匹配 */Assert(aggref->aggsplit == aggstate->aggsplit);peragg = &peraggs[aggref->aggno]; // 获取聚合函数状态/* 检查是否已初始化该聚合状态 */if (peragg->aggref != NULL)continue;peragg->aggref = aggref; // 设置聚合函数引用peragg->transno = aggref->aggtransno; // 设置转换状态编号/* 获取 pg_aggregate 元数据 */aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid));if (!HeapTupleIsValid(aggTuple))elog(ERROR, "cache lookup failed for aggregate %u", aggref->aggfnoid);aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); // 获取聚合元数据/* 检查调用聚合函数的权限 */aclresult = object_aclcheck(ProcedureRelationId, aggref->aggfnoid, GetUserId(),ACL_EXECUTE);if (aclresult != ACLCHECK_OK)aclcheck_error(aclresult, OBJECT_AGGREGATE, get_func_name(aggref->aggfnoid));InvokeFunctionExecuteHook(aggref->aggfnoid); // 调用执行钩子/* 从 Aggref 获取转换状态类型 */aggtranstype = aggref->aggtranstype;Assert(OidIsValid(aggtranstype));/* 如果不需要最终化,则无需最终函数 */if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))peragg->finalfn_oid = finalfn_oid = InvalidOid;elseperagg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; // 设置最终函数serialfn_oid = InvalidOid; // 初始化序列化函数deserialfn_oid = InvalidOid; // 初始化反序列化函数/** 检查是否需要序列化/反序列化,仅对 INTERNAL 类型有效*/if (aggtranstype == INTERNALOID){if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit)){Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); // 序列化时不应最终化if (!OidIsValid(aggform->aggserialfn))elog(ERROR, "serialfunc not provided for serialization aggregation");serialfn_oid = aggform->aggserialfn; // 设置序列化函数}if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit)){Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit)); // 反序列化时需组合if (!OidIsValid(aggform->aggdeserialfn))elog(ERROR, "deserialfunc not provided for deserialization aggregation");deserialfn_oid = aggform->aggdeserialfn; // 设置反序列化函数}}/* 检查聚合函数所有者对组件函数的权限 */{HeapTuple procTuple; // 函数元组procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid));if (!HeapTupleIsValid(procTuple))elog(ERROR, "cache lookup failed for function %u", aggref->aggfnoid);aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; // 获取函数所有者ReleaseSysCache(procTuple);if (OidIsValid(finalfn_oid)){aclresult = object_aclcheck(ProcedureRelationId, finalfn_oid, aggOwner,ACL_EXECUTE);if (aclresult != ACLCHECK_OK)aclcheck_error(aclresult, OBJECT_FUNCTION, get_func_name(finalfn_oid));InvokeFunctionExecuteHook(finalfn_oid);}if (OidIsValid(serialfn_oid)){aclresult = object_aclcheck(ProcedureRelationId, serialfn_oid, aggOwner,ACL_EXECUTE);if (aclresult != ACLCHECK_OK)aclcheck_error(aclresult, OBJECT_FUNCTION, get_func_name(serialfn_oid));InvokeFunctionExecuteHook(serialfn_oid);}if (OidIsValid(deserialfn_oid)){aclresult = object_aclcheck(ProcedureRelationId, deserialfn_oid, aggOwner,ACL_EXECUTE);if (aclresult != ACLCHECK_OK)aclcheck_error(aclresult, OBJECT_FUNCTION, get_func_name(deserialfn_oid));InvokeFunctionExecuteHook(deserialfn_oid);}}/** 获取聚合函数的实际输入类型,可能与声明类型不同*/numAggTransFnArgs = get_aggregate_argtypes(aggref, aggTransFnInputTypes);/* 计算直接参数数量 */numDirectArgs = list_length(aggref->aggdirectargs);/* 计算最终函数的参数数量 */if (aggform->aggfinalextra)peragg->numFinalArgs = numAggTransFnArgs + 1;elseperagg->numFinalArgs = numDirectArgs + 1;/* 初始化直接参数表达式 */peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,(PlanState *) aggstate);/** 为最终函数构建表达式树(如果需要)*/if (OidIsValid(finalfn_oid)){build_aggregate_finalfn_expr(aggTransFnInputTypes,peragg->numFinalArgs,aggtranstype,aggref->aggtype,aggref->inputcollid,finalfn_oid,&finalfnexpr);fmgr_info(finalfn_oid, &peragg->finalfn); // 设置最终函数信息fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn); // 设置最终函数表达式}/* 获取输出值的数据类型信息 */get_typlenbyval(aggref->aggtype,&peragg->resulttypeLen,&peragg->resulttypeByVal);/** 为转换函数构建工作状态(如果尚未初始化)*/pertrans = &pertransstates[aggref->aggtransno];if (pertrans->aggref == NULL){Datum textInitVal; // 初始值文本Datum initValue; // 初始值bool initValueIsNull; // 初始值是否为空Oid transfn_oid; // 转换函数 OID/* 根据聚合模式选择转换或组合函数 */if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)){transfn_oid = aggform->aggcombinefn;if (!OidIsValid(transfn_oid))elog(ERROR, "combinefn not set for aggregate function");}elsetransfn_oid = aggform->aggtransfn;/* 检查转换函数权限 */aclresult = object_aclcheck(ProcedureRelationId, transfn_oid, aggOwner, ACL_EXECUTE);if (aclresult != ACLCHECK_OK)aclcheck_error(aclresult, OBJECT_FUNCTION, get_func_name(transfn_oid));InvokeFunctionExecuteHook(transfn_oid);/* 获取初始值 */textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,Anum_pg_aggregate_agginitval,&initValueIsNull);if (initValueIsNull)initValue = (Datum) 0;elseinitValue = GetAggInitVal(textInitVal, aggtranstype);if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)){Oid combineFnInputTypes[] = {aggtranstype, aggtranstype}; // 组合函数输入类型pertrans->numTransInputs = 1; // 组合模式下仅一个输入/* 构建组合函数状态 */build_pertrans_for_aggref(pertrans, aggstate, estate,aggref, transfn_oid, aggtranstype,serialfn_oid, deserialfn_oid,initValue, initValueIsNull,combineFnInputTypes, 2);/* 确保 INTERNAL 类型的组合函数非严格 */if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)ereport(ERROR,(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),errmsg("combine function with transition type %s must not be declared STRICT",format_type_be(aggtranstype))));}else{/* 计算转换函数参数数量 */if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))pertrans->numTransInputs = list_length(aggref->args);elsepertrans->numTransInputs = numAggTransFnArgs;/* 构建转换函数状态 */build_pertrans_for_aggref(pertrans, aggstate, estate,aggref, transfn_oid, aggtranstype,serialfn_oid, deserialfn_oid,initValue, initValueIsNull,aggTransFnInputTypes,numAggTransFnArgs);/* 检查严格函数和初始值的兼容性 */if (pertrans->transfn.fn_strict && pertrans->initValueIsNull){if (numAggTransFnArgs <= numDirectArgs ||!IsBinaryCoercible(aggTransFnInputTypes[numDirectArgs],aggtranstype))ereport(ERROR,(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),errmsg("aggregate %u needs to have compatible input type and transition type",aggref->aggfnoid)));}}}elsepertrans->aggshared = true; // 标记共享转换状态ReleaseSysCache(aggTuple); // 释放元数据缓存}/** 更新唯一聚合函数和转换状态数量*/aggstate->numaggs = numaggs;aggstate->numtrans = numtrans;/** 检查是否存在嵌套聚合函数(非法情况)*/if (numaggrefs != list_length(aggstate->aggs))ereport(ERROR,(errcode(ERRCODE_GROUPING_ERROR),errmsg("aggregate function calls cannot be nested")));/** 为每个阶段构建转换函数表达式*/for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++){AggStatePerPhase phase = &aggstate->phases[phaseidx]; // 当前阶段数据bool dohash = false; // 是否执行哈希聚合bool dosort = false; // 是否执行排序聚合if (!phase->aggnode)continue; // 跳过不存在的阶段 0if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1){dohash = true; // 混合模式阶段 1 执行哈希和排序dosort = true;}else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0){continue; // 混合模式阶段 0 无需转换函数}else if (phase->aggstrategy == AGG_PLAIN || phase->aggstrategy == AGG_SORTED){dohash = false; // 普通或排序模式dosort = true;}else if (phase->aggstrategy == AGG_HASHED){dohash = true; // 哈希模式dosort = false;}elseAssert(false); // 非法策略phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash, false); // 构建转换函数表达式phase->evaltrans_cache[0][0] = phase->evaltrans; // 缓存转换表达式}return aggstate; // 返回初始化的聚合状态
}

ExecInitAgg 函数的大致流程

  1. 创建并初始化 AggState:分配 AggState 结构,设置计划节点、执行状态和聚合策略(如 AGG_HASHED)。初始化状态变量(如 numaggsearly_stop_limit)。
  2. 确定阶段和分组集:根据聚合策略(哈希或排序)和分组集(groupingSets)计算阶段数量(numPhases)和哈希阶段数量(numHashes)。
  3. 分配表达式上下文:为输入、输出、哈希表和分组集创建表达式上下文(aggcontexts),用于存储临时数据和聚合结果。
  4. 初始化子计划:调用 ExecInitNode 初始化子计划(如 Seq Scan),并设置输入元组的类型和扫描槽。
  5. 处理多阶段和排序:若阶段数超过 2(排序模式),分配排序槽;为哈希聚合初始化哈希表相关结构(如 hashcontexthash_pergroup)。
  6. 初始化结果槽和投影:设置结果元组槽和投影信息,确保输出格式正确。
  7. 初始化表达式和聚合函数:初始化限定条件(qual)和聚合函数(Aggref),检查权限并提取聚合函数元数据(如转换函数、最终函数)。
  8. 处理分组集和哈希表:为每个阶段和分组集分配分组列和比较函数,初始化哈希表(包括内存限制和分区估计)。
  9. 设置阶段和转换函数:根据聚合策略(哈希或排序)初始化当前阶段,构建转换函数表达式(evaltrans)并缓存。
  10. 返回 AggState:返回完全初始化的 AggState,供执行阶段使用。

ExecInitAgg 函数的细化流程

  以下是对 ExecInitAgg 函数的更细化流程描述,深入剖析其分支逻辑,并结合查询 SELECT DISTINCT a FROM t1 LIMIT 10 的上下文,进一步细化条件分支和关键决策点。
  ExecInitAgg 函数负责初始化聚合节点的运行时状态(AggState),为 GROUP BYDISTINCT 或聚合函数的执行准备环境。它根据聚合策略(哈希、排序或混合)、分组集(groupingSets)和链式聚合(chain)进行复杂的分支处理。以下是细化的流程描述,重点突出分支逻辑:

  1. 创建并初始化 AggState

    • 分配 AggState 结构,设置计划节点(node)、执行状态(estate)和执行函数(ExecAgg)。
    • 初始化状态变量:聚合函数列表(aggs)、数量(numaggsnumtrans)、策略(aggstrategy)、拆分模式(aggsplit)、早停限制(early_stop_limit)等。
    • 分支:检查执行标志(eflags),确保不支持 BACKWARDMARK 扫描。
  2. 确定阶段和分组集数量

    • 判断是否使用哈希聚合(AGG_HASHEDAGG_MIXED),设置 numPhases(哈希模式为 1,排序模式为 2)和 numHashes
    • 分支:若存在 groupingSets,遍历 groupingSets 和链式聚合(chain):
      • 计算最大分组集数量(numGroupingSets)。
      • 对链式聚合节点,若策略为 AGG_HASHED,增加 numHashes;否则,增加 numPhases
    • 设置 maxsetsnumphases
  3. 分配表达式上下文

    • 创建主表达式上下文(ps_ExprContext)和临时上下文(tmpcontext)。
    • 为每个分组集分配上下文(aggcontexts)。
    • 分支:若为哈希聚合,额外创建哈希表上下文(hashcontext)。
    • 创建额外的表达式上下文用于输出处理。
  4. 初始化子计划

    • 获取外层计划(outerPlan)并调用 ExecInitNode 初始化。
    • 分支:若为 AGG_HASHED,禁用 REWIND 标志以优化重置。
    • 设置输入元组的操作(outerops)和扫描槽(ss_ScanTupleSlot)。
  5. 处理多阶段和排序槽

    • 分支:若 numPhases > 2(排序模式多阶段),分配排序槽(sort_slot)并调整 outerops
    • 否则,跳过排序槽分配。
  6. 初始化结果槽和投影

    • 初始化结果元组槽(TTSOpsVirtual)和投影信息,确保输出格式正确。
  7. 初始化表达式和聚合函数

    • 初始化限定条件(qual),查找其中的 Aggref
    • 遍历 aggs,计算唯一聚合函数(numaggs)和转换状态(numtrans)数量。
    • 分支:为每个 Aggref
      • 检查权限(ACL_EXECUTE)和聚合函数元数据(pg_aggregate)。
      • 子分支:根据 aggsplit 模式(SKIPFINALSERIALIZEDESERIALIZE):
        • 设置最终函数(finalfn)、序列化(serialfn)、反序列化(deserialfn)函数。
        • aggtranstype = INTERNALOID,验证序列化/反序列化函数。
      • 初始化直接参数(aggdirectargs)和最终函数表达式。
      • 子分支:根据 aggsplitCOMBINE 或普通模式):
        • 使用组合函数(aggcombinefn)或转换函数(aggtransfn)。
        • 设置初始值(initValue)和输入参数数量(numTransInputs)。
      • 检查严格函数(fn_strict)与初始值的兼容性。
  8. 处理分组集和比较函数

    • 为每个阶段分配 AggStatePerPhaseData
    • 分支:根据聚合策略:
      • 哈希模式(AGG_HASHEDAGG_MIXED
        • 设置阶段 0 数据,保存分组列(grouped_cols)和长度(gset_lengths)。
        • 累积所有分组列(all_grouped_cols)。
      • 排序模式(AGG_SORTEDAGG_PLAIN
        • 为每个分组集分配列和长度。
        • 子分支:若为排序聚合,预计算比较函数(eqfunctions)以优化元组比较。
    • all_grouped_cols 转换为降序列表。
  9. 初始化哈希表(仅哈希模式)

    • 分支:若 use_hashing
      • 创建哈希表元上下文(hash_metacxt)和溢出槽(hash_spill_rslothash_spill_wslot)。
      • 计算哈希表条目大小(hashentrysize)和总分组数(totalGroups)。
      • 调用 hash_agg_set_limits 设置内存限制(hash_mem_limit)和分区数(hash_planned_partitions)。
      • 调用 find_hash_columnsbuild_hash_tables(非 EXPLAIN 模式)初始化哈希表。
      • 设置初始状态(table_filled = falsehash_batches_used = 1)。
  10. 设置阶段和转换函数

    • 分支:根据 aggstrategy
      • AGG_HASHED:设置 current_phase = 0,初始化阶段 0
      • 其他:设置 current_phase = 1,初始化阶段 1
    • 调用 initialize_phaseselect_current_set 设置阶段和分组集。
    • 为每个阶段构建转换函数表达式(evaltrans)并缓存。
  11. 返回 AggState

    • 返回完全初始化的 AggState,供执行阶段(如 ExecAgg)使用。

Mermaid 流程图

  以下分别是 ExecInitAgg 函数的大致和细化流程图,使用 Mermaid 语法表示,清晰展示其主要步骤:

graph TDA[开始: ExecInitAgg] --> B[创建 AggState 结构<br>设置计划、执行状态、聚合策略]B --> C[确定阶段数和分组集数量<br>计算 numPhases 和 numHashes]C --> D[分配表达式上下文<br>为输入、输出、哈希表、分组集创建]D --> E[初始化子计划<br>调用 ExecInitNode 设置子计划和输入元组类型]E --> F{聚合策略}F -->|哈希模式| G[初始化哈希表<br>分配 hashcontext, hash_pergroup<br>设置内存限制和分区]F -->|排序/混合模式| H[若 numPhases > 2<br>分配排序槽]G --> I[初始化结果槽和投影<br>设置结果元组槽和投影信息]H --> II --> J[初始化限定条件和聚合函数<br>检查权限,提取元数据]J --> K[处理分组集和比较函数<br>为阶段和分组集分配分组列]K --> L[初始化阶段和转换函数<br>设置 current_phase, 构建 evaltrans]L --> M[返回初始化的 AggState]

在这里插入图片描述

graph TDA[开始: ExecInitAgg] --> B[创建 AggState<br>设置计划、执行状态、ExecAgg<br>初始化 aggstrategy, early_stop_limit]B --> C{检查 eflags}C -->|无 BACKWARD/MARK| D[确定阶段数<br>use_hashing = AGG_HASHED or AGG_MIXED?]D -->|| E[numPhases = 1<br>numHashes = 1]D -->|| F[numPhases = 2<br>numHashes = 0]E --> G{groupingSets 存在?}F --> GG -->|| H[遍历 groupingSets 和 chain<br>计算 numGroupingSets<br>AGG_HASHED: 增加 numHashes<br>其他: 增加 numPhases]G -->|| I[numGroupingSets = 1]H --> J[设置 maxsets, numphases]I --> JJ --> K[分配表达式上下文<br>创建主上下文、tmpcontext<br>为每个分组集分配 aggcontexts]K --> L{use_hashing?}L -->|| M[创建 hashcontext]L -->|| N[跳过 hashcontext]M --> O[初始化子计划<br>调用 ExecInitNode<br>AGG_HASHED: 禁用 REWIND]N --> OO --> P{numPhases > 2?}P -->|| Q[分配 sort_slot<br>调整 outerops]P -->|| R[跳过排序槽]Q --> S[初始化结果槽<br>设置 TTSOpsVirtual 和投影]R --> SS --> T[初始化 qual 和 Aggref<br>计算 numaggs, numtrans]T --> U{遍历 aggref}U -->|未初始化| V[检查权限<br>获取 pg_aggregate 元数据]V --> W{aggsplit 模式}W -->|SKIPFINAL| X[finalfn_oid = Invalid]W -->|其他| Y[finalfn_oid = aggfinalfn]X --> Z{aggtranstype = INTERNAL?}Y --> ZZ -->|| AA{aggsplit}AA -->|SERIALIZE| AB[设置 serialfn_oid]AA -->|DESERIALIZE| AC[设置 deserialfn_oid]AA -->|其他| AD[跳过序列化]Z -->|| ADAB --> AE[检查组件函数权限<br>初始化直接参数]AC --> AEAD --> AEAE --> AF{aggsplit = COMBINE?}AF -->|| AG[使用 aggcombinefn<br>numTransInputs = 1]AF -->|| AH{aggkind = ORDERED_SET?}AH -->|| AI[numTransInputs = args 长度]AH -->|| AJ[numTransInputs = numAggTransFnArgs]AI --> AK[构建 pertrans<br>设置 transfn, initValue<br>检查严格性兼容]AJ --> AKAG --> AKAK --> AL{pertrans 已初始化?}AL -->|| AM[标记 aggshared]AL -->|| UAM --> AN[更新 numaggs, numtrans]AN --> AO{检查嵌套 Aggref}AO -->|无嵌套| AP[为每个阶段构建 evaltrans]AP --> AQ{aggstrategy}AQ -->|AGG_MIXED, phase=1| AR[dohash = true<br>dosort = true]AQ -->|AGG_MIXED, phase=0| AS[跳过转换函数]AQ -->|AGG_PLAIN/SORTED| AT[dohash = false<br>dosort = true]AQ -->|AGG_HASHED| AU[dohash = true<br>dosort = false]AR --> AV[构建 evaltrans<br>缓存到 evaltrans_cache]AT --> AVAU --> AVAS --> AW{use_hashing?}AW -->|| AX[创建 hash_metacxt<br>分配 spill_rslot, spill_wslot<br>计算 hashentrysize<br>设置 hash_mem_limit, hash_planned_partitions<br>构建哈希表]AW -->|| AY[跳过哈希表]AX --> AZ{aggstrategy = AGG_HASHED?}AY --> AZAZ -->|| BA[current_phase = 0<br>初始化阶段 0]AZ -->|| BB[current_phase = 1<br>初始化阶段 1]BA --> BC[返回 AggState]BB --> BC

在这里插入图片描述
  打印堆栈信息:

(gdb) bt
#0  ExecInitAgg (node=0x1cea958, estate=0x1ce6e20, eflags=32) at nodeAgg.c:3200
#1  0x000000000075cf03 in ExecInitNode (node=0x1cea958, estate=0x1ce6e20, eflags=32) at execProcnode.c:341
#2  0x000000000078e4ff in ExecInitLimit (node=0x1ceaa68, estate=0x1ce6e20, eflags=32) at nodeLimit.c:477
#3  0x000000000075cff5 in ExecInitNode (node=0x1ceaa68, estate=0x1ce6e20, eflags=32) at execProcnode.c:381
#4  0x000000000075268b in InitPlan (queryDesc=0x1cbea90, eflags=32) at execMain.c:966
#5  0x000000000075152a in standard_ExecutorStart (queryDesc=0x1cbea90, eflags=32) at execMain.c:261
#6  0x0000000000751279 in ExecutorStart (queryDesc=0x1cbea90, eflags=0) at execMain.c:137
#7  0x00000000009ea524 in PortalStart (portal=0x1c6a1b0, params=0x0, eflags=0, snapshot=0x0) at pquery.c:517
#8  0x00000000009e429a in exec_simple_query (query_string=0x1bebc30 "SELECT DISTINCT a FROM t1 LIMIT 10;") a
t postgres.c:1245
#9  0x00000000009e8cb9 in PostgresMain (dbname=0x1c24050 "postgres", username=0x1c24038 "kuchiki") at postgr
es.c:4766
#10 0x00000000009e089e in BackendMain (startup_data=0x7fff47d0585c "", startup_data_len=4) at backend_startu
p.c:107
#11 0x000000000090d681 in postmaster_child_launch (child_type=B_BACKEND, startup_data=0x7fff47d0585c "", sta
rtup_data_len=4, client_sock=0x7fff47d05880) at launch_backend.c:274
#12 0x0000000000912a76 in BackendStartup (client_sock=0x7fff47d05880) at postmaster.c:3414
#13 0x0000000000910549 in ServerLoop () at postmaster.c:1648
#14 0x000000000090ff12 in PostmasterMain (argc=3, argv=0x1be67e0) at postmaster.c:1346
#15 0x00000000007d1de4 in main (argc=3, argv=0x1be67e0) at main.c:197

ExecAgg 函数

  ExecAggPostgreSQL 中执行聚合操作的核心函数,负责从外层子计划(如 Seq Scan)接收元组,根据聚合策略(AGG_HASHEDAGG_MIXEDAGG_PLAINAGG_SORTED)处理聚合,并返回结果元组槽。它在运行时状态(AggState)的指导下工作,处理 GROUP BYDISTINCT 或聚合函数的逻辑。
  以下是为 ExecAgg 函数添加中文注释的代码版本,并结合示例查询 SELECT DISTINCT a FROM t1 LIMIT 10 提供细化的流程描述和 Mermaid 流程图。注释清晰说明每行代码的作用,流程描述深入剖析函数逻辑和分支,流程图进一步细化策略分支,确保内容简洁且易于理解。

/** ExecAgg** 执行聚合操作,从外层子计划接收元组并按目标列表或限定条件中的聚合函数(Aggref 节点)进行聚合。* 分组聚合为每个分组生成一行结果;普通聚合为整个查询生成单行结果。* 聚合值存储在表达式上下文中,供 ExecProject 评估结果元组使用。* 参数:*   pstate - 计划状态(此处为 AggState)* 返回值:*   TupleTableSlot* - 聚合结果的元组槽,或 NULL(无更多结果)*/
static TupleTableSlot *
ExecAgg(PlanState *pstate)
{AggState   *node = castNode(AggState, pstate); // 将计划状态转换为 AggStateTupleTableSlot *result = NULL; // 初始化结果元组槽为 NULLCHECK_FOR_INTERRUPTS(); // 检查查询中断信号if (!node->agg_done) // 检查聚合是否完成{/* 根据聚合策略分派执行 */switch (node->phase->aggstrategy){case AGG_HASHED: // 哈希聚合策略if (!node->table_filled) // 如果哈希表未填充agg_fill_hash_table(node); // 填充哈希表/* FALLTHROUGH */case AGG_MIXED: // 混合聚合策略result = agg_retrieve_hash_table(node); // 从哈希表检索结果break;case AGG_PLAIN: // 普通聚合策略case AGG_SORTED: // 排序聚合策略result = agg_retrieve_direct(node); // 直接检索结果break;}if (!TupIsNull(result)) // 如果结果元组不为空return result; // 返回结果元组槽}return NULL; // 无更多结果,返回 NULL
}

ExecAgg 函数的细化流程

  1. 转换计划状态

    • 将输入的 PlanState 转换为 AggStatenode),获取运行时状态。
  2. 检查中断

    • 调用 CHECK_FOR_INTERRUPTS 检查是否有查询中断信号(如用户取消查询)。
  3. 检查聚合完成状态

    • 检查 node->agg_done,若为 true,表示聚合已完成,直接返回 NULL
    • 分支:若 agg_done = false,继续处理聚合。
  4. 根据聚合策略分派

    • 分支:AGG_HASHED
      • 子分支:检查 table_filled
        • false,调用 agg_fill_hash_table 填充哈希表,从子计划读取所有元组并构建哈希表。
        • 继续执行(FALLTHROUGHAGG_MIXED)。
      • 调用 agg_retrieve_hash_table 从哈希表中检索结果元组。
    • 分支:AGG_MIXED
      • 直接调用 agg_retrieve_hash_table,处理哈希表结果(混合模式可能包含哈希和排序的组合)。
    • 分支:AGG_PLAINAGG_SORTED
      • 调用 agg_retrieve_direct,直接从子计划读取元组并处理(普通聚合为单行,排序聚合按组处理)。
    • 子分支:若 early_stop_limit 已设置(如 LIMIT 10),可能提前终止(取决于优化实现)。
  5. 返回结果

    • 分支:若 result 不为空(!TupIsNull(result)),返回结果元组槽。
    • 否则,返回 NULL(无更多结果或聚合完成)。

示例查询关联

示例查询:

SELECT DISTINCT a FROM t1 LIMIT 10;

查询计划:

Limit  (cost=21.50..21.60 rows=10 width=4) (actual time=0.723..0.728 rows=10 loops=1)Output: aBuffers: shared hit=9->  HashAggregate  (cost=21.50..22.50 rows=100 width=4) (actual time=0.721..0.725 rows=10 loops=1)Output: aGroup Key: t1.aBatches: 1  Memory Usage: 24kBBuffers: shared hit=9->  Seq Scan on public.t1  (cost=0.00..19.00 rows=1000 width=4) (actual time=0.018..0.198 rows=1000 loops=1)Output: a, bBuffers: shared hit=9
Planning Time: 0.131 ms
Execution Time: 0.767 ms

关联说明

  • 步骤 1pstate 转换为 AggStateaggstrategy = AGG_HASHED
  • 步骤 2:检查中断,无中断继续执行。
  • 步骤 3agg_done = false,进入策略分派。
  • 步骤 4
    • 进入 AGG_HASHED 分支。
    • 初次调用时,table_filled = false,调用 agg_fill_hash_table,从 Seq Scan 读取 1000 行,构建哈希表(Memory Usage: 24kBBatches: 1)。
    • 调用 agg_retrieve_hash_table,检索前 10 个唯一值(受 LIMIT 10 影响,可能触发早停优化)。
  • 步骤 5:返回包含 t1.a 的结果元组槽(10 行),或 NULL(完成后)。

Mermaid 流程图(细化分支逻辑)

  以下是细化的 Mermaid 流程图,详细展示 ExecAgg 的分支逻辑,涵盖聚合策略和处理路径:

AGG_HASHED
AGG_MIXED
AGG_PLAIN/AGG_SORTED
开始: ExecAgg
转换 PlanState 为 AggState
检查中断信号
CHECK_FOR_INTERRUPTS
agg_done?
返回 NULL
phase->aggstrategy
table_filled?
调用 agg_fill_hash_table
从子计划填充哈希表
调用 agg_retrieve_hash_table
检索哈希表结果
调用 agg_retrieve_direct
直接从子计划处理
result 不为空?
返回 result 元组槽

agg_fill_hash_table 函数

  agg_fill_hash_tablePostgreSQL 中哈希聚合(HashAgg)的核心函数之一,负责从外层子计划(如 Seq Scan)读取输入元组,构建哈希表以存储分组键和聚合状态,为后续结果检索(如 agg_retrieve_hash_table)做准备。

/** agg_fill_hash_table** 为哈希聚合读取输入元组并构建哈希表。* 参数:*   aggstate - 聚合节点的运行时状态(AggState)*/
static void
agg_fill_hash_table(AggState *aggstate)
{TupleTableSlot *outerslot; // 外层计划的输入元组槽ExprContext *tmpcontext = aggstate->tmpcontext; // 临时表达式上下文/** 循环处理外层计划的每个元组,直到耗尽输入。*/for (;;){outerslot = fetch_input_tuple(aggstate); // 从外层计划获取输入元组if (TupIsNull(outerslot)) // 如果元组为空(输入耗尽)break; // 退出循环/* 设置临时上下文的外部元组,用于哈希查找和聚合推进 */tmpcontext->ecxt_outertuple = outerslot;/* 查找或构建哈希表条目 */lookup_hash_entries(aggstate);/* 推进聚合函数或组合函数 */advance_aggregates(aggstate);/** 在每个元组处理后重置临时上下文(哈希查找已处理部分重置)*/ResetExprContext(aggstate->tmpcontext);}/* 完成所有溢出操作(如果有) */hashagg_finish_initial_spills(aggstate);aggstate->table_filled = true; // 标记哈希表已填充/* 初始化以遍历第一个哈希表 */select_current_set(aggstate, 0, true); // 选择当前分组集为哈希模式ResetTupleHashIterator(aggstate->perhash[0].hashtable,&aggstate->perhash[0].hashiter); // 重置哈希表迭代器
}

  以下是细化的流程描述,突出分支逻辑:

  1. 初始化变量

    • 获取 AggState 的临时表达式上下文(tmpcontext)和输入元组槽(outerslot)。
  2. 循环读取输入元组

    • 调用 fetch_input_tuple 从外层子计划获取元组。
    • 分支:检查元组是否为空(TupIsNull):
      • 若为空,表示输入耗尽,退出循环。
      • 若不为空,继续处理。
  3. 设置表达式上下文

    • 将当前元组设置为 tmpcontext->ecxt_outertuple,为哈希查找和聚合推进准备数据。
  4. 查找或构建哈希表条目

    • 调用 lookup_hash_entries,根据分组键(Group Key)计算哈希值:
      • 子分支:若键已存在,更新对应条目。
      • 若键不存在,创建新条目并初始化聚合状态。
  5. 推进聚合状态

    • 调用 advance_aggregates,更新聚合函数或组合函数的状态(如累加 COUNTSUM 或去重)。
  6. 重置临时上下文

    • 调用 ResetExprContext 重置 tmpcontext,释放临时内存(哈希查找可能已部分重置)。
  7. 完成溢出处理

    • 调用 hashagg_finish_initial_spills,处理可能发生的磁盘溢出(如哈希表超过 work_mem)。
  8. 标记哈希表完成

    • 设置 table_filled = true,表示哈希表构建完成。
  9. 初始化哈希表迭代

    • 调用 select_current_set 设置当前分组集(哈希模式)。
    • 调用 ResetTupleHashIterator 重置哈希表迭代器,准备后续结果检索。

示例查询关联

示例查询:

SELECT DISTINCT a FROM t1 LIMIT 10;

查询计划:

Limit  (cost=21.50..21.60 rows=10 width=4) (actual time=0.723..0.728 rows=10 loops=1)Output: aBuffers: shared hit=9->  HashAggregate  (cost=21.50..22.50 rows=100 width=4) (actual time=0.721..0.725 rows=10 loops=1)Output: aGroup Key: t1.aBatches: 1  Memory Usage: 24kBBuffers: shared hit=9->  Seq Scan on public.t1  (cost=0.00..19.00 rows=1000 width=4) (actual time=0.018..0.198 rows=1000 loops=1)Output: a, bBuffers: shared hit=9
Planning Time: 0.131 ms
Execution Time: 0.767 ms

关联说明

  • 步骤 1aggstate 包含 HashAgg 的运行时状态,tmpcontext 用于临时处理。
  • 步骤 2fetch_input_tupleSeq Scan 读取 1000 行(rows=1000),每次获取一个元组(t1.a 值)。
  • 步骤 3:设置 ecxt_outertuple 为当前元组的 t1.a 值。
  • 步骤 4lookup_hash_entries 计算 t1.a 的哈希值(099),插入或更新哈希表,存储 100 个唯一值(rows=100)。
  • 步骤 5advance_aggregates 处理去重逻辑(DISTINCT 无聚合函数,仅记录唯一键)。
  • 步骤 6:重置 tmpcontext,释放内存。
  • 步骤 7Batches: 1 表示无溢出(Memory Usage: 24kB 远低于 work_mem),无需调用 hashagg_finish_initial_spills
  • 步骤 8:设置 table_filled = true
  • 步骤 9:初始化哈希表迭代器,准备 agg_retrieve_hash_table 检索 10 个结果。

Mermaid 流程图(细化分支逻辑)

  以下是细化的 Mermaid 流程图,展示 agg_fill_hash_table 的详细逻辑和分支:

开始: agg_fill_hash_table
初始化变量
获取 tmpcontext, outerslot
循环: fetch_input_tuple
TupIsNull?
退出循环
设置 ecxt_outertuple = outerslot
调用 lookup_hash_entries
查找或构建哈希表条目
调用 advance_aggregates
推进聚合状态
重置 tmpcontext
调用 hashagg_finish_initial_spills
完成溢出处理
设置 table_filled = true
调用 select_current_set
设置当前分组集
调用 ResetTupleHashIterator
重置哈希表迭代器
结束

  打印堆栈信息:

Breakpoint 1, agg_fill_hash_table (aggstate=0x1ce7358) at nodeAgg.c:2543
(gdb) bt
#0  agg_fill_hash_table (aggstate=0x1ce7358) at nodeAgg.c:2543
#1  0x000000000076f4e2 in ExecAgg (pstate=0x1ce7358) at nodeAgg.c:2172
#2  0x000000000075d202 in ExecProcNodeFirst (node=0x1ce7358) at execProcnode.c:469
#3  0x000000000078d90b in ExecProcNode (node=0x1ce7358) at ../../../src/include/executor/executor.h:274
#4  0x000000000078db10 in ExecLimit (pstate=0x1ce7068) at nodeLimit.c:95
#5  0x000000000075d202 in ExecProcNodeFirst (node=0x1ce7068) at execProcnode.c:469
#6  0x00000000007510eb in ExecProcNode (node=0x1ce7068) at ../../../src/include/executor/executor.h:274
#7  0x0000000000753bde in ExecutePlan (estate=0x1ce6e20, planstate=0x1ce7068, use_parallel_mode=false, opera
tion=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x1cf71b8, execute_on
ce=true) at execMain.c:1646
#8  0x000000000075177e in standard_ExecutorRun (queryDesc=0x1cbea90, direction=ForwardScanDirection, count=0
, execute_once=true) at execMain.c:363
#9  0x0000000000751592 in ExecutorRun (queryDesc=0x1cbea90, direction=ForwardScanDirection, count=0, execute
_once=true) at execMain.c:304
#10 0x00000000009eae31 in PortalRunSelect (portal=0x1c6a1b0, forward=true, count=0, dest=0x1cf71b8) at pquer
y.c:924
#11 0x00000000009eaaef in PortalRun (portal=0x1c6a1b0, count=9223372036854775807, isTopLevel=true, run_once=
true, dest=0x1cf71b8, altdest=0x1cf71b8, qc=0x7fff47d05550) at pquery.c:768
#12 0x00000000009e4386 in exec_simple_query (query_string=0x1bebc30 "SELECT DISTINCT a FROM t1 LIMIT 10;") a
t postgres.c:1284
#13 0x00000000009e8cb9 in PostgresMain (dbname=0x1c24050 "postgres", username=0x1c24038 "kuchiki") at postgr
es.c:4766
#14 0x00000000009e089e in BackendMain (startup_data=0x7fff47d0585c "", startup_data_len=4) at backend_startu
p.c:107
#15 0x000000000090d681 in postmaster_child_launch (child_type=B_BACKEND, startup_data=0x7fff47d0585c "", sta
rtup_data_len=4, client_sock=0x7fff47d05880) at launch_backend.c:274
#16 0x0000000000912a76 in BackendStartup (client_sock=0x7fff47d05880) at postmaster.c:3414
#17 0x0000000000910549 in ServerLoop () at postmaster.c:1648
#18 0x000000000090ff12 in PostmasterMain (argc=3, argv=0x1be67e0) at postmaster.c:1346
#19 0x00000000007d1de4 in main (argc=3, argv=0x1be67e0) at main.c:197

lookup_hash_entries 函数

  lookup_hash_entriesPostgreSQL 哈希聚合(HashAgg)的核心函数之一,负责为当前输入元组在所有哈希分组集(num_hashes)中查找或创建哈希表条目。它处理内存中的哈希表操作,并在溢出模式下将元组写入磁盘分区。

/** lookup_hash_entries** 为当前元组在所有哈希分组集中查找或创建哈希表条目。* 注意:本函数可能重置 tmpcontext。* 在“溢出模式”下,某些条目可能为 NULL。同一元组在不同分组集中可能属于不同组,* 某些分组集可能匹配内存中的组,而其他分组集可能需溢出到磁盘。* 即使多次溢出同一元组到不同分组集看似浪费,但通过为每个分组集分区,* 可提高哈希表重新填充的效率。* 参数:*   aggstate - 聚合节点的运行时状态(AggState)*/
static void
lookup_hash_entries(AggState *aggstate)
{AggStatePerGroup *pergroup = aggstate->hash_pergroup; // 哈希分组状态数组TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple; // 当前输入元组槽int			setno; // 分组集索引/* 遍历所有哈希分组集 */for (setno = 0; setno < aggstate->num_hashes; setno++){AggStatePerHash perhash = &aggstate->perhash[setno]; // 当前分组集的哈希数据TupleHashTable hashtable = perhash->hashtable; // 哈希表TupleTableSlot *hashslot = perhash->hashslot; // 哈希表槽TupleHashEntry entry; // 哈希表条目uint32		hash; // 哈希值bool		isnew = false; // 标记是否为新条目bool	   *p_isnew; // 新条目指针(溢出模式下为 NULL)/* 如果哈希表已溢出,不创建新条目 */p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;select_current_set(aggstate, setno, true); // 选择当前分组集(哈希模式)prepare_hash_slot(perhash, outerslot, hashslot); // 准备哈希表槽,填充分组键/* 查找或创建哈希表条目 */entry = LookupTupleHashEntry(hashtable, hashslot, p_isnew, &hash);if (entry != NULL) // 如果找到或创建了条目{if (isnew) // 如果是新条目initialize_hash_entry(aggstate, hashtable, entry); // 初始化条目pergroup[setno] = entry->additional; // 保存分组状态}else // 如果未找到且需溢出{HashAggSpill *spill = &aggstate->hash_spills[setno]; // 溢出数据结构TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple; // 当前元组/* 初始化溢出分区(如果尚未初始化) */if (spill->partitions == NULL)hashagg_spill_init(spill, aggstate->hash_tapeset, 0,perhash->aggnode->numGroups,aggstate->hashentrysize);/* 将元组写入溢出分区 */hashagg_spill_tuple(aggstate, spill, slot, hash);pergroup[setno] = NULL; // 设置分组状态为 NULL}}
}

  以下是细化的流程描述,聚焦原始逻辑:

  1. 初始化变量

    • 获取 AggState 的哈希分组状态数组(pergroup)和当前输入元组(outerslot)。
  2. 遍历哈希分组集

    • 对每个分组集(setno0num_hashes-1):
      • 获取当前分组集的哈希数据(perhash)、哈希表(hashtable)和哈希表槽(hashslot)。
  3. 设置溢出模式标志

    • 检查 hash_spill_mode
      • 若为 true(溢出模式),设置 p_isnew = NULL,不创建新条目。
      • 若为 false,设置 p_isnew = &isnew,允许创建新条目。
  4. 选择分组集并准备哈希槽

    • 调用 select_current_set 设置当前分组集为哈希模式。
    • 调用 prepare_hash_slot,将输入元组(outerslot)的分组键(如 t1.a)填充到 hashslot
  5. 查找或创建哈希表条目

    • 调用 LookupTupleHashEntry,根据 hashslot 计算哈希值(hash)并查找条目:
      • 分支:若找到条目或创建新条目(entry != NULL):
        • isnew = true,调用 initialize_hash_entry 初始化新条目(设置初始聚合状态)。
        • 保存条目的分组状态到 pergroup[setno]
      • 分支:若未找到条目(entry = NULL,溢出模式):
        • 获取溢出数据结构(spill)。
        • spill->partitions 未初始化,调用 hashagg_spill_init 设置溢出分区。
        • 调用 hashagg_spill_tuple,将元组和哈希值写入溢出分区。
        • 设置 pergroup[setno] = NULL
  6. 继续处理下一分组集

    • 循环处理所有分组集,直至完成。

示例查询关联

示例查询:

SELECT DISTINCT a FROM t1 LIMIT 10;

查询计划:

Limit  (cost=21.50..21.60 rows=10 width=4) (actual time=0.723..0.728 rows=10 loops=1)Output: aBuffers: shared hit=9->  HashAggregate  (cost=21.50..22.50 rows=100 width=4) (actual time=0.721..0.725 rows=10 loops=1)Output: aGroup Key: t1.aBatches: 1  Memory Usage: 24kBBuffers: shared hit=9->  Seq Scan on public.t1  (cost=0.00..19.00 rows=1000 width=4) (actual time=0.018..0.198 rows=1000 loops=1)Output: a, bBuffers: shared hit=9
Planning Time: 0.131 ms
Execution Time: 0.767 ms

关联说明

  • 步骤 1pergroup 存储哈希分组状态,outerslot 包含 Seq Scan 返回的元组(t1.a 值)。
  • 步骤 2num_hashes = 1(单分组集,Group Key: t1.a),循环仅执行一次。
  • 步骤 3hash_spill_mode = falseBatches: 1Memory Usage: 24kB 小于 work_mem),p_isnew = &isnew
  • 步骤 4:调用 select_current_set(setno=0, true),设置分组集;prepare_hash_slott1.a 值填充到 hashslot
  • 步骤 5
    • LookupTupleHashEntry 计算 t1.a 的哈希值(099):
      • t1.a 已存在,更新条目。
      • 若新值,创建条目(isnew = true),调用 initialize_hash_entry 初始化(DISTINCT 无聚合函数,仅记录键)。
      • 保存条目状态到 pergroup[0]
    • 本例无溢出(Batches: 1),不执行溢出分支。
  • 步骤 6:单分组集,循环结束。

Mermaid 流程图(原始逻辑)

  以下是 Mermaid 流程图,展示 lookup_hash_entries 的原始逻辑,细化分支但不包含 early_stop_limit
在这里插入图片描述

hashagg_finish_initial_spills 函数

  hashagg_finish_initial_spillsPostgreSQL 哈希聚合(HashAgg)流程中的辅助函数,负责在初始哈希表构建完成后处理磁盘溢出(spill。它将溢出的分区转换为新的批次(batches),供后续处理,并清理溢出相关资源。

/** hashagg_finish_initial_spills** 在处理完哈希聚合批次(HashAggBatch)后,如果有元组溢出到磁盘,* 将溢出的分区转换为新的批次,供后续执行。* 参数:*   aggstate - 聚合节点的运行时状态(AggState)*/
static void
hashagg_finish_initial_spills(AggState *aggstate)
{int			setno; // 分组集索引int			total_npartitions = 0; // 总分区数/* 检查是否存在溢出数据结构 */if (aggstate->hash_spills != NULL){/* 遍历所有哈希分组集 */for (setno = 0; setno < aggstate->num_hashes; setno++){HashAggSpill *spill = &aggstate->hash_spills[setno]; // 当前分组集的溢出数据total_npartitions += spill->npartitions; // 累积分区数hashagg_spill_finish(aggstate, spill, setno); // 完成当前分组集的溢出处理}/** 不再处理外层计划的元组,仅处理溢出批次的元组。* 初始溢出结构不再需要,释放内存。*/pfree(aggstate->hash_spills); // 释放溢出数据结构aggstate->hash_spills = NULL; // 清空溢出指针}/* 更新哈希聚合的度量信息 */hash_agg_update_metrics(aggstate, false, total_npartitions);aggstate->hash_spill_mode = false; // 关闭溢出模式
}

  以下是细化的流程描述,聚焦原始逻辑:

  1. 初始化变量

    • 定义分组集索引(setno)和总分区计数(total_npartitions)。
  2. 检查溢出数据结构

    • 检查 aggstate->hash_spills 是否存在:
      • 若为空(无溢出),跳过循环,直接更新度量。
      • 若不为空,继续处理。
  3. 遍历哈希分组集

    • 对每个分组集(setno 从 0 到 num_hashes-1):
      • 获取当前分组集的溢出数据结构(spill)。
      • 累积分区数(spill->npartitions)。
      • 调用 hashagg_spill_finish,将溢出分区转换为新批次。
  4. 释放溢出资源

    • 释放 hash_spills 内存,设置 hash_spills = NULL,表示不再处理外层计划元组,仅处理溢出批次。
  5. 更新度量并关闭溢出模式

    • 调用 hash_agg_update_metrics,记录分区数等信息(spilling = false)。
    • 设置 hash_spill_mode = false,退出溢出模式。

示例查询关联

示例查询:

SELECT DISTINCT a FROM t1 LIMIT 10;

查询计划:

Limit  (cost=21.50..21.60 rows=10 width=4) (actual time=0.723..0.728 rows=10 loops=1)Output: aBuffers: shared hit=9->  HashAggregate  (cost=21.50..22.50 rows=100 width=4) (actual time=0.721..0.725 rows=10 loops=1)Output: aGroup Key: t1.aBatches: 1  Memory Usage: 24kBBuffers: shared hit=9->  Seq Scan on public.t1  (cost=0.00..19.00 rows=1000 width=4) (actual time=0.018..0.198 rows=1000 loops=1)Output: a, bBuffers: shared hit=9
Planning Time: 0.131 ms
Execution Time: 0.767 ms

关联说明

  • 步骤 1:初始化 setnototal_npartitions = 0
  • 步骤 2hash_spills = NULLBatches: 1Memory Usage: 24kB 小于 work_mem),无溢出,跳过循环。
  • 步骤 3:无分组集需要处理(num_hashes = 1,但无溢出)。
  • 步骤 4:跳过释放 hash_spills(已是 NULL)。
  • 步骤 5:调用 hash_agg_update_metrics,记录 total_npartitions = 0,设置 hash_spill_mode = false

Mermaid 流程图(原始逻辑)

  以下是 Mermaid 流程图,展示 hashagg_finish_initial_spills 的原始逻辑,细化分支:

结束循环
开始: hashagg_finish_initial_spills
初始化变量
setno, total_npartitions = 0
hash_spills != NULL?
循环: setno = 0 to num_hashes-1
获取 spill 结构
total_npartitions += spill->npartitions
调用 hashagg_spill_finish
将溢出分区转为新批次
继续下一分组集
释放 hash_spills
设置 hash_spills = NULL
跳过循环
调用 hash_agg_update_metrics
更新度量信息
设置 hash_spill_mode = false
结束

agg_retrieve_hash_table 函数

  agg_retrieve_hash_tablePostgreSQL 哈希聚合(HashAgg)的核心函数之一,负责从哈希表中检索分组结果。它首先处理内存中的哈希表条目,若内存条目耗尽,则尝试从磁盘溢出分区重新填充哈希表,直至所有分组处理完毕。

/** agg_retrieve_hash_table** 为哈希聚合从哈希表中检索分组结果。* 在内存中元组耗尽后,尝试使用先前溢出的元组重新填充哈希表。* 仅在内存和溢出元组全部耗尽后返回 NULL。* 参数:*   aggstate - 聚合节点的运行时状态(AggState)* 返回值:*   TupleTableSlot* - 包含分组结果的元组槽,或 NULL(全部耗尽)*/
static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
{TupleTableSlot *result = NULL; // 初始化结果元组槽为 NULL/* 循环直到获取有效结果或全部耗尽 */while (result == NULL){/* 尝试从内存中的哈希表检索结果 */result = agg_retrieve_hash_table_in_memory(aggstate);if (result == NULL) // 如果内存中无更多结果{/* 尝试重新填充哈希表 */if (!agg_refill_hash_table(aggstate)){aggstate->agg_done = true; // 标记聚合完成break; // 退出循环}}}return result; // 返回结果元组槽或 NULL
}

  以下是细化的流程描述,聚焦原始逻辑:

  1. 初始化结果槽

    • 初始化 result 元组槽为 NULL
  2. 循环检索结果

    • 分支:当 resultNULL 时,继续循环:
      • 调用 agg_retrieve_hash_table_in_memory,从内存中的哈希表获取下一个分组结果:
        • 若返回有效元组槽(result != NULL),退出循环。
        • 若返回 NULL,内存中条目已耗尽。
      • 子分支:若内存中无结果,调用 agg_refill_hash_table
        • 若返回 true,表示成功从溢出分区重新填充哈希表,继续循环。
        • 若返回 false,表示无更多溢出元组,设置 agg_done = true 并退出循环。
  3. 返回结果

    • 返回 result(有效元组槽或 NULL)。

示例查询关联

示例查询:

SELECT DISTINCT a FROM t1 LIMIT 10;

查询计划:

Limit  (cost=21.50..21.60 rows=10 width=4) (actual time=0.723..0.728 rows=10 loops=1)Output: aBuffers: shared hit=9->  HashAggregate  (cost=21.50..22.50 rows=100 width=4) (actual time=0.721..0.725 rows=10 loops=1)Output: aGroup Key: t1.aBatches: 1  Memory Usage: 24kBBuffers: shared hit=9->  Seq Scan on public.t1  (cost=0.00..19.00 rows=1000 width=4) (actual time=0.018..0.198 rows=1000 loops=1)Output: a, bBuffers: shared hit=9
Planning Time: 0.131 ms
Execution Time: 0.767 ms

关联说明

  • 步骤 1result = NULL
  • 步骤 2
    • 调用 agg_retrieve_hash_table_in_memory,从哈希表(包含 100 个唯一 t1.a 值)获取分组结果:
      • 每次返回一个元组槽(t1.a 值),共返回 10 行(受 LIMIT 10 限制)。
      • Batches: 1 表示无溢出,内存中哈希表(24kB)包含所有分组。
    • 无需调用 agg_refill_hash_table(无溢出分区)。
  • 步骤 3:返回每个 result 元组槽(包含 t1.a),第 11 次调用返回 NULLagg_done = true)。

Mermaid 流程图(原始逻辑)

以下是 Mermaid 流程图,展示 agg_retrieve_hash_table 的原始逻辑,细化分支:

开始: agg_retrieve_hash_table
初始化 result = NULL
result == NULL?
调用 agg_retrieve_hash_table_in_memory
result != NULL?
退出循环
调用 agg_refill_hash_table
返回 true?
设置 agg_done = true
退出循环
返回 result
结束

agg_retrieve_hash_table_in_memory 函数

  agg_retrieve_hash_table_in_memoryPostgreSQL 哈希聚合(HashAgg)的核心函数之一,负责从内存中的哈希表检索分组结果,仅处理内存中的条目(不考虑溢出元组)。它循环扫描哈希表,转换分组数据,执行聚合函数,并生成结果元组。

/** agg_retrieve_hash_table_in_memory** 从内存中的哈希表检索分组结果,不考虑溢出到磁盘的元组。* 参数:*   aggstate - 聚合节点的运行时状态(AggState)* 返回值:*   TupleTableSlot* - 包含分组结果的元组槽,或 NULL(无更多分组)*/
static TupleTableSlot *
agg_retrieve_hash_table_in_memory(AggState *aggstate)
{ExprContext *econtext; // 输出元组的表达式上下文AggStatePerAgg peragg; // 聚合函数状态数组AggStatePerGroup pergroup; // 分组状态TupleHashEntryData *entry; // 哈希表条目TupleTableSlot *firstSlot; // 扫描元组槽TupleTableSlot *result; // 结果元组槽AggStatePerHash perhash; // 当前分组集的哈希数据/** 获取节点的状态信息。* econtext 是每个输出元组的表达式上下文。*/econtext = aggstate->ss.ps.ps_ExprContext; // 获取表达式上下文peragg = aggstate->peragg; // 获取聚合函数状态firstSlot = aggstate->ss.ss_ScanTupleSlot; // 获取扫描元组槽/** 注意:perhash(及其相关数据)可能在循环中变化,* 因为我们会在不同分组集之间切换。*/perhash = &aggstate->perhash[aggstate->current_set]; // 获取当前分组集的哈希数据/** 循环检索分组,直到找到满足 aggstate->ss.ps.qual 的分组*/for (;;){TupleTableSlot *hashslot = perhash->hashslot; // 哈希表槽int			i; // 循环计数器CHECK_FOR_INTERRUPTS(); // 检查查询中断信号/** 查找哈希表中的下一个条目*/entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter); // 扫描哈希表if (entry == NULL) // 如果没有更多条目{int			nextset = aggstate->current_set + 1; // 下一个分组集索引if (nextset < aggstate->num_hashes) // 如果存在更多分组集{/** 切换到下一个分组集,重新初始化并重启循环*/select_current_set(aggstate, nextset, true); // 切换分组集perhash = &aggstate->perhash[aggstate->current_set]; // 更新哈希数据ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter); // 重置迭代器continue; // 继续循环}else // 无更多分组集{return NULL; // 返回 NULL}}/** 为每个分组清空输出元组上下文。* 不使用 ReScanExprContext,因为可能有聚合函数的关闭回调尚未调用。*/ResetExprContext(econtext); // 重置表达式上下文/** 将代表性元组转换回具有正确列的格式*/ExecStoreMinimalTuple(entry->firstTuple, hashslot, false); // 存储最小元组slot_getallattrs(hashslot); // 获取哈希表槽的所有属性ExecClearTuple(firstSlot); // 清空扫描元组槽memset(firstSlot->tts_isnull, true,firstSlot->tts_tupleDescriptor->natts * sizeof(bool)); // 将所有属性标记为 NULL/* 复制分组列到扫描元组槽 */for (i = 0; i < perhash->numhashGrpCols; i++){int			varNumber = perhash->hashGrpColIdxInput[i] - 1; // 输入列索引firstSlot->tts_values[varNumber] = hashslot->tts_values[i]; // 复制值firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i]; // 复制空值标志}ExecStoreVirtualTuple(firstSlot); // 存储虚拟元组pergroup = (AggStatePerGroup) entry->additional; // 获取分组状态/** 使用代表性输入元组处理限定条件和目标列表中的非聚合列引用*/econtext->ecxt_outertuple = firstSlot; // 设置外部元组prepare_projection_slot(aggstate,econtext->ecxt_outertuple,aggstate->current_set); // 准备投影槽finalize_aggregates(aggstate, peragg, pergroup); // 完成聚合函数计算result = project_aggregates(aggstate); // 投影聚合结果if (result) // 如果投影结果有效return result; // 返回结果元组槽}/* 无更多分组 */return NULL;
}

  以下是细化的流程描述,聚焦原始逻辑:

  1. 初始化变量

    • 获取表达式上下文(econtext)、聚合函数状态(peragg)、扫描元组槽(firstSlot)。
    • 获取当前分组集的哈希数据(perhash)。
  2. 循环检索分组

    • 调用 CHECK_FOR_INTERRUPTS 检查查询中断。
    • 调用 ScanTupleHashTable,获取哈希表中的下一个条目(entry):
      • 分支:若 entry = NULL(当前分组集耗尽):
        • 检查是否存在下一个分组集(nextset < num_hashes):
          • 若存在,调用 select_current_set 切换分组集,更新 perhash,调用 ResetTupleHashIterator 重置迭代器,继续循环。
          • 若无更多分组集,返回 NULL
      • 分支:若 entry != NULL,继续处理。
  3. 重置表达式上下文

    • 调用 ResetExprContext,清空 econtext,避免调用关闭回调(使用普通重置)。
  4. 转换代表性元组

    • 调用 ExecStoreMinimalTuple 将哈希表条目的 firstTuple 存储到 hashslot
    • 调用 slot_getallattrs 获取 hashslot 的属性。
    • 清空 firstSlot,将所有属性标记为 NULLtts_isnull)。
    • 复制分组列(numhashGrpCols)从 hashslotfirstSlot,存储为虚拟元组。
  5. 准备投影和聚合

    • 设置 econtext->ecxt_outertuple = firstSlot,用于非聚合列引用。
    • 调用 prepare_projection_slot,准备投影槽。
    • 调用 finalize_aggregates,完成聚合函数计算(peraggpergroup)。
    • 调用 project_aggregates,生成投影结果(result)。
  6. 返回结果

    • 分支:若 result 不为空,返回结果元组槽。
    • 否则,继续循环查找下一个分组。

示例查询关联

示例查询:

SELECT DISTINCT a FROM t1 LIMIT 10;

查询计划:

Limit  (cost=21.50..21.60 rows=10 width=4) (actual time=0.723..0.728 rows=10 loops=1)Output: aBuffers: shared hit=9->  HashAggregate  (cost=21.50..22.50 rows=100 width=4) (actual time=0.721..0.725 rows=10 loops=1)Output: aGroup Key: t1.aBatches: 1  Memory Usage: 24kBBuffers: shared hit=9->  Seq Scan on public.t1  (cost=0.00..19.00 rows=1000 width=4) (actual time=0.018..0.198 rows=1000 loops=1)Output: a, bBuffers: shared hit=9
Planning Time: 0.131 ms
Execution Time: 0.767 ms

关联说明

  • 步骤 1econtext 为输出上下文,peragg = NULLDISTINCT 无聚合函数),firstSlot 为扫描槽,perhash 指向哈希表(含 100 个唯一 t1.a 值)。
  • 步骤 2num_hashes = 1current_set = 0
    • ScanTupleHashTable 返回哈希表条目(每次一个 t1.a 值)。
    • 本例无其他分组集(nextset >= 1),耗尽后返回 NULL.
  • 步骤 3:重置 econtext
  • 步骤 4:将 entry->firstTuplet1.a 值)存储到 hashslot,复制到 firstSlot(仅 t1.a 列)。
  • 步骤 5:设置 ecxt_outertuple = firstSlot,调用 prepare_projection_slotproject_aggregatesfinalize_aggregates 为空操作,DISTINCT 无聚合),返回 t1.a 值。共返回 10 行(受 LIMIT 10 限制)。
  • 步骤 6:返回有效元组槽(10 次),第 11 次返回 NULL.

Mermaid 流程图(原始逻辑)

  以下是 Mermaid 流程图,展示 agg_retrieve_hash_table_in_memory 的原始逻辑,细化分支:

开始: agg_retrieve_hash_table_in_memory
初始化变量
获取 econtext, peragg, firstSlot, perhash
循环开始
检查中断信号
CHECK_FOR_INTERRUPTS
调用 ScanTupleHashTable
获取下一个条目
entry == NULL?
nextset < num_hashes?
调用 select_current_set
更新 perhash
重置迭代器
返回 NULL
重置 econtext
存储 entry->firstTuple 到 hashslot
获取属性
清空 firstSlot
复制分组列
设置 ecxt_outertuple = firstSlot
调用 prepare_projection_slot
调用 finalize_aggregates
调用 project_aggregates
result != NULL?
返回 result
结束

agg_retrieve_direct 函数

  agg_retrieve_directPostgreSQL 非哈希聚合(AGG_PLAINAGG_SORTED,可能包含 AGG_MIXED 的排序部分)的核心函数,负责从外层计划读取元组,处理分组,执行聚合函数,并返回结果元组。它支持分组集和多阶段处理,循环直到找到满足条件的有效分组或耗尽所有分组。

/** agg_retrieve_direct** 为非哈希聚合(普通或排序聚合)处理分组并返回结果元组。* 参数:*   aggstate - 聚合节点的运行时状态(AggState)* 返回值:*   TupleTableSlot* - 包含分组结果的元组槽,或 NULL(无更多分组)*/
static TupleTableSlot *
agg_retrieve_direct(AggState *aggstate)
{Agg		   *node = aggstate->phase->aggnode; // 聚合计划节点ExprContext *econtext; // 输出元组的表达式上下文ExprContext *tmpcontext; // 输入元组的表达式上下文AggStatePerAgg peragg; // 聚合函数状态数组AggStatePerGroup *pergroups; // 分组状态数组TupleTableSlot *outerslot; // 外层计划元组槽TupleTableSlot *firstSlot; // 扫描元组槽TupleTableSlot *result; // 结果元组槽bool		hasGroupingSets = aggstate->phase->numsets > 0; // 是否有分组集int			numGroupingSets = Max(aggstate->phase->numsets, 1); // 分组集数量int			currentSet; // 当前分组集索引int			nextSetSize; // 下一个分组集的列数int			numReset; // 需要重置的上下文数量int			i; // 循环计数器/** 获取节点的状态信息。* econtext 是每个输出元组的表达式上下文。* tmpcontext 是每个输入元组的表达式上下文。*/econtext = aggstate->ss.ps.ps_ExprContext; // 获取输出上下文tmpcontext = aggstate->tmpcontext; // 获取输入上下文peragg = aggstate->peragg; // 获取聚合函数状态pergroups = aggstate->pergroups; // 获取分组状态firstSlot = aggstate->ss.ss_ScanTupleSlot; // 获取扫描元组槽/** 循环检索分组,直到找到满足 aggstate->ss.ps.qual 的分组。* 对于分组集,projected_set 为 -1(初始调用)或已完成分组的索引。*/while (!aggstate->agg_done){/** 为每个分组清空输出上下文和聚合上下文(含引用传递的转换值)。* 使用 ReScanExprContext 调用关闭回调,确保清理非内存资源。*/ReScanExprContext(econtext);/** 确定在当前边界需要重置的分组集数量。*/if (aggstate->projected_set >= 0 &&aggstate->projected_set < numGroupingSets)numReset = aggstate->projected_set + 1;elsenumReset = numGroupingSets;/* 重置聚合上下文 */for (i = 0; i < numReset; i++){ReScanExprContext(aggstate->aggcontexts[i]);}/** 检查输入是否完成且当前阶段无更多分组集需要投影;* 若完成,切换到下一阶段或标记聚合完成。*/if (aggstate->input_done == true &&aggstate->projected_set >= (numGroupingSets - 1)){if (aggstate->current_phase < aggstate->numphases - 1){/* 切换到下一阶段 */initialize_phase(aggstate, aggstate->current_phase + 1);aggstate->input_done = false;aggstate->projected_set = -1;numGroupingSets = Max(aggstate->phase->numsets, 1);node = aggstate->phase->aggnode;numReset = numGroupingSets;}else if (aggstate->aggstrategy == AGG_MIXED){/** 混合模式:已输出所有分组数据,切换到哈希表输出。*/initialize_phase(aggstate, 0);aggstate->table_filled = true;ResetTupleHashIterator(aggstate->perhash[0].hashtable,&aggstate->perhash[0].hashiter);select_current_set(aggstate, 0, true);return agg_retrieve_hash_table(aggstate);}else{aggstate->agg_done = true;break;}}/** 获取下一个分组集的列数,用于检查是否到达分组边界。*/if (aggstate->projected_set >= 0 &&aggstate->projected_set < (numGroupingSets - 1))nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];elsenextSetSize = 0;/** 如果存在当前分组集的子组,投影它。* 新分组的条件:* - 输入耗尽但未投影所有分组集(已在上面检查)* 或* - 已投影的行不属于最后一个分组集* 且* - 下一个分组集至少有一个分组列* 且* - 前一行和当前行在下一个分组集的列上不同*/tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;if (aggstate->input_done ||(node->aggstrategy != AGG_PLAIN &&aggstate->projected_set != -1 &&aggstate->projected_set < (numGroupingSets - 1) &&nextSetSize > 0 &&!ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],tmpcontext))){aggstate->projected_set += 1;Assert(aggstate->projected_set < numGroupingSets);Assert(nextSetSize > 0 || aggstate->input_done);}else{/** 下一个投影将是第一个(或唯一)分组集。*/aggstate->projected_set = 0;/** 如果没有当前分组的第一个元组,从外层计划获取。*/if (aggstate->grp_firstTuple == NULL){outerslot = fetch_input_tuple(aggstate);if (!TupIsNull(outerslot)){/* 复制第一个输入元组,用于比较和投影 */aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);}else{/* 外层计划无元组 */if (hasGroupingSets){/** 无输入时,仅投影大小为 0 的分组集。*/aggstate->input_done = true;while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0){aggstate->projected_set += 1;if (aggstate->projected_set >= numGroupingSets)break;}if (aggstate->projected_set >= numGroupingSets)continue;}else{aggstate->agg_done = true;if (node->aggstrategy != AGG_PLAIN)return NULL;}}}/* 初始化新输入元组组的工作状态 */initialize_aggregates(aggstate, pergroups, numReset);if (aggstate->grp_firstTuple != NULL){/* 存储第一个输入元组到扫描槽 */ExecForceStoreHeapTuple(aggstate->grp_firstTuple, firstSlot, true);aggstate->grp_firstTuple = NULL;/* 设置第一次 advance_aggregates 调用 */tmpcontext->ecxt_outertuple = firstSlot;/* 处理外层计划元组,直到耗尽或跨越分组边界 */for (;;){/* 混合模式阶段 1 需要更新哈希表 */if (aggstate->aggstrategy == AGG_MIXED &&aggstate->current_phase == 1){lookup_hash_entries(aggstate);}/* 推进聚合函数或组合函数 */advance_aggregates(aggstate);/* 重置每个输入元组的上下文 */ResetExprContext(tmpcontext);outerslot = fetch_input_tuple(aggstate);if (TupIsNull(outerslot)){/* 外层计划元组耗尽 */if (aggstate->aggstrategy == AGG_MIXED &&aggstate->current_phase == 1)hashagg_finish_initial_spills(aggstate);if (hasGroupingSets)aggstate->input_done = true;elseaggstate->agg_done = true;break;}/* 设置下一次 advance_aggregates 调用 */tmpcontext->ecxt_outertuple = outerslot;/* 如果分组,检查是否跨越分组边界 */if (node->aggstrategy != AGG_PLAIN && node->numCols > 0){tmpcontext->ecxt_innertuple = firstSlot;if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],tmpcontext)){aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);break;}}}}/* 使用代表性输入元组处理非聚合列引用 */econtext->ecxt_outertuple = firstSlot;}Assert(aggstate->projected_set >= 0);currentSet = aggstate->projected_set;prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);select_current_set(aggstate, currentSet, false);finalize_aggregates(aggstate, peragg, pergroups[currentSet]);/* 如果当前无行可投影,继续循环 */result = project_aggregates(aggstate);if (result)return result;}/* 无更多分组 */return NULL;
}

以下是细化的流程描述,聚焦原始逻辑:

  1. 初始化变量

    • 获取聚合节点(node)、表达式上下文(econtexttmpcontext)、聚合状态(peraggpergroups)、扫描槽(firstSlot)。
    • 计算分组集数量(numGroupingSets)和是否包含分组集(hasGroupingSets)。
  2. 循环检索分组

    • 分支:当 agg_done = false 时:
      • 调用 ReScanExprContext(econtext) 重置输出上下文,调用关闭回调清理资源。
      • 计算需要重置的上下文数量(numReset),重置 aggcontexts
      • 子分支:若 input_done = trueprojected_set 达到最后一个分组集:
        • 若存在更多阶段(current_phase < numphases - 1),调用 initialize_phase 切换阶段,重置 input_doneprojected_set
        • 若为 AGG_MIXED,初始化哈希表阶段,调用 agg_retrieve_hash_table
        • 否则,设置 agg_done = true,退出循环。
  3. 确定下一个分组集列数

    • projected_set 有效,获取下一个分组集的列数(nextSetSize);否则,设为 0。
  4. 检查新分组

    • 分支:若满足新分组条件(输入耗尽或分组列不同):
      • 增加 projected_set
    • 否则,设置 projected_set = 0
  5. 获取分组的第一个元组

    • 分支:若 grp_firstTuple = NULL
      • 调用 fetch_input_tuple 获取外层元组:
        • 若有效,复制到 grp_firstTuple
        • 若为空:
          • 若有分组集,跳过非空分组集,设置 input_done = true
          • 若无分组集,设置 agg_done = true(非 AGG_PLAIN 返回 NULL)。
  6. 初始化和处理分组

    • 调用 initialize_aggregates 初始化聚合状态。
    • grp_firstTuple != NULL
      • 存储到 firstSlot,清空 grp_firstTuple
      • 循环处理外层元组:
        • AGG_MIXEDcurrent_phase = 1,调用 lookup_hash_entries 更新哈希表。
        • 调用 advance_aggregates 推进聚合。
        • 重置 tmpcontext
        • 获取下一个元组,若为空,处理溢出(AGG_MIXED)并设置 input_doneagg_done
        • 若分组(非 AGG_PLAIN),检查分组边界,若不同,复制新元组到 grp_firstTuple,退出循环。
  7. 投影结果

    • 设置 econtext->ecxt_outertuple = firstSlot
    • 调用 prepare_projection_slotfinalize_aggregates
    • 调用 project_aggregates,若返回有效 result,返回;否则继续循环。
  8. 返回结果

    • 若无更多分组,返回 NULL

示例查询关联

示例查询:

SELECT DISTINCT a FROM t1 LIMIT 10;

查询计划(假设使用 AGG_SORTED 而非 HashAggregate):

Limit  (cost=21.50..21.60 rows=10 width=4) (actual time=0.723..0.728 rows=10 loops=1)Output: aBuffers: shared hit=9->  Sort  (cost=21.50..22.50 rows=100 width=4) (actual time=0.721..0.725 rows=10 loops=1)Output: aSort Key: t1.aBuffers: shared hit=9->  Seq Scan on public.t1  (cost=0.00..19.00 rows=1000 width=4) (actual time=0.018..0.198 rows=1000 loops=1)Output: a, bBuffers: shared hit=9
Planning Time: 0.131 ms
Execution Time: 0.767 ms

关联说明

  • 步骤 1aggstrategy = AGG_SORTEDnumGroupingSets = 1peragg = NULLDISTINCT 无聚合函数),firstSlot 为扫描槽。
  • 步骤 2agg_done = falseprojected_set = -1,重置 econtextnumReset = 1
  • 步骤 3nextSetSize = 0(单分组集)。
  • 步骤 4:设置 projected_set = 0
  • 步骤 5grp_firstTuple = NULL,调用 fetch_input_tuple 获取 1000 行(t1.a 值),复制第一个元组。
  • 步骤 6
    • 初始化聚合(空操作)。
    • 存储 grp_firstTuplefirstSlot,循环处理元组:
      • 调用 advance_aggregates(记录 t1.a 去重)。
      • 获取下一个元组,检查分组边界(t1.a 值不同时退出)。
      • 若输入耗尽,设置 agg_done = true.
  • 步骤 7:设置 ecxt_outertuple,调用 project_aggregates,返回 t1.a10 行,受 LIMIT 10 限制)。
  • 步骤 8:第 11 次返回 NULL.

Mermaid 流程图(原始逻辑)

  以下是 Mermaid 流程图,展示 agg_retrieve_direct 的原始逻辑,细化分支:

在这里插入图片描述

总结

  以下是基于 PostgreSQL HashAggregate 相关函数的精简调用关系流程图,使用 Mermaid 语法,仅列出函数名称和中文操作解释,突出调用关系,聚焦原始逻辑。
在这里插入图片描述

函数名称与中文操作解释

以下是将上述内容整理为表格形式的结果:

序号名称功能描述
1ExecAgg执行聚合,分派策略
2agg_fill_hash_table填充哈希表,处理输入元组
3fetch_input_tuple获取输入元组
4ExecProcNode执行子计划节点(如 Seq Scan
5lookup_hash_entries查找或创建分组
6select_current_set选择当前分组集
7prepare_hash_slot准备哈希槽位
8LookupTupleHashEntry查找或新建哈希条目
9initialize_hash_entry初始化哈希条目
10hashagg_spill_init初始化溢出分区
11hashagg_spill_tuple写入溢出数据
12advance_aggregates推进聚合函数计算
13hashagg_finish_initial_spills完成溢出处理
14hashagg_spill_finish转换溢出分区为批次
15hash_agg_update_metrics更新统计信息
16agg_retrieve_hash_table从哈希表取结果
17agg_retrieve_hash_table_in_memory从内存哈希表取分组
18ScanTupleHashTable扫描哈希表条目
19ResetTupleHashIterator重置哈希表迭代器
20prepare_projection_slot准备投影槽
21finalize_aggregates完成聚合计算
22project_aggregates生成结果元组
23agg_refill_hash_table重新填充哈希表
24tuplesort_gettupleslot从溢出批次或排序器取元组
25initialize_aggregates初始化聚合状态
26ExecQual检查分组边界

示例查询关联(SELECT DISTINCT a FROM t1 LIMIT 10

  • 计划Limit -> HashAggregate -> Seq Scan, Batches: 1, Memory Usage: 24kB.
  • 调用路径
    • ExecAggaggstrategy = AGG_HASHED,初次 table_filled = false,调用 agg_fill_hash_table,然后 agg_retrieve_hash_table
    • agg_fill_hash_table:循环调用 fetch_input_tuple(通过 ExecProcNode 获取 1000 行),lookup_hash_entries(调用 select_current_set, prepare_hash_slot, LookupTupleHashEntry, initialize_hash_entry 构建哈希表,100 个唯一 t1.a 值),advance_aggregates(去重),hashagg_finish_initial_spills(调用 hash_agg_update_metrics,无溢出,hash_spills = NULL)。
    • agg_retrieve_hash_table:调用 agg_retrieve_hash_table_in_memory(调用 ScanTupleHashTable, prepare_projection_slot, finalize_aggregates(空操作),project_aggregates),返回 10 行,无需 agg_refill_hash_table(无溢出)。
    • agg_retrieve_direct:未触发。
http://www.dtcms.com/a/419176.html

相关文章:

  • 选择邯郸网站建设电商网站如何备案
  • 易旅游网站建设东莞有哪些做推广的网站
  • [算法练习]第三天:定长滑动窗口
  • 山海织锦·时序成画——连云港城市旅游宣传片的策划、拍摄与制作全流程解构
  • Mariadb服务器
  • 现代Web存储技术(三):配额监控与自动化清理机制
  • 高并发系统的海量数据处理架构
  • 苹果群控系统游戏运营如何实现自动执行任务
  • NXP - 在MCUXpresso IDE中查看编译日志文件的方法
  • 荣耀官方网站郑州粒米seo外包
  • UI自动化框架之Selenium(一)
  • AI编程:自动化代码生成的实践
  • 网站免费建站ppa企业网站托管和网站建设服务商
  • LSTM自然语言处理情感分析项目(二)加载数据集
  • 自定义渲染管线 Custom Render Pipeline
  • 【循环神经网络3】门控循环单元GRU详解
  • 邯郸网站设计做网站的动态图片
  • 建网站要花钱吗网络建设推广
  • 【Java并发】揭秘Lock体系 -- 深入理解AbstractQueuedSynchronizer(AQS)
  • 3.8 数据链路层设备 (答案见原书 P122)
  • 轻松修复 WordPress 的“缺少临时文件夹”错误
  • PHP智能开发工具PhpStorm v2025.2全新上线——支持PHPUnit 12等
  • MySQL 事务和 Spring 事务
  • 怎样免费建立网站广州工商注册查询系统官网
  • 广州新建站wordpress 缩略图 oss
  • JVM 目录
  • Unity学习之常用的数据结构
  • 【C++实战(51)】C++11新特性实战:移动语义与右值引用,解锁性能密码
  • 做宠物的网站有哪些做任务 网站
  • python做网站缺点公司建设官方网站