当前位置: 首页 > news >正文

《Muduo网络库:实现Acceptor类》

Acceptor类是运行在mainLoop上,负责监听新用户连接的,即监听listenfd的读事件。

首先实现封装TCP socket操作的类Socket,通过封装系统调用,简化socket编程复杂度。

Socket.h

#pragma once#include "noncopyable.h"class InetAddress;class Socket : noncopyable
{
public:explicit Socket(int sockfd): sockfd_(sockfd){}~Socket();int fd() const { return sockfd_; }void bindAddress(const InetAddress &localaddr);void listen();int accept(InetAddress *peeraddr);void shutdownWrite();// Socket选项设置void setTcpNoDelay(bool on);void setReuseAddr(bool on);void setReusePort(bool on);void setKeepAlive(bool on);private:const int sockfd_;
};

Socket.cc

#include "Socket.h"
#include "Logger.h"
#include "InetAddress.h"#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <strings.h>Socket::~Socket()
{close(sockfd_);
}void Socket::bindAddress(const InetAddress &localaddr)
{if (0 != ::bind(sockfd_, (sockaddr *)localaddr.getSockAddr(), sizeof(sockaddr_in))){LOG_FATAL("bind sockfd:%d fail\n", sockfd_);}
}void Socket::listen()
{if (0 != ::listen(sockfd_, 1024)){LOG_FATAL("listen sockfd:%d fail\n", sockfd_);}
}// 返回accept获取的连接fd,并将远端的连接的地址信息填入peeraddr
int Socket::accept(InetAddress *peeraddr)
{sockaddr_in addr;socklen_t len;bzero(&addr, 0);int connfd = ::accept(sockfd_, (sockaddr *)&addr, &len);if (connfd >= 0){peeraddr->setSockAddr(addr);}return connfd;
}// 关闭写端
void Socket::shutdownWrite()
{if (::shutdown(sockfd_, SHUT_WR) < 0){LOG_ERROR("shutdownWrite error");}
}// 是否禁用Nagle算法
void Socket::setTcpNoDelay(bool on)
{int optval = on ? 1 : 0;::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof optval);
}// 是否允许重用本地地址
void Socket::setReuseAddr(bool on)
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}// 是否允许重用端口
void Socket::setReusePort(bool on)
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
}// 是否启用TCP保活机制
void Socket::setKeepAlive(bool on)
{int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
}

Acceptor.h

#pragma once#include "noncopyable.h"
#include "Socket.h"
#include "Channel.h"class EventLoop;
class InetAddress;class Acceptor : noncopyable
{
public:using NewConnectionCallback = std::function<void(int sockfd, InetAddress &)>;Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool resueport);~Acceptor();void setNewConnectionCallback(const NewConnectionCallback &cb){newConnectionCallback_ = cb;}bool listenning() const { return listenning_; }void listen();private:void handleRead();EventLoop *loop_; // 绑定的事件循环,Acceptor用的就是用户定义的那个baseLoop,也称作mainLoopSocket acceptSocket_;Channel acceptChannel_;                       // 将listenfd的读事件与回调绑定NewConnectionCallback newConnectionCallback_; //新连接的回调函数,由TcpServer设置bool listenning_;
};

Acceptor.cc

#include "Acceptor.h"
#include "Logger.h"
#include "InetAddress.h"#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <unistd.h>// 创建非阻塞socket fd
static int createNonblocking()
{int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);if (sockfd < 0){LOG_FATAL("%s:%s:%d listen socket create err:%d \n", __FILE__, __FUNCTION__, __LINE__, errno);}
}Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool resueport):loop_(loop),acceptSocket_(createNonblocking())       // 创建非阻塞listen fd,acceptChannel_(loop,acceptSocket_.fd())  // 绑定listenfd到channel,listenning_(false)
{acceptSocket_.setReuseAddr(true);      // 允许端口复用acceptSocket_.setReusePort(true);      // 允许多进程/线程复用端口acceptSocket_.bindAddress(listenAddr); // 绑定监听地址// TcpServer::start() Acceptor.listen() 如果有新用户连接,要执行一个回调(connfd => channel => subLoop)// baseLoop => acceptChannel_(listenfd) => Acceptor::handleRead()// 绑定读事件回调,新连接到来时触发handleReadacceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}Acceptor::~Acceptor()
{acceptChannel_.disableAll();acceptChannel_.remove();
}void Acceptor::listen()
{listenning_ = true;acceptSocket_.listen();         // 启动listen,开始监听连接请求acceptChannel_.enableReading(); // acceptChannel_ => Poller 将listenfd的读事件注册到EventLoop的Poller上
}// 当Poller检测到listenfd有读事件发生,即就是有新用户连接了,触发此回调
void Acceptor::handleRead()
{InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);  // 接收新连接if (connfd >= 0){if (newConnectionCallback_)  // 若上层设置了回调,传递connfd和新连接的客户端地址{newConnectionCallback_(connfd, peerAddr); // 回调要做的事情:轮询找到subLoop,唤醒,分发当前的新客户端的Channel}else{::close(connfd);}}else{LOG_ERROR("%s:%s:%d accept err:%d \n", __FILE__, __FUNCTION__, __LINE__, errno);if (errno == EMFILE)  // 进程打开的文件描述符上限{LOG_ERROR("%s:%s:%d sockfd reached limit! \n", __FILE__, __FUNCTION__, __LINE__);}}
}

