当前位置: 首页 > news >正文

vulkanscenegraph显示倾斜模型(6.4)-多线程下的记录与提交

前言

        上章深入分析了帧循环中呈现阶段的具体实现。本章将分析多线程下的记录与提交,进一步剖析vsg帧循环过程中的同步机制,并揭露信号量(VkSemaphore)和围栏(VkFence)以及vsg::FrameBlock与vsg::Barrier在其中的作用。


目录

  • 1 信号量(VkSemaphore)、栅栏(VkFence)、vsg::FrameBlock与vsg::Barrier
  • 2 多线程记录与提交

1 信号量(VkSemaphore)、围栏(VkFence)与vsg::FrameBlock、vsg::Barrier

        vsg::Semaphore封装了VkSaphore,用于将vulkan命令的完成与其他vulkan命令提交的开始同步,为GPU内部的同步;vsg::Fence封装了vkFence,用于同步Vulkan命令提交到队列的完成情况,用于应用程序(CPU端)与Vulkan命令提交到队列的完成情况(GPU端)的同步;vsg::FrameBlock提供了一种机制,用于同步等待新帧开始的线程;vsg::Barrier提供了一种同步多个线程的方法,一旦指定数量的线程加入Barrier,这些线程就会一起释放。

1.1 vsg::Semaphore

Semaphore::Semaphore(Device* device, VkPipelineStageFlags pipelineStageFlags, void* pNextCreateInfo) :_pipelineStageFlags(pipelineStageFlags),_device(device)
{VkSemaphoreCreateInfo semaphoreInfo = {};semaphoreInfo.sType = VK_STRUCTURE_TYPE_SEMAPHORE_CREATE_INFO;semaphoreInfo.pNext = pNextCreateInfo;VkResult result = vkCreateSemaphore(*device, &semaphoreInfo, _device->getAllocationCallbacks(), &_semaphore);if (result != VK_SUCCESS){throw Exception{"Error: Failed to create semaphore.", result};}
}

        vsg::Semaphore构造函数使用vkCreateSemaphore创建信号量VkSemaphore,信号量的创建与某一逻辑设备绑定,即信号量可用于GPU内部同一队列或同一逻辑设备的不同队列间的同步。

Semaphore::~Semaphore()
{if (_semaphore){vkDestroySemaphore(*_device, _semaphore, _device->getAllocationCallbacks());}
}

        vsg::Semaphore析构函数使用vkDestroySemaphore释放信号量VkSemaphore。

1.2 vsg::Fence

Fence::Fence(Device* device, VkFenceCreateFlags flags) :_device(device)
{VkFenceCreateInfo createFenceInfo = {};createFenceInfo.sType = VK_STRUCTURE_TYPE_FENCE_CREATE_INFO;createFenceInfo.flags = flags;createFenceInfo.pNext = nullptr;if (VkResult result = vkCreateFence(*device, &createFenceInfo, _device->getAllocationCallbacks(), &_vkFence); result != VK_SUCCESS){throw Exception{"Error: Failed to create Fence.", result};}
}

        vsg::Fence构造函数使用vkCreateFence创建VkFence,围栏的创建与某一逻辑设备绑定。

Fence::~Fence()
{if (_vkFence){vkDestroyFence(*_device, _vkFence, _device->getAllocationCallbacks());}
}

        vsg::Fence析构函数使用vkDestroyFence释放围栏。

VkResult Fence::wait(uint64_t timeout) const
{return vkWaitForFences(*_device, 1, &_vkFence, VK_TRUE, timeout);
}

        vsg::Fence在应用层(CPU端)的使用通过调用wait函数,其通过封装vkWaitForFences实现。

VkResult Fence::reset() const
{return vkResetFences(*_device, 1, &_vkFence);
}

     vsg::Fence在GPU端使用时,需重置为无信号状态,否则可能会导致应用层调用vkWaitForFences卡死。

1.3 vsg::FrameBlock

        vsg::FrameBlock提供了一种机制,用于同步等待新帧开始的线程。

        std::mutex _mutex;std::condition_variable _cv;ref_ptr<FrameStamp> _value;ref_ptr<ActivityStatus> _status;

        上述代码为vsg::FrameBlock的成员变量,其通过对std::mutex和std::condition_variable的封装实现了一种针对vsg::FrameStamp是否变化的阻塞能力,即同步所有等待新帧开始的线程,而变量_status(vsg::ActivityStatus类型)用于标记vsg::FrameBlock的阻塞能力是否有效。

        bool wait_for_change(ref_ptr<FrameStamp>& value){std::unique_lock lock(_mutex);while (_value == value && _status->active()){_cv.wait(lock);}value = _value;return _status->active();}

        通过调用wait_for_change接口,当传入的vsg::FrameStamp对象与已有的一致时,阻塞应用程序所在线程。

        void set(ref_ptr<FrameStamp> frameStamp){std::scoped_lock lock(_mutex);_value = frameStamp;_cv.notify_all();}

        当设置vsg::FrameStamp对象后,则通知所有阻塞线程解除阻塞。

