WebRTC 定时任务Process Module
一、作用:
在日常开发过程中,我们经常会要执行一个定时任务,往往我们会想着单独整一个线程去定时循环,或者给定时器注册个回调,被定时循环调用。WebRTC当中通过模块化的实现,根据定时任务的特点做了抽象,和自己本来的Thread或者TaskQueue配合起来,实现了一套完美的机制,可以快速实现各种场景的定时任务。
二、Module重要方法:
Module 是一个接口类,定义在 rtc_base/task_utils/repeating_task.h 或相关模块中,核心方法包括 Process 和 ProcessStart 等。Module 的设计目的是让模块能够被 WebRTC 的线程模型(通常是 TaskQueue 或 Thread)管理和调度。
- Module 接口:
- Module 接口通常包含以下关键方法(以下是简化的伪代码表示,实际定义可能在不同 WebRTC 版本中略有变化):
class Module {public:virtual void Process() = 0; // 核心处理函数,模块需要实现virtual int64_t TimeUntilNextProcess() = 0; // 返回下次调用 Process 的时间间隔virtual void ProcessStart() {} // 可选,模块初始化时调用virtual ~Module() = default;
};
-
Process 方法的作用:
- Process 是模块的核心逻辑执行入口,用于处理模块的周期性任务(如定时检查状态、处理数据、触发事件等)。
- 模块通过实现 TimeUntilNextProcess 方法,告诉 WebRTC 事件循环下一次调用 Process 的时间间隔(通常以毫秒为单位)。
- WebRTC 的线程模型(如 rtc::Thread 或 TaskQueue)会根据 TimeUntilNextProcess 的返回值,定时调用 Process 方法。
-
自动调用的关键:
- WebRTC 的线程模型(通常是 rtc::Thread 或 TaskQueueBase)维护了一个事件循环,负责调度所有注册的 Module 实例。Module 实例通过注册到线程或任务队列,自动被事件循环管理,Process 方法会在合适的时机被调用。
三、派生类实现Process:
为了说明一个继承自 Module 的类如何自动让 Process 被调用,我们可以通过一个简化的示例代码来展示。示例代码:实现一个简单的 Module假设我们实现一个简单的模块 MyModule,它继承自 Module,并在 Process 方法中打印日志,同时通过 TimeUntilNextProcess 指定每 100ms 调用一次。
#include <iostream>
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/thread.h"class MyModule : public webrtc::Module {public:MyModule() : counter_(0) {}// 实现 Process 方法void Process() override {counter_++;std::cout << "MyModule Process called, counter: " << counter_ << std::endl;}// 控制 Process 的调用间隔int64_t TimeUntilNextProcess() override {return 100; // 每 100ms 调用一次 Process}// 可选的初始化方法void ProcessStart() override {std::cout << "MyModule ProcessStart called" << std::endl;}private:int counter_;
};// 主函数:将 MyModule 注册到 WebRTC 线程
int main() {// 创建 WebRTC 线程std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();thread->Start(); // 启动线程的事件循环// 创建 MyModule 实例auto module = std::make_unique<MyModule>();// 将模块注册到线程thread->PostTask([module = module.get()]() {// 模拟模块注册逻辑,实际 WebRTC 可能通过其他方式管理module->ProcessStart();});// 模拟线程运行一段时间thread->Run();return 0;
}
代码说明:
- MyModule 类:
- 继承自 webrtc::Module,实现了 Process 和 TimeUntilNextProcess 方法。
- Process 方法每次被调用时,增加计数器并打印日志。
- TimeUntilNextProcess 返回 100ms,表示每 100ms 调用一次 Process。
- 线程管理:
- WebRTC 的 rtc::Thread 是一个事件循环线程,负责调度任务。
- 通过 PostTask,我们可以将模块的初始化或处理逻辑注册到线程中。
- 实际 WebRTC 实现中,Module 通常通过 rtc::Thread::AddModule 或类似机制注册到线程,线程会根据 TimeUntilNextProcess 的返回值,定期调用 Process。
- 自动调用机制:
- rtc::Thread 的事件循环会检查所有注册的 Module 实例,调用 TimeUntilNextProcess 获取下次执行的时间。
- 当时间到达时,线程通过内部调度机制(如定时器或任务队列)调用 Process 方法。
- 这个过程是自动的,开发者无需手动调用 Process,只需实现 Module 接口并将实例注册到线程。
四、如何自动调用 Process:
WebRTC 的线程模型是 Process 方法自动调用的核心。以下是其内部工作原理的简要概述:
-
模块注册:
- 模块实例通过 rtc::Thread::AddModule 或类似方法注册到 WebRTC 的线程(如 SignalingThread、WorkerThread 等)。
- 注册后,线程会将模块加入其内部管理列表(如 std::vector<Module*>)。
-
事件循环调度:
- WebRTC 的线程(如 rtc::Thread)运行一个事件循环,定期检查所有注册的 Module 实例。
- 对于每个模块,线程调用 TimeUntilNextProcess 获取下次执行的时间间隔。
- 如果时间间隔 <= 0(或当前时间已到达),线程调用该模块的 Process 方法。
-
定时器或任务队列:
- WebRTC 使用定时器(基于 TaskQueue 或 libuv 等)来跟踪模块的执行时间。
- 当某个模块的 TimeUntilNextProcess 返回的时间到达时,线程通过任务队列触发 Process 方法。
-
实际实现:
-
在 WebRTC 源码中,rtc::Thread 的实现(位于 rtc_base/thread.cc)包含一个 ProcessMessages 方法,负责处理所有模块的任务。
-
伪代码大致如下:
void Thread::ProcessMessages(int timeout) {for (Module* module : modules_) {int64_t next_run = module->TimeUntilNextProcess();if (next_run <= 0) {module->Process();}}// 继续处理其他任务或等待 }
-
五、一个继承自 Module 的类如何自动触发 Process:
当你实现一个继承自 Module 的类并将其注册到 WebRTC 的线程或任务队列时,以下步骤确保 Process 方法被自动调用:
-
实现 Module 接口:
- 确保类实现了 Process 和 TimeUntilNextProcess 方法。
- 可选实现 ProcessStart 用于初始化。
-
注册到线程或任务队列:
-
通过 rtc::Thread::AddModule 或 TaskQueueBase::PostTask 将模块实例注册到 WebRTC 的线程或任务队列。
-
例如:
rtc::Thread* thread = rtc::Thread::Current(); thread->AddModule(my_module.get());
-
-
线程事件循环自动调度:
- WebRTC 线程的事件循环会定期检查模块的 TimeUntilNextProcess 返回值。
- 当时间到达时,线程自动调用 Process 方法,无需开发者干预。
-
注意事项:
- 确保 TimeUntilNextProcess 返回合理的时间间隔(单位:毫秒)。如果返回负值或 0,Process 可能被立即调用。
- 如果模块不再需要处理,TimeUntilNextProcess 可以返回一个较大的值(如 std::numeric_limits<int64_t>::max())以暂停调度。
- 模块的生命周期需要由开发者管理(如通过 std::unique_ptr 或其他方式),避免在线程销毁前模块被销毁。
六、WebRTC当中实例:
比如PacedSender模块:
class PacedSender : public Module {
public:PacedSender(Clock* clock,PacketRouter* packet_router,RtcEventLog* event_log,const WebRtcKeyValueConfig* field_trials = nullptr,ProcessThread* process_thread = nullptr);
private:int64_t TimeUntilNextProcess() override;void ProcessThreadAttached(ProcessThread* process_thread) override;
public:void Process() override;private:// Private implementation of Module to not expose those implementation details// publicly and control when the class is registered/deregistered.class ModuleProxy : public Module {public:explicit ModuleProxy(PacedSender* delegate) : delegate_(delegate) {}private:int64_t TimeUntilNextProcess() override {return delegate_->TimeUntilNextProcess();}void Process() override {return delegate_->Process();}void ProcessThreadAttached(ProcessThread* process_thread) override {return delegate_->ProcessThreadAttached(process_thread);}PacedSender* const delegate_;} module_proxy_{this};
};// 实现如下:
PacedSender::PacedSender(Clock* clock,PacketRouter* packet_router,RtcEventLog* event_log,const WebRtcKeyValueConfig* field_trials,ProcessThread* process_thread): process_mode_((field_trials != nullptr && field_trials->Lookup("WebRTC-Pacer-DynamicProcess").find("Enabled") == 0)? PacingController::ProcessMode::kDynamic: PacingController::ProcessMode::kPeriodic),pacing_controller_(clock, static_cast<PacingController::PacketSender*>(this), event_log, field_trials, process_mode_),clock_(clock),packet_router_(packet_router),process_thread_(process_thread) {if (process_thread_) {process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);}
}
PacedSender::~PacedSender() {if (process_thread_) {process_thread_->DeRegisterModule(&module_proxy_);}
}
int64_t PacedSender::TimeUntilNextProcess() {rtc::CritScope cs(&critsect_);Timestamp next_send_time = pacing_controller_.NextSendTime();TimeDelta sleep_time = std::max(TimeDelta::Zero(), next_send_time - clock_->CurrentTime());if (process_mode_ == PacingController::ProcessMode::kDynamic) {return std::max(sleep_time, PacingController::kMinSleepTime).ms();}return sleep_time.ms();
}void PacedSender::Process() {rtc::CritScope cs(&critsect_);pacing_controller_.ProcessPackets();
}void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {RTC_LOG(LS_INFO) << "ProcessThreadAttached 0x" << process_thread;RTC_DCHECK(!process_thread || process_thread == process_thread_);
}
PacedSender 继承自 Module , 实现了 Module 接口的以下方法:
- TimeUntilNextProcess:计算下一次调用 Process 的时间间隔。
- Process:处理发送队列中的数据包。
- ProcessThreadAttached:在模块与 ProcessThread 关联或解除关联时调用。
Process 方法:
- PacedSender::Process 调用 pacing_controller_.ProcessPackets(),处理待发送的 RTP 数据包队列。
- 它通过 PacingController 决定哪些数据包可以发送,并调用 PacketRouter::SendPacket 执行实际发送。
自动调用机制:
- PacedSender 的 Process 方法由 WebRTC 的 ProcessThread 定时调用。
- 关键在于 PacedSender 在构造时与 ProcessThread 关联,ProcessThread 的事件循环会根据 TimeUntilNextProcess 的返回值定期调用 Process。
PacedSender 如何与 ProcessThread 关联:
-
PacedSender 的构造函数接受一个 ProcessThread* process_thread 参数,表示要关联的线程。
-
如果 process_thread 不为空,构造函数调用 process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE),将 PacedSender 的 module_proxy_ 注册到 ProcessThread。
-
module_proxy_ 是 PacedSender 的内部类 ModuleProxy,它实现了 Module 接口,并将 TimeUntilNextProcess 和 Process 的调用委托给 PacedSender 本身。
-
ModuleProxy 的作用:
- ModuleProxy 是一个代理类,用于封装 PacedSender 的 Module 接口实现,防止外部直接调用 PacedSender 的 Module 方法。
- 它确保 PacedSender 的 Process 和 TimeUntilNextProcess 方法通过代理被 ProcessThread 调用。
-
析构时的解除关联:
在 PacedSender 的析构函数中:
PacedSender::~PacedSender() {if (process_thread_) {process_thread_->DeRegisterModule(&module_proxy_);} }
- 关键点:
- 析构时,PacedSender 调用 process_thread_->DeRegisterModule(&module_proxy_),将 module_proxy_ 从 ProcessThread 中移除。
- 这确保了在 PacedSender 销毁后,ProcessThread 不会继续调用其 Process 方法。
- 关键点: