当前位置: 首页 > news >正文

今日内容总结

一、今天在干什么:整体框架回顾

今天主要围绕这几个点展开:

  1. 把自己的 Reactor 类完善:

    • epoll + eventfd 唤醒

    • 事件循环 loop

    • addFd / modFd / delFd

  2. Server 类写完整:

    • 监听端口、接受连接

    • 管理每个 Connection 的收发

    • 和 Reactor 配合,用回调 onEvent 驱动逻辑

  3. 弄清楚一堆底层关键点:

    • eventfd 是干嘛的

    • epoll_wait 的事件是怎么来的

    • static_cast 和 reinterpret_cast 区别

    • atomic running_ 怎么控制循环退出

    • listen(backlog) 参数是什么意思

    • git push 能不能写注释

下面按模块细讲。


二、Reactor 模块详细笔记

1. Reactor 的定位

 Reactor 负责:

  • 创建 epoll 实例,统一管理所有 fd 的事件

  • 持有一个 eventfd,用来跨线程唤醒 epoll_wait

  • loop() 中调用 epoll_wait 获取就绪事件,然后分发给上层(Server)的 dispatcher 回调

  • 提供 addFd / modFd / delFd 接口给 Server 使用

  • 提供 stop() 让外部线程可以优雅地让 loop 退出

整体流程可以理解为一句话:

Reactor 在一个线程里死循环等 epoll_wait,
有事件了就调用 dispatcher_(fd, events, user)。

2. 构造函数里的关键初始化

构造函数大致做了三件事:

epfd_ = epoll_create1(EPOLL_CLOEXEC);
evfd_ = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);// 把 evfd_ 加入 epoll,关注 EPOLLIN
epoll_event ev{};
ev.data.fd = evfd_;
ev.events = EPOLLIN;
epoll_ctl(epfd_, EPOLL_CTL_ADD, evfd_, &ev);

要点:

  • epfd_:epoll 的“管理句柄”,所有 epoll_ctl、epoll_wait 都用它

  • evfd_:eventfd,内核里是一个 64 位无符号计数器

  • 把 evfd_ 加入 epoll,是为了后面别的线程写 eventfd 时,可以唤醒 epoll_wait

3. running_ 和事件循环 loop()

loop() 里控制变量是:

std::atomic<bool> running_;

在 loop 开始时:

running_.store(true, std::memory_order_release);while (running_.load(std::memory_order_acquire)) {int n = epoll_wait(epfd_, eventList_.data(), eventList_.size(), -1);...
}

解释:

  • store(true, release):告诉其他线程“我已经开始跑 loop 了”,并且保证之前的初始化动作对其他线程可见

  • while 里用 load(acquire):其他线程把 running_ 改为 false 时,loop 能立即看到,并且看到的内存状态是正确的

配合 stop() 使用:

void reactor::stop() {bool expected = true;if (running_.compare_exchange_strong(expected, false, std::memory_order_acq_rel)) {wakeup();}
}
  • compare_exchange_strong:只有在 running_ 原来是 true 的时候,才会改成 false,并返回成功

  • 成功时调用 wakeup(),唤醒 epoll_wait,让循环跳出 while

4. wakeup() 和 eventfd 的作用

wakeup() 的实现:

void reactor::wakeup() {uint64_t one = 1;write(evfd_, &one, sizeof(one));
}

意思就是:往 eventfd 写一个 1。

eventfd 内核行为:

  • 内部有一个 64 位整数 counter

  • 每写一次 8 字节无符号整数,counter += 写入的值

  • 当 counter > 0 时,这个 fd 会被视为“可读”,epoll_wait 会返回对应的事件

因此:

任何线程调用 wakeup(),
实际效果是:写 eventfd,让 epoll_wait 立即返回。

5. epoll_wait 里如何处理 eventfd

loop() 中的事件处理逻辑大致是:

