当前位置: 首页 > news >正文

基于Reactor模式的高性能C++仿Muduo库:Server服务器模块实现

本文将介绍如何实现一个基于Reactor模式的高性能C++服务器模块,这是仿照Muduo网络库设计的核心组件。

设计思路

我们的Server模块采用经典的Reactor模式,主要包含以下核心组件:

  • EventLoop: 事件循环,负责事件分发

  • Acceptor: 连接接收器,处理新连接

  • Connection: 连接对象,管理单个TCP连接

  • ThreadPool: 线程池,实现多线程处理

  • Server: 服务器主类,整合所有组件

核心组件实现

1. EventLoop 事件循环

cpp

// EventLoop.h
#ifndef EVENTLOOP_H
#define EVENTLOOP_H#include <memory>
#include <vector>
#include <functional>
#include <mutex>
#include "Epoll.h"
#include "Channel.h"class EventLoop {
public:EventLoop();~EventLoop();void loop();void quit();void runInLoop(std::function<void()> cb);void queueInLoop(std::function<void()> cb);void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool isInLoopThread() const { return threadId_ == std::this_thread::get_id(); }private:void wakeup();void handleWakeup();void doPendingFunctors();bool looping_;bool quit_;bool callingPendingFunctors_;const std::thread::id threadId_;std::unique_ptr<Epoll> epoller_;int wakeupFd_;std::unique_ptr<Channel> wakeupChannel_;std::vector<std::function<void()>> pendingFunctors_;std::mutex mutex_;
};#endif

cpp

