C++每日训练 Day 15:构建线程池支持的事件中心 SignalHub
📘 本篇在前几日协程 + 信号 + 调度器的基础上,构建一个支持多线程调度、事件分发和协程 resume 的通用事件中心:SignalHub。通过接入线程池和事件标识,我们实现一个真正可以落地于大型应用(如服务端、UI 框架、游戏引擎)的异步事件调度系统。篇末提供问题与答案帮助巩固理解。
🔁 回顾 Day 14:信号调度器 + 协程 resume
组件名称 | 作用 |
---|---|
Dispatcher | 可执行任务调度接口 |
ThreadDispatcher | 线程任务队列,支持 submit + 主动 run |
AwaiterWithDispatcher | 协程 resume 时使用 dispatcher 进行线程切换 |
AsyncSignalWithDispatcher | emit 时自动 resume 挂起协程到指定线程 |
🎯 今日目标:
模块 | 作用说明 |
---|---|
ThreadPool | 支持多线程任务提交与执行 |
SignalHub | 注册不同类型事件 + 标识符,统一事件中心接口 |
AwaitableHub | 支持协程挂起 + resume 到线程池 |
泛型接口 | 支持任意参数/类型事件统一封装 |
✅ 一、构建线程池(简易版)
class ThreadPool : public Dispatcher {
public:ThreadPool(size_t count = std::thread::hardware_concurrency()) : running(true) {for (size_t i = 0; i < count; ++i) {workers.emplace_back([this]() {while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [&]() { return !tasks.empty() || !running; });if (!running && tasks.empty()) return;task = std::move(tasks.front());tasks.pop();}task();}});}}void dispatch(std::function<void()> task) override {{std::lock_guard<std::mutex> lock(mtx);tasks.push(std::move(task));}cv.notify_one();}void stop() {running = false;cv.notify_all();for (auto& t : workers) t.join();}private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex mtx;std::condition_variable cv;bool running;
};
✅ 二、事件中心 SignalHub 结构
template<typename Key, typename T>
class SignalHub {
public:using Callback = std::function<void(const T&)>;void subscribe(const Key& key, Callback cb) {observers[key].push_back(std::move(cb));}void emit(const Key& key, const T& value) {auto it = observers.find(key);if (it != observers.end()) {for (auto& cb : it->second) cb(value);}}private:std::unordered_map<Key, std::vector<Callback>> observers;
};
✅ 三、接入 Awaiter + resume to ThreadPool
template<typename T>
struct HubAwaiter {std::optional<T> result;std::coroutine_handle<> handle;Dispatcher* dispatcher;bool await_ready() const noexcept { return false; }void await_suspend(std::coroutine_handle<> h) { handle = h; }T await_resume() { return *result; }void resume(T value) {result = std::move(value);dispatcher->dispatch([h = handle]() { h.resume(); });}
};
✅ 四、构建 AwaitableSignalHub
template<typename Key, typename T>
class AwaitableSignalHub {
public:AwaitableSignalHub(Dispatcher* d) : dispatcher(d) {}void emit(const Key& key, const T& value) {auto it = waiters.find(key);if (it != waiters.end()) {for (auto* w : it->second) w->resume(value);waiters.erase(it);}}HubAwaiter<T>* wait(const Key& key) {auto* w = new HubAwaiter<T>();w->dispatcher = dispatcher;waiters[key].push_back(w);return w;}private:Dispatcher* dispatcher;std::unordered_map<Key, std::vector<HubAwaiter<T>*>> waiters;
};
✅ 五、实战示例:登录事件中心
Task<void> waitLogin(AwaitableSignalHub<std::string, std::string>& hub) {std::string user = co_await *hub.wait("login");std::cout << "用户登录成功: " << user << std::endl;co_return;
}
int main() {ThreadPool pool(2);AwaitableSignalHub<std::string, std::string> hub(&pool);auto task = waitLogin(hub); // 协程开始等待登录事件std::this_thread::sleep_for(std::chrono::milliseconds(200));hub.emit("login", "Jerry"); // 发出登录事件std::this_thread::sleep_for(std::chrono::seconds(1));pool.stop();return 0;
}
输出:
用户登录成功: Jerry
📘 巩固练习题(附答案)
Q1:SignalHub 和 AwaitableSignalHub 的区别?
A:SignalHub 是同步事件派发,AwaitableSignalHub 是协程挂起后由 emit 异步 resume。
Q2:Dispatcher 的作用?
A:控制协程在哪个线程 resume,是实现线程调度 resume 的关键。
Q3:为什么使用 optional 而不是直接变量?
A:optional 能很好表达“结果未设置”状态,避免未初始化访问。
Q4:emit 后协程 resume 会在哪里运行?
A:取决于 awaiter 中 dispatcher 设置的线程,一般是主线程或线程池。
Q5:是否支持多个事件 key?
A:是的,每种 key 类型对应一组 waiter 或 callback,完全解耦。
🔭 下一步预告:构建 GUI + 事件驱动架构系统(Day 16)
- 将信号中心 + 协程结合 GUI 主线程(事件循环)
- 构建响应式组件(按钮、输入框)事件信号
- 支持主线程 resume + 业务线程分发
📌 若你有 UI 框架、游戏引擎、微服务等场景,我可以帮助你落地完整架构 💡