int n = epoll_wait(epfd_, eventList_.data(), eventList_.size(), -1);for (int i = 0; i < n; ++i) {int fd = eventList_[i].data.fd;uint32_t events = eventList_[i].events;if (fd == evfd_) {DrainEvent(evfd_);continue;}void* user = nullptr;{std::lock_guard<std::mutex> lock(user_mtx_);auto it = users_.find(fd);if (it != users_.end()) user = it->second;}dispatcher_(fd, events, user);
}

解释:

  • 如果是 evfd_,说明这是一个“唤醒信号”,比如 stop 调用、线程池调用 wakeup

  • 对 evfd_ 调用 DrainEvent,把内核计数器读掉,避免 epoll_wait 一直为可读导致死循环

  • 其他 fd,从 users_ 里取出 user 指针(通常是 Connection* 或 this 指针)

  • 最后调用 dispatcher_,由上层 Server 的 onEvent 接管

6. DrainEvent 的意义

一个简化实现可以像这样:

static void DrainEvent(int evfd) {uint64_t counter;read(evfd, &counter, sizeof(counter));
}

作用:

  • 读取 eventfd 的计数,让 counter 变回 0

  • 如果不读,eventfd 会一直处于“可读状态”,epoll_wait 每次都会立即返回,导致 CPU 飙满

你还写了注释讨论“信号量模式”和“普通模式”的区别:

  • 信号量模式:一次 read 只减去 1,适合多个线程轮流抢任务

  • 普通模式(默认):一次 read 把全部计数读出来,用于简单的“唤醒一次”的场景

你这个 Reactor 用的是“唤醒一次就够”的逻辑,普通模式即可。

7. addFd / modFd / delFd

模板:

bool reactor::addFd(int fd, uint32_t events, void* user) {epoll_event ev{};ev.data.fd = fd;ev.events = events | (useET ? EPOLLET : 0);epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev);users_[fd] = user;
}bool reactor::modFd(int fd, uint32_t events, void* user) {epoll_event ev{};ev.data.fd = fd;ev.events = events | (useET ? EPOLLET : 0);epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev);if (user) users_[fd] = user;
}bool reactor::delFd(int fd) {epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr);users_.erase(fd);
}

两点非常关键:

  1. epoll_ctl 第一个参数一定是 epfd_,不能传 fd

  2. user 指针由 Reactor 保存,loop 时再取出来,把它传给 dispatcher_

8. 析构函数中的资源释放

reactor::~reactor() {stop();if (evfd_ != -1) close(evfd_);if (epfd_ != -1) close(epfd_);
}

stop() 负责停循环,析构负责真正关闭 epoll 和 eventfd。
不要在 stop() 里关 epfd_ 和 evfd_,否则 loop 里还在用时会炸。


三、Server 模块详细笔记

1. Server 的职责

  • 与 Reactor 绑定

  • 创建监听 socket,bind + listen

  • 在 onAccept 中接受新连接并创建 Connection 对象

  • 在 onConnRead 中处理收到的数据

  • 在 onConnWrite 里发送数据

  • 管理所有连接的生命周期

  • 提供 stop(),关闭所有连接和监听

2. Server 的 start() 流程

精简后的逻辑:

