今日内容总结
一、今天在干什么:整体框架回顾
今天主要围绕这几个点展开:
把自己的
Reactor类完善:epoll + eventfd 唤醒
事件循环 loop
addFd / modFd / delFd
把
Server类写完整:监听端口、接受连接
管理每个 Connection 的收发
和 Reactor 配合,用回调 onEvent 驱动逻辑
弄清楚一堆底层关键点:
eventfd 是干嘛的
epoll_wait 的事件是怎么来的
static_cast 和 reinterpret_cast 区别
atomic running_ 怎么控制循环退出
listen(backlog) 参数是什么意思
git push 能不能写注释
下面按模块细讲。
二、Reactor 模块详细笔记
1. Reactor 的定位
Reactor 负责:
创建 epoll 实例,统一管理所有 fd 的事件
持有一个 eventfd,用来跨线程唤醒 epoll_wait
loop() 中调用 epoll_wait 获取就绪事件,然后分发给上层(Server)的 dispatcher 回调
提供 addFd / modFd / delFd 接口给 Server 使用
提供 stop() 让外部线程可以优雅地让 loop 退出
整体流程可以理解为一句话:
Reactor 在一个线程里死循环等 epoll_wait,
有事件了就调用 dispatcher_(fd, events, user)。
2. 构造函数里的关键初始化
构造函数大致做了三件事:
epfd_ = epoll_create1(EPOLL_CLOEXEC);
evfd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);// 把 evfd_ 加入 epoll,关注 EPOLLIN
epoll_event ev{};
ev.data.fd = evfd_;
ev.events = EPOLLIN;
epoll_ctl(epfd_, EPOLL_CTL_ADD, evfd_, &ev);
要点:
epfd_:epoll 的“管理句柄”,所有 epoll_ctl、epoll_wait 都用它
evfd_:eventfd,内核里是一个 64 位无符号计数器
把 evfd_ 加入 epoll,是为了后面别的线程写 eventfd 时,可以唤醒 epoll_wait
3. running_ 和事件循环 loop()
loop() 里控制变量是:
std::atomic<bool> running_;
在 loop 开始时:
running_.store(true, std::memory_order_release);while (running_.load(std::memory_order_acquire)) {int n = epoll_wait(epfd_, eventList_.data(), eventList_.size(), -1);...
}
解释:
store(true, release):告诉其他线程“我已经开始跑 loop 了”,并且保证之前的初始化动作对其他线程可见
while 里用 load(acquire):其他线程把 running_ 改为 false 时,loop 能立即看到,并且看到的内存状态是正确的
配合 stop() 使用:
void reactor::stop() {bool expected = true;if (running_.compare_exchange_strong(expected, false, std::memory_order_acq_rel)) {wakeup();}
}
compare_exchange_strong:只有在 running_ 原来是 true 的时候,才会改成 false,并返回成功
成功时调用 wakeup(),唤醒 epoll_wait,让循环跳出 while
4. wakeup() 和 eventfd 的作用
wakeup() 的实现:
void reactor::wakeup() {uint64_t one = 1;write(evfd_, &one, sizeof(one));
}
意思就是:往 eventfd 写一个 1。
eventfd 内核行为:
内部有一个 64 位整数 counter
每写一次 8 字节无符号整数,counter += 写入的值
当 counter > 0 时,这个 fd 会被视为“可读”,epoll_wait 会返回对应的事件
因此:
任何线程调用 wakeup(),
实际效果是:写 eventfd,让 epoll_wait 立即返回。
5. epoll_wait 里如何处理 eventfd
loop() 中的事件处理逻辑大致是:
int n = epoll_wait(epfd_, eventList_.data(), eventList_.size(), -1);for (int i = 0; i < n; ++i) {int fd = eventList_[i].data.fd;uint32_t events = eventList_[i].events;if (fd == evfd_) {DrainEvent(evfd_);continue;}void* user = nullptr;{std::lock_guard<std::mutex> lock(user_mtx_);auto it = users_.find(fd);if (it != users_.end()) user = it->second;}dispatcher_(fd, events, user);
}
解释:
如果是 evfd_,说明这是一个“唤醒信号”,比如 stop 调用、线程池调用 wakeup
对 evfd_ 调用 DrainEvent,把内核计数器读掉,避免 epoll_wait 一直为可读导致死循环
其他 fd,从 users_ 里取出 user 指针(通常是 Connection* 或 this 指针)
最后调用 dispatcher_,由上层 Server 的 onEvent 接管
6. DrainEvent 的意义
一个简化实现可以像这样:
static void DrainEvent(int evfd) {uint64_t counter;read(evfd, &counter, sizeof(counter));
}
作用:
读取 eventfd 的计数,让 counter 变回 0
如果不读,eventfd 会一直处于“可读状态”,epoll_wait 每次都会立即返回,导致 CPU 飙满
你还写了注释讨论“信号量模式”和“普通模式”的区别:
信号量模式:一次 read 只减去 1,适合多个线程轮流抢任务
普通模式(默认):一次 read 把全部计数读出来,用于简单的“唤醒一次”的场景
你这个 Reactor 用的是“唤醒一次就够”的逻辑,普通模式即可。
7. addFd / modFd / delFd
模板:
bool reactor::addFd(int fd, uint32_t events, void* user) {epoll_event ev{};ev.data.fd = fd;ev.events = events | (useET ? EPOLLET : 0);epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);users_[fd] = user;
}bool reactor::modFd(int fd, uint32_t events, void* user) {epoll_event ev{};ev.data.fd = fd;ev.events = events | (useET ? EPOLLET : 0);epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev);if (user) users_[fd] = user;
}bool reactor::delFd(int fd) {epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr);users_.erase(fd);
}
两点非常关键:
epoll_ctl 第一个参数一定是 epfd_,不能传 fd
user 指针由 Reactor 保存,loop 时再取出来,把它传给 dispatcher_
8. 析构函数中的资源释放
reactor::~reactor() {stop();if (evfd_ != -1) close(evfd_);if (epfd_ != -1) close(epfd_);
}
stop() 负责停循环,析构负责真正关闭 epoll 和 eventfd。
不要在 stop() 里关 epfd_ 和 evfd_,否则 loop 里还在用时会炸。
三、Server 模块详细笔记
1. Server 的职责
与 Reactor 绑定
创建监听 socket,bind + listen
在 onAccept 中接受新连接并创建 Connection 对象
在 onConnRead 中处理收到的数据
在 onConnWrite 里发送数据
管理所有连接的生命周期
提供 stop(),关闭所有连接和监听
2. Server 的 start() 流程
精简后的逻辑:
bool Server::start() {bool expected = false;if (!running_.compare_exchange_strong(expected, true)) {return true; // 已经启动过了}bool ok = false;do {listenFd_ = socket(AF_INET, SOCK_STREAM, 0);setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, ...);
#ifdef SO_REUSEPORTsetsockopt(listenFd_, SOL_SOCKET, SO_REUSEPORT, ...);
#endifsetNonBlock(listenFd_);sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_port = htons(port_);addr.sin_addr.s_addr = htonl(INADDR_ANY);bind(listenFd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));listen(listenFd_, 128);reactor_.setDispatcher([this](int fd, uint32_t events, void* user){this->onEvent(fd, events, user);});reactor_.addFd(listenFd_, EPOLLIN, this);ok = true;} while (false);if (!ok) {if (listenFd_ != -1) { close(listenFd_); listenFd_ = -1; }running_.store(false);}return ok;
}
关键点:
running_ 使用 compare_exchange_strong 控制“只启动一次”
出错时要记得把 running_ 还原为 false,不然 Server 对象会处于“假启动状态”
SO_REUSEADDR 和 SO_REUSEPORT 防止端口重用问题,要小心宏名和 SOL_SOCKET 拼写
3. stop() 流程
void Server::stop() {if (!running_.exchange(false)) return;if (listenFd_ != -1) {reactor_.delFd(listenFd_);close(listenFd_);listenFd_ = -1;}std::lock_guard<std::mutex> lk(conns_mtx_);for (auto& kv : conns_) {int fd = kv.first;reactor_.delFd(fd);close(fd);}conns_.clear();
}
running_.exchange(false):只在第一次 stop 生效
删除 listenFd_,再关闭
遍历所有连接,删除事件并关闭 fd,然后清空 map
4. onEvent:Reactor 回调入口
void Server::onEvent(int fd, uint32_t events, void* user) {if (events & (EPOLLERR | EPOLLHUP)) {closeConn(fd);return;}if (fd == listenFd_) {if (events & EPOLLIN) onAccept();return;}std::unique_ptr<Connection>* p_holder = nullptr;{std::lock_guard<std::mutex> lk(conns_mtx_);auto it = conns_.find(fd);if (it == conns_.end()) return;p_holder = &it->second;}Connection& c = *p_holder->get();if (events & EPOLLIN) onConnRead(c);if (events & EPOLLOUT) onConnWrite(c);
}
优先处理错误事件,避免继续读写坏掉的 fd。
5. onAccept:接受新连接
void Server::onAccept() {for (;;) {sockaddr_in cli{};socklen_t len = sizeof(cli);int cfd = accept(listenFd_, reinterpret_cast<sockaddr*>(&cli), &len);if (cfd < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) break;perror("accept");break;}if (!setNonBlock(cfd)) { close(cfd); continue; }setTcpNoDelay(cfd);auto conn = std::make_unique<Connection>();conn->fd = cfd;Connection* user_ptr = conn.get();{std::lock_guard<std::mutex> lk(conns_mtx_);conns_.emplace(cfd, std::move(conn));}if (!reactor_.addFd(cfd, EPOLLIN, user_ptr)) {std::lock_guard<std::mutex> lk(conns_mtx_);conns_.erase(cfd);close(cfd);continue;}}
}
要点:
非阻塞 accept 循环,一直 accept 到出现 EAGAIN 或 EWOULDBLOCK 表示“当前没有更多连接”
新连接统一 setNonBlock + setTcpNoDelay
Connection 对象放在 map 里管理
user_ptr 传给 Reactor,让它在回调时交还给你
6. onConnRead:读数据 + 拆包 + 分发业务
void Server::onConnRead(Connection& c) {char buf[4096];for (;;) {ssize_t n = read(c.fd, buf, sizeof(buf));if (n > 0) {c.inbuf.append(buf, n);continue;}if (n == 0) {closeConn(c.fd);return;}if (errno == EAGAIN || errno == EWOULDBLOCK) break;perror("read");closeConn(c.fd);return;}size_t pos = 0;for (;;) {auto nl = c.inbuf.find('\n', pos);if (nl == std::string::npos) {c.inbuf.erase(0, pos);break;}std::string line = c.inbuf.substr(pos, nl - pos);if (!line.empty() && line.back() == '\r') line.pop_back();pos = nl + 1;auto out = processLine(line);postWrite(c.fd, std::move(out));}
}
说明:
非阻塞读循环:把当前内核缓冲区能读的数据都读完
0 表示对端关闭连接,调用 closeConn
错误处理:除了 EAGAIN/EWOULDBLOCK 之外都算致命
就地按行拆包(协议:一行一条消息,行末是 \n,可能有 \r)
7. onConnWrite:把缓冲区的数据发出去
void Server::onConnWrite(Connection& c) {for (;;) {if (c.outbuf.empty()) break;ssize_t n = write(c.fd, c.outbuf.data(), c.outbuf.size());if (n > 0) {c.outbuf.erase(0, n);continue;}if (errno == EAGAIN || errno == EWOULDBLOCK) break;perror("write");closeConn(c.fd);return;}if (c.outbuf.empty() && c.wantWrite) {c.wantWrite = false;reactor_.modFd(c.fd, EPOLLIN, nullptr);}
}
outbuf 用来存待发送的数据
写完或写到 EAGAIN 为止
当 outbuf 清空时,把 fd 的关注事件从 EPOLLIN | EPOLLOUT 改回 EPOLLIN,只保留读事件
8. closeConn:关闭连接的统一入口
void Server::closeConn(int fd) {{std::lock_guard<std::mutex> lk(conns_mtx_);auto it = conns_.find(fd);if (it == conns_.end()) return;}reactor_.delFd(fd);close(fd);{std::lock_guard<std::mutex> lk(conns_mtx_);conns_.erase(fd);}
}
顺序:
先确认这个 fd 还在 conns_ 里
从 Reactor 中删除 fd 的事件
close(fd)
从 conns_ 中 erase
9. processLine:业务逻辑示例
当前只是一个简单的 echo:
std::string Server::processLine(const std::string& line) {if (line == "quit" || line == "exit") {return "bye\n";}return "echo: " + line + "\n";
}
后面可以按协议来改成真实业务,比如 JSON、命令、聊天室等。
10. postWrite:线程安全地投递写操作
void Server::postWrite(int fd, std::string data) {std::unique_lock<std::mutex> lk(conns_mtx_);auto it = conns_.find(fd);if (it == conns_.end()) return;Connection& c = *it->second;c.outbuf.append(data);if (!c.wantWrite) {c.wantWrite = true;reactor_.modFd(fd, EPOLLIN | EPOLLOUT, &c);reactor_.wakeup();}
}
要点:
线程安全地访问 conns_,用锁保护
把要发的数据追加到 outbuf
如果之前没关注写事件(wantWrite=false),需要:
把 fd 的事件修改为 EPOLLIN | EPOLLOUT
调用 wakeup() 唤醒 epoll_wait,让 Reactor 立即处理这些更新
非常关键的一点:顺序是“先 modFd,再 wakeup”。
四、C++ 类型转换总结
1. static_cast
特点:
有类型检查
用于“本来就能转换”的情况
例子:void* 转成 Connection*,int 转 double 等
在你的项目中:
Connection* c = static_cast<Connection*>(user);
这里 user 原来就是 Connection*,只是通过 void* 存了一次,转回来用 static_cast 是合理、安全的。
2. reinterpret_cast
特点:
不检查类型是否有关系
只是按字节重新解释
用在 C 风格 API 上比较多
典型用法:
sockaddr_in addr{};
bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
原因:
sockaddr_in 和 sockaddr 没有继承关系
static_cast 不允许这种转换
C 的 socket API 要的就是 sockaddr*,只能用 reinterpret_cast 强行当成 sockaddr 来用
规则可以记成:
能用 static_cast 的地方不要用 reinterpret_cast
只有面对 C 接口、需要把具体结构体看成“某个通用类型”时才用 reinterpret_cast
五、atomic 和 running_ 控制退出
你的运行控制主要是这一对:
Reactor 里有一个
std::atomic<bool> running_Server 里也有一个
std::atomic<bool> running_
用法:
start() 时用 compare_exchange_strong 把 false 改成 true,防止重复启动
loop() 里用 running_.load 控制 while 循环
stop() 里用 compare_exchange_strong 或 exchange 把 running_ 改为 false,再唤醒 eventfd
配合 memory_order:
store(true, release) / load(acquire):保证跨线程的可见性
compare_exchange_strong(..., acq_rel):既有 acquire 又有 release,适合“把状态改掉”的场景
简单理解:
普通 bool 在多线程下可能看不到最新值,atomic 配合 memory_order 可以让“设置状态”和“读状态”变得可靠。
六、listen(backlog) 的真实含义
你今天问的:
listen(listenFd_, 128);
这里的 128 不是“最多 128 个客户端”,而是:
“处于已完成握手、等待 accept 的连接队列的最大长度”。
如果短时间内有很多 connect 请求,而服务器忙着处理现有连接,
队列满了以后,新的连接就可能失败或被重置。
实际最大队列长度还会受内核参数限制,例如 somaxconn。
七、Git:push 能不能写注释
答案:不能。
注释写在 commit 上,用
git commit -m "你的说明"git push 只是“把已有 commit 推到远程”,没有 message 参数
常用流程:
git add .
git commit -m "修复 Reactor 的 eventfd 唤醒逻辑,完善 Server 代码结构"
git push
如果只是想提示自己,可以在 shell 里用 echo 打印信息,但那不算 Git 注释。
八、今天容易踩坑的地方总结
wakeup 写错 fd(应该写 evfd_)
epoll_ctl 的第一个参数必须是 epfd_
SO_REUSEPORT 的宏使用要小心,第二个参数是 SOL_SOCKET
start() 失败时必须把 running_ 还原为 false
onAccept 里不要多次加锁,直接在插入前拿到 conn.get() 即可
postWrite 的操作顺序是:先 modFd,再 wakeup
DrainEvent 的实现和注释要统一,确保理解 eventfd 的计数行为
