当前位置: 首页 > wzjs >正文

做网站 就上宝华建站如何做网络营销

做网站 就上宝华建站,如何做网络营销,2017年政府网站建设的讲话,网站 多语言深入理解 PostgreSQL 中的 tuplesort_performsort 函数 函数概述函数源码函数流程图函数签名核心功能相关函数简介 代码结构与逻辑分析1. 内存上下文切换2. 调试跟踪(可选)3. 状态机逻辑(switch 分支)4. 调试跟踪(完成…

深入理解 PostgreSQL 中的 tuplesort_performsort 函数

  • 函数概述
    • 函数源码
    • 函数流程图
    • 函数签名
    • 核心功能
    • 相关函数简介
  • 代码结构与逻辑分析
    • 1. 内存上下文切换
    • 2. 调试跟踪(可选)
    • 3. 状态机逻辑(switch 分支)
    • 4. 调试跟踪(完成时)
    • 5. 恢复内存上下文
  • 关键特性与优化
    • 1. 状态机设计
    • 2. 状态转换图
    • 3. 混合排序策略
    • 4. 资源管理哲学
    • 5. 异常处理机制
  • 相关函数介绍
    • tuplesort_sort_memtuples 函数
      • 优化策略与特性
      • 应用场景
    • inittapes 函数
    • dumptuples 函数
    • leader_takeover_tapes 函数
    • mergeruns 函数

声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书

  在 PostgreSQL 的元组排序(tuplesort)模块中,tuplesort_performsort 是一个核心函数,负责执行排序的主要逻辑。它根据当前的排序状态(state->status选择不同的排序策略,处理串行并行排序场景,并确保内存管理的高效性。本文将详细分析该函数的实现细节,探讨其作用、逻辑结构、状态转换以及相关优化策略,以帮助读者更好地理解 PostgreSQL 的排序机制。其中,tuplesort_performsort函数的源码流程可见【PostgreSQL内核学习 —— (sort算子)】。

函数概述

函数源码

/** 执行排序的主入口函数,处理不同排序阶段的逻辑*/
void
tuplesort_performsort(Tuplesortstate *state)
{// 切换到排序专用的内存上下文,确保内存分配在正确环境中进行MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);#ifdef TRACE_SORT  // 调试跟踪模块if (trace_sort)elog(LOG, "performsort of worker %d starting: %s",state->worker, pg_rusage_show(&state->ru_start));  // 记录排序开始时的资源使用情况
#endif// 根据当前排序状态选择处理逻辑switch (state->status){case TSS_INITIAL:  // 初始状态(未开始排序)if (SERIAL(state)) {  // 串行模式/* 单线程快速排序 */tuplesort_sort_memtuples(state);  // 对内存中的元组数组执行快速排序state->status = TSS_SORTEDINMEM;  // 更新状态为"内存已排序"} else if (WORKER(state)) {  // 并行工作线程模式/* * 并行Worker将数据写入磁带:* 1. 初始化磁带存储(inittapes)* 2. 将内存数据转储到磁带(dumptuples)* 3. 标记不需要合并运行(worker_nomergeruns)*/inittapes(state, false);          // 初始化磁带文件dumptuples(state, true);          // 将内存元组写入磁带worker_nomergeruns(state);        // 设置Worker不参与合并state->status = TSS_SORTEDONTAPE;  // 更新状态为"磁带已排序"} else {  // 并行Leader模式/* * Leader接管Worker的磁带并执行多路归并:* 1. 接管磁带资源(leader_takeover_tapes)* 2. 执行多路归并排序(mergeruns)*/leader_takeover_tapes(state);     // 接管Worker的磁带资源mergeruns(state);                 // 执行多路归并排序}// 重置迭代器状态state->current = 0;                   // 重置当前读取位置state->eof_reached = false;           // 清除EOF标记state->markpos_block = 0L;            // 重置标记位置块号state->markpos_offset = 0;            // 重置标记位置偏移量state->markpos_eof = false;           // 清除标记EOF状态break;case TSS_BOUNDED:  // 有界堆排序状态/* * 转换有界堆为有序数组:* 1. 调用sort_bounded_heap处理堆结构* 2. 更新状态为"内存已排序"*/sort_bounded_heap(state);             // 将有界堆转换为有序数组state->current = 0;state->eof_reached = false;state->markpos_offset = 0;state->markpos_eof = false;state->status = TSS_SORTEDINMEM;      // 更新状态break;case TSS_BUILDRUNS:  // 磁带构建运行状态/** 完成磁带排序:* 1. 将内存剩余元组写入磁带(dumptuples)* 2. 执行多路归并(mergeruns)*/dumptuples(state, true);              // 最终转储内存数据mergeruns(state);                     // 执行磁带合并state->eof_reached = false;state->markpos_block = 0L;state->markpos_offset = 0;state->markpos_eof = false;break;default:  // 无效状态处理elog(ERROR, "invalid tuplesort state");  // 抛出错误break;}#ifdef TRACE_SORT  // 调试跟踪模块if (trace_sort) {if (state->status == TSS_FINALMERGE)  // 最终合并阶段日志elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start));else  // 常规完成日志elog(LOG, "performsort of worker %d done: %s",state->worker, pg_rusage_show(&state->ru_start));}
#endif// 恢复原始内存上下文MemoryContextSwitchTo(oldcontext);
}

函数流程图

在这里插入图片描述

函数签名

void tuplesort_performsort(Tuplesortstate *state);
  • 参数Tuplesortstate *state 是一个指向排序状态结构的指针,包含了排序过程中的所有关键信息,如当前状态、内存数据、磁带文件等。
  • 作用:作为排序执行的入口点,tuplesort_performsort 根据状态机的设计,处理不同的排序阶段,包括内存排序、磁带写入和多路归并等。

核心功能

  • 根据 state->status 的值,决定是执行内存快速排序将数据写入磁带,还是进行多路归并
  • 支持串行模式并行模式(包括 WorkerLeader 角色)。
  • 管理内存上下文,确保资源分配和释放的正确性。
  • 重置迭代器状态,为后续读取操作做好准备。

相关函数简介

  • tuplesort_sort_memtuples:内存中的快速排序实现。
  • inittapes:初始化磁带文件结构。
  • dumptuples:将内存元组写入磁带。
  • worker_nomergeruns:标记 Worker 无需参与合并。
  • leader_takeover_tapesLeader 接管 Worker 的磁带资源。
  • mergeruns:执行多路归并排序。
  • sort_bounded_heap:将有界堆转换为有序数组。

这些函数共同构成了 tuplesort 模块的功能链。

代码结构与逻辑分析

  以下是 tuplesort_performsort 的完整代码分解与分析:

1. 内存上下文切换

MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
  • 目的:将当前内存上下文切换到排序专用的 state->sortcontext,确保排序过程中分配的内存都在这个上下文中,便于统一管理和释放。
  • 重要性PostgreSQL 使用内存上下文来隔离不同操作的内存分配,防止内存泄漏或冲突。

在函数结束时,会通过 MemoryContextSwitchTo(oldcontext) 恢复原始上下文。

2. 调试跟踪(可选)

#ifdef TRACE_SORT
if (trace_sort)elog(LOG, "performsort of worker %d starting: %s",state->worker, pg_rusage_show(&state->ru_start));
#endif
  • 作用:如果启用了 TRACE_SORT 宏,记录排序开始时的资源使用情况(如 CPU 和内存)。
  • 工具pg_rusage_show 用于显示资源使用统计,便于性能分析和调试。

类似的调试日志也会在排序完成后记录,区分是否处于最终合并阶段。

3. 状态机逻辑(switch 分支)

  tuplesort_performsort 的核心逻辑基于 state->status 的值,通过 switch-case 结构分支执行:
TSS_INITIAL:初始状态

case TSS_INITIAL:if (SERIAL(state)) {  // 串行模式tuplesort_sort_memtuples(state);  // 内存快速排序state->status = TSS_SORTEDINMEM;  // 更新状态} else if (WORKER(state)) {  // 并行 Worker 模式inittapes(state, false);          // 初始化磁带dumptuples(state, true);          // 写入磁带worker_nomergeruns(state);        // 标记无需合并state->status = TSS_SORTEDONTAPE; // 更新状态} else {  // 并行 Leader 模式leader_takeover_tapes(state);     // 接管磁带mergeruns(state);                 // 多路归并}// 重置迭代器状态state->current = 0;state->eof_reached = false;state->markpos_block = 0L;state->markpos_offset = 0;state->markpos_eof = false;break;

串行模式 (SERIAL(state)):

调用 tuplesort_sort_memtuples 对内存中的元组数组执行快速排序。
状态更新为 TSS_SORTEDINMEM,表示内存已排序完成。

并行 Worker 模式 (WORKER(state)):

初始化磁带文件(inittapes)。
将内存中的元组写入磁带(dumptuples)。
调用 worker_nomergeruns 标记 Worker 不参与后续合并。
状态更新为 TSS_SORTEDONTAPE,表示数据已排序并存储在磁带上。

并行 Leader 模式:

调用 leader_takeover_tapes 接管 Worker 的磁带资源。
执行 mergeruns 进行多路归并排序。

迭代器重置:无论哪种模式,都会重置读取相关的状态变量(如 currenteof_reached)。


TSS_BOUNDED:有界堆排序状态

case TSS_BOUNDED:sort_bounded_heap(state);             // 转换堆为有序数组state->current = 0;state->eof_reached = false;state->markpos_offset = 0;state->markpos_eof = false;state->status = TSS_SORTEDINMEM;      // 更新状态break;
  • 逻辑

    • 调用 sort_bounded_heap 将有界堆转换为有序数组
    • 更新状态为 TSS_SORTEDINMEM
  • 适用场景:用于需要限制内存使用的排序(如 Top-N 查询)。


TSS_BUILDRUNS:构建运行状态

case TSS_BUILDRUNS:dumptuples(state, true);              // 写入剩余元组mergeruns(state);                     // 多路归并state->eof_reached = false;state->markpos_block = 0L;state->markpos_offset = 0;state->markpos_eof = false;break;
  • 逻辑

    • 将内存中剩余的元组写入磁带(dumptuples)。
    • 执行多路归并(mergeruns)完成排序。
  • 适用场景:处理大数据集时,内存不足以容纳所有元组,需分阶段写入磁带。


默认分支:无效状态

default:elog(ERROR, "invalid tuplesort state");break;
  • 作用:如果 state->status 不匹配任何已知状态,抛出错误,确保程序健壮性。

4. 调试跟踪(完成时)

#ifdef TRACE_SORT
if (trace_sort) {if (state->status == TSS_FINALMERGE)elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",state->worker, state->nInputTapes, pg_rusage_show(&state->ru_start));elseelog(LOG, "performsort of worker %d done: %s",state->worker, pg_rusage_show(&state->ru_start));
}
#endif
  • 作用:记录排序完成时的资源使用情况,特别区分是否处于最终合并阶段(TSS_FINALMERGE)。

5. 恢复内存上下文

MemoryContextSwitchTo(oldcontext);
  • 目的:恢复调用前的内存上下文,确保不影响外部环境。

关键特性与优化

1. 状态机设计

Tuplesortstate 中的 status 字段定义了排序的不同阶段:

  • TSS_INITIAL:初始状态,未开始排序。
  • TSS_BOUNDED:有界堆排序状态,用于内存受限场景。
  • TSS_BUILDRUNS:构建运行状态,处理大数据集。
  • TSS_SORTEDINMEM:内存已排序完成。
  • TSS_SORTEDONTAPE:磁带已排序完成。
  • TSS_FINALMERGE:最终合并阶段(调试日志中提及)。

这种状态机设计使得函数逻辑清晰,能够灵活应对不同场景。

2. 状态转换图

  以下是 tuplesort_performsort 中状态转换的简化流程:

[TSS_INITIAL]├─SERIAL → 快速排序 → [TSS_SORTEDINMEM]├─WORKER → 磁带写入 → [TSS_SORTEDONTAPE] └─LEADER → 多路归并 → [TSS_FINALMERGE]TSS_BOUNDED → TSS_SORTEDINMEMTSS_BUILDRUNS → (完成合并后,可能进入 TSS_SORTEDONTAPE 或 TSS_FINALMERGE)

3. 混合排序策略

  根据数据规模动态选择算法:

数据量 < work_mem → 快速排序
work_mem < 数据量 < 10*work_mem → 堆排序
数据量 > 10*work_mem → 多阶段归并

4. 资源管理哲学

  • 内存优先:最大限度利用可用内存
  • 渐进式落盘:按需写入临时文件
  • 并行流水线:计算与I/O重叠

5. 异常处理机制

  • 内存超限:触发"soft"错误可恢复
  • 磁盘空间不足:硬错误终止
  • 并行协调:通过共享内存检测Worker状态

相关函数介绍

tuplesort_sort_memtuples 函数

  本函数是PostgreSQL内存排序的核心实现,根据不同的数据类型特征选择最优化的快速排序策略。主要执行流程为:
  tuplesort_sort_memtuplesPostgreSQL tuplesort 机制中的核心函数之一负责对 memtuples 数组中的元组执行快速排序 (quicksort)。首先,它会检查 LEADER(state) 以确保当前不是 Leader 线程,然后判断 memtupcount 是否大于 1,如果只有一个或没有元组,则无需排序,直接返回。
  接下来,它会根据数据特征选择最优的排序策略。如果 haveDatum1 标志为 true,且 sortKeys 存在,表示可以使用专门优化的比较器。针对不同的 comparator,例如 ssup_datum_unsigned_cmpssup_datum_signed_cmp(仅适用于 64 位系统)或 ssup_datum_int32_cmp,分别调用 qsort_tuple_unsignedqsort_tuple_signedqsort_tuple_int32 进行排序,以提升性能。
  如果不能使用这些优化路径,则会检查 onlyKey 是否非空,若为 true,说明只涉及单列排序,调用 qsort_ssup 进行排序;否则,调用 qsort_tuple,使用 comparetup 作为比较函数,执行多列排序。这样,tuplesort_sort_memtuples 在不同数据类型和排序需求下,都能高效地选择最优排序方式,提高整体查询性能。
  函数源码如下:

/* * 对内存中的元组数组进行快速排序* 根据排序键类型选择最优化的排序实现*/
static void
tuplesort_sort_memtuples(Tuplesortstate *state)
{/* 断言确认当前不是并行排序的Leader进程 */Assert(!LEADER(state));/* 仅当内存元组数量>1时才需要排序 */if (state->memtupcount > 1){/** 检查是否存在可优化的排序条件:* 1. 首列数据存储在datum1字段中* 2. 存在有效的排序键定义*/if (state->base.haveDatum1 && state->base.sortKeys){/* 针对无符号整型比较器的优化路径 */if (state->base.sortKeys[0].comparator == ssup_datum_unsigned_cmp){/* 调用无符号整型特化快速排序 */qsort_tuple_unsigned(state->memtuples,state->memtupcount,state);return; // 优化路径直接返回}
#if SIZEOF_DATUM >= 8  // 64位系统专属优化/* 针对有符号整型比较器的优化路径 */else if (state->base.sortKeys[0].comparator == ssup_datum_signed_cmp){/* 调用有符号整型特化快速排序 */qsort_tuple_signed(state->memtuples,state->memtupcount,state);return;}
#endif/* 针对32位整型比较器的优化路径 */else if (state->base.sortKeys[0].comparator == ssup_datum_int32_cmp){/* 调用32位整型特化快速排序 */qsort_tuple_int32(state->memtuples,state->memtupcount,state);return;}}/* 单排序键的通用处理路径 */if (state->base.onlyKey != NULL){/* 使用标准单键快速排序实现 */qsort_ssup(state->memtuples, state->memtupcount,state->base.onlyKey);}/* 多键/复杂排序场景 */else{/* 使用元组通用快速排序实现 */qsort_tuple(state->memtuples,state->memtupcount,state->base.comparetup,  // 自定义元组比较函数state);                  // 传递状态上下文}}
}

优化策略与特性

  • 数据类型特化: 通过识别排序键的数据类型(如无符号整数有符号整数32 位整数),选择专用 qsort 变体,提升性能。
  • 单键优化: 单键场景下使用 qsort_ssup,避免多键比较的复杂性。
  • 条件编译: #if SIZEOF_DATUM >= 8 确保有符号整数优化仅在 64 位系统启用,增强兼容性。
  • 惰性执行: 仅在元组数量大于 1 时排序,减少无用操作。

应用场景

  • 内存中小数据集: 快速完成排序。
  • 外部排序: 在外部排序的运行中,对内存中的元组进行预排序后写入磁带。
  • 并行排序: Worker 线程在提交结果前对本地元组排序。

inittapes 函数

  inittapes 函数是 PostgreSQL 元组排序模块的核心函数之一,专门用于在内存不足以完成排序时初始化外部磁带排序的环境
  它首先检查 mergeruns 参数,判断是否需要合并多个排序运行runs)。如果需要,则根据可用内存调用 tuplesort_merge_order 计算最优的输入磁带数量,以优化归并性能;如果不需要(通常发生在 Worker 线程中),则将磁带数量设为最小值,以简化处理流程。
  接着,函数会记录切换到外部排序的日志(若启用了调试跟踪),并调用 inittapestateLogicalTapeSetCreate 来创建和初始化逻辑磁带集。该逻辑磁带集可以支持与共享文件集关联,使得并行排序任务能够协调进行。
  随后,inittapes 还会初始化运行编号、输入和输出磁带数组,以及相关的计数器等状态变量,确保磁带资源管理井然有序。
  最后,函数将排序状态更新为 TSS_BUILDRUNS,并选择一个新的磁带,以准备数据写入。通过合理分配磁带资源并设置必要的状态inittapes 为后续的大规模外部排序奠定了坚实基础,确保 PostgreSQL 在处理超出内存容量的大规模数据排序时依然能够高效运行。
  函数源码如下:

/** inittapes - initialize for tape sorting.* 初始化磁带排序的函数。** This is called only if we have found we won't sort in memory.* 仅在确定无法在内存中完成排序时调用此函数。*/
static void
inittapes(Tuplesortstate *state, bool mergeruns)  // 定义静态函数 inittapes,参数包括排序状态 state 和是否需要合并运行的布尔值 mergeruns
{Assert(!LEADER(state));  // 断言当前线程不是 Leader 线程,因为 Leader 线程通常不直接处理磁带初始化,而是协调其他线程if (mergeruns)  // 如果 mergeruns 为真,表示需要进行多运行的合并排序{/* Compute number of input tapes to use when merging *//* 计算在合并时需要使用的输入磁带数量 */state->maxTapes = tuplesort_merge_order(state->allowedMem);  // 调用 tuplesort_merge_order 函数,根据允许的内存大小计算合适的输入磁带数量}else  // 如果 mergeruns 为假,表示无需合并,通常发生在 Worker 线程生成单个运行的情况{/* Workers can sometimes produce single run, output without merge *//* Worker 线程有时可以生成单个运行并直接输出,无需合并 */Assert(WORKER(state));  // 断言当前线程是 Worker 线程,确保逻辑正确state->maxTapes = MINORDER;  // 将磁带数量设置为最小值 MINORDER(通常为 1),因为只需处理单个运行}if (trace_sort)  // 如果启用了排序跟踪功能(trace_sort 为真)elog(LOG, "worker %d switching to external sort with %d tapes: %s",  // 调用 elog 记录日志state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));  // 日志内容包括 Worker ID、使用磁带数量及资源使用情况/* Create the tape set *//* 创建磁带集 */inittapestate(state, state->maxTapes);  // 调用 inittapestate 函数初始化磁带状态,准备管理磁带文件和资源state->tapeset =  // 为状态中的 tapeset 赋值,创建逻辑磁带集LogicalTapeSetCreate(false,  // 调用 LogicalTapeSetCreate 创建逻辑磁带集,第一个参数为 false 表示不使用随机访问state->shared ? &state->shared->fileset : NULL,  // 如果 state->shared 存在,则使用共享文件集,否则为 NULLstate->worker);  // 传递 Worker ID,用于标识该磁带集的所有者state->currentRun = 0;  // 初始化当前运行编号为 0,表示从第一个运行开始/** Initialize logical tape arrays.* 初始化逻辑磁带数组。*/state->inputTapes = NULL;  // 将输入磁带数组初始化为空,表示当前没有输入磁带state->nInputTapes = 0;    // 初始化输入磁带数量为 0state->nInputRuns = 0;     // 初始化输入运行数量为 0state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));  // 使用 palloc0 分配输出磁带数组内存,大小为 maxTapes 个指针,并初始化为 0state->nOutputTapes = 0;   // 初始化输出磁带数量为 0state->nOutputRuns = 0;    // 初始化输出运行数量为 0state->status = TSS_BUILDRUNS;  // 更新排序状态为 TSS_BUILDRUNS,表示当前处于构建运行的阶段selectnewtape(state);  // 调用 selectnewtape 函数选择一个新的磁带,用于后续元组的写入操作
}

dumptuples 函数

  该函数是PostgreSQL外部排序的核心例程,负责将内存中的排序结果持久化到磁带文件(实际为临时文件),形成可用于多路归并的初始有序段。主要实现以下关键功能:

​1. 内存管控逻辑

  • 动态决策转储时机:通过LACKMEM判断内存压力,仅在内存不足或强制模式时触发转储
  • 空转储预防机制:避免生成无效空运行段(Worker进程例外)
  • 运行次数安全墙:防止INT_MAX溢出导致逻辑错误

​2. 磁带管理

  • 多磁带轮转策略:通过selectnewtape实现Round-robin磁带选择
  • 运行段边界标记markrunend写入特殊标识区分不同数据段
  • 并行Worker支持:维护独立的运行计数器及磁带分配

​3. 排序与持久化

  • 内存排序优化:调用特化的tuplesort_sort_memtuples进行快速排序
  • 批量写入优化:采用顺序写模式最大化I/O效率
  • 元组内存管理:重置内存上下文避免碎片,精确统计内存使用

​4. 资源监控与调试

  • 资源使用跟踪:通过pg_rusage_show记录CPU/内存/I/O数据
  • 多级日志输出:记录排序开始、结束及磁带写入的关键节点
  • 运行编号追踪:维护currentRun实现多阶段运行管理

  该过程在外部排序中循环执行,直至所有数据处理完成,形成多个有序的磁带运行段,为后续的多阶段归并排序奠定基础。
  函数源码如下:

/** 将内存中的元组转储到磁带,形成初始归并段* * @param state 排序状态机* @param alltuples 是否强制转储所有内存元组*                  (true表示输入数据已结束,必须全部转储)*/
static void
dumptuples(Tuplesortstate *state, bool alltuples)
{int         memtupwrite;  // 待写入的元组总数int         i;            // 循环索引/** 内存未耗尽且非强制转储时提前返回:* 1. 内存元组未达容量上限* 2. 仍有可用内存空间* 3. 非最终强制转储模式*/if (state->memtupcount < state->memtupsize && !LACKMEM(state) &&!alltuples)return;/** 处理空转储的特殊情况:* 1. 内存中无待处理元组* 2. 当前运行轮次>0(避免创建全空初始运行)* 例外:Worker进程必须生成至少一个磁带*/if (state->memtupcount == 0 && state->currentRun > 0)return;// 验证当前处于构建运行阶段Assert(state->status == TSS_BUILDRUNS);/** 运行次数安全校验:* 防止currentRun溢出INT_MAX导致不可预测行为* 实际场景几乎不可能触发,但作为防御性编程措施*/if (state->currentRun == INT_MAX)ereport(ERROR,  // 抛出致命错误(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),errmsg("cannot have more than %d runs for an external sort",INT_MAX)));/* 新运行轮次需要切换输出磁带 */if (state->currentRun > 0)selectnewtape(state);  // 选择新的磁带进行写入state->currentRun++;  // 递增运行计数器/* 调试日志:记录排序开始信息 */if (trace_sort)elog(LOG, "worker %d starting quicksort of run %d: %s",state->worker, state->currentRun,pg_rusage_show(&state->ru_start));  // 显示资源使用情况/** 核心排序操作:* 对当前内存中的元组执行快速排序* 使用特化排序函数提升性能*/tuplesort_sort_memtuples(state);/* 调试日志:记录排序完成信息 */if (trace_sort)elog(LOG, "worker %d finished quicksort of run %d: %s",state->worker, state->currentRun,pg_rusage_show(&state->ru_start));/* 准备写入操作 */memtupwrite = state->memtupcount;  // 获取待写入元组总数for (i = 0; i < memtupwrite; i++)  // 遍历排序后的元组{SortTuple  *stup = &state->memtuples[i];  // 获取当前元组指针WRITETUP(state, state->destTape, stup);  // 将元组写入目标磁带}state->memtupcount = 0;  // 重置内存元组计数器/** 重置元组内存上下文:* 1. 释放所有已分配的元组内存* 2. 防止内存碎片化,特别是元组大小差异较大时* 3. 对于有界排序,避免AllocSetFree按大小分类导致的碎片*/MemoryContextReset(state->base.tuplecontext);/** 更新内存统计:* 1. 释放元组占用的内存配额* 2. 重置内存使用计数器*/FREEMEM(state, state->tupleMem);state->tupleMem = 0;/* 标记当前磁带运行结束 */markrunend(state->destTape);/* 调试日志:记录磁带写入完成 */if (trace_sort)elog(LOG, "worker %d finished writing run %d to tape %d: %s",state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,pg_rusage_show(&state->ru_start));
}

leader_takeover_tapes 函数

  本函数是PostgreSQL并行外部排序的核心协调函数,主要实现Leader进程对Worker排序结果的统一接管和归并准备。其核心工作流程可分为四个阶段

  1. 状态安全校验
    • 通过双断言确保执行环境正确(Leader身份、至少1Worker
    • 采用自旋锁(SpinLock)安全读取共享内存中的Worker完成状态
    • 完整性检查防止数据丢失(确保所有Worker完成任务)
  2. ​磁带系统初始化
    • 复用Worker生成的共享文件集合(shared->fileset
    • 创建逻辑磁带集合(LogicalTapeSet),建立虚拟磁带管理系统
    • 初始化运行计数器为Worker数量,建立归并轮次基础
  3. ​数据结构重置
    • 清空输入磁带列表,准备构建多路归并的输入源
    • 分配输出磁带数组,容量等于Worker数量
    • 设置输出参数反映待处理的并行运行段
  4. 磁带资源整合
    • 循环遍历每个Worker,通过LogicalTapeImport将其物理文件抽象为逻辑磁带
    • Worker的输出磁带挂载到Leader的磁带管理系统
    • 最终状态切换至TSS_BUILDRUNS,标志初始归并段构建完成
        函数源码如下:
/** Leader进程接管所有Worker生成的排序磁带,构建归并所需的磁带集合* 使Leader进入与串行外部排序相同的状态,准备执行多路归并*/
static void
leader_takeover_tapes(Tuplesortstate *state)
{Sharedsort *shared = state->shared;         // 获取共享内存控制结构int         nParticipants = state->nParticipants; // 参与的Worker总数int         workersFinished;               // 实际完成的Worker计数器int         j;                             // 循环索引/* 核心断言校验 */Assert(LEADER(state));                     // 确保当前是Leader进程Assert(nParticipants >= 1);                // 至少存在1个Worker参与/* 安全获取Worker完成状态 */SpinLockAcquire(&shared->mutex);           // 获取自旋锁保护共享内存workersFinished = shared->workersFinished; // 读取已完成的Worker数量SpinLockRelease(&shared->mutex);           // 释放自旋锁/* 完整性检查:所有Worker必须完成排序 */if (nParticipants != workersFinished)elog(ERROR, "cannot take over tapes before all workers finish");/** 初始化磁带系统:* 1. 创建包含Worker磁带的逻辑磁带集合* 2. 参数说明:*    - false: 表示不立即创建物理文件*    - &shared->fileset: 复用Worker的共享文件集合*    - -1: 临时文件使用默认编号*/inittapestate(state, nParticipants);       // 初始化磁带管理状态state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);/* * 设置currentRun为Worker数量:* 此值后续用于控制归并轮次,每个Worker对应一个初始运行段*/state->currentRun = nParticipants;/* * 重置输入输出状态,模拟串行排序的初始运行完成状态:* - inputTapes: 归并输入磁带数组(初始为空)* - nInputTapes/Runs: 输入磁带/运行数清零* - outputTapes: 分配Worker数量大小的输出磁带数组* - nOutputTapes/Runs: 设置为Worker数量,表示待处理运行段*/state->inputTapes = NULL;state->nInputTapes = 0;state->nInputRuns = 0;state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *)); // 清零分配state->nOutputTapes = nParticipants;       // 输出磁带数=Worker数state->nOutputRuns = nParticipants;        // 每个Worker对应1个运行段/* 循环导入每个Worker的磁带 */for (j = 0; j < nParticipants; j++){/* * 将Worker的磁带导入Leader的磁带集合:* - j: Worker的编号标识* - &shared->tapes[j]: 对应Worker的磁带元数据*/state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);}/* 更新状态机至构建运行完成阶段 */state->status = TSS_BUILDRUNS;
}

mergeruns 函数

  该函数是PostgreSQL外部排序的核心归并引擎,负责将多个已排序的初始运行(磁带上的有序数据段)通过多轮平衡K路归并,最终生成完全有序的结果。主要功能包括:

​1. 预处理阶段

  • 禁用缩写键优化,确保使用完整比较器
  • 重构内存管理系统,切换为slab分配器优化小对象分配
  • 初始化K路归并堆结构,分配每个磁带的元组槽位

​2. 多阶段归并

  • 磁带轮转机制:将上一轮的输出磁带作为下一轮的输入
  • 动态缓冲区分配:根据剩余内存计算最优I/O缓冲区大小
  • 归并策略选择:自动判断是否进入最终单次归并阶段
  • 平衡归并执行:每次从N个输入磁带选取最小元素写入新运行

​3. 终止与收尾

  • 结果磁带冻结:阻止后续修改,准备读取
  • 资源清理:释放缓冲区内存,关闭磁带句柄
  • 状态更新:标记排序完成状态

  函数源码如下:

/** 执行多阶段归并排序,将多个有序运行段合并为最终有序结果* 实现平衡K路归并算法,处理可能的多轮归并过程*/
static void
mergeruns(Tuplesortstate *state)
{int         tapenum;  // 磁带编号迭代变量/* 校验当前状态:必须处于构建运行完成阶段且内存无剩余元组 */Assert(state->status == TSS_BUILDRUNS);Assert(state->memtupcount == 0);/* 禁用缩写键优化:磁盘数据不存储缩写键,需使用完整比较器 */if (state->base.sortKeys != NULL && state->base.sortKeys->abbrev_converter != NULL){state->base.sortKeys->abbrev_converter = NULL;  // 清空缩写转换器state->base.sortKeys->comparator = state->base.sortKeys->abbrev_full_comparator; // 切换完整比较器/* 清理相关字段保持结构整洁 */state->base.sortKeys->abbrev_abort = NULL;state->base.sortKeys->abbrev_full_comparator = NULL;}/* 重置元组内存上下文:释放所有已分配元组,切换slab分配策略 */MemoryContextResetOnly(state->base.tuplecontext);/* 释放原始大内存数组,后续使用更紧凑的堆结构 */FREEMEM(state, GetMemoryChunkSpace(state->memtuples));pfree(state->memtuples);state->memtuples = NULL;/* 初始化slab分配器:每个输入磁带分配1槽 + 结果保留槽 */if (state->base.tuples)init_slab_allocator(state, state->nOutputTapes + 1); // 元组类型需要内存管理elseinit_slab_allocator(state, 0); // 值类型无需额外分配/* 初始化归并堆:分配N路归并所需的元组槽 */state->memtupsize = state->nOutputTapes; // 堆容量等于输入磁带数state->memtuples = (SortTuple *) MemoryContextAlloc(state->base.maincontext,state->nOutputTapes * sizeof(SortTuple));USEMEM(state, GetMemoryChunkSpace(state->memtuples)); // 更新内存统计/* 分配磁带缓冲区内存:使用全部剩余可用内存 */state->tape_buffer_mem = state->availMem; USEMEM(state, state->tape_buffer_mem);if (trace_sort)  // 调试日志:记录内存分配elog(LOG, "worker %d using %zu KB of memory for tape buffers",state->worker, state->tape_buffer_mem / 1024);/* 多阶段归并主循环 */for (;;){/* 新归并轮次初始化:当输入运行耗尽时重新配置磁带 */if (state->nInputRuns == 0){int64       input_buffer_size;/* 清理上一轮输入磁带 */if (state->nInputTapes > 0){for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)LogicalTapeClose(state->inputTapes[tapenum]); // 关闭磁带句柄pfree(state->inputTapes); // 释放输入磁带数组}/* 轮转磁带角色:上轮输出变为本轮输入 */state->inputTapes = state->outputTapes;state->nInputTapes = state->nOutputTapes;state->nInputRuns = state->nOutputRuns;/* 初始化新输出磁带集合 */state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));state->nOutputTapes = 0;state->nOutputRuns = 0;/* 计算输入缓冲区大小(平衡I/O与内存使用) */input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,state->nInputTapes,state->nInputRuns,state->maxTapes);/* 调试日志:记录归并轮次开始 */if (trace_sort)elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB for input: %s",state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,pg_rusage_show(&state->ru_start));/* 准备输入磁带:倒带并设置读取缓冲区 */for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);/* 最终合并条件判断:单轮归并可结束 */if ((state->base.sortopt & TUPLESORT_RANDOMACCESS) == 0  // 无需随机访问&& state->nInputRuns <= state->nInputTapes  // 运行数<=磁带数&& !WORKER(state))  // 非工作进程{LogicalTapeSetForgetFreeSpace(state->tapeset); // 停止空间回收beginmerge(state);         // 初始化最终归并state->status = TSS_FINALMERGE; // 更新状态return;  // 进入最终归并阶段}}/* 选择输出磁带并执行单次归并 */selectnewtape(state);   // 分配新输出磁带mergeonerun(state);     // 合并一个完整运行/* 终止条件:输入耗尽且输出单一运行 */if (state->nInputRuns == 0 && state->nOutputRuns <= 1)break;}/* 归并完成处理 */state->result_tape = state->outputTapes[0];  // 设置结果磁带if (!WORKER(state))LogicalTapeFreeze(state->result_tape, NULL);  // 主进程冻结磁带elseworker_freeze_result_tape(state);  // 工作进程特殊处理state->status = TSS_SORTEDONTAPE;  // 更新为磁带已排序状态/* 资源清理:关闭所有输入磁带释放缓冲区 */for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)LogicalTapeClose(state->inputTapes[tapenum]);
}
http://www.dtcms.com/wzjs/31997.html

相关文章:

  • 杭州怎么做网站网站建站哪家公司好
  • 邯郸网站开发定制百度起诉seo公司
  • 做塑胶网站需要什么上海百度推广公司
  • 电白建设局网站海外独立站
  • wordpress建站公司长春网站提升排名
  • 学用mvc做网站优化网站平台
  • 惠州seo排名公司福州百度快速优化
  • 网站建设m.cnzran.com优化网站排名解析推广
  • 万脑网站建设网络营销策略研究论文
  • 手机在线设计成都百度网站排名优化
  • 深圳网站建设培训班关键词排名霸屏代做
  • 网站开发的源码会计培训班需要学多长时间
  • 前程无忧怎么做网站微信营销典型案例
  • 做ae好的网站有哪些长沙官网seo服务
  • wordpress拖曳式建站关键词排名优化工具
  • 体育设施建设网站哪里能买精准客户电话
  • 企业网站推广的目的百度seo技术
  • 凡客诚品的衣服质量怎么样360seo排名优化服务
  • 如何查看网站蜘蛛苏州网站排名推广
  • 百度收录什么网站吗深圳推广公司推荐
  • 允许个人做动漫网站吗广告公司收费价格表
  • 人民日报客户端和人民网的区别郑州seo优化外包公司
  • 百度快照 网站描述 更新谷歌浏览器下载安装(手机安卓版)
  • 湖南智能网站建设推荐手机百度免费下载
  • 河北网站制作公司报价微营销平台有哪些
  • 比分网站建设今日百度小说排行榜风云榜
  • 花瓣网设计网站网页百度
  • 湖南网站建设推荐推广自己产品的文案
  • 太平洋手机报价大全长沙网站优化指导
  • 有没有做企业网站的种子搜索