// EventLoop.cpp
#include "EventLoop.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <cassert>EventLoop::EventLoop() : looping_(false),quit_(false),callingPendingFunctors_(false),threadId_(std::this_thread::get_id()),epoller_(std::make_unique<Epoll>()),wakeupFd_(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)) {if (wakeupFd_ < 0) {// 错误处理}wakeupChannel_ = std::make_unique<Channel>(this, wakeupFd_);wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleWakeup, this));wakeupChannel_->enableReading();
rd.xjyl.gov.cn/upload/1982074951388594176.html
rd.xjyl.gov.cn/upload/1982074951422148608.html
rd.xjyl.gov.cn/upload/1982074951539589120.html
rd.xjyl.gov.cn/upload/1982074951594115072.html
rd.xjyl.gov.cn/upload/1982074951652835328.html
rd.xjyl.gov.cn/upload/1982074951673806848.html
rd.xjyl.gov.cn/upload/1982074951921270784.html
rd.xjyl.gov.cn/upload/1982074951917076480.html
rd.xjyl.gov.cn/upload/1982074951946436608.html
rd.xjyl.gov.cn/upload/1982074952038711296.html
rd.xjyl.gov.cn/upload/1982074952017739776.html
rd.xjyl.gov.cn/upload/1982074952294563840.html
rd.xjyl.gov.cn/upload/1982074952508473344.html
rd.xjyl.gov.cn/upload/1982074952546222080.html
rd.xjyl.gov.cn/upload/1982074952680439808.html
rd.xjyl.gov.cn/upload/1982074952659468288.html
rd.xjyl.gov.cn/upload/1982074952856600576.html
rd.xjyl.gov.cn/upload/1982074952919515136.html
rd.xjyl.gov.cn/upload/1982074953087287296.html
rd.xjyl.gov.cn/upload/1982074953129230336.html
rd.xjyl.gov.cn/upload/1982074953183756288.html
}EventLoop::~EventLoop() {wakeupChannel_->disableAll();::close(wakeupFd_);
}void EventLoop::loop() {assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false;while (!quit_) {auto activeChannels = epoller_->poll();for (auto& channel : activeChannels) {channel->handleEvent();}doPendingFunctors();}looping_ = false;
}void EventLoop::quit() {quit_ = true;if (!isInLoopThread()) {wakeup();}
}void EventLoop::runInLoop(std::function<void()> cb) {if (isInLoopThread()) {cb();} else {queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(std::function<void()> cb) {{std::lock_guard<std::mutex> lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_) {wakeup();}
}void EventLoop::updateChannel(Channel* channel) {assert(channel->ownerLoop() == this);assertInLoopThread();epoller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel* channel) {assert(channel->ownerLoop() == this);assertInLoopThread();epoller_->removeChannel(channel);
}void EventLoop::wakeup() {uint64_t one = 1;ssize_t n = ::write(wakeupFd_, &one, sizeof(one));if (n != sizeof(one)) {// 错误处理}
}void EventLoop::handleWakeup() {uint64_t one;ssize_t n = ::read(wakeupFd_, &one, sizeof(one));if (n != sizeof(one)) {// 错误处理}
}void EventLoop::doPendingFunctors() {std::vector<std::function<void()>> functors;callingPendingFunctors_ = true;{std::lock_guard<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const auto& functor : functors) {functor();}callingPendingFunctors_ = false;
}

2. Acceptor 连接接收器

cpp

// Acceptor.h
#ifndef ACCEPTOR_H
#define ACCEPTOR_H#include <functional>
#include "Socket.h"
#include "Channel.h"class EventLoop;
class InetAddress;class Acceptor {
public:using NewConnectionCallback = std::function<void(int sockfd, const InetAddress&)>;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reusePort = true);~Acceptor();void setNewConnectionCallback(const NewConnectionCallback& cb) {newConnectionCallback_ = cb;}bool listening() const { return listening_; }void listen();private:void handleRead();EventLoop* loop_;Socket acceptSocket_;Channel acceptChannel_;NewConnectionCallback newConnectionCallback_;bool listening_;
};#endif

cpp

// Acceptor.cpp
#include "Acceptor.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <sys/socket.h>
#include <unistd.h>Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reusePort): loop_(loop),acceptSocket_(Socket::createNonblocking()),acceptChannel_(loop, acceptSocket_.fd()),listening_(false) {acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reusePort);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}Acceptor::~Acceptor() {acceptChannel_.disableAll();acceptChannel_.remove();
}void Acceptor::listen() {loop_->assertInLoopThread();listening_ = true;acceptSocket_.listen();acceptChannel_.enableReading();
}void Acceptor::handleRead() {loop_->assertInLoopThread();InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0) {if (newConnectionCallback_) {newConnectionCallback_(connfd, peerAddr);} else {::close(connfd);}} else {// 错误处理}
}

3. Connection 连接管理

cpp

// Connection.h
#ifndef CONNECTION_H
#define CONNECTION_H#include <memory>
#include <functional>
#include "Socket.h"
#include "Channel.h"
#include "Buffer.h"class EventLoop;
class InetAddress;class Connection : public std::enable_shared_from_this<Connection> {
public:using ConnectionCallback = std::function<void(const std::shared_ptr<Connection>&)>;using MessageCallback = std::function<void(const std::shared_ptr<Connection>&, Buffer*)>;using CloseCallback = std::function<void(const std::shared_ptr<Connection>&)>;Connection(EventLoop* loop, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr);~Connection();EventLoop* getLoop() const { return loop_; }const InetAddress& localAddress() const { return localAddr_; }const InetAddress& peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; }void connectEstablished();void connectDestroyed();void send(const std::string& message);void shutdown();private:enum State { kDisconnected, kConnecting, kConnected, kDisconnecting };void setState(State s) { state_ = s; }void handleRead();void handleWrite();void handleClose();void handleError();void sendInLoop(const std::string& message);void shutdownInLoop();EventLoop* loop_;Socket socket_;Channel channel_;InetAddress localAddr_;InetAddress peerAddr_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;CloseCallback closeCallback_;State state_;Buffer inputBuffer_;Buffer outputBuffer_;
};#endif

cpp

// Connection.cpp
#include "Connection.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>Connection::Connection(EventLoop* loop, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr): loop_(loop),socket_(sockfd),channel_(loop, sockfd),localAddr_(localAddr),peerAddr_(peerAddr),state_(kConnecting) {channel_.setReadCallback(std::bind(&Connection::handleRead, this));channel_.setWriteCallback(std::bind(&Connection::handleWrite, this));channel_.setCloseCallback(std::bind(&Connection::handleClose, this));channel_.setErrorCallback(std::bind(&Connection::handleError, this));
rd.xjyl.gov.cn/upload/1982074953204727808.html
rd.xjyl.gov.cn/upload/1982074953313779712.html
rd.xjyl.gov.cn/upload/1982074953343139840.html
rd.xjyl.gov.cn/upload/1982074953347334144.html
rd.xjyl.gov.cn/upload/1982074953582215168.html
rd.xjyl.gov.cn/upload/1982074953598992384.html
rd.xjyl.gov.cn/upload/1982074953649324032.html
rd.xjyl.gov.cn/upload/1982074953716432896.html
rd.xjyl.gov.cn/upload/1982074953758375936.html
rd.xjyl.gov.cn/upload/1982074953796124672.html
rd.xjyl.gov.cn/upload/1982074953800318976.html
rd.xjyl.gov.cn/upload/1982074953833873408.html
rd.xjyl.gov.cn/upload/1982074953838067712.html
rd.xjyl.gov.cn/upload/1982074954005839872.html
rd.xjyl.gov.cn/upload/1982074954010034176.html
rd.xjyl.gov.cn/upload/1982074954144251904.html
rd.xjyl.gov.cn/upload/1982074954219749376.html
rd.xjyl.gov.cn/upload/1982074954278469633.html
rd.xjyl.gov.cn/upload/1982074954282663936.html
rd.xjyl.gov.cn/upload/1982074954291052544.html
rd.xjyl.gov.cn/upload/1982074954278469632.html
rd.xjyl.gov.cn/upload/1982074954442047488.html
}Connection::~Connection() {assert(state_ == kDisconnected);
}void Connection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);channel_.enableReading();if (connectionCallback_) {connectionCallback_(shared_from_this());}
}void Connection::connectDestroyed() {loop_->assertInLoopThread();if (state_ == kConnected) {setState(kDisconnected);channel_.disableAll();if (connectionCallback_) {connectionCallback_(shared_from_this());}}channel_.remove();
}void Connection::send(const std::string& message) {if (state_ == kConnected) {if (loop_->isInLoopThread()) {sendInLoop(message);} else {loop_->runInLoop(std::bind(&Connection::sendInLoop, this, message));}}
}void Connection::shutdown() {if (state_ == kConnected) {setState(kDisconnecting);loop_->runInLoop(std::bind(&Connection::shutdownInLoop, this));}
}void Connection::handleRead() {loop_->assertInLoopThread();int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_.fd(), &savedErrno);if (n > 0) {if (messageCallback_) {messageCallback_(shared_from_this(), &inputBuffer_);}} else if (n == 0) {handleClose();} else {// 错误处理}
}void Connection::handleWrite() {loop_->assertInLoopThread();if (channel_.isWriting()) {ssize_t n = ::write(channel_.fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());if (n > 0) {outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0) {channel_.disableWriting();if (state_ == kDisconnecting) {shutdownInLoop();}}} else {// 错误处理}}
}void Connection::handleClose() {loop_->assertInLoopThread();setState(kDisconnected);channel_.disableAll();std::shared_ptr<Connection> guardThis(shared_from_this());if (connectionCallback_) {connectionCallback_(guardThis);}if (closeCallback_) {closeCallback_(guardThis);}
}void Connection::handleError() {// 错误处理
}void Connection::sendInLoop(const std::string& message) {loop_->assertInLoopThread();ssize_t nwrote = 0;if (!channel_.isWriting() && outputBuffer_.readableBytes() == 0) {nwrote = ::write(channel_.fd(), message.data(), message.size());if (nwrote >= 0) {if (static_cast<size_t>(nwrote) < message.size()) {// 剩余数据添加到输出缓冲区}} else {// 错误处理return;}}if (static_cast<size_t>(nwrote) < message.size()) {outputBuffer_.append(message.data() + nwrote, message.size() - nwrote);if (!channel_.isWriting()) {channel_.enableWriting();}}
}void Connection::shutdownInLoop() {loop_->assertInLoopThread();if (!channel_.isWriting()) {socket_.shutdownWrite();}
}

4. Server 主类

cpp

// Server.h
#ifndef SERVER_H
#define SERVER_H#include <memory>
#include <map>
#include <atomic>
#include "Acceptor.h"
#include "EventLoop.h"
#include "EventLoopThreadPool.h"class InetAddress;class Server {
public:using ThreadInitCallback = std::function<void(EventLoop*)>;Server(EventLoop* loop, const InetAddress& listenAddr, const std::string& name = "");~Server();void setThreadNum(int numThreads);void setThreadInitCallback(const ThreadInitCallback& cb) {threadInitCallback_ = cb;}void start();void setConnectionCallback(const Connection::ConnectionCallback& cb) {connectionCallback_ = cb;}void setMessageCallback(const Connection::MessageCallback& cb) {messageCallback_ = cb;}private:void newConnection(int sockfd, const InetAddress& peerAddr);void removeConnection(const std::shared_ptr<Connection>& conn);void removeConnectionInLoop(const std::shared_ptr<Connection>& conn);using ConnectionMap = std::map<int, std::shared_ptr<Connection>>;EventLoop* baseLoop_;const std::string name_;std::unique_ptr<Acceptor> acceptor_;std::unique_ptr<EventLoopThreadPool> threadPool_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;ThreadInitCallback threadInitCallback_;std::atomic<bool> started_;int nextConnId_;ConnectionMap connections_;
};#endif

cpp

// Server.cpp
#include "Server.h"
#include "InetAddress.h"
#include "Connection.h"
#include <functional>Server::Server(EventLoop* loop, const InetAddress& listenAddr, const std::string& name): baseLoop_(loop),name_(name),acceptor_(std::make_unique<Acceptor>(loop, listenAddr)),threadPool_(std::make_unique<EventLoopThreadPool>(loop)),started_(false),nextConnId_(1) {acceptor_->setNewConnectionCallback(std::bind(&Server::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}Server::~Server() {baseLoop_->assertInLoopThread();// 清理所有连接
}void Server::setThreadNum(int numThreads) {threadPool_->setThreadNum(numThreads);
}void Server::start() {if (!started_.exchange(true)) {threadPool_->start(threadInitCallback_);baseLoop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));}
}void Server::newConnection(int sockfd, const InetAddress& peerAddr) {baseLoop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();std::string connName = name_ + "-" + std::to_string(nextConnId_++);auto conn = std::make_shared<Connection>(ioLoop, sockfd, acceptor_->getLocalAddress(), peerAddr);connections_[sockfd] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setCloseCallback(std::bind(&Server::removeConnection, this, std::placeholders::_1));ioLoop->runInLoop(std::bind(&Connection::connectEstablished, conn));
}void Server::removeConnection(const std::shared_ptr<Connection>& conn) {baseLoop_->runInLoop(std::bind(&Server::removeConnectionInLoop, this, conn));
}void Server::removeConnectionInLoop(const std::shared_ptr<Connection>& conn) {baseLoop_->assertInLoopThread();size_t n = connections_.erase(conn->socket().fd());if (n != 1) {// 错误处理}EventLoop* ioLoop = conn->getLoop();ioLoop->queueInLoop(std::bind(&Connection::connectDestroyed, conn));
}

关键特性说明

1. 线程安全的事件循环

  • 每个EventLoop绑定一个线程

  • 跨线程调用通过wakeup fd和pending functors实现

  • 确保所有IO操作在正确的线程中执行

2. 高效的缓冲区设计

  • 使用readv/scatter-gather IO提高读取效率

  • 自动扩容机制避免频繁内存分配

  • 零拷贝优化减少数据移动

3. 智能的资源管理

  • 使用shared_ptr管理Connection生命周期

  • RAII模式确保资源正确释放

  • 防止悬空指针和内存泄漏

4. 灵活的回调机制

  • 支持用户自定义各种事件回调

  • 类型安全的函数包装

  • 便于扩展和定制

使用示例

cpp

#include "Server.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <iostream>void onConnection(const std::shared_ptr<Connection>& conn) {if (conn->connected()) {std::cout << "New connection from " << conn->peerAddress().toIpPort() << std::endl;} else {std::cout << "Connection closed" << std::endl;}
}void onMessage(const std::shared_ptr<Connection>& conn, Buffer* buf) {std::string msg(buf->retrieveAllAsString());std::cout << "Received: " << msg << std::endl;conn->send("Echo: " + msg);
}int main() {EventLoop loop;InetAddress listenAddr(8888);Server server(&loop, listenAddr, "TestServer");server.setConnectionCallback(onConnection);server.setMessageCallback(onMessage);server.setThreadNum(4);server.start();loop.loop();return 0;
}

