NebulaChat 框架学习笔记:深入理解 Reactor 与多线程同步机制
今天主要整理了 Reactor 框架中几个核心机制,包括 epoll、eventfd、atomic、vector.data() 的使用,还有多线程同步中 cv.wait() 的底层逻辑。
这些知识看似细节,实则是写高性能 C++ 网络程序的地基。
一、Reactor::loop() 的事件派发流程
loop() 是 Reactor 的“心脏”,它通过 epoll_wait() 等待内核事件,并把每个事件分发给上层 Server 或 Connection 对象。
void Reactor::loop() {if (!dispatcher_) return;running_.store(true, std::memory_order_release);while (running_.load(std::memory_order_acquire)) {int n = epoll_wait(epfd_, evlist_.data(), evlist_.size(), -1);for (int i = 0; i < n; ++i) {int fd = evlist_[i].data.fd;uint32_t ev = evlist_[i].events;if (fd == evfd_) { DrainEventfd(evfd_); continue; }void* user = nullptr;{std::lock_guard<std::mutex> lk(users_mtx_);auto it = users_.find(fd);if (it != users_.end()) user = it->second;}dispatcher_(fd, ev, user);}}
}
参数传递逻辑
epoll_wait()会返回一个就绪事件数组。每个元素
evlist_[i]包含:data.fd→ 哪个 socket 触发了;events→ 是可读、可写还是出错;
Reactor 会根据这个 fd 从
users_查出对应对象指针(Server*或Connection*),并调用dispatcher_(fd, ev, user)交给上层处理。
这就是事件从内核 → Reactor → Server 的传递链。
二、eventfd 与 epfd 的区别与关系
| 名称 | 作用 | 谁创建 | 是否加入 epoll |
|---|---|---|---|
| epfd_ | epoll 实例,用来监听所有 fd 事件 | Reactor | 是 |
| evfd_ | 唤醒用的“信号 fd” | Reactor | 是 |
evfd_ 是 Reactor 自己注册的一个 eventfd 文件描述符,
当其他线程想唤醒 Reactor 时,只需要:
uint64_t one = 1;
write(evfd_, &one, 8);
这时:
内核把
evfd_标记为“可读”;epoll_wait()立即被唤醒;loop()发现fd == evfd_,就知道是“唤醒信号”;调用
DrainEventfd()清空计数,继续循环。
三、为什么要清空 eventfd?
eventfd 内部维护一个 64 位计数器:
write()会加一;read()会清零。
如果不 read(),它会一直被标记为“可读”,
而 epoll 在水平触发模式下会认为它永远就绪,
导致 epoll_wait() 每次都立刻返回、CPU 100% 空转。
static void DrainEventfd(int evfd) {uint64_t cnt;while (read(evfd, &cnt, sizeof(cnt)) == sizeof(cnt)) {}
}
总结一句话:
如果不清空 eventfd,它的“门铃灯”会一直亮着,
epoll_wait 每次都会被立即唤醒,形成死循环。
四、epoll_wait 的逻辑本质
epoll_wait() 的作用是:
从成千上万个注册的 fd 中,筛选出当前真正有事件的那些。
如果有 10 万个连接,但只有 2 个有数据:
epoll_wait()只返回那 2 个;你只遍历它们,不用再检查其他 99998 个。
这一点是 epoll 能支持高并发的根本原因。
即使你写了:
for (int i = 0; i < n; ++i)
那也是在遍历“活跃 fd 列表”,不是在轮询所有连接。
五、atomic 的 store/load 与内存序
std::atomic<bool> running_{false};
store()、load() 用来在线程间安全读写标志:
running_.store(true, std::memory_order_release);
while (running_.load(std::memory_order_acquire)) { ... }
含义:
store:写入true,并发布写入前的所有操作;load:读取running_,确保读取的是最新的状态;多线程下保证 stop() 设置的
false会被另一个线程立即看到。
而不是像普通 bool 那样因为 CPU cache 而“看不见更新”。
六、为什么用 vector.data()?
evlist_ 是一个 std::vector<epoll_event>。
epoll_wait() 要求传入的是裸指针:
epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
所以我们用:
evlist_.data() // 返回底层数组的首地址 (epoll_event*)
这块内存在 vector 创建时就一次性分配好了:
std::vector<epoll_event> evlist_(maxEvents);
只要不扩容(不 resize),它的地址在整个 loop 期间都是固定有效的。
七、为什么判断 if (fd == evfd_)?
因为 Reactor 在 epoll 里注册了自己的 eventfd 唤醒源:
epoll_ctl(epfd_, EPOLL_CTL_ADD, evfd_, &ev);
所以 epoll_wait 可能返回两种事件:
普通 socket:说明客户端有 I/O;
eventfd:说明有线程调用了
wakeup()。
而:
if (fd == evfd_) {DrainEventfd(evfd_);continue;
}
这句就是在区分“网络事件”和“唤醒事件”。
八、cv.wait 的真实工作机制
std::condition_variable 是 C++ 的线程同步原语。
用法:
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return ready; });
它做了三件事:
自动释放锁(让别的线程修改条件);
挂起当前线程(不占 CPU);
被唤醒后重新上锁,再检查条件。
唤醒不等于立刻拥有锁,多个线程被 notify_all() 唤醒后还要重新竞争锁。
只有抢到锁的线程,wait 才会真正返回。
伪代码解释:
// wait 内部行为
unlock(mtx);
sleep until notified;
lock(mtx);
所以:
cv.wait()并不会永久持锁等待,而是“放锁→睡眠→醒来再上锁”,
这样生产者才能改共享变量,否则会死锁。
九、唤醒时的锁竞争与条件检查
被 notify_one() 唤醒的线程会:
从内核等待队列中被唤醒;
尝试重新锁住 mutex;
拿到锁后返回 wait;
检查条件是否真的满足(可能虚假唤醒);
条件为真再继续执行。
这就是为什么标准建议用:
cv.wait(lock, []{ return 条件; });
而不是直接 cv.wait(lock)。
十.小结:今天的知识体系
| 模块 | 核心概念 | 关键机制 |
|---|---|---|
| Reactor::loop | epoll_wait 分发事件 | 从内核获取就绪 fd 并派发 |
| eventfd | 唤醒机制 | 跨线程通知 Reactor |
| DrainEventfd | 清空计数防止死循环 | 保证下一轮 epoll 阻塞正常 |
| atomic | 线程间可见性 | 控制 loop 的安全退出 |
| vector.data() | 取底层指针 | 兼容 C 风格 epoll 接口 |
| fd == evfd_ | 区分唤醒与普通事件 | 内核事件源类型判断 |
| cv.wait | 条件等待 | 解锁→睡眠→唤醒→重新加锁 |
| notify_one | 唤醒机制 | 通知等待线程竞争锁继续执行 |
