当前位置: 首页 > wzjs >正文

WordPress 网站小图标深圳门户网站开发

WordPress 网站小图标,深圳门户网站开发,wordpress修改导航,渭南免费做网站目录 Tcp模块的流程 测试代码 TcpServer初始化阶段详解 TcpServer构造函数执行流程 EventLoop创建详细步骤: Channel与Poller的关联 设置Acceptor回调函数 启动监听 启动服务器完整流程 连接建立流程详解 新连接到达流程 连接初始化详解 连接就绪详解 …

目录

Tcp模块的流程 

测试代码

TcpServer初始化阶段详解

 TcpServer构造函数执行流程

EventLoop创建详细步骤:

Channel与Poller的关联

设置Acceptor回调函数

启动监听

启动服务器完整流程

连接建立流程详解

新连接到达流程

连接初始化详解

连接就绪详解

数据收发流程详解

数据接收流程

 数据发送流程

关于Http模块的流程

HTTP请求的接收流程

HTTP响应的发送流程

模块初始化都干了什么事情

Poller对象的创建

EventLoop对象的创建

 Channel对象的创建

Socket对象的创建

Acceptor对象的创建

TcpServer对象的创建

​编辑LoopThreadPool对象的创建

LoopThread对象的创建

Timer_wheel对象的创建

Connection对象的创建

1. 连接触发监听套接字的可读事件

2. Poller检测到事件

3. EventLoop处理就绪事件

4. Acceptor的HandleRead方法被调用

5. TcpServer::NewConnection方法被调用

6. Connection对象创建

7. Connection::Established方法执行

8. Connection::EstablishedInLoop方法执行

9. 启动读事件监控

10. 更新Poller中的事件监控

11. 调用连接建立回调函数

12. 连接就绪,等待数据交互


Tcp模块的流程 

EventLoop模块

构造函数 