总结

本文实现了Server模块的核心组件,包括:

  • 线程安全的事件循环机制

  • 高效的连接接收和管理

  • 多线程支持

  • 完整的生命周期管理

在下一篇文章中,我们将继续完善这个服务器模块,添加定时器、日志记录、HTTP协议支持等高级功能。

http://www.dtcms.com/a/531798.html

相关文章:

  • 常州市网站建设设计公众号开发和小程序开发哪个简单
  • 【Android】DrawerLayout实现侧边导航栏
  • 缓存查询逻辑及问题解决
  • 襄阳网站seo公司江津网站建设口碑
  • 【中望3D 2025】配置【vs2022】开发环境
  • 基于定制开发开源AI智能名片S2B2C商城小程序的全方位种草模式研究
  • 实现Callable接口(了解即可)
  • 从入门到实操:贝叶斯分析完整技术步骤与核心R包指南
  • 做理财的网站有哪些内容长春一般建一个网站需要多少钱
  • C#开发后端:API 控制器(Controller)
  • 建湖人才网招工湛江怎么做网站关键词优化
  • 深入理解 Flink SQL 状态:原理、应用与优化
  • Product Hunt 每日热榜 | 2025-10-26
  • Java的语法与Python进行对比学习
  • 【MCAL实战】CanTrcv模块配置实践
  • coco 可视化 txt版
  • idea字体的问题(idea应用本身的字体问题)
  • 计算机操作系统 — 链接
  • 网站图片加altwordpress前端库加速
  • 在linux上使用docker搭建ELK日志框架
  • Docker 应该如何学习 分四个阶段
  • 面试过程中的扣分项,你踩过几个?
  • 中牟高端网站建设专做户外装备测评视频网站
  • CSS属性(二)
  • 2011年下半年试题四:论软件需求获取技术及应用
  • Mujoco 仿真 PPO 强化学习机械臂末端路径规划到达指定位置(代码讲解)
  • 【C#】EventHandler的使用
  • C++ 实际应用系列(第六部分):并发系统的性能优化与工程实践(完)
  • 上市公司网站建设分析wordpress 转 app
  • Prometheus+Grafana 智能监控告警系统(服务器指标采集、mysql指标采集)