核心逻辑

1、Acceptor初始化时创建非阻塞的listenfd,并且将该listenfd和loop绑定到channel,绑定listenfd地址信息,调用acceptChannel_.setReadCallback()将读事件与handleRead函数绑定。

2、启动监听,底层执行系统调用listen()监听客户端连接请求,同时将listenfd的读事件(新连接到来时触发)注册到mainLoop的Poller中。

3、当Poller监听到channel上有事件发生时,即listenfd上有读事件发生时,会自动触发该acceptChannel_上绑定的回调函数handleRead()。

4、回调函数handleRead()被执行,开始处理新连接,底层调用accept()获取新连接connfd和客户端地址peerAddr。

5、如果上层设置了newConnectionCallback_新连接回调,执行该回调,该连接可用于后续处理该新连接,比如将该连接轮询分配给subLoop继续监听。做到了回调解耦,即“接收连接”与“处理连接”分离,上层模块TcpServer可灵活决定新连接connfd的后续处理。

6、Acceptor析构时,禁用该acceptChannel_所有事件,并且将该channel从Poller中移除。

http://www.dtcms.com/a/524818.html

相关文章:

  • 第十三篇《TCP的可靠性:三次握手与四次挥手全解析》
  • SSE 流式响应实战:如何在 JavaScript 中处理 DeepSeek 流式 API
  • 在线阅读网站开发教程品牌建设促进会是什么工作
  • 一站式服务门户网站充值支付宝收款怎么做
  • 网站建设超速云免费小程序源码php
  • 如何裁剪u-boot,保留其必要功能,使体积尽可能小
  • 借助智能 GitHub Copilot 副驾驶 的 Agent Mode 升级 Java 项目
  • 广州市网站建设 乾图信息科技在哪里建网站
  • Flutter---自定义日期选择对话框
  • 怎么代码放到网站上网站建设需要的公司
  • k8s部署容器化应用-tomcat
  • AI开发工具实战解析:如何实现企业数据处理流程自动化
  • asp装饰公司网站源码黑龙江 俄语网站制作
  • 网站设计公司无锡网站初期建设的成本来源
  • 通过API接口创建1688订单,一键采购指南
  • UniGetUI下载安装图文教程(附安装包,适合新手)
  • 网站开发怎么自动获取位置wordpress免费博客主题
  • thinkphp做的网站源码怎么做网页啊
  • 备份一体机:数据同步困局突围指南:毫秒级实时同步如何根治80%业务痛点
  • nexus上传jar包图文步骤
  • 【2025 最新】ArcGIS for JS TileLayer/FeatureLayer/ImageLayer 用法对比
  • AntV X6实战:实现节点四边自动连接与自定义箭头的完整配置
  • 黄骅市网站建设广州市官方网站
  • 华为OD机考双机位A卷 - 最长的密码 / 寻找密码 (C++ Python JAVA JS GO)
  • ELK日志分析系统完整部署与应用指南
  • 浦江做网站茂名网站建设公司
  • 【CVE-2025-40778】通过未经请求的答复记录进行 BIND 9 缓存中毒(内含复现步骤)
  • 架构权衡与实践:基于“约束大于规范”的缓存组件封装
  • 【实战经验】飞牛云 如何使用 SSD 缓存加速?
  • 数据结构--顺序表与链表