1.4 vsg::Barrier

        vsg::Barrier提供了一种同步多个线程的方法,一旦指定数量的线程加入Barrier,这些线程就会一起释放。

        const uint32_t _num_threads;uint32_t _num_arrived;uint32_t _phase;std::mutex _mutex;std::condition_variable _cv;

        vsg::Barrier同样是封装std::mutex和std::condition_variable实现,辅以_num_theads(同步的线程数)和_num_arrived(到达的线程数)实现多线程同步。

        void arrive_and_wait(){std::unique_lock lock(_mutex);if (++_num_arrived == _num_threads){_release();}else{auto my_phase = _phase;_cv.wait(lock, [this, my_phase]() { return this->_phase != my_phase; });}}

        如上代码为arrive_and_wait函数实现,当到达的线程数与总线程数一致时,则释放所有线程,否则阻塞且记录当前阶段my_phase。

        void _release(){_num_arrived = 0;++_phase;_cv.notify_all();}

        释放所有线程的代码如上所示,同时更新当前的阶段(++_phase),将当前到达的线程数_num_arrived置为0。

        void arrive_and_drop(){std::unique_lock lock(_mutex);if (++_num_arrived == _num_threads){_release();}}

        arrive_and_drop会更新当前到达的线程数,同时判断,当到达线程数等于所有线程数时,则释放所有线程,但不阻塞当前线程。

2 多线程记录与提交

