当前位置: 首页 > news >正文

C++每日训练 Day 15:构建线程池支持的事件中心 SignalHub

📘 本篇在前几日协程 + 信号 + 调度器的基础上,构建一个支持多线程调度、事件分发和协程 resume 的通用事件中心:SignalHub。通过接入线程池和事件标识,我们实现一个真正可以落地于大型应用(如服务端、UI 框架、游戏引擎)的异步事件调度系统。篇末提供问题与答案帮助巩固理解。


🔁 回顾 Day 14:信号调度器 + 协程 resume

组件名称作用
Dispatcher可执行任务调度接口
ThreadDispatcher线程任务队列,支持 submit + 主动 run
AwaiterWithDispatcher协程 resume 时使用 dispatcher 进行线程切换
AsyncSignalWithDispatcheremit 时自动 resume 挂起协程到指定线程

🎯 今日目标:

模块作用说明
ThreadPool支持多线程任务提交与执行
SignalHub注册不同类型事件 + 标识符,统一事件中心接口
AwaitableHub支持协程挂起 + resume 到线程池
泛型接口支持任意参数/类型事件统一封装

在这里插入图片描述

✅ 一、构建线程池(简易版)

class ThreadPool : public Dispatcher {
public:ThreadPool(size_t count = std::thread::hardware_concurrency()) : running(true) {for (size_t i = 0; i < count; ++i) {workers.emplace_back([this]() {while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [&]() { return !tasks.empty() || !running; });if (!running && tasks.empty()) return;task = std::move(tasks.front());tasks.pop();}task();}});}}void dispatch(std::function<void()> task) override {{std::lock_guard<std::mutex> lock(mtx);tasks.push(std::move(task));}cv.notify_one();}void stop() {running = false;cv.notify_all();for (auto& t : workers) t.join();}private:std::vector<std::thread> workers;std::queue<std::function<void()>> tasks;std::mutex mtx;std::condition_variable cv;bool running;
};

✅ 二、事件中心 SignalHub 结构

template<typename Key, typename T>
class SignalHub {
public:using Callback = std::function<void(const T&)>;void subscribe(const Key& key, Callback cb) {observers[key].push_back(std::move(cb));}void emit(const Key& key, const T& value) {auto it = observers.find(key);if (it != observers.end()) {for (auto& cb : it->second) cb(value);}}private:std::unordered_map<Key, std::vector<Callback>> observers;
};

✅ 三、接入 Awaiter + resume to ThreadPool

template<typename T>
struct HubAwaiter {std::optional<T> result;std::coroutine_handle<> handle;Dispatcher* dispatcher;bool await_ready() const noexcept { return false; }void await_suspend(std::coroutine_handle<> h) { handle = h; }T await_resume() { return *result; }void resume(T value) {result = std::move(value);dispatcher->dispatch([h = handle]() { h.resume(); });}
};

✅ 四、构建 AwaitableSignalHub

template<typename Key, typename T>
class AwaitableSignalHub {
public:AwaitableSignalHub(Dispatcher* d) : dispatcher(d) {}void emit(const Key& key, const T& value) {auto it = waiters.find(key);if (it != waiters.end()) {for (auto* w : it->second) w->resume(value);waiters.erase(it);}}HubAwaiter<T>* wait(const Key& key) {auto* w = new HubAwaiter<T>();w->dispatcher = dispatcher;waiters[key].push_back(w);return w;}private:Dispatcher* dispatcher;std::unordered_map<Key, std::vector<HubAwaiter<T>*>> waiters;
};

✅ 五、实战示例:登录事件中心

Task<void> waitLogin(AwaitableSignalHub<std::string, std::string>& hub) {std::string user = co_await *hub.wait("login");std::cout << "用户登录成功: " << user << std::endl;co_return;
}
int main() {ThreadPool pool(2);AwaitableSignalHub<std::string, std::string> hub(&pool);auto task = waitLogin(hub); // 协程开始等待登录事件std::this_thread::sleep_for(std::chrono::milliseconds(200));hub.emit("login", "Jerry"); // 发出登录事件std::this_thread::sleep_for(std::chrono::seconds(1));pool.stop();return 0;
}

输出:

用户登录成功: Jerry

📘 巩固练习题(附答案)

Q1:SignalHub 和 AwaitableSignalHub 的区别?
A:SignalHub 是同步事件派发,AwaitableSignalHub 是协程挂起后由 emit 异步 resume。

Q2:Dispatcher 的作用?
A:控制协程在哪个线程 resume,是实现线程调度 resume 的关键。

Q3:为什么使用 optional 而不是直接变量?
A:optional 能很好表达“结果未设置”状态,避免未初始化访问。

Q4:emit 后协程 resume 会在哪里运行?
A:取决于 awaiter 中 dispatcher 设置的线程,一般是主线程或线程池。

Q5:是否支持多个事件 key?
A:是的,每种 key 类型对应一组 waiter 或 callback,完全解耦。


🔭 下一步预告:构建 GUI + 事件驱动架构系统(Day 16)

  • 将信号中心 + 协程结合 GUI 主线程(事件循环)
  • 构建响应式组件(按钮、输入框)事件信号
  • 支持主线程 resume + 业务线程分发

📌 若你有 UI 框架、游戏引擎、微服务等场景,我可以帮助你落地完整架构 💡

相关文章:

  • 计算机系统---烤机(性能测评)
  • 修改SpringBoot生成的jar文件后重新打包
  • 揭秘大数据 | 21、软件定义计算
  • 嵌入式WebRTC轻量化SDK压缩至500K-800K ,为嵌入式设备节省Flash资源
  • kali的wifi工具使用
  • 如何获取Google Chrome的官方最新下载链接【获取教程】
  • 【补题】The 2024 ICPC Kunming Invitational Contest I. Left Shifting 2
  • STM32
  • 17:00开始面试,17:08就出来了,问的问题有点变态。。。
  • 使用CubeMX新建EXTI外部中断工程——不使用回调函数
  • 豆瓣图书数据采集与可视化分析
  • 【家政平台开发(48)】家政平台安全“攻防战”:渗透测试全解析
  • 【BUG】Redis RDB快照持久化及写操作禁止问题排查与解决
  • OpenCV的详细介绍与安装(一)
  • 【Spring框架】
  • 01 位运算
  • LoadableTransportInfo函数分析之RPCRT4!LOADABLE_TRANSPORT::LOADABLE_TRANSPORT初始化过程
  • Cube IDE常用快捷键
  • Java使用ANTLR4解析IDL文件
  • OpenCV 图形API(35)图像滤波-----中值模糊函数medianBlur()
  • 2024年境内酒店住宿行业指标同比下滑:酒店行业传统增长模式面临挑战
  • 美国季度GDP时隔三年再现负增长,特朗普政府关税政策对美国经济负面影响或将持续
  • 十二届上海市委第六轮巡视全面进驻,巡视组联系方式公布
  • 外交部:中美双方并未就关税问题进行磋商或谈判
  • 百年传承,再启新程,参天中国迎来2.0时代
  • 体重管理门诊来了,瘦不下来的我们有救了?|健康有方FM