bool Server::start() {bool expected = false;if (!running_.compare_exchange_strong(expected, true)) {return true; // 已经启动过了}bool ok = false;do {listenFd_ = socket(AF_INET, SOCK_STREAM, 0);setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, ...);
#ifdef SO_REUSEPORTsetsockopt(listenFd_, SOL_SOCKET, SO_REUSEPORT, ...);
#endifsetNonBlock(listenFd_);sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_port   = htons(port_);addr.sin_addr.s_addr = htonl(INADDR_ANY);bind(listenFd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));listen(listenFd_, 128);reactor_.setDispatcher([this](int fd, uint32_t events, void* user){this->onEvent(fd, events, user);});reactor_.addFd(listenFd_, EPOLLIN, this);ok = true;} while (false);if (!ok) {if (listenFd_ != -1) { close(listenFd_); listenFd_ = -1; }running_.store(false);}return ok;
}

关键点:

  • running_ 使用 compare_exchange_strong 控制“只启动一次”

  • 出错时要记得把 running_ 还原为 false,不然 Server 对象会处于“假启动状态”

  • SO_REUSEADDR 和 SO_REUSEPORT 防止端口重用问题,要小心宏名和 SOL_SOCKET 拼写

3. stop() 流程

void Server::stop() {if (!running_.exchange(false)) return;if (listenFd_ != -1) {reactor_.delFd(listenFd_);close(listenFd_);listenFd_ = -1;}std::lock_guard<std::mutex> lk(conns_mtx_);for (auto& kv : conns_) {int fd = kv.first;reactor_.delFd(fd);close(fd);}conns_.clear();
}
  • running_.exchange(false):只在第一次 stop 生效

  • 删除 listenFd_,再关闭

  • 遍历所有连接,删除事件并关闭 fd,然后清空 map

4. onEvent:Reactor 回调入口

void Server::onEvent(int fd, uint32_t events, void* user) {if (events & (EPOLLERR | EPOLLHUP)) {closeConn(fd);return;}if (fd == listenFd_) {if (events & EPOLLIN) onAccept();return;}std::unique_ptr<Connection>* p_holder = nullptr;{std::lock_guard<std::mutex> lk(conns_mtx_);auto it = conns_.find(fd);if (it == conns_.end()) return;p_holder = &it->second;}Connection& c = *p_holder->get();if (events & EPOLLIN)  onConnRead(c);if (events & EPOLLOUT) onConnWrite(c);
}

优先处理错误事件,避免继续读写坏掉的 fd。

5. onAccept:接受新连接

void Server::onAccept() {for (;;) {sockaddr_in cli{};socklen_t len = sizeof(cli);int cfd = accept(listenFd_, reinterpret_cast<sockaddr*>(&cli), &len);if (cfd < 0) {if (errno == EAGAIN || errno == EWOULDBLOCK) break;perror("accept");break;}if (!setNonBlock(cfd)) { close(cfd); continue; }setTcpNoDelay(cfd);auto conn = std::make_unique<Connection>();conn->fd = cfd;Connection* user_ptr = conn.get();{std::lock_guard<std::mutex> lk(conns_mtx_);conns_.emplace(cfd, std::move(conn));}if (!reactor_.addFd(cfd, EPOLLIN, user_ptr)) {std::lock_guard<std::mutex> lk(conns_mtx_);conns_.erase(cfd);close(cfd);continue;}}
}

要点:

  • 非阻塞 accept 循环,一直 accept 到出现 EAGAIN 或 EWOULDBLOCK 表示“当前没有更多连接”

  • 新连接统一 setNonBlock + setTcpNoDelay

  • Connection 对象放在 map 里管理

  • user_ptr 传给 Reactor,让它在回调时交还给你

6. onConnRead:读数据 + 拆包 + 分发业务

void Server::onConnRead(Connection& c) {char buf[4096];for (;;) {ssize_t n = read(c.fd, buf, sizeof(buf));if (n > 0) {c.inbuf.append(buf, n);continue;}if (n == 0) {closeConn(c.fd);return;}if (errno == EAGAIN || errno == EWOULDBLOCK) break;perror("read");closeConn(c.fd);return;}size_t pos = 0;for (;;) {auto nl = c.inbuf.find('\n', pos);if (nl == std::string::npos) {c.inbuf.erase(0, pos);break;}std::string line = c.inbuf.substr(pos, nl - pos);if (!line.empty() && line.back() == '\r') line.pop_back();pos = nl + 1;auto out = processLine(line);postWrite(c.fd, std::move(out));}
}

说明:

  • 非阻塞读循环:把当前内核缓冲区能读的数据都读完

  • 0 表示对端关闭连接,调用 closeConn

  • 错误处理:除了 EAGAIN/EWOULDBLOCK 之外都算致命

  • 就地按行拆包(协议:一行一条消息,行末是 \n,可能有 \r)

7. onConnWrite:把缓冲区的数据发出去

void Server::onConnWrite(Connection& c) {for (;;) {if (c.outbuf.empty()) break;ssize_t n = write(c.fd, c.outbuf.data(), c.outbuf.size());if (n > 0) {c.outbuf.erase(0, n);continue;}if (errno == EAGAIN || errno == EWOULDBLOCK) break;perror("write");closeConn(c.fd);return;}if (c.outbuf.empty() && c.wantWrite) {c.wantWrite = false;reactor_.modFd(c.fd, EPOLLIN, nullptr);}
}
  • outbuf 用来存待发送的数据

  • 写完或写到 EAGAIN 为止

  • 当 outbuf 清空时,把 fd 的关注事件从 EPOLLIN | EPOLLOUT 改回 EPOLLIN,只保留读事件

8. closeConn:关闭连接的统一入口

void Server::closeConn(int fd) {{std::lock_guard<std::mutex> lk(conns_mtx_);auto it = conns_.find(fd);if (it == conns_.end()) return;}reactor_.delFd(fd);close(fd);{std::lock_guard<std::mutex> lk(conns_mtx_);conns_.erase(fd);}
}

顺序:

  1. 先确认这个 fd 还在 conns_ 里

  2. 从 Reactor 中删除 fd 的事件

  3. close(fd)

  4. 从 conns_ 中 erase

9. processLine:业务逻辑示例

当前只是一个简单的 echo:

std::string Server::processLine(const std::string& line) {if (line == "quit" || line == "exit") {return "bye\n";}return "echo: " + line + "\n";
}

后面可以按协议来改成真实业务,比如 JSON、命令、聊天室等。

10. postWrite:线程安全地投递写操作

void Server::postWrite(int fd, std::string data) {std::unique_lock<std::mutex> lk(conns_mtx_);auto it = conns_.find(fd);if (it == conns_.end()) return;Connection& c = *it->second;c.outbuf.append(data);if (!c.wantWrite) {c.wantWrite = true;reactor_.modFd(fd, EPOLLIN | EPOLLOUT, &c);reactor_.wakeup();}
}

要点:

  • 线程安全地访问 conns_,用锁保护

  • 把要发的数据追加到 outbuf

  • 如果之前没关注写事件(wantWrite=false),需要:

    • 把 fd 的事件修改为 EPOLLIN | EPOLLOUT

    • 调用 wakeup() 唤醒 epoll_wait,让 Reactor 立即处理这些更新

非常关键的一点:顺序是“先 modFd,再 wakeup”。


四、C++ 类型转换总结

1. static_cast

特点:

  • 有类型检查

  • 用于“本来就能转换”的情况

  • 例子:void* 转成 Connection*,int 转 double 等

在你的项目中:

Connection* c = static_cast<Connection*>(user);

这里 user 原来就是 Connection*,只是通过 void* 存了一次,转回来用 static_cast 是合理、安全的。

2. reinterpret_cast

特点:

  • 不检查类型是否有关系

  • 只是按字节重新解释

  • 用在 C 风格 API 上比较多

典型用法:

sockaddr_in addr{};
bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));