#if 1if (_threading)
#else// The following is a workaround for an odd "Possible data race during write of size 1" warning that valgrind tool=helgrind reports// on the first call to vkBeginCommandBuffer despite them being done on independent command buffers.  This could well be a driver bug or a false positive.// If you want to quieten this warning then change the #if above to #if 0 as rendering the first three frames single threaded avoids the warning.if (_threading && _frameStamp->frameCount > 2)
#endif{_frameBlock->set(_frameStamp);_submissionCompleted->arrive_and_wait();}else{for (auto& recordAndSubmitTask : recordAndSubmitTasks){recordAndSubmitTask->submit(_frameStamp);}}

        上述代码为Viewer.cpp中的821-838行,当标记_threading为true时,执行多线程提交。首先更新当前帧(上述代码第9行),接着等待提交的完成(上述代码第10行)。其中_frameBlock和_submissionComplete分别为vsg::FrameBlock和vsg::Barrier对象,其初始化在vsg::Viewer::setupThreading函数中完成。vsg::Viewer::setupThreading的执行可分为多线程同步变量初始化、创建多线程两部分。

    uint32_t numValidTasks = 0;for (const auto& task : recordAndSubmitTasks){if (!task->commandGraphs.empty()){++numValidTasks;}}// check if there is any point in setting up threadingif (numValidTasks == 0){return;}status->set(true);_threading = true;_frameBlock = FrameBlock::create(status);_submissionCompleted = Barrier::create(1 + numValidTasks);

        上述代码首先统计有效的提交任务数,接着创建vsg::FrameBlock和vsg::Barrier对象,其中vsg::Barrier对象_submissionCompleted传入的线程数为有效任务数+1,其中'+1'代表主线程,即主线程调用其arrive_and_wait方法并阻塞,当所有提交线程完成时,则释放主线程。

        创建多线程部分以vsg::RecordAndSubmitTask为粒度创建,vsg::RecordAndSubmitTask与vsg::CommandGraph和vsg::TransferTask关系如下:

        线程的创建分两种情况,当vsg::RecordAndSubmitTask对象中包含的CommandGraph数组数量为1且vsg::TransferTask对象为空时,则仅创建一个任务提交线程,否则需同时创建数据传输线程。

        if (task->commandGraphs.size() == 1 && !task->transferTask){// task only contains a single CommandGraph so keep thread simpleauto run = [](ref_ptr<RecordAndSubmitTask> viewer_task, ref_ptr<FrameBlock> viewer_frameBlock, ref_ptr<Barrier> submissionCompleted, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(viewer_task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = viewer_frameBlock->initial_value;// wait for this frame to be signaledwhile (viewer_frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer run", COLOR_RECORD);viewer_task->submit(frameStamp);submissionCompleted->arrive_and_drop();}};threads.emplace_back(run, task, _frameBlock, _submissionCompleted, make_string("Viewer run thread"));}

       上述代码为,当vsg::RecordAndSubmitTask对象中包含的CommandGraph数组数量为1且vsg::TransferTask对象为空时,仅创建一个提交线程,线程中主要调用vsg::RecordAndSubmitTask的submit方法执行提交任务。其中线程为std::thread。

else if (!task->commandGraphs.empty())
{// we have multiple CommandGraphs in a single Task so set up a thread per CommandGraphstruct SharedData : public Inherit<Object, SharedData>{SharedData(ref_ptr<RecordAndSubmitTask> in_task, ref_ptr<FrameBlock> in_frameBlock, ref_ptr<Barrier> in_submissionCompleted, uint32_t numThreads) :task(in_task),frameBlock(in_frameBlock),submissionCompletedBarrier(in_submissionCompleted){recordedCommandBuffers = RecordedCommandBuffers::create();recordStartBarrier = Barrier::create(numThreads);recordCompletedBarrier = Barrier::create(numThreads);}// shared between all threadsref_ptr<RecordAndSubmitTask> task;ref_ptr<FrameBlock> frameBlock;ref_ptr<Barrier> submissionCompletedBarrier;// shared between threads associated with each taskref_ptr<RecordedCommandBuffers> recordedCommandBuffers;ref_ptr<Barrier> recordStartBarrier;ref_ptr<Barrier> recordCompletedBarrier;};uint32_t numThreads = static_cast<uint32_t>(task->commandGraphs.size());if (task->transferTask) ++numThreads;ref_ptr<SharedData> sharedData = SharedData::create(task, _frameBlock, _submissionCompleted, numThreads);auto run_primary = [](ref_ptr<SharedData> data, ref_ptr<CommandGraph> commandGraph, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer primary", COLOR_RECORD);// primary thread starts the taskdata->task->start();data->recordStartBarrier->arrive_and_wait();//vsg::info("run_primary");commandGraph->record(data->recordedCommandBuffers, frameStamp, data->task->databasePager);data->recordCompletedBarrier->arrive_and_wait();// primary thread finishes the task, submitting all the command buffers recorded by the primary and all secondary threads to its queuedata->task->finish(data->recordedCommandBuffers);data->recordedCommandBuffers->clear();data->submissionCompletedBarrier->arrive_and_wait();}};auto run_secondary = [](ref_ptr<SharedData> data, ref_ptr<CommandGraph> commandGraph, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer secondary", COLOR_RECORD);data->recordStartBarrier->arrive_and_wait();commandGraph->record(data->recordedCommandBuffers, frameStamp, data->task->databasePager);data->recordCompletedBarrier->arrive_and_wait();}};auto run_transfer = [](ref_ptr<SharedData> data, ref_ptr<TransferTask> transferTask, TransferTask::TransferMask transferMask, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer transfer", COLOR_RECORD);data->recordStartBarrier->arrive_and_wait();//vsg::info("run_transfer");if (auto transfer = transferTask->transferData(transferMask); transfer.result == VK_SUCCESS){if (transfer.dataTransferredSemaphore){data->task->earlyDataTransferredSemaphore = transfer.dataTransferredSemaphore;}}data->recordCompletedBarrier->arrive_and_wait();}};for (uint32_t i = 0; i < task->commandGraphs.size(); ++i){if (i == 0)threads.emplace_back(run_primary, sharedData, task->commandGraphs[i], make_string("Viewer primary thread"));elsethreads.emplace_back(run_secondary, sharedData, task->commandGraphs[i], make_string("Viewer seconary thread ", i));}if (task->transferTask){threads.emplace_back(run_transfer, sharedData, task->transferTask, TransferTask::TRANSFER_BEFORE_RECORD_TRAVERSAL, make_string("Viewer early transferTask thread"));}
}

       其它情况创建的线程,需针对vsg::TransferTask对象创建传输线程,当存在多个CommandGraph时,创建的提交线程的方式需区分。线程使用std::thread,通过lambda表达式封装线程的执行函数。

                SharedData(ref_ptr<RecordAndSubmitTask> in_task, ref_ptr<FrameBlock> in_frameBlock, ref_ptr<Barrier> in_submissionCompleted, uint32_t numThreads) :task(in_task),frameBlock(in_frameBlock),submissionCompletedBarrier(in_submissionCompleted){recordedCommandBuffers = RecordedCommandBuffers::create();recordStartBarrier = Barrier::create(numThreads);recordCompletedBarrier = Barrier::create(numThreads);}

       上述代码为SharedData的构造函数,提交线程和数据传输线程的同步通过上述recordStartBarrier和recordCompletedBarrier两个vsg::Barrier对象实现。

            auto run_primary = [](ref_ptr<SharedData> data, ref_ptr<CommandGraph> commandGraph, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer primary", COLOR_RECORD);// primary thread starts the taskdata->task->start();data->recordStartBarrier->arrive_and_wait();//vsg::info("run_primary");commandGraph->record(data->recordedCommandBuffers, frameStamp, data->task->databasePager);data->recordCompletedBarrier->arrive_and_wait();// primary thread finishes the task, submitting all the command buffers recorded by the primary and all secondary threads to its queuedata->task->finish(data->recordedCommandBuffers);data->recordedCommandBuffers->clear();data->submissionCompletedBarrier->arrive_and_wait();}};auto run_secondary = [](ref_ptr<SharedData> data, ref_ptr<CommandGraph> commandGraph, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer secondary", COLOR_RECORD);data->recordStartBarrier->arrive_and_wait();commandGraph->record(data->recordedCommandBuffers, frameStamp, data->task->databasePager);data->recordCompletedBarrier->arrive_and_wait();}};auto run_transfer = [](ref_ptr<SharedData> data, ref_ptr<TransferTask> transferTask, TransferTask::TransferMask transferMask, const std::string& threadName) {auto local_instrumentation = shareOrDuplicateForThreadSafety(data->task->instrumentation);if (local_instrumentation) local_instrumentation->setThreadName(threadName);auto frameStamp = data->frameBlock->initial_value;// wait for this frame to be signaledwhile (data->frameBlock->wait_for_change(frameStamp)){CPU_INSTRUMENTATION_L1_NC(local_instrumentation, "Viewer transfer", COLOR_RECORD);data->recordStartBarrier->arrive_and_wait();//vsg::info("run_transfer");if (auto transfer = transferTask->transferData(transferMask); transfer.result == VK_SUCCESS){if (transfer.dataTransferredSemaphore){data->task->earlyDataTransferredSemaphore = transfer.dataTransferredSemaphore;}}data->recordCompletedBarrier->arrive_and_wait();}};

       如上代码为三个lambda函数,run_primary、run_secondary、run_transfer,分别对应第一个提交线程、其它提交线程、数据传输线程的执行函数。其将主体执行内容放置在recordStartBarrier->arrive_and_wait() 和 recordCompletedBarrier->arrive_and_wait()之间,实现线程间的同步,采用如下的模式实现线程和主线程的同步:

                while (data->frameBlock->wait_for_change(frameStamp)){//执行内容data->submissionCompletedBarrier->arrive_and_wait();}

       vulkanscenegraph显示倾斜模型(6.2)-记录与提交-CSDN博客中将任务提交的具体实现分为开始、recordTraversal前的数据传输、record、完成四个部分,而run_primary独自负责任务的开始、recordTraversal前的数据传输、完成三个部分,run_primary与run_secondary共同负责record部分。通过run_primary函数的实现可看出,当前帧所有数据传输完成、命令录制完成,最后调用finish方法提交任务到队列。

文末:本章深入分析了帧循环中多线程下的记录与提交,首先深入剖析了vsg中与多线程同步相关的封装:vsg::Semaphore、vsg::Fence、vsg::FrameBlock、vsg::Barrier,接着进一步分析了记录与提交过程中的多线程机制。下章将分析vsg::DatabasePager在更新场景树过程中的作用。

待分析项:vsg::DatabasePager在更新场景树过程中的作用。

相关文章:

  • Temp Mail 1.7.0 | 创建和管理临时邮箱,防止垃圾邮件骚扰,保护隐私安全
  • Javase 基础加强 —— 04 集合2.0
  • MIT 6.S081 2020 Lab2 system calls 个人全流程
  • 运维--计划任务
  • 深入理解Java垃圾回收机制
  • chrome 浏览器怎么不自动提示是否翻译网站
  • 「一针见血能力」的终极训练手册
  • PATHWAYS: 用于机器学习的异步分布式数据流
  • 广东省考备考(第一天5.4)—判断(对称)
  • 【AI提示词】 复利效应教育专家
  • USB Type-C是不是全方位优于其他USB接口?
  • 什么是JDBC
  • Oracle OCP认证考试考点详解083系列05
  • PISI:眼图1:眼图相关基本概念
  • PCB实战篇
  • 一格一格“翻地毯”找单词——用深度优先搜索搞定单词搜索
  • MVP架构梳理
  • 使用Mathematica绘制Peano Curve
  • Linux 入门:操作系统进程详解
  • C++惯用法:In-Place Construction 和placement new
  • 抚州一原副县长拉拢公职人员组建“吃喝圈”,长期接受打牌掼蛋等“保姆式”服务
  • 专访|刘伟强:在《水饺皇后》里,我放进儿时全家福照片
  • 日本来信|劳动者的书信④
  • 重庆渝中警方:男子点燃摩托车欲寻衅滋事,被民警和群众合力制服
  • 浙江“胖都来”开业多位明星祝贺,“胖东来”称已取证投诉,律师:碰瓷侵权
  • 中国驻旧金山总领馆:领区发生旅行交通事故,有中国公民伤亡