class EventLoop {private:using Functor = std::function<void()>;std::thread::id _thread_id;//线程IDint _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;Poller _poller;//进行所有描述符的事件监控std::vector<Functor> _tasks;//任务池std::mutex _mutex;//实现任务池操作的线程安全TimerWheel _timer_wheel;//定时器模块public:EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)),_timer_wheel(this) {//给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));//启动eventfd的读事件监控_event_channel->EnableRead();}
};

 Start接口

        void Start() {while(1) {//1. 事件监控, std::vector<Channel *> actives;_poller.Poll(&actives);//2. 事件处理。 for (auto &channel : actives) {channel->HandleEvent();}//3. 执行任务RunAllTask();}}

 TcpServer模块

class TcpServer {    private:uint64_t _next_id;      //这是一个自动增长的连接ID,int _port;int _timeout;           //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop;    //这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;    //这是监听套接字的管理对象LoopThreadPool _pool;   //这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;    public:TcpServer(int port):_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//将监听套接字挂到baseloop上}void SetThreadCount(int count) { return _pool.SetThreadCount(count); }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; }//用于添加一个定时任务void RunAfter(const Functor &task, int delay) {_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));}void Start() { _pool.Create();  _baseloop.Start(); }
};

Poller模块

class Poller {private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;public:Poller() {_epfd = epoll_create(MAX_EPOLLEVENTS);if (_epfd < 0) {ERR_LOG("EPOLL CREATE FAILED!!");abort();//退出程序}}
};

 Acceptor模块

class Acceptor {private:Socket _socket;//用于创建监听套接字EventLoop *_loop; //用于对监听套接字进行事件监控Channel _channel; //用于对监听套接字进行事件管理using AcceptCallback = std::function<void(int)>;AcceptCallback _accept_callback;public:/*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*//*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) {_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));}void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; }void Listen() { _channel.EnableRead(); }void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return ;}if (_accept_callback) _accept_callback(newfd);}
};

 Channel模块

构造函数 

class Channel {private:int _fd;EventLoop *_loop;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件using EventCallback = std::function<void()>;EventCallback _read_callback;   //可读事件被触发的回调函数EventCallback _write_callback;  //可写事件被触发的回调函数EventCallback _error_callback;  //错误事件被触发的回调函数EventCallback _close_callback;  //连接断开事件被触发的回调函数EventCallback _event_callback;  //任意事件被触发的回调函数public:Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}
};

LoopThreadPool模块 

构造函数 

class LoopThreadPool {private:int _thread_count;int _next_idx;EventLoop *_baseloop;std::vector<LoopThread*> _threads;std::vector<EventLoop *> _loops;public:LoopThreadPool(EventLoop *baseloop):_thread_count(0), _next_idx(0), _baseloop(baseloop) {}
};

Create接口 

        void Create() {if (_thread_count > 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++) {_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return ;}

Connection模块

class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection> {private:uint64_t _conn_id;  // 连接的唯一ID,便于连接的管理和查找//uint64_t _timer_id;   //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器IDint _sockfd;        // 连接关联的文件描述符bool _enable_inactive_release;  // 连接是否启动非活跃销毁的判断标志,默认为falseEventLoop *_loop;   // 连接所关联的一个EventLoopConnStatu _statu;   // 连接状态Socket _socket;     // 套接字操作管理Channel _channel;   // 连接的事件管理Buffer _in_buffer;  // 输入缓冲区---存放从socket中读取到的数据Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据Any _context;       // 请求的接收处理上下文/*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*//*换句话说,这几个回调都是组件使用者使用的*/using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*//*就应该从管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}
};

一个Muduo网络库的流程:

在main线程中首先创建一个TcpServer对象_server,

这个_server在构造的时候创建main_loop,从属线程池_pool,_acceptor,设置非活跃连接时间,非活跃连接关闭为false,并且把从属线程池也注册到main_loop上,把acceptor注册到main_loop上,然后给_acceptor绑定新连接到来的回调函数(连接ID递增,增加连接计数器,确保每个连接有唯一ID,创建连接对象:PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)),创建新的Connection智能指针对象,通过线程池获取下一个可用的EventLoop (_pool.NextLoop()),传入新的连接ID和套接字文件描述符,设置回调函数,启用非活跃连接超时检测,初始化连接:conn->Established() 将连接标记为已建立,可能包括:更改连接状态为CONNECTED,启用读事件监听,调用连接建立回调函数,保存连接:将新连接添加到连接管理容器中,使用连接ID作为键,这样服务器可以通过ID快速查找和管理连接),然后调用Listen接口用来监听新连接的到来

这个main_loop在构造的时候创建了_event_channel(使用智能指针管理),_poller,_event_fd,_thread_id(初始化为当前线程的ID,),_task,_mutex,_timer_wheel,然后给_event_channel绑定可读事件回调函数,并且启动eventfd的读事件监控

这个_poller在构造的时候创建了_epfd存储由epoll_create()返回的epoll文件描述符的整数,_evs是一个epoll_event结构体数组,用于存储事件,_channels是一个哈希表,将文件描述符(int)与Channel指针关联起来,使用epoll_create()创建一个epoll实例

这个_pool在构造的时候创建了 _thread_count;线程池中IO线程的数量, _next_idx用于轮询分配线程的下标(实现负载均衡),EventLoop *_baseloop指向主线程的EventLoop,std::vector<LoopThread*> _threads保存所有IO线程对象的指针,std::vector<EventLoop *> _loops保存所有IO线程对应的EventLoop指针

这个_acceptor在构造的时候创建了_socket用于创建监听套接字,EventLoop *_loop用于对监听套接字进行事件监控,Channel _channel用于对监听套接字进行事件管理 _socket(CreateServer(port))创建监听socket并绑定端口, _loop(loop)保存主事件循环指针,_channel(loop, _socket.Fd())创建Channel,管理监听socket的事件,并且 _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this))它使用 std::bind 将 Acceptor 类的 HandleRead 成员函数绑定为当通道上有数据可读时将被调用的回调函数。

这个socket在构造的时候会创一个文件描述符_sockfd,当调用CreateServer会创建套接字,绑定地址,开始监听,设置非阻塞,启动地址重用

这个_channel在构造的时候创建了个_fd,*_loop指针,需要监控的事件类型_events,已经触发的事件类型_revents,各种事件回调函数的设置,并且初始化_loop,将Channel与特定的事件循环(EventLoop)关联起来。以便这个loop能管理和调度该Channel上的事件。

这个conn在构造的时候,会把该conn注册到传入的loop中,同时也会创建一个channel,把这个channel注册到这个loop中,同时也需要把sockfd注册到channel上,因为Channel知道它要监听哪个文件描述符(sockfd),Channel知道它的事件应该由哪个事件循环(loop)处理,并且给这个channel绑定上读/写/错误/关闭的事件回调函数,然后会创建 _in_buffer用户态输入缓冲区---存放从socket中读取到的数据。_out_buffer用户态输出缓冲区---存放要发送给对端的数据。当客户端发送数据时,SubLoop检测到读事件,调用lcpconnection::handleRead进行数据的读取,操作系统内核接收数据,暂存在socket的接收缓冲区,然后再把数据被读入InputBuffer,调用用户注册的messageCallback,将数据传递给上层应用处理,上层应用根据自己的业务逻辑处理数据。然后服务器构建好了要把数据响应给上层应用调用TcpConnection::send()发送数据,数据被写入OutputBuffer,然后把OutputBuffer的数据尝试发送到内核缓冲区,如果不能完全发送,则:将剩余数据留在OutputBuffer中,向Poller注册该连接socket的写事件,当socket可写时,SubLoop检测到写事件,继续发送OutputBuffer中的数据,数据发送完毕后,取消写事件关注

测试代码

#include "m_server.h"// 简单的回显服务器测试
class EchoServer {
private:TcpServer _server;public:EchoServer(int port) : _server(port) {// 设置服务器回调函数_server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));_server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));// 启用非活跃连接超时释放(60秒)_server.EnableInactiveRelease(60);// 设置工作线程数量_server.SetThreadCount(4);}// 连接建立回调void OnConnected(const PtrConnection& conn) {INF_LOG("新连接建立: %d", conn->Id());// 发送欢迎消息std::string welcome = "欢迎连接到回显服务器!\n";conn->Send(welcome.c_str(), welcome.size());}// 消息处理回调void OnMessage(const PtrConnection& conn, Buffer* buf) {// 读取所有可读数据std::string msg = buf->ReadAsStringAndPop(buf->ReadAbleSize());INF_LOG("收到消息[%d]: %s", conn->Id(), msg.c_str());// 回显消息std::string echo_prefix = "回显: ";std::string response = echo_prefix + msg;conn->Send(response.c_str(), response.size());}// 连接关闭回调void OnClosed(const PtrConnection& conn) {INF_LOG("连接关闭: %d", conn->Id());}// 启动服务器void Start() {INF_LOG("回显服务器启动在端口: %d", 8080);_server.Start();}
};int main(int argc, char* argv[]) {int port = 8080;if (argc > 1) {port = atoi(argv[1]);}INF_LOG("启动回显服务器,端口: %d", port);// 创建并启动服务器EchoServer server(port);server.Start();return 0;
}

TcpServer初始化阶段详解

 TcpServer构造函数执行流程

private:uint64_t _next_id;      //这是一个自动增长的连接ID,int _port;int _timeout;           //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志EventLoop _baseloop;    //这是主线程的EventLoop对象,负责监听事件的处理Acceptor _acceptor;    //这是监听套接字的管理对象LoopThreadPool _pool;   //这是从属EventLoop线程池std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象using ConnectedCallback = std::function<void(const PtrConnection&)>;using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;using ClosedCallback = std::function<void(const PtrConnection&)>;using AnyEventCallback = std::function<void(const PtrConnection&)>;using Functor = std::function<void()>;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;
TcpServer(int port):_port(port),_next_id(0),_enable_inactive_release(false),_acceptor(&_baseloop, port),_pool(&_baseloop) {_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen();//将监听套接字挂到baseloop上
}

详细步骤:

成员初始化:

  • _port: 存储服务器监听端口
  • _next_id: 连接ID计数器初始化为0
  • _enable_inactive_release: 非活跃连接释放标志设为false

当TcpServer构造函数中初始化_baseloop成员变量时,会创建EventLoop对象:

EventLoop::EventLoop():_thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)),_poller(),  // 创建Poller对象_timer_wheel(this) {// 设置eventfd的读事件回调_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();
}