原因:

  • sockaddr_in 和 sockaddr 没有继承关系

  • static_cast 不允许这种转换

  • C 的 socket API 要的就是 sockaddr*,只能用 reinterpret_cast 强行当成 sockaddr 来用

规则可以记成:

  • 能用 static_cast 的地方不要用 reinterpret_cast

  • 只有面对 C 接口、需要把具体结构体看成“某个通用类型”时才用 reinterpret_cast


五、atomic 和 running_ 控制退出

你的运行控制主要是这一对:

  • Reactor 里有一个 std::atomic<bool> running_

  • Server 里也有一个 std::atomic<bool> running_

用法:

  1. start() 时用 compare_exchange_strong 把 false 改成 true,防止重复启动

  2. loop() 里用 running_.load 控制 while 循环

  3. stop() 里用 compare_exchange_strong 或 exchange 把 running_ 改为 false,再唤醒 eventfd

配合 memory_order:

  • store(true, release) / load(acquire):保证跨线程的可见性

  • compare_exchange_strong(..., acq_rel):既有 acquire 又有 release,适合“把状态改掉”的场景

简单理解:
普通 bool 在多线程下可能看不到最新值,atomic 配合 memory_order 可以让“设置状态”和“读状态”变得可靠。


六、listen(backlog) 的真实含义

