rknn优化教程(一)
文章目录
- 1. 前述
- 2. 优化思想
- 2.1 实时帧率
- 2.2 多线程处理
- 2.2.1 排序
- 2.2.2 批量处理
- 2.2.3 队列
- 2.3 进一步优化
- 3. 代码
1. 前述
OK,铺垫了很久的rknn优化,终于开始写了。为什么要优化呢?当然是我们的使用遇到了瓶颈,要么使用的时候达不到实时帧率,要么就是根本没有将硬件利用起来,那么这篇博客就是围绕两个核心的问题来进行说明。那么博客的内容主要就是如下的一些部分:
- 如何解决
rknn_model_zoo
给的demo中无法达到实时帧率的问题 - 如何能将
rk3588
上的硬件资源全部利用起来 - 如何使用一些成熟的库来替代和优化
rknn_model_zoo
的demo中给出的部分函数 - 如何使用
xmake
构建这个优化工程
2. 优化思想
针对前面提出的4个部分,博客将逐一给出解决方案。
注意,博客将以detect
进行说明,至于其他的比如segment
和pose
等等,都是一通百通了。
2.1 实时帧率
如果正常的使用yolo11
或者yolov8
的nano
模型在rk3588
上跑,大致能跑15~20fps
就差不多了。但如果是small
模型呢?如果标签数量很多呢?可能连15fps都达不到。当然这些都是以rknn_model_zoo
中的demo来说的。
这里不得不吐槽一下rknn_model_zoo的代码水平,我是觉得那个
c/c++
写的不伦不类,用c
的思路写c++
不说了,而且各种东西穿插使用,看的特别割裂。而且还有一些c/c++
入门的新手写代码的一些非常低级的用法,这个东西不好用,写这个库和demo的人至少背80%的锅。一个很简单的东西,比如我现在有两个
float
类型的数组A和B,如果要将A中的元素全部拷贝到B中去,这个里面的写法是:// 这里的count可能是 640*640*3这样的大小 for (int index = 0; index < count; ++index) {B[index] = A[index]; }
我就不说优化的思路,或者说经常写
c/c++
的人应该怎么写了,大家也可以给出自己的想法。
为了解决实时帧率:既然单线程只能跑15~20fps,那我是不是可以开2个线程?是不是可以开3个?甚至9个?
OK,那我们已经找到了一个很有用的解决方法,就是使用多线程来解决实时帧率的问题,那么我们解决了实时帧率的问题,但是又带来了新的问题:
- 如何在多线程的情况下保证帧的顺序?
2.2 多线程处理
多线程可以帮助我们达到实时帧率的处理,但是我们需要一个优雅的方法来处理帧的顺序问题。
比如我们有一个连续的帧1 2 3,然后放入到三个线程中进行处理:
当然,如果按照顺序输出 1 2 3的结果肯定是好的,但是我们根本无法保证这个输出的顺序,而且输出的顺序 3 2 1这个的概率不低,那我们就还得给输出进行排序,这个就是多线程下必须解决的问题。
那我们有几个思路:
- 对每一个数据帧都绑定一个数值,然后输出的时候进行排序?
- 每次都是执行线程数量的数据帧,每次输入输出,然后再进行下一批数量的输入输出,按照图示,就是每次送3帧数据,然后都处理完了,再送入三帧。
- 队列处理?
那我们就对这三种思路分别分析一下:
2.2.1 排序
纯粹的排序听起来是最简单的方法,非常容易,每次调用一下std::sort
似乎就可以了,但是没办法解决几个问题:
- 每次
std::sort
非常耗费时间和资源 - 数据连续处理的时候,针对图示的线程数量,可能存在第6帧的结果比第2帧的结果还要先得到,虽然概率极小,但是也是存在的,那怎么排序呢?怎么确认你得到了序号最小的那一个数据?
所以这个排序可以实现,但不稳定、不好用
。
2.2.2 批量处理
这个很容易解决,只要我们将三个线程都是用std::future
就可以了,每次等待三帧的处理,然后三个结果那也很好处理序号的问题,代码写起来那也是非常easy了,但是有一个非常严重的问题:
- 每次都必须延迟3帧的时间
虽然看起来处理的速度很快,但是要导致3帧的延迟,这对于实时系统来说,可能无法接受,尤其是:
假设你现在传入的是1fps的视频进行处理,就必须延迟3s左右才能获得结果,这就没法玩了……
2.2.3 队列
那就必须要考虑到我们在处理时序问题时候经常使用到的方法:队列
OK,知道要使用队列了,但是我怎么处理队列呢?还是没法避免排序的问题啊?
这里我们首先要想到,我线程的数量肯定不是无止境的,就rk3588
这个板子,开10个线程跑detect
就不错了,再开多了,搞不好还速度下降了。当然这里就是线程的数量处理问题了,经常进行并发编程的是知道的,线程并不是越多越好。
那我们来看这样一个队列处理:
这里的1st 2nd 3rd
都是表示得到结果的顺序,可以快速在输出的队列上进行对应填写结果,那么:
当得到3的结果的时候,先在输出的队列索引3上写入结果,然后发现1和2还是没有得到,不进行输出
当得到2的结果的时候,先在输出的队列索引2上写入结果,然后发现1还是没有得到,不进行输出
当得到1的结果的时候,先在输出的队列索引1上写入结果,进行输出,然后发现2和3的结果也有了,那就一起输出
那么我们如何构造这样一个队列呢?
首先,我们注意到uin8_t
这个类型,取值是0~255
,并且当其为255的时候,自增会回到0,利用自身的溢出功能,能帮我们构建天然的索引闭环!
那我们直接将输出队列使用大小256的数组进行表示,是不是就可以实现了一个自动闭环的队列索引了?然后控制输入输出的索引处理,连排序都省去了!
256的大小完全远远超出我们的使用极限了,线程就是开了20个,那最差的情况差不多就是19个数据帧等待一个数据帧的处理结束,256完全满足了缓冲的条件了。
这只是我们假想的一个极特殊的情况,等待个线程数量的数据帧已经很极限了,正常的时候等两三帧就已经差不多了
那么用代码怎么实现呢:
// 假设的输入数据用这个结构体存储
struct TaskInput
{cv::Mat m_img;uint8_t m_taskId;... // 其他的额外数据
};// 假设输出的结果用这个一个结构体进行存储
struct TaskResult
{TaskInput m_inputData;bool m_isGet; // 是否得到了结果... // 结果的数据存储
};// 然后我们有一个线程池的类
class TaskPool
{
public:// 添加一个处理的任务void add(cv::Mat&& img);// 线程处理的函数,每一个线程都是调用这个函数进行识别的处理void run(int thread_id);// 这是所有线程处理完了之后都调用这个函数来进行结果的处理void dealResult(TaskInput&& task_data, ...); // 这里的 ... 只是表示一个结果的类型,暂时不写明具体的形参类型
private:bool m_isRun;std::mutex m_mutexInput;std::queue<TaskInput> m_taskInputs;uint8_t m_indexInput = 0; // 初始化为0std::mutex m_mutexOutput;uint8_t m_indexOutput = 0; // 初始化为0std::array<TaskResult, 256> m_taskResults;
}
那我们在add
中就需要处理好输入!
void TaskPool::add(cv::Mat&& img)
{// 使用线程锁保护一下lock_guard<mutex> lg(m_mutexInput);// 将输入添加到任务队列中TaskInput input;input.m_img = std::move(img);input.m_taskId = m_indexInput++; // 这里就打上了序号了……// 其他可能的操作...;
}
在run
函数中做好数据的提取:
void TaskPool::run(int thread_id)
{while (m_isRun){TaskInput task_data;{unique_lock<mutex> lock(m_mutexInput);m_cvTask.wait(lock, [&]{ return !m_taskInputs.empty() || !m_isRun; });if (!m_isRun){return;}// 减少内存拷贝task_data = std::move(m_taskInputs.front());m_taskDatas.pop();}// 这里就是根据数据进行识别的处理了,包括前处理、推理和后处理等等realWork(thread_id, std::move(task_data));// 注意,结果是通过回调处理的!}
}
那么获得了结果了,那么这里就需要在dealResult
进行处理:
void TaskPool::dealResult(TaskInput&& task_data, ...)
{// 这里只是表示通过task_data来构建resultTaskResult result(std::move(task_data), ...);result.m_isGet = true;lock_guard<mutex> lg(m_mutexOutput);m_taskResults[result.m_inputData.m_taskId] = std::move(result);while (true){if (m_taskResults[m_outputIndex].m_isGet){auto &res_data = m_taskResults[m_outputIndex];// 这里就可以去处理结果了callback(std::move(res_data));// 处理完成了,这个又要变成没有得到结果的状态了!res_data.m_isGet = false;++m_outputIndex;}else{break; // 如果最前方的数据还没有得到结果,那就直接退出……}}
}
这样的处理是不是很优雅了……
2.3 进一步优化
上述的处理其实已经能提高不少了,只需要将rknn_model_zoo
中的代码变换成多线程处理就可以了。
当然优化嘛,是需要精益求精的,将rknn_model_zoo
中的那些低水平的代码更换为我们高水平的代码,优化他们的前处理,后处理,使用opencv
和eigen
等库来进行优化函数处理,并且使用我们更好的程序设计,避免一些代码的冗余等等……
3. 代码
这些优化和设计有一个可用的代码库,先整理一下,后续的博客再进行分享了。