EventLoop创建详细步骤:

初始化线程ID: 

   _thread_id = std::this_thread::get_id()
  • 记录创建EventLoop的线程ID,用于后续线程安全检查 

创建eventfd:

   _event_fd = CreateEventFd()static int CreateEventFd() {int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0) {ERR_LOG("CREATE EVENTFD FAILED!!");abort();}return efd;}
  • 创建一个eventfd用于线程间通信,设置为非阻塞模式

创建event_channel:

   _event_channel = new Channel(this, _event_fd)
  • 创建一个Channel对象管理eventfd的事件

创建Poller对象:

   _poller()  // 默认构造函数
  • 创建Poller对象用于事件监控

创建TimerWheel对象

   _timer_wheel(this)
  •  创建定时器管理对象

设置eventfd回调:

   _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
  •  设置eventfd的读事件回调函数

启用eventfd读事件监控:

   _event_channel->EnableRead();
  • 启用eventfd的读事件监控,这会将eventfd添加到Poller中 

 Poller创建过程

当EventLoop构造函数中初始化_poller成员变量时,会创建Poller对象:

Poller::Poller() {_epfd = epoll_create(MAX_EPOLLEVENTS);if (_epfd < 0) {ERR_LOG("EPOLL CREATE FAILED!!");abort();}
}

Channel与Poller的关联

