【中间件】bthread_基础_TaskControl
TaskControl
- 1 Definition
- 2 Introduce
- **核心职责**
- 3 成员解析
- **3.1 数据结构与线程管理**
- **3.2 任务调度与负载均衡**
- **3.3 线程停放与唤醒(ParkingLot)**
- **3.4 统计与监控**
- 4 **工作流程**
- 5 **设计亮点**
- 6 **使用场景示例**
- 7 **总结**
- 8 学习过程中的疑问
- 8.1 init函数为什么不在构造函数中调用
1 Definition
class TaskControl {friend class TaskGroup; // 友元类friend void wait_for_butex(void*); // 友元函数
#ifdef BRPC_BTHREAD_TRACERfriend bthread_t init_for_pthread_stack_trace(); // 友元函数
#endif // BRPC_BTHREAD_TRACERpublic:TaskControl();~TaskControl();// Must be called before using. `nconcurrency' is # of worker pthreads.int init(int nconcurrency);// Create a TaskGroup in this control.TaskGroup* create_group(bthread_tag_t tag);// Steal a task from a "random" group.bool steal_task(bthread_t* tid, size_t* seed, size_t offset);// Tell other groups that `n' tasks was just added to caller's runqueuevoid signal_task(int num_task, bthread_tag_t tag);// Stop and join worker threads in TaskControl.void stop_and_join();// Get # of worker threads.int concurrency() const { return _concurrency.load(butil::memory_order_acquire); }int concurrency(bthread_tag_t tag) const { return _tagged_ngroup[tag].load(butil::memory_order_acquire); }void print_rq_sizes(std::ostream& os);double get_cumulated_worker_time();double get_cumulated_worker_time_with_tag(bthread_tag_t tag);int64_t get_cumulated_switch_count();int64_t get_cumulated_signal_count();// [Not thread safe] Add more worker threads.// Return the number of workers actually added, which may be less than |num|int add_workers(int num, bthread_tag_t tag);// Choose one TaskGroup (randomly right now).// If this method is called after init(), it never returns NULL.TaskGroup* choose_one_group(bthread_tag_t tag);#ifdef BRPC_BTHREAD_TRACER// A stacktrace of bthread can be helpful in debugging.void stack_trace(std::ostream& os, bthread_t tid);std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACERprivate:typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;static const int PARKING_LOT_NUM = 4;typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;// Add/Remove a TaskGroup.// Returns 0 on success, -1 otherwise.int _add_group(TaskGroup*, bthread_tag_t tag);int _destroy_group(TaskGroup*);// Tag groupTaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; }// Tag ngroupbutil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }// Tag parking slotTaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }static void delete_task_group(void* arg);static void* worker_thread(void* task_control);template <typename F>void for_each_task_group(F const& f);bvar::LatencyRecorder& exposed_pending_time();bvar::LatencyRecorder* create_exposed_pending_time();bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag);bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag);std::vector<butil::atomic<size_t>> _tagged_ngroup;std::vector<TaggedGroups> _tagged_groups;butil::Mutex _modify_group_mutex;butil::atomic<bool> _init; // if not init, bvar will case coredumpbool _stop;butil::atomic<int> _concurrency;std::vector<pthread_t> _workers;butil::atomic<int> _next_worker_id;bvar::Adder<int64_t> _nworkers;butil::Mutex _pending_time_mutex;butil::atomic<bvar::LatencyRecorder*> _pending_time;bvar::PassiveStatus<double> _cumulated_worker_time;bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;bvar::PassiveStatus<int64_t> _cumulated_switch_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;bvar::PassiveStatus<int64_t> _cumulated_signal_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;bvar::PassiveStatus<std::string> _status;bvar::Adder<int64_t> _nbthreads;std::vector<bvar::Adder<int64_t>*> _tagged_nworkers;std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;std::vector<TaggedParkingLot> _pl;#ifdef BRPC_BTHREAD_TRACERTaskTracer _task_tracer;
#endif // BRPC_BTHREAD_TRACER};
2 Introduce
TaskControl作为任务调度控制中心,管理多个任务组(TaskGroup
)并协调工作线程的高效运作,适用于BRPC的bthread协程库:
核心职责
- 任务组管理:创建、销毁任务组,支持按标签(
bthread_tag_t
)分类管理。 - 线程池调度:动态调整工作线程数量,实现任务窃取(Work Stealing)以平衡负载。
- 同步与唤醒:通过停放区(
ParkingLot
)管理线程的休眠与唤醒。 - 性能监控:集成统计模块(
bvar
)跟踪任务处理时间、切换次数等指标。
3 成员解析
3.1 数据结构与线程管理
-
_tagged_groups
类型:std::vector<TaggedGroups>
作用:按标签存储任务组指针数组,每个标签对应一个TaggedGroups
(固定大小为BTHREAD_MAX_CONCURRENCY
的数组)。
示例:标签可用于区分不同业务优先级或租户的任务。 -
_tagged_ngroup
类型:std::vector<butil::atomic<size_t>>
作用:记录每个标签下的任务组数量,原子操作保证线程安全。 -
_workers
类型:std::vector<pthread_t>
作用:保存所有工作线程的ID,用于线程生命周期管理(启动、停止、回收)。 -
_concurrency
类型:butil::atomic<int>
作用:总工作线程数,支持原子读写,动态调整并发度。
3.2 任务调度与负载均衡
-
steal_task(bthread_t* tid, size_t* seed, size_t offset)
作用:从其他任务组窃取任务,避免工作线程空闲。
实现:- 使用
seed
随机选择目标组,结合offset
避免多个线程竞争同一队列。 - 窃取成功返回
true
,任务ID存入tid
。
- 使用
-
signal_task(int num_task, bthread_tag_t tag)
作用:通知任务组有新任务加入,触发唤醒机制。
场景:当任务被添加到队列时,调用此方法唤醒可能休眠的线程。 -
choose_one_group(bthread_tag_t tag)
作用:根据标签选择一个任务组,用于任务分发或负载均衡。
策略:可能采用轮询或随机算法选择组,确保任务均匀分配。
3.3 线程停放与唤醒(ParkingLot)
_pl
类型:std::vector<TaggedParkingLot>
作用:按标签管理的停放区数组,每个标签对应PARKING_LOT_NUM
个停放区。
机制:- 工作线程无任务时进入停放区等待,减少CPU空转。
- 新任务到达时,通过停放区唤醒线程,降低延迟。
3.4 统计与监控
-
bvar
集成
关键指标:_cumulated_worker_time
:累计任务处理时间。_cumulated_switch_count
:上下文切换次数。_signal_per_second
:每秒任务唤醒次数。
作用:通过BRPC的bvar
库暴露性能指标,方便实时监控与调优。
-
标签化统计
成员如_tagged_nworkers
、_tagged_cumulated_worker_time
等,按标签细分统计,支持多维分析。
4 工作流程
-
初始化
- 调用
init(nconcurrency)
创建指定数量工作线程,每个线程执行worker_thread
函数。 - 工作线程通过
choose_one_group
选择任务组,执行任务循环。
- 调用
-
任务执行
- 线程从本地任务组获取任务,若队列为空,尝试从其他组窃取(
steal_task
)。 - 无任务可执行时,进入停放区(
ParkingLot
)休眠。
- 线程从本地任务组获取任务,若队列为空,尝试从其他组窃取(
-
任务通知
- 添加新任务时,调用
signal_task
递增信号计数器,唤醒停放区线程。
- 添加新任务时,调用
-
动态扩缩容
add_workers
动态增加指定标签的工作线程,适应负载变化。
-
停止与清理
stop_and_join
设置_stop
标志,通知所有线程退出,并回收资源。
5 设计亮点
- 标签化分组
支持多维度任务分类,适用于混合部署场景(如不同服务优先级)。 - 任务窃取
避免工作线程闲置,提升CPU利用率,降低任务处理延迟。 - 高效同步
结合原子操作与停放区,减少锁竞争,保证高吞吐量。 - 精细化监控
通过bvar
提供详尽的运行时指标,助力性能分析与优化。
6 使用场景示例
高并发服务:
- Web服务器接收请求后,封装为bthread任务,按业务类型打标签。
TaskControl
根据标签分发任务到不同组,保证关键业务优先调度。- 工作线程动态扩展应对流量高峰,空闲时自动缩减节省资源。
- 监控指标实时反馈系统负载,辅助容量规划。
7 总结
TaskControl
是BRPC bthread调度系统的核心,通过高效的任务组管理、工作线程调度及细粒度监控,实现了高并发、低延迟的协程任务处理。其设计充分考虑了扩展性、性能与可观测性,是构建高性能C++服务的基石组件。
8 学习过程中的疑问
8.1 init函数为什么不在构造函数中调用
疑问:init函数在注释中声明需要在使用前调用,为什么不能将其放在构造函数中直接调用呢?
回答:
-
可能原因:
-
- 初始化可能失败,需要错误处理
- 构造函数没有返回值,若在构造函数中执行可能的失败操作(eg. 创建线程、分配资源),只能通过异常或设置内部状态标记错误。brpc代码风格倾向于避免异常,习惯于返回错误码。参考代码8-1
-
- 需要依赖外部参数
- 构造时参数可能不完整,TaskControl的初始化需要并发线程数等参数,可能在运行时动态确定,无法在编译器硬编码。
- 更复杂的重载,如果后续需要扩展初始化参数(eg. 增加timeout/policy等配置),显式init()更容易扩展,而构造函数重载会膨胀。
- 3 支持对象的复用
- 销毁后重新初始化,某些场景下,用户可能希望销毁TaskControl后重新初始化(eg. 动态调整线程池的大小)。若初始化逻辑在构造函数中,则需要先析构对象再重新构造,而显式init/stop_and_join允许复用同一对象。参考代码8-2
- 4 明确的二段式生命周期
- 分离资源分配与初始化,二段式设计(构造+init())将对喜爱嗯的内存分配和资源初始化解耦:
- 构造阶段:仅进行内存布局、简单成员初始化;
- 初始化阶段:执行重量级操作(eg. 创建线程、连接资源);
- 更符合RAII的变体模式,尤其是在需要延迟初始化时。
- 分离资源分配与初始化,二段式设计(构造+init())将对喜爱嗯的内存分配和资源初始化解耦:
- 5 避免隐藏的副作用
- 隐式初始化可能引入意外行为,若构造函数自动初始化,用户可能在不知情的情况下触发资源分配(eg. 线程创建)。显式init()强制用户主动控制初始化时机,避免副作用。
- 6 与brpc其他组件的设计一致性
- 统一风格,brpc很多组件(eg. Channel / Server)均采用类似的二段式模式(先构造,再调用init() / start()),保持代码风格统一,降低用户学习成本。
-
-
什么情况下应在构造函数中初始化?
- 轻量级且无失败可能的操作,eg. 设置默认参数、初始化原子计数器等。
- 强制一次性初始化,若对象必须在构造时完全初始化,且不允许重新初始化。
-
两种方式对比,见代码8-3
-
二段式的作用
- 清晰的错误处理:通过返回int明确传递错误
- 参数灵活性:允许运行时动态决定初始化参数
- 对象复用:支持重新初始化而不重新构造
- 代码一致性:符合BRPC设计惯例
代码8-1:
// 当前使用方法
TaskControl ctl;
if (ctl.init(32) != 0) {// 处理初始化失败
}// 如果放在构造函数中
TaskControl ctl(32);
if (!ctl.is_initialized()) {// 处理错误
}
代码8-2
TaskControl ctl;
ctl.init(16);
ctl.stop_and_join();
ctl.init(32);
代码8-3
// 二段式
class TaskControl {
public:TaskControl(); // 轻量构造~TaskControl();int init(int nconcurrency); // 显式初始化// ...
};// 使用方式
TaskControl ctl;
if (ctl.init(32) != 0) {LOG(ERROR) << "Failed to initialize TaskControl";return -1;
}// 合并到构造方式
class TaskControl {
public:explicit TaskControl(int nconcurrency); // 可能抛出异常~TaskControl();bool is_initialized() const; // 需额外状态检查// ...
};// 使用方式
try {TaskControl ctl(32);
} catch (const std::exception& e) {LOG(ERROR) << "Construction failed: " << e.what();
}
if (!ctl.is_initialized()) { // 需要额外检查// 处理错误
}