[simdjson] document_stream | iterate_many() | batch_size | 线程加速 | 轻量handle
第七章:文档流
欢迎回来
在前面的章节中,我们学习了如何使用解析器结合填充字符串获取表示JSON根节点的文档,并通过按需API(On-Demand API)遍历值、对象和数组,同时使用simdjson_result
进行错误处理。
到目前为止,我们专注于逐个解析单个JSON文档。
但如果需要处理包含多个连续JSON文档的大型文件或网络流呢?
常见格式如NDJSON(换行符分隔的JSON)或JSON Lines,其中每行都是完整且有效的JSON文档。
例如:
{"user":"Alice", "id":1}
{"user":"Bob", "id":2}
{"user":"Charlie", "id":3}
...(可能有数百万行)
将整个文件加载到内存逐个解析会非常低效且消耗大量内存。这正是文档流抽象要解决的问题。
什么是文档流?
simdjson::ondemand::document_stream
(DOM API对应simdjson::dom::document_stream
,但我们聚焦按需API的iterate_many
)旨在高效处理包含多个JSON文档的大型输入。
-
不同于逐个提供文档字符串,
iterate_many
(DOM API用parse_many
)一次性接收整个缓冲区或文件内容。 -
iterate_many
不会立即解析所有内容,而是建立内部机制,在迭代流
时逐个查找并解析文档。
想象成传送带:将整个大型容器装载到传送带(iterate_many
),然后逐个处理到达面前的物品(循环遍历document_stream
)。无需同时存储所有物品,只需处理当前项。
这种方法有两大优势:
- 内存高效:无需同时加载整个GB级流到内存(尽管需要初始大缓冲区)。解析器为每个文档
复用
内部缓冲区。 - 高性能:Simdjson可用高速方法处理流块。按需API甚至能用
后台线程
在您处理当前文档时查找下一个文档!
获取文档流
通过simdjson::ondemand::parser
实例的iterate_many()
方法获取document_stream
。
链接:https://github.com/simdjson/simdjson/blob/master/doc/iterate_many.md
iterate_many()
参数通常包括:
- 填充JSON数据缓冲区指针(
const uint8_t*
或padded_string
) - 数据长度(
size_t
) - 可选的
batch_size
(后续讨论) - 可选的允许逗号分隔文档标志(NDJSON/JSONL不常用)
与其他simdjson操作类似,iterate_many()
返回simdjson_result<simdjson::ondemand::document_stream>
。使用流前必须检查结果是否有错误。
以下为示例NDJSON数据处理:
#include <simdjson.h>
#include <iostream>int main() {// 假设从大文件读取的NDJSON数据simdjson::padded_string ndjson_data = R"({"user":"Alice", "id":1}{"user":"Bob", "id":2}{"user":"Charlie", "id":3})"_padded; // _padded确保填充simdjson::ondemand::parser parser;// 获取文档流simdjson::simdjson_result<simdjson::ondemand::document_stream> stream_result =parser.iterate_many(ndjson_data);// 检查流设置是否成功if (stream_result.error()) {std::cerr << "文档流设置错误: " << stream_result.error() << std::endl;return EXIT_FAILURE;}// 从结果中获取document_stream对象simdjson::ondemand::document_stream doc_stream = std::move(stream_result.value());std::cout << "成功建立文档流。" << std::endl;// 现在可以迭代流...(见下节)return EXIT_SUCCESS;
}
注意iterate_many
接收包含所有JSON文档的整个填充缓冲区。
迭代文档流
simdjson::ondemand::document_stream
设计用于基于范围的for循环。循环中每个项是simdjson_result<simdjson::ondemand::document_reference>
。
为什么用
document_reference
而非document
?
因为流为高效性复用解析器内部单个document
对象。document_reference
是轻量句柄,指向此内部可复用文档对象。每次循环移动时,内部文档对象更新为流中下一个JSON文档。
#include <simdjson.h>
#include <iostream>int main() {simdjson::padded_string ndjson_data = R"({"user":"Alice", "id":1}{"user":"Bob", "id":2}{"user":"Charlie", "id":3})"_padded;simdjson::ondemand::parser parser;auto stream_result = parser.iterate_many(ndjson_data);if (stream_result.error()) {std::cerr << "文档流设置错误: " << stream_result.error() << std::endl;return EXIT_FAILURE;}simdjson::ondemand::document_stream doc_stream = std::move(stream_result.value());// 遍历流中每个文档int doc_count = 0;for (auto doc_result : doc_stream) {// 每个'doc_result'是simdjson_result<simdjson::ondemand::document_reference>// 访问文档前检查错误if (doc_result.error()) {std::cerr << "解析文档" << doc_count << "错误: " << doc_result.error() << std::endl;// 可选择继续或停止continue; // 跳过损坏文档}// 获取文档引用simdjson::ondemand::document_reference doc = doc_result.value();std::cout << "处理文档 " << doc_count << ":" << std::endl;// 可像常规文档一样使用'doc'对象访问字段auto user_result = doc["user"].get_string();if (!user_result.error()) {std::cout << " 用户: " << user_result.value() << std::endl;} else {std::cerr << " 获取用户字段错误: " << user_result.error() << std::endl;}auto id_result = doc["id"].get_int64();if (!id_result.error()) {std::cout << " ID: " << id_result.value() << std::endl;} else {std::cerr << " 获取ID字段错误: " << id_result.error() << std::endl;}doc_count++;}std::cout << "已完成处理 " << doc_count << " 个文档。" << std::endl;// 'parser'和'ndjson_data'在循环期间需保持有效return EXIT_SUCCESS;
}
输出:
此模式是使用
document_stream
的核心。遍历流时获取当前JSON文档句柄(document_reference
),然后用标准按需API方法(get_object()
、[]
、get_string()
等)处理。
错误处理至关重要。错误可能发生在流设置(stream_result.error()
)或循环内解析单个文档时(doc_result.error()
),均需检查。
批处理大小与内存
调用iterate_many(buffer, len, batch_size)
时,batch_size
参数很重要。它定义simdjson初始结构分析(阶段1)时处理的输入缓冲区块大小。
batch_size
需大于流中最大单个JSON文档。若文档大于批处理大小,simdjson无法在单批次正确解析。- 更大
batch_size
可能通过减少批次切换开销提升性能,但需解析器分配足够内存。 - 合理默认值通常为1MB,但可根据文档大小调整。解析器容量需通过
parser.allocate()
设置足够大或自动扩展。
Simdjson读取batch_size
字节(或剩余字节),在阶段1查找块内所有文档边界,迭代时从该块提供文档。消耗完块内文档后加载下一批次。
线程加速
iterate_many
的强大功能之一(当simdjson启用线程时)是使用后台线程重叠工作。
主线程处理当前批次文档时(循环调用document_reference
方法),工作线程可同时在后台对下一批次数据执行阶段1分析。
这种重叠意味着后续批次阶段1的时间被"隐藏"在当前批次处理时间内。若单文档处理时间足够长,下批次阶段1可能已完成,使阶段1成本趋近于零!
(类似于 生产消费者模型的思想)
无需修改循环代码即可启用此功能。若simdjson编译时启用线程(SIMDJSON_THREADS_ENABLED
),iterate_many
会自动在适当时使用后台线程。
底层机制(简化版)
document_stream
对象管理输入缓冲区、batch_size
及跨文档和批次的迭代状态。包含:
- 原始输入缓冲区指针及长度
- 主
simdjson::ondemand::parser
指针 - 当前处理批次信息(起始位置
batch_start
) - 跟踪批次内当前文档的内部状态
- 启用线程时管理
stage1_worker
线程对象和辅助parser
实例(stage1_thread_parser
)
调用iterate_many(buffer, len, batch_size)
时:
- 构建
document_stream
对象,存储缓冲区/长度/批次大小和解析器指针 - 启动循环(
stream.begin()
)触发初始处理:- 主解析器内部缓冲区可能调整以适应
batch_size
- 用主解析器对输入缓冲区第一个
batch_size
字节执行阶段1 - 设置初始
document
句柄指向第一个文档 - 启用线程且有更多数据时,启动工作线程对下一批次执行阶段1
- 主解析器内部缓冲区可能调整以适应
- 循环内(
for (auto doc_result : doc_stream)
)document_reference
包装主解析器内部文档状态。对doc
的操作(get_object()
、[]
等)执行当前文档阶段2解析 - 循环
++
运算符移动至下一文档时:- 主解析器内部迭代器跳过当前文档
- 若迭代器到达当前批次末尾,检查工作线程是否完成下一批次阶段1
- 若下一批次阶段1就绪,快速交换主解析器和工作解析器内部状态。工作解析器成为新主解析器
- 禁用线程或工作线程未就绪时,主线程自行执行下一批次阶段1
- 加载新批次后更新内部文档句柄指向首文档
- 无更多批次时结束流迭代
关键在于document_stream
管理缓冲区块和后台阶段1处理,通过迭代器接口逐个产出文档。
高级:位置与截断
如需更多控制或调试,document_stream::iterator
(范围for循环隐式使用)提供额外方法:
iterator::current_index() const
:返回当前文档在原始缓冲区的字节偏移,用于记录错误或跟踪大文件进度iterator::source() const
:返回指向当前文档原始字节的std::string_view
(可能含周围空白)iterator::error() const
:返回当前文档error_code
流迭代完成后,document_stream
对象提供:
document_stream::truncated_bytes() const
:返回缓冲区末尾无法解析为完整JSON文档的字节数,用于检测不完整输入或尾部垃圾
逗号分隔文档
默认iterate_many
期望JSON空白分隔文档。
若文档用逗号分隔(如1, "hello", true, {}
),可在iterate_many
传入allow_comma_separated
参数为true
。但此模式需整个输入作为单批次处理(忽略batch_size
)且禁用线程,不适合大规模流式处理。
总结
处理多JSON文档大型文件(如NDJSON/JSON Lines)需内存高效方案。simdjson::ondemand::document_stream
专为此设计:
- 通过
parser.iterate_many(padded_data, ...)
获取文档流 - 用范围for循环迭代流:
for (auto doc_result : stream)
- 循环项是表示流中文档的
simdjson_result<document_reference>
,访问前需检查错误 batch_size
参数控制阶段1处理块大小,应不小于最大文档- 启用线程时,
iterate_many
可重叠下批次阶段1与当前批次处理,极大提升吞吐 - 循环中
document_reference
是轻量句柄,指向解析器复用内存
(通过指针移动,减少拷贝开销和零碎内存)
使用document_stream
可高效解析多文档输入,使simdjson成为处理流式JSON数据的利器。
掌握解析、数据处理、导航、错误处理和流处理后,我们已建立坚实基础。下一章将揭秘simdjson通过实现/CPU派发机制实现高性能的奥秘。
下一章:实现/CPU派发
补充:
轻量句柄
轻量句柄是内存中对象的简易引用,不直接存储数据,仅指向已存在的资源(如解析后的文档)。
循环中的document_reference
作为轻量句柄,避免了重复解析,直接复用内存中的结果,节省计算开销。
类似书签标记书页位置,书签(轻量句柄)本身不是
书页内容,但能快速定位
到已存在的页面(内存数据)。
核心特点
- 低开销:不复制数据,仅保存指向内存的指针或标识符。
- 高效复用:通过
引用共享已解析内容
,避免重复处理。 - 无所有权:轻量句柄不管理资源生命周期,
需确保
目标资源有效。
(通过指针移动,减少拷贝开销和零碎内存)