当调用Channel::EnableRead()等方法时,会将Channel添加到Poller中:

      Acceptor创建过程:

      • 传入&_baseloop和port构造Acceptor
      • Acceptor构造函数内部用CreateServer(port)创建监听套接字:
      Acceptor(EventLoop *loop, int port): _socket(CreateServer(port)), _loop(loop), _channel(loop, _socket.Fd()) {_channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
      }
      • CreateServer 内部调用:
      int CreateServer(int port) {bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();
      }
      • Socket::CreateServer 执行五个关键操作:
      bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {//1. 创建套接字if (Create() == false) return false;//2. 设置非阻塞(如果需要)if (block_flag) NonBlock();//3. 绑定地址if (Bind(ip, port) == false) return false;//4. 开始监听(默认最大连接队列1024)if (Listen() == false) return false;//5. 启动地址端口重用ReuseAddress();return true;
      }
      • 创建Channel对象管理监听套接字,设置读事件回调函数为 Acceptor::HandleRead

      Channel构造:

         Channel(EventLoop *loop, int fd): _fd(fd), _events(0), _revents(0), _loop(loop) {}
        • 将监听套接字的文件描述符与Channel关联
        • 初始化事件标志和关联的EventLoop
        • 此时还未设置任何监控事件(events = 0)

        设置回调函数

           _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
        • 将Acceptor::HandleRead方法绑定为Channel的读事件回调
        • 当监听套接字可读时(有新连接到达),会触发此回调

         HandleRead实现

           void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd);}
        • 接受新连接,获取新连接的文件描述符
        • 调用预设的_accept_callback处理新连接(即TcpServer::NewConnection)

        线程池创建:

        • 构造 pool(&_baseloop),传入主事件循环指针
        • 此时线程池只是初始化,没有创建工作线程,等待后续 SetThreadCount 和 Create 调用

        设置Acceptor回调函数

        _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));

        详细解析:

        回调绑定机制:

        • 使用 std::bind 创建函数对象,绑定 TcpServer::NewConnection 成员函数
        • this 指针作为第一个参数,确保回调能访问到TcpServer对象
        • std::placeholders::1 占位符表示将来自Acceptor的新连接fd传给NewConnection

        NewConnection函数职责:

        void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);conn->Established();_conns.insert(std::make_pair(_next_id, conn));
        }

        启动监听

        _acceptor.Listen();

        内部实现详解:

        Acceptor::Listen内部:

           void Listen() { _channel.EnableRead(); }
        • 调用Channel的EnableRead方法启动监听套接字的读事件监控

        Channel::EnableRead内部:

             void EnableRead() { _events |= EPOLLIN; Update(); }
        • 将EPOLLIN标志添加到_events中
        • 调用Update更新事件监控

        Channel::Update内部: 

             void Update() { return _loop->UpdateEvent(this); }
        • 调用EventLoop的UpdateEvent方法

        EventLoop::UpdateEvent内部:

        void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
        • 调用Poller的UpdateEvent方法

        Poller::UpdateEvent内部:

        void UpdateEvent(Channel *channel) {bool ret = HasChannel(channel);if (ret == false) {//不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);
        }

         

        • 检查Channel是否已在监控集合中
        • 如果不存在, 添加到_channels映射并调用epoll_ctl添加监控
        • 如果存在, 调用epoll_ctl修改监控事件

        Poller::Update内部:

        void Update(Channel *channel, int op) {int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0) {ERR_LOG("EPOLLCTL FAILED!");}return;
        }
        • 准备epoll_event结构
        • 调用epoll_ctl将监听套接字添加到epoll实例
        • 此时监听套接字的可读事件(有新连接)开始被监控

        启动服务器完整流程

        void Start() {_pool.Create();  // 创建线程池中的工作线程_baseloop.Start(); // 启动主事件循环
        }

         线程池创建:

        void Create() {if (_thread_count > 0) {_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++) {_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}return ;
        }
        • 线程池创建指定数量的LoopThread
        • 调用GetLoop获取并保存每个线程的EventLoop指针
        • GetLoop会等待线程创建并初始化EventLoop
        class LoopThread {
        private:std::mutex _mutex;          // 互斥锁std::condition_variable _cond;   // 条件变量EventLoop *_loop;       // EventLoop指针变量,这个对象需要在线程内实例化std::thread _thread;    // EventLoop对应的线程private:void ThreadEntry() {EventLoop loop;{std::unique_lock<std::mutex> lock(_mutex);//加锁_loop = &loop;_cond.notify_all();}loop.Start();}public:LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}EventLoop *GetLoop() {EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex);//加锁_cond.wait(lock, [&](){ return _loop != NULL; });//loop为NULL就一直阻塞loop = _loop;}return loop;}
        };

         线程创建:

           LoopThread():_loop(NULL), _thread(std::thread(&LoopThread::ThreadEntry, this)) {}
        • 构造函数初始化_loop为NULL
        • 创建线程,给这个线程绑定入口函数为ThreadEntry
        • 线程立即开始执行
           void ThreadEntry() {EventLoop loop;  // 在线程栈上创建EventLoop对象{std::unique_lock<std::mutex> lock(_mutex);  // 加锁_loop = &loop;  // 将指针指向线程栈上的EventLoop_cond.notify_all();  // 通知等待的线程}loop.Start();  // 开始事件循环}
        • 在线程内部创建EventLoop对象
        • 通过互斥锁保护设置_loop指针
        • 通知可能等待的GetLoop()调用
        • 启动事件循环

         获取EventLoop指针

           EventLoop *GetLoop() {EventLoop *loop = NULL;{std::unique_lock<std::mutex> lock(_mutex);  // 加锁_cond.wait(lock, [&](){ return _loop != NULL; });  // loop为NULL就一直阻塞loop = _loop;}return loop;}
        • 主线程调用此函数获取工作线程的EventLoop
        • 使用条件变量等待EventLoop创建完成
        • 返回EventLoop指针

        主事件循环启动:

        void Start() {while(1) {//1. 事件监控std::vector<Channel *> actives;_poller.Poll(&actives);//2. 事件处理for (auto &channel : actives) {channel->HandleEvent();}
        • 进入无限循环,监控事件、处理就绪事件、执行任务队列
        • 此时服务器完全启动,等待客户端连接 

        连接建立流程详解

        新连接到达流程

        新连接到达,监听套接字可读事件触发,

        Acceptor::HandleRead被调用

        void HandleRead() {int newfd = _socket.Accept();if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd);
        }
        • Poller检测到监听套接字可读(有新连接请求)
        • EventLoop的事件循环调用Channel::HandleEvent
        • Channel根据事件类型调用Acceptor::HandleRead
        • 调用Socket::Accept获取新连接的文件描述符
        • 触发_accept_callback即TcpServer::NewConnection

        Socket::Accept实现:

        int Accept() {// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);int newfd = accept(_sockfd, NULL, NULL);if (newfd < 0) {ERR_LOG("SOCKET ACCEPT FAILED!");return -1;}return newfd;
        }
        • 调用系统调用accept接受新连接
        • 返回新连接的文件描述符

        连接初始化详解

        1. TcpServer::NewConnection实现:
        void NewConnection(int fd) {_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);conn->Established();_conns.insert(std::make_pair(_next_id, conn));
        }
        1. 工作线程选择机制:
        EventLoop* NextLoop() {if (_thread_count == 0) {return _baseloop;}_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];
        }
        • 如果没有工作线程,使用主线程的EventLoop
        • 否则采用简单的轮询算法选择一个工作线程
        • 确保连接均匀分布在各个线程中
        1. Connection构造过程:
        Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
        }
        • 初始化连接ID和状态
        • 创建Socket和Channel对象管理连接
        • 设置Channel的各种事件回调
        1. 非活跃连接释放机制:
        void EnableInactiveRelease(int sec) {_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
        }void EnableInactiveReleaseInLoop(int sec) {_enable_inactive_release = true;if (_loop->HasTimer(_conn_id)) {return _loop->TimerRefresh(_conn_id);}_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
        }
        • 将操作封装为任务,确保在正确的线程中执行
        • 添加定时器任务,超时后自动释放连接
        • 使用连接ID作为定时器ID

        连接就绪详解

        1. Connection::Established实现:
        void Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
        }
        • 使用RunInLoop确保在连接所属的EventLoop线程中执行
        1. Connection::EstablishedInLoop实现:
        void EstablishedInLoop() {assert(_statu == CONNECTING);_statu = CONNECTED;_channel.EnableRead();if (_connected_callback) _connected_callback(shared_from_this());
        }
        • 断言确保当前状态为CONNECTING
        • 将状态更新为CONNECTED
        • 调用Channel::EnableRead启动读事件监控:
          void EnableRead() { _events |= EPOLLIN; Update(); }
        • 最后调用用户设置的连接建立回调
        1. shared_from_this机制:
        _connected_callback(shared_from_this())
        • Connection继承自std::enable_shared_from_this
        • shared_from_this()返回管理当前对象的shared_ptr
        • 确保回调中使用的Connection对象生命周期安全
        1. 回调函数调用:
        • 回调函数是在连接分配到的工作线程中执行的
        • 这确保了每个连接的所有操作都在同一个线程中进行
        • 避免了多线程并发访问导致的竞态条件
        1. 连接保存到管理表:
        _conns.insert(std::make_pair(_next_id, conn));
        • 将连接保存到TcpServer的_conns哈希表中
        • 使用连接ID作为键,便于后续查找和管理

        数据收发流程详解

        数据接收流程

        1. 可读事件触发,Connection::HandleRead被调用

        当客户端发送数据到服务器时,连接对应的socket变为可读状态,触发事件处理:

        // Channel::HandleEvent内部逻辑(事件分发)
        void HandleEvent() {if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {if (_read_callback) _read_callback();}// ...其他事件处理
        }
        • Poller检测到连接socket可读
        • EventLoop调用对应Channel的HandleEvent
        • Channel根据事件类型调用Connection::HandleRead

        Connection::HandleRead实现详解

        void HandleRead() {// 1. 接收socket的数据,放到缓冲区char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if (ret < 0) {// 出错了,不能直接关闭连接return ShutdownInLoop();}// 将数据放入输入缓冲区,写入之后顺便将写偏移向后移动_in_buffer.WriteAndPush(buf, ret);// 2. 调用message_callback进行业务处理if (_in_buffer.ReadAbleSize() > 0) {// shared_from_this--从当前对象自身获取自身的shared_ptr管理对象return _message_callback(shared_from_this(), &_in_buffer);}
        }

         Socket::NonBlockRecv实现

        ssize_t NonBlockRecv(void *buf, size_t len) {return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞
        }ssize_t Recv(void *buf, size_t len, int flag = 0) {// ssize_t recv(int sockfd, void *buf, size_t len, int flag);ssize_t ret = recv(_sockfd, buf, len, flag);if (ret <= 0) {// EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误// EINTR  表示当前socket的阻塞等待,被信号打断了if (errno == EAGAIN || errno == EINTR) {return 0; // 表示这次接收没有接收到数据}ERR_LOG("SOCKET RECV FAILED!!");return -1;}return ret; // 实际接收的数据长度
        }

         Buffer::WriteAndPush实现

        void WriteAndPush(const void *data, uint64_t len) {Write(data, len);MoveWriteOffset(len);
        }void Write(const void *data, uint64_t len) {// 1. 保证有足够空间,2. 拷贝数据进去if (len == 0) return;EnsureWriteSpace(len);const char *d = (const char *)data;std::copy(d, d + len, WritePosition());
        }

         调用用户消息回调

        _message_callback(shared_from_this(), &_in_buffer);
        • 传递连接管理对象和输入缓冲区
        • 在回显服务器示例中,对应EchoServer::OnMessage
        • 用户在回调中可以读取缓冲区数据并进行处理

         数据发送流程

        用户调用Connection::Send发送数据

        void Send(const char *data, size_t len) {Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
        }
        • 创建临时Buffer对象存储数据
        • 使用std::move优化性能,避免拷贝
        • 通过RunInLoop确保在正确的线程中执行SendInLoop

        Connection::SendInLoop实现

        void SendInLoop(Buffer &buf) {if (_statu == DISCONNECTED) return;_out_buffer.WriteBufferAndPush(buf);if (_channel.WriteAble() == false) {_channel.EnableWrite();}
        }
        • 检查连接状态,断开则不发送
        • 将数据写入输出缓冲区
        • 如果未启动写事件监控,则启动

        Channel::EnableWrite实现

        void EnableWrite() { _events |= EPOLLOUT; Update(); }
        • 添加EPOLLOUT事件到监控
        • 通过Update更新事件监控

        连接可写事件触发,Connection::HandleWrite被调用

        void HandleWrite() {// _out_buffer中保存的数据就是要发送的数据ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret < 0) {// 发送错误就该关闭连接了if (_in_buffer.ReadAbleSize() > 0) {_message_callback(shared_from_this(), &_in_buffer);}return Release(); // 这时候就是实际的关闭释放操作了}_out_buffer.MoveReadOffset(ret); // 千万不要忘了,将读偏移向后移动if (_out_buffer.ReadAbleSize() == 0) {_channel.DisableWrite(); // 没有数据待发送了,关闭写事件监控// 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放if (_statu == DISCONNECTING) {return Release();}}return;
        }

         Socket::NonBlockSend实现

        ssize_t NonBlockSend(void *buf, size_t len) {if (len == 0) return 0;return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞
        }ssize_t Send(const void *buf, size_t len, int flag = 0) {// ssize_t send(int sockfd, void *data, size_t len, int flag);ssize_t ret = send(_sockfd, buf, len, flag);if (ret < 0) {if (errno == EAGAIN || errno == EINTR) {return 0;}ERR_LOG("SOCKET SEND FAILED!!");return -1;}return ret; // 实际发送的数据长度
        }

        处理发送结果

        • 如果发送出错(ret < 0):
        • 处理剩余输入数据
        • 调用Release关闭连接
        • 如果发送成功:
        • 更新缓冲区读偏移(MoveReadOffset(ret))
        • 如果缓冲区为空,关闭写事件监控(DisableWrite())
        • 如果连接状态为DISCONNECTING且所有数据已发送,释放连接

         Channel::DisableWrite实现

        void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
        • 从事件标志中移除EPOLLOUT
        • 通过Update更新事件监控

        关于Http模块的流程

        HTTP请求的接收流程

        客户端发送HTTP请求

        • 数据通过TCP连接到达服务器。

        数据进入内核接收缓冲区

        • 操作系统内核接收数据,暂存在socket的接收缓冲区。

        muduo通过read()读取数据到in_buffer

        • muduo检测到socket可读事件,调用read()把数据读到in_buffer。

        HTTP解析器处理in_buffer

        • muduo的HTTP模块会不断检查in_buffer,尝试解析出完整的HTTP请求(包括请求行,请求头,请求体)。
        • 如果数据不完整(比如POST体还没收全),就继续等待数据到来。

        解析出完整请求后,调用用户回调

        • 一旦解析出完整的HTTP请求,muduo会调用你注册的HTTP请求处理回调(如onRequest),把HttpRequest对象传给你的业务代码。

        HTTP响应的发送流程

        你的业务代码生成HttpResponse

        • 你在回调里构造HttpResponse对象,设置响应内容、状态码、头部等。

        muduo把HttpResponse序列化到out_buffer

        • muduo会把HttpResponse对象序列化成HTTP协议格式的字符串,写入out_buffer。

        尝试write()到socket

        • muduo尝试把out_buffer的数据写入socket(内核发送缓冲区)。
        • 如果一次写不完,剩余数据继续留在out_buffer,等待下次socket可写时再发。

        客户端收到HTTP响应

        • 数据通过网络传输到客户端,客户端解析HTTP响应。

        首先是会创建个TCPserver对象_server,然后就是然后这个_server成员列表会创建个_baseloop对象,_acceptor对象,_pool对象,会先执行它们的默认构造函数,base_loop对象的成员列表中会创建poller对象,channel对象,timer_wheel对象

        模块初始化都干了什么事情

        Poller对象的创建

        EventLoop对象的创建

         Channel对象的创建

        Socket对象的创建

        Acceptor对象的创建

        TcpServer对象的创建

        LoopThreadPool对象的创建

        LoopThread对象的创建

        Timer_wheel对象的创建

        Connection对象的创建

        1. 连接触发监听套接字的可读事件

        客户端发起连接请求(connect),操作系统内核接收到SYN包,完成三次握手后,监听套接字变为可读状态(有新连接待接受)。

        2. Poller检测到事件

        在主事件循环(baseloop)中:

        - _poller.Poll(&actives)检测到监听套接字的可读事件

        • 将监听套接字对应的Channel添加到活跃Channel列表中
        3. EventLoop处理就绪事件

        在主事件循环的事件处理阶段:

        for (auto &channel : actives) {channel->HandleEvent();
        }
        • 调用监听套接字Channel的HandleEvent方法
        • Channel根据就绪事件类型调用相应的回调函数
        4. Acceptor的HandleRead方法被调用

        监听套接字的可读事件触发了Channel的读回调,即Acceptor::HandleRead:

        void Acceptor::HandleRead() {int newfd = _socket.Accept();  // 接受新连接if (newfd < 0) {return;}if (_accept_callback) _accept_callback(newfd);  // 调用接受连接回调
        }
        5. TcpServer::NewConnection方法被调用

        Acceptor的接受连接回调是TcpServer::NewConnection:

        void TcpServer::NewConnection(int fd) {_next_id++;  // 生成新的连接ID// 选择一个EventLoop来处理这个连接(负载均衡)EventLoop* loop = _pool.NextLoop();// 创建Connection对象管理这个连接PtrConnection conn(new Connection(loop, _next_id, fd));// 设置Connection的各种回调函数conn->SetMessageCallback(_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));// 如果启用了非活跃连接超时释放,设置超时时间if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);// 完成连接初始化conn->Established();// 将连接添加到管理表_conns.insert(std::make_pair(_next_id, conn));
        }
        6. Connection对象创建

        当执行new Connection(loop, _next_id, fd)时:

        • 初始化成员变量:连接ID、文件描述符、EventLoop指针等
        • 创建Socket对象包装文件描述符
        • 创建Channel对象关联到EventLoop和连接套接字
        • 设置Channel的各种事件回调函数
        • 连接状态设置为CONNECTING
        7. Connection::Established方法执行
        void Connection::Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
        }
        • 将EstablishedInLoop任务放入连接所属的EventLoop的任务队列
        • 这确保了连接的初始化在正确的线程中执行
        8. Connection::EstablishedInLoop方法执行

        在连接所属的EventLoop线程中:

        void Connection::EstablishedInLoop() {assert(_statu == CONNECTING);  // 确保当前状态是CONNECTING_statu = CONNECTED;  // 修改连接状态为CONNECTED// 启动读事件监控_channel.EnableRead();// 调用连接建立回调函数if (_connected_callback) _connected_callback(shared_from_this());
        }
        9. 启动读事件监控

        当执行_channel.EnableRead()时:

        void Channel::EnableRead() { _events |= EPOLLIN; Update(); 
        }void Channel::Update() {// 通过EventLoop更新事件监控_loop->UpdateEvent(this);
        }
        10. 更新Poller中的事件监控

        EventLoop::UpdateEvent调用Poller::UpdateEvent:

        void Poller::UpdateEvent(Channel *channel) {bool ret = HasChannel(channel);if (ret == false) {// 不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);
        }void Update(Channel *channel, int op) {int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0) {ERR_LOG("EPOLLCTL FAILED!!");}return;
        }
        • 将连接套接字添加到epoll的监控列表中
        • 设置监控的事件类型(EPOLLIN表示可读事件)
        11. 调用连接建立回调函数

        执行用户设置的连接建立回调函数:

        if (_connected_callback) _connected_callback(shared_from_this());
        • 将当前Connection的shared_ptr传递给回调函数
        • 用户可以在回调函数中处理连接建立事件
        12. 连接就绪,等待数据交互

        至此,新连接的处理流程完成:

        • 连接已被接受并分配了唯一ID
        • 连接被分配给了一个EventLoop进行管理
        • 连接的读事件监控已启动
        • 连接状态已设置为CONNECTED
        • 连接已添加到TcpServer的连接管理表中
        • 用户的连接建立回调函数已被调用

        连接现在处于就绪状态,可以进行数据收发。当有数据到达时,会触发连接套接字的可读事件,从而调用Connection::HandleRead方法处理数据。


        文章转载自:

        http://sqghtFpP.Lpbrp.cn
        http://LUCG3Ma7.Lpbrp.cn
        http://K6Kno36J.Lpbrp.cn
        http://l4Cy9u8f.Lpbrp.cn
        http://MNaHSFKR.Lpbrp.cn
        http://9lRrcpza.Lpbrp.cn
        http://7LoD192g.Lpbrp.cn
        http://1insf5lh.Lpbrp.cn
        http://C2Vu5ndA.Lpbrp.cn
        http://FqPaqINC.Lpbrp.cn
        http://cYEQUyuO.Lpbrp.cn
        http://s1hBy2W4.Lpbrp.cn
        http://6QPzoA8R.Lpbrp.cn
        http://xAI7rw1k.Lpbrp.cn
        http://bexrneBs.Lpbrp.cn
        http://PdB7eOxJ.Lpbrp.cn
        http://u3mopXDT.Lpbrp.cn
        http://ZbvK4T6N.Lpbrp.cn
        http://nCDmqGf2.Lpbrp.cn
        http://lEE7LcHa.Lpbrp.cn
        http://GGwpAaCS.Lpbrp.cn
        http://N7WmcVvL.Lpbrp.cn
        http://Fh8EjvUK.Lpbrp.cn
        http://XnyHr98u.Lpbrp.cn
        http://GaGw36ub.Lpbrp.cn
        http://gl3fppmG.Lpbrp.cn
        http://mkLQyTLa.Lpbrp.cn
        http://Hu1J2owa.Lpbrp.cn
        http://wWEtRNLM.Lpbrp.cn
        http://x11REQW7.Lpbrp.cn
        http://www.dtcms.com/wzjs/608829.html

        相关文章:

      1. 做网站为什么一定要留住用户一个前端页面多少钱
      2. 西安年网站建设小程序快速开发
      3. 广西网站建网站死链检测工具
      4. 信息企业网站建设的优势爱情链接
      5. 网站备案审核流程图网站建设來超速云建站
      6. 南江网站建设搜网站首页不见了seo
      7. 东莞优化网站关键词优化昆明市建设厅官方网站
      8. php网站开发实例pdfapp网站开发住房公积金
      9. 如何做网站排名优化学做网站要编程
      10. 企业网站模板源码wordpress负载均衡
      11. 做详情页不错的网站电子商务网站硬件需求
      12. 网站平台建设是什么安徽网站推广系统
      13. 在线看mv视频网站入口软件下载多种语言网站怎么做
      14. 一个网站开发流程图免费域名空间服务
      15. 在哪建网站磁力在线搜索引擎
      16. 网站开发 .net为企业做一个网站多少钱
      17. 网站怎么找百度培训
      18. 民制作网站哪家便宜建立个人网站代码
      19. 自己做公众号和小说网站推广荆门网站seo
      20. 如何选择网站营销公司网站上的验证码怎么做的
      21. 经营购物网站顺德品牌网站建设信息
      22. 兖州网站建设公司cod建站系统
      23. 徐州建筑网站做网站需要哪种工程师
      24. 苏州网络推广电话中山网站优化排名
      25. 匿名ip访问网站受限北京通州住房和城乡建设部网站
      26. 专业网站优化培训网站活动策划方案
      27. 行业网站定位专业网站开发服务
      28. 网站推广优化h1z1注册网站
      29. wordpress数据库版本号长沙百度网站推广优化
      30. 哪里查网站备案信息网站建设创新互联