当前位置: 首页 > news >正文

[simdjson] document_stream | iterate_many() | batch_size | 线程加速 | 轻量handle

第七章:文档流

欢迎回来

在前面的章节中,我们学习了如何使用解析器结合填充字符串获取表示JSON根节点的文档,并通过按需API(On-Demand API)遍历值、对象和数组,同时使用simdjson_result进行错误处理。

到目前为止,我们专注于逐个解析单个JSON文档。

但如果需要处理包含多个连续JSON文档的大型文件或网络流呢?

常见格式如NDJSON(换行符分隔的JSON)或JSON Lines,其中每行都是完整且有效的JSON文档。

例如:

{"user":"Alice", "id":1}
{"user":"Bob", "id":2}
{"user":"Charlie", "id":3}
...(可能有数百万行)

将整个文件加载到内存逐个解析会非常低效且消耗大量内存。这正是文档流抽象要解决的问题。

什么是文档流?

simdjson::ondemand::document_stream(DOM API对应simdjson::dom::document_stream,但我们聚焦按需API的iterate_many)旨在高效处理包含多个JSON文档的大型输入。

  • 不同于逐个提供文档字符串,iterate_many(DOM API用parse_many一次性接收整个缓冲区或文件内容。

  • iterate_many不会立即解析所有内容,而是建立内部机制,在迭代流时逐个查找并解析文档。

想象成传送带:将整个大型容器装载到传送带(iterate_many),然后逐个处理到达面前的物品(循环遍历document_stream)。无需同时存储所有物品,只需处理当前项。

这种方法有两大优势:

  1. 内存高效:无需同时加载整个GB级流到内存(尽管需要初始大缓冲区)。解析器为每个文档复用内部缓冲区。
  2. 高性能:Simdjson可用高速方法处理流块。按需API甚至能用后台线程在您处理当前文档时查找下一个文档!

获取文档流

通过simdjson::ondemand::parser实例的iterate_many()方法获取document_stream

链接:https://github.com/simdjson/simdjson/blob/master/doc/iterate_many.md

iterate_many()参数通常包括:

  1. 填充JSON数据缓冲区指针(const uint8_t*padded_string
  2. 数据长度(size_t
  3. 可选的batch_size(后续讨论)
  4. 可选的允许逗号分隔文档标志(NDJSON/JSONL不常用)

与其他simdjson操作类似,iterate_many()返回simdjson_result<simdjson::ondemand::document_stream>。使用流前必须检查结果是否有错误。

以下为示例NDJSON数据处理:

#include <simdjson.h>
#include <iostream>int main() {// 假设从大文件读取的NDJSON数据simdjson::padded_string ndjson_data = R"({"user":"Alice", "id":1}{"user":"Bob", "id":2}{"user":"Charlie", "id":3})"_padded; // _padded确保填充simdjson::ondemand::parser parser;// 获取文档流simdjson::simdjson_result<simdjson::ondemand::document_stream> stream_result =parser.iterate_many(ndjson_data);// 检查流设置是否成功if (stream_result.error()) {std::cerr << "文档流设置错误: " << stream_result.error() << std::endl;return EXIT_FAILURE;}// 从结果中获取document_stream对象simdjson::ondemand::document_stream doc_stream = std::move(stream_result.value());std::cout << "成功建立文档流。" << std::endl;// 现在可以迭代流...(见下节)return EXIT_SUCCESS;
}

注意iterate_many接收包含所有JSON文档的整个填充缓冲区。

迭代文档流

simdjson::ondemand::document_stream设计用于基于范围的for循环。循环中每个项是simdjson_result<simdjson::ondemand::document_reference>

为什么用document_reference而非document

因为流为高效性复用解析器内部单个document对象。document_reference轻量句柄,指向此内部可复用文档对象。每次循环移动时,内部文档对象更新为流中下一个JSON文档。

#include <simdjson.h>
#include <iostream>int main() {simdjson::padded_string ndjson_data = R"({"user":"Alice", "id":1}{"user":"Bob", "id":2}{"user":"Charlie", "id":3})"_padded;simdjson::ondemand::parser parser;auto stream_result = parser.iterate_many(ndjson_data);if (stream_result.error()) {std::cerr << "文档流设置错误: " << stream_result.error() << std::endl;return EXIT_FAILURE;}simdjson::ondemand::document_stream doc_stream = std::move(stream_result.value());// 遍历流中每个文档int doc_count = 0;for (auto doc_result : doc_stream) {// 每个'doc_result'是simdjson_result<simdjson::ondemand::document_reference>// 访问文档前检查错误if (doc_result.error()) {std::cerr << "解析文档" << doc_count << "错误: " << doc_result.error() << std::endl;// 可选择继续或停止continue; // 跳过损坏文档}// 获取文档引用simdjson::ondemand::document_reference doc = doc_result.value();std::cout << "处理文档 " << doc_count << ":" << std::endl;// 可像常规文档一样使用'doc'对象访问字段auto user_result = doc["user"].get_string();if (!user_result.error()) {std::cout << "  用户: " << user_result.value() << std::endl;} else {std::cerr << "  获取用户字段错误: " << user_result.error() << std::endl;}auto id_result = doc["id"].get_int64();if (!id_result.error()) {std::cout << "  ID: " << id_result.value() << std::endl;} else {std::cerr << "  获取ID字段错误: " << id_result.error() << std::endl;}doc_count++;}std::cout << "已完成处理 " << doc_count << " 个文档。" << std::endl;// 'parser'和'ndjson_data'在循环期间需保持有效return EXIT_SUCCESS;
}

输出:

在这里插入图片描述

此模式是使用document_stream的核心。遍历流时获取当前JSON文档句柄(document_reference),然后用标准按需API方法(get_object()[]get_string()等)处理。

错误处理至关重要。错误可能发生在流设置(stream_result.error())或循环内解析单个文档时(doc_result.error()),均需检查。

批处理大小与内存

调用iterate_many(buffer, len, batch_size)时,batch_size参数很重要。它定义simdjson初始结构分析(阶段1)时处理的输入缓冲区块大小。

  • batch_size需大于流中最大单个JSON文档。若文档大于批处理大小,simdjson无法在单批次正确解析。
  • 更大batch_size可能通过减少批次切换开销提升性能,但需解析器分配足够内存。
  • 合理默认值通常为1MB,但可根据文档大小调整。解析器容量需通过parser.allocate()设置足够大或自动扩展。

在这里插入图片描述

Simdjson读取batch_size字节(或剩余字节),在阶段1查找块内所有文档边界,迭代时从该块提供文档。消耗完块内文档后加载下一批次

线程加速

iterate_many的强大功能之一(当simdjson启用线程时)是使用后台线程重叠工作。

主线程处理当前批次文档时(循环调用document_reference方法),工作线程可同时在后台对下一批次数据执行阶段1分析。

在这里插入图片描述

这种重叠意味着后续批次阶段1的时间被"隐藏"在当前批次处理时间内。若单文档处理时间足够长,下批次阶段1可能已完成,使阶段1成本趋近于零!

(类似于 生产消费者模型的思想)

无需修改循环代码即可启用此功能。若simdjson编译时启用线程(SIMDJSON_THREADS_ENABLED),iterate_many自动在适当时使用后台线程。

底层机制(简化版)

document_stream对象管理输入缓冲区batch_size及跨文档和批次的迭代状态。包含:

  • 原始输入缓冲区指针及长度
  • simdjson::ondemand::parser指针
  • 当前处理批次信息(起始位置batch_start
  • 跟踪批次内当前文档的内部状态
  • 启用线程时管理stage1_worker线程对象和辅助parser实例(stage1_thread_parser

调用iterate_many(buffer, len, batch_size)时:

  1. 构建document_stream对象,存储缓冲区/长度/批次大小和解析器指针
  2. 启动循环(stream.begin())触发初始处理:
    • 主解析器内部缓冲区可能调整以适应batch_size
    • 用主解析器对输入缓冲区第一个batch_size字节执行阶段1
    • 设置初始document句柄指向第一个文档
    • 启用线程且有更多数据时,启动工作线程对下一批次执行阶段1
  3. 循环内(for (auto doc_result : doc_stream)document_reference包装主解析器内部文档状态。对doc的操作(get_object()[]等)执行当前文档阶段2解析
  4. 循环++运算符移动至下一文档时:
    • 主解析器内部迭代器跳过当前文档
    • 若迭代器到达当前批次末尾,检查工作线程是否完成下一批次阶段1
    • 若下一批次阶段1就绪,快速交换主解析器和工作解析器内部状态。工作解析器成为新主解析器
    • 禁用线程或工作线程未就绪时,主线程自行执行下一批次阶段1
    • 加载新批次后更新内部文档句柄指向首文档
    • 无更多批次时结束流迭代

关键在于document_stream管理缓冲区块和后台阶段1处理,通过迭代器接口逐个产出文档。

高级:位置与截断

如需更多控制或调试,document_stream::iterator(范围for循环隐式使用)提供额外方法:

  • iterator::current_index() const返回当前文档在原始缓冲区的字节偏移,用于记录错误或跟踪大文件进度
  • iterator::source() const:返回指向当前文档原始字节的std::string_view(可能含周围空白)
  • iterator::error() const:返回当前文档error_code

流迭代完成后,document_stream对象提供:

  • document_stream::truncated_bytes() const:返回缓冲区末尾无法解析为完整JSON文档的字节数,用于检测不完整输入或尾部垃圾

逗号分隔文档

默认iterate_many期望JSON空白分隔文档。

若文档用逗号分隔(如1, "hello", true, {}),可在iterate_many传入allow_comma_separated参数为true。但此模式需整个输入作为单批次处理(忽略batch_size)且禁用线程,不适合大规模流式处理。

总结

处理多JSON文档大型文件(如NDJSON/JSON Lines)需内存高效方案。simdjson::ondemand::document_stream专为此设计:

  • 通过parser.iterate_many(padded_data, ...)获取文档流
  • 用范围for循环迭代流:for (auto doc_result : stream)
  • 循环项是表示流中文档的simdjson_result<document_reference>,访问前需检查错误
  • batch_size参数控制阶段1处理块大小,应不小于最大文档
  • 启用线程时,iterate_many可重叠下批次阶段1与当前批次处理,极大提升吞吐
  • 循环中document_reference是轻量句柄,指向解析器复用内存

(通过指针移动,减少拷贝开销和零碎内存)

使用document_stream可高效解析多文档输入,使simdjson成为处理流式JSON数据的利器。

掌握解析、数据处理、导航、错误处理和流处理后,我们已建立坚实基础。下一章将揭秘simdjson通过实现/CPU派发机制实现高性能的奥秘。

下一章:实现/CPU派发


补充:

轻量句柄

轻量句柄是内存中对象的简易引用,不直接存储数据,仅指向已存在的资源(如解析后的文档)

循环中的document_reference作为轻量句柄,避免了重复解析,直接复用内存中的结果,节省计算开销。

类似书签标记书页位置,书签(轻量句柄)本身不是书页内容,但能快速定位到已存在的页面(内存数据)。

核心特点

  • 低开销:不复制数据,仅保存指向内存的指针或标识符。
  • 高效复用:通过引用共享已解析内容,避免重复处理。
  • 无所有权:轻量句柄不管理资源生命周期,需确保目标资源有效。

(通过指针移动,减少拷贝开销和零碎内存)

http://www.dtcms.com/a/289331.html

相关文章:

  • Pycharm的Terminal打开后默认是python环境
  • 网工实验——路由器小项目
  • 每日面试题10:令牌桶
  • tidyverse-数据可视化 - 图形的分层语法
  • 论文分享(一)
  • C++ primer知识点总结
  • LVS-----TUN模式配置
  • Docker-compose-知识总结
  • 基于单片机倾角测量仪/角度测量/水平仪
  • 双8无碳小车“cad【17张】三维图+设计说名书
  • 【HarmonyOS】ArkUI - 自定义组件和结构重用
  • 【pandoc实践】如何将wordpress文章批量导出为Markdown格式
  • 神经网络:卷积层
  • 使用UV管理PyTorch项目
  • PyTorch常用的简单数学运算
  • Paimon INSERT OVERWRITE
  • 一维数组练题习~
  • PyTorch的基础概念和复杂模型的基本使用
  • 【软件测试】从软件测试到Bug评审:生命周期与管理技巧
  • ESXi6.7硬件传感器红色警示信息
  • ICT模拟零件测试方法--测量参数详解
  • ThinkPHP8极简上手指南:开启高效开发之旅
  • 基于机器视觉的迈克耳孙干涉环自动计数系统设计与实现
  • STM32CubeMX的一些操作步骤的作用
  • 拼写纠错模型Noisy Channel(下)
  • 机器学习理论基础 - 核心概念篇
  • 复杂度优先:基于推理链复杂性的提示工程新范式
  • Linux操作系统之线程(四):线程控制
  • 20250720-1-Kubernetes 调度-白话理解创建一个Pod的内部工作流_笔记
  • Qt的安装和环境配置