你今天问的:

listen(listenFd_, 128);

这里的 128 不是“最多 128 个客户端”,而是:

“处于已完成握手、等待 accept 的连接队列的最大长度”。

如果短时间内有很多 connect 请求,而服务器忙着处理现有连接,
队列满了以后,新的连接就可能失败或被重置。

实际最大队列长度还会受内核参数限制,例如 somaxconn。


七、Git:push 能不能写注释

答案:不能。

  • 注释写在 commit 上,用 git commit -m "你的说明"

  • git push 只是“把已有 commit 推到远程”,没有 message 参数

常用流程:

git add .
git commit -m "修复 Reactor 的 eventfd 唤醒逻辑,完善 Server 代码结构"
git push

如果只是想提示自己,可以在 shell 里用 echo 打印信息,但那不算 Git 注释。


八、今天容易踩坑的地方总结

  1. wakeup 写错 fd(应该写 evfd_)

  2. epoll_ctl 的第一个参数必须是 epfd_

  3. SO_REUSEPORT 的宏使用要小心,第二个参数是 SOL_SOCKET

  4. start() 失败时必须把 running_ 还原为 false

  5. onAccept 里不要多次加锁,直接在插入前拿到 conn.get() 即可

  6. postWrite 的操作顺序是:先 modFd,再 wakeup

  7. DrainEvent 的实现和注释要统一,确保理解 eventfd 的计数行为

http://www.dtcms.com/a/607272.html

相关文章:

  • 除了PubMed,还有哪些稳定好用的查找医学文献的平台?
  • 网站登记模板网站免费建设
  • 网站建设在电子商务中的作用的看法外贸网站开发营销
  • bug【celery】
  • 常用个人网站襄县网站建设
  • MiniGPT-4:解锁 LLM 驱动的高级视觉语言能力
  • 网站设计常用软件都有哪些台州超值营销型网站建设地址
  • Rust入门:基础语法应用
  • 中国建设银行广西分行网站首页肥西县建设局网站
  • 遥感论文学习
  • 镇江市住房城乡建设局网站qq免费的推广引流软件
  • github下载repo中的单独文件
  • 营销网页wordpress数据库优化
  • 网站轮播图居中代码怎么写工作室建设规划
  • 青岛市建设安全监督站网站外贸网站建设原则
  • 大作设计网站官网登录微赞直播平台
  • 宿州网站建设公司哪家好html电子商务网站模板下载
  • 做搜狗手机网站排名软2345官网
  • 烟台网站排名seowordpress主题字体
  • 深圳提供网站建设制作wordpress收费会员
  • [科普] 卫星共视授时原理
  • 个人建设网站流程图温州市微网站制作电话
  • 做直播小视频在线观看网站龙岩个人小程序开发
  • Spring Boot 从 2.7.x 升级到 3.3注意事项
  • YOLO11-MSAM:印尼传统蜡染图案智能识别系统实现
  • 台州千寻网站建设公司好看网电影网站模板免费下载
  • 百度收录什么网站吗网页设计ppt模板
  • Mac怎么搭建网站开发环境jsp网站开发制作
  • 珠海网站建设公司哪家好网站开发工具需求
  • IDA反编译成C语言 | 深入解析反编译技术与应用