当前位置: 首页 > news >正文

C++仿Muduo库Server服务器模块实现 基于Reactor模式的高性

C++仿Muduo库:Server服务器模块实现(上)

本文将介绍如何实现一个基于Reactor模式的高性能C++服务器模块,这是仿照Muduo网络库设计的核心组件。

设计思路

我们的Server模块采用经典的Reactor模式,主要包含以下核心组件:

  • EventLoop: 事件循环,负责事件分发

  • Acceptor: 连接接收器,处理新连接

  • Connection: 连接对象,管理单个TCP连接

  • ThreadPool: 线程池,实现多线程处理

  • Server: 服务器主类,整合所有组件

核心组件实现

1. EventLoop 事件循环

cpp

// EventLoop.h
#ifndef EVENTLOOP_H
#define EVENTLOOP_H#include <memory>
#include <vector>
#include <functional>
#include <mutex>
#include "Epoll.h"
#include "Channel.h"class EventLoop {
public:EventLoop();~EventLoop();void loop();void quit();void runInLoop(std::function<void()> cb);void queueInLoop(std::function<void()> cb);void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool isInLoopThread() const { return threadId_ == std::this_thread::get_id(); }private:void wakeup();void handleWakeup();void doPendingFunctors();bool looping_;bool quit_;bool callingPendingFunctors_;const std::thread::id threadId_;std::unique_ptr<Epoll> epoller_;int wakeupFd_;std::unique_ptr<Channel> wakeupChannel_;std::vector<std::function<void()>> pendingFunctors_;std::mutex mutex_;
};#endif

cpp

// EventLoop.cpp
#include "EventLoop.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <cassert>EventLoop::EventLoop() : looping_(false),quit_(false),callingPendingFunctors_(false),threadId_(std::this_thread::get_id()),epoller_(std::make_unique<Epoll>()),wakeupFd_(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)) {if (wakeupFd_ < 0) {// 错误处理}wakeupChannel_ = std::make_unique<Channel>(this, wakeupFd_);wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleWakeup, this));wakeupChannel_->enableReading();
}EventLoop::~EventLoop() {wakeupChannel_->disableAll();::close(wakeupFd_);
}void EventLoop::loop() {assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false;while (!quit_) {auto activeChannels = epoller_->poll();for (auto& channel : activeChannels) {channel->handleEvent();}doPendingFunctors();}looping_ = false;
}void EventLoop::quit() {quit_ = true;if (!isInLoopThread()) {wakeup();}
}void EventLoop::runInLoop(std::function<void()> cb) {if (isInLoopThread()) {cb();} else {queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(std::function<void()> cb) {{std::lock_guard<std::mutex> lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_) {wakeup();}
}void EventLoop::updateChannel(Channel* channel) {assert(channel->ownerLoop() == this);assertInLoopThread();epoller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel* channel) {assert(channel->ownerLoop() == this);assertInLoopThread();epoller_->removeChannel(channel);
}void EventLoop::wakeup() {uint64_t one = 1;ssize_t n = ::write(wakeupFd_, &one, sizeof(one));if (n != sizeof(one)) {// 错误处理}
}void EventLoop::handleWakeup() {uint64_t one;ssize_t n = ::read(wakeupFd_, &one, sizeof(one));if (n != sizeof(one)) {// 错误处理}
}void EventLoop::doPendingFunctors() {std::vector<std::function<void()>> functors;callingPendingFunctors_ = true;{std::lock_guard<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const auto& functor : functors) {functor();}callingPendingFunctors_ = false;
}

2. Acceptor 连接接收器

cpp

// Acceptor.h
#ifndef ACCEPTOR_H
#define ACCEPTOR_H#include <functional>
#include "Socket.h"
#include "Channel.h"class EventLoop;
class InetAddress;class Acceptor {
public:using NewConnectionCallback = std::function<void(int sockfd, const InetAddress&)>;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reusePort = true);~Acceptor();void setNewConnectionCallback(const NewConnectionCallback& cb) {newConnectionCallback_ = cb;}bool listening() const { return listening_; }void listen();private:void handleRead();EventLoop* loop_;Socket acceptSocket_;Channel acceptChannel_;NewConnectionCallback newConnectionCallback_;bool listening_;
};#endif

cpp

// Acceptor.cpp
#include "Acceptor.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <sys/socket.h>
#include <unistd.h>Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reusePort): loop_(loop),acceptSocket_(Socket::createNonblocking()),acceptChannel_(loop, acceptSocket_.fd()),listening_(false) {acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reusePort);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}Acceptor::~Acceptor() {acceptChannel_.disableAll();acceptChannel_.remove();
}void Acceptor::listen() {loop_->assertInLoopThread();listening_ = true;acceptSocket_.listen();acceptChannel_.enableReading();
}void Acceptor::handleRead() {loop_->assertInLoopThread();InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0) {if (newConnectionCallback_) {newConnectionCallback_(connfd, peerAddr);} else {::close(connfd);}} else {// 错误处理}
}

3. Connection 连接管理

cpp

// Connection.h
#ifndef CONNECTION_H
#define CONNECTION_H#include <memory>
#include <functional>
#include "Socket.h"
#include "Channel.h"
#include "Buffer.h"class EventLoop;
class InetAddress;class Connection : public std::enable_shared_from_this<Connection> {
public:using ConnectionCallback = std::function<void(const std::shared_ptr<Connection>&)>;using MessageCallback = std::function<void(const std::shared_ptr<Connection>&, Buffer*)>;using CloseCallback = std::function<void(const std::shared_ptr<Connection>&)>;Connection(EventLoop* loop, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr);~Connection();EventLoop* getLoop() const { return loop_; }const InetAddress& localAddress() const { return localAddr_; }const InetAddress& peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; }void connectEstablished();void connectDestroyed();void send(const std::string& message);void shutdown();private:enum State { kDisconnected, kConnecting, kConnected, kDisconnecting };void setState(State s) { state_ = s; }void handleRead();void handleWrite();void handleClose();void handleError();void sendInLoop(const std::string& message);void shutdownInLoop();EventLoop* loop_;Socket socket_;Channel channel_;InetAddress localAddr_;InetAddress peerAddr_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;CloseCallback closeCallback_;State state_;Buffer inputBuffer_;Buffer outputBuffer_;
};#endif

cpp

// Connection.cpp
#include "Connection.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>Connection::Connection(EventLoop* loop, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr): loop_(loop),socket_(sockfd),channel_(loop, sockfd),localAddr_(localAddr),peerAddr_(peerAddr),state_(kConnecting) {channel_.setReadCallback(std::bind(&Connection::handleRead, this));channel_.setWriteCallback(std::bind(&Connection::handleWrite, this));channel_.setCloseCallback(std::bind(&Connection::handleClose, this));channel_.setErrorCallback(std::bind(&Connection::handleError, this));}Connection::~Connection() {assert(state_ == kDisconnected);
}void Connection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);channel_.enableReading();if (connectionCallback_) {connectionCallback_(shared_from_this());}
}void Connection::connectDestroyed() {loop_->assertInLoopThread();if (state_ == kConnected) {setState(kDisconnected);channel_.disableAll();if (connectionCallback_) {connectionCallback_(shared_from_this());}}channel_.remove();
}void Connection::send(const std::string& message) {if (state_ == kConnected) {if (loop_->isInLoopThread()) {sendInLoop(message);} else {loop_->runInLoop(std::bind(&Connection::sendInLoop, this, message));}}
}void Connection::shutdown() {if (state_ == kConnected) {setState(kDisconnecting);loop_->runInLoop(std::bind(&Connection::shutdownInLoop, this));}
}void Connection::handleRead() {loop_->assertInLoopThread();int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_.fd(), &savedErrno);if (n > 0) {if (messageCallback_) {messageCallback_(shared_from_this(), &inputBuffer_);}} else if (n == 0) {handleClose();} else {// 错误处理}
}void Connection::handleWrite() {loop_->assertInLoopThread();if (channel_.isWriting()) {ssize_t n = ::write(channel_.fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());if (n > 0) {outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0) {channel_.disableWriting();if (state_ == kDisconnecting) {shutdownInLoop();}}} else {// 错误处理}}
}void Connection::handleClose() {loop_->assertInLoopThread();setState(kDisconnected);channel_.disableAll();std::shared_ptr<Connection> guardThis(shared_from_this());if (connectionCallback_) {connectionCallback_(guardThis);}if (closeCallback_) {closeCallback_(guardThis);}
}void Connection::handleError() {// 错误处理
}void Connection::sendInLoop(const std::string& message) {loop_->assertInLoopThread();ssize_t nwrote = 0;if (!channel_.isWriting() && outputBuffer_.readableBytes() == 0) {nwrote = ::write(channel_.fd(), message.data(), message.size());if (nwrote >= 0) {if (static_cast<size_t>(nwrote) < message.size()) {// 剩余数据添加到输出缓冲区}} else {// 错误处理return;}}if (static_cast<size_t>(nwrote) < message.size()) {outputBuffer_.append(message.data() + nwrote, message.size() - nwrote);if (!channel_.isWriting()) {channel_.enableWriting();}}
}void Connection::shutdownInLoop() {loop_->assertInLoopThread();if (!channel_.isWriting()) {socket_.shutdownWrite();}
}

4. Server 主类

cpp

// Server.h
#ifndef SERVER_H
#define SERVER_H#include <memory>
#include <map>
#include <atomic>
#include "Acceptor.h"
#include "EventLoop.h"
#include "EventLoopThreadPool.h"class InetAddress;class Server {
public:using ThreadInitCallback = std::function<void(EventLoop*)>;Server(EventLoop* loop, const InetAddress& listenAddr, const std::string& name = "");~Server();void setThreadNum(int numThreads);void setThreadInitCallback(const ThreadInitCallback& cb) {threadInitCallback_ = cb;}void start();void setConnectionCallback(const Connection::ConnectionCallback& cb) {connectionCallback_ = cb;}void setMessageCallback(const Connection::MessageCallback& cb) {messageCallback_ = cb;}private:void newConnection(int sockfd, const InetAddress& peerAddr);void removeConnection(const std::shared_ptr<Connection>& conn);void removeConnectionInLoop(const std::shared_ptr<Connection>& conn);using ConnectionMap = std::map<int, std::shared_ptr<Connection>>;EventLoop* baseLoop_;const std::string name_;std::unique_ptr<Acceptor> acceptor_;std::unique_ptr<EventLoopThreadPool> threadPool_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;ThreadInitCallback threadInitCallback_;std::atomic<bool> started_;int nextConnId_;ConnectionMap connections_;
};#endif

cpp

// Server.cpp
#include "Server.h"
#include "InetAddress.h"
#include "Connection.h"
#include <functional>Server::Server(EventLoop* loop, const InetAddress& listenAddr, const std::string& name): baseLoop_(loop),name_(name),acceptor_(std::make_unique<Acceptor>(loop, listenAddr)),threadPool_(std::make_unique<EventLoopThreadPool>(loop)),started_(false),nextConnId_(1) {acceptor_->setNewConnectionCallback(std::bind(&Server::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}Server::~Server() {baseLoop_->assertInLoopThread();// 清理所有连接
}void Server::setThreadNum(int numThreads) {threadPool_->setThreadNum(numThreads);
}void Server::start() {if (!started_.exchange(true)) {threadPool_->start(threadInitCallback_);baseLoop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));}
}void Server::newConnection(int sockfd, const InetAddress& peerAddr) {baseLoop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();std::string connName = name_ + "-" + std::to_string(nextConnId_++);auto conn = std::make_shared<Connection>(ioLoop, sockfd, acceptor_->getLocalAddress(), peerAddr);connections_[sockfd] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setCloseCallback(std::bind(&Server::removeConnection, this, std::placeholders::_1));ioLoop->runInLoop(std::bind(&Connection::connectEstablished, conn));
}void Server::removeConnection(const std::shared_ptr<Connection>& conn) {baseLoop_->runInLoop(std::bind(&Server::removeConnectionInLoop, this, conn));
}void Server::removeConnectionInLoop(const std::shared_ptr<Connection>& conn) {baseLoop_->assertInLoopThread();size_t n = connections_.erase(conn->socket().fd());if (n != 1) {// 错误处理}EventLoop* ioLoop = conn->getLoop();ioLoop->queueInLoop(std::bind(&Connection::connectDestroyed, conn));
}

关键特性说明

1. 线程安全的事件循环

  • 每个EventLoop绑定一个线程

  • 跨线程调用通过wakeup fd和pending functors实现

  • 确保所有IO操作在正确的线程中执行

2. 高效的缓冲区设计

  • 使用readv/scatter-gather IO提高读取效率

  • 自动扩容机制避免频繁内存分配

  • 零拷贝优化减少数据移动

3. 智能的资源管理

  • 使用shared_ptr管理Connection生命周期

  • RAII模式确保资源正确释放

  • 防止悬空指针和内存泄漏

4. 灵活的回调机制

  • 支持用户自定义各种事件回调

  • 类型安全的函数包装

  • 便于扩展和定制

使用示例

cpp

#include "Server.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <iostream>void onConnection(const std::shared_ptr<Connection>& conn) {if (conn->connected()) {std::cout << "New connection from " << conn->peerAddress().toIpPort() << std::endl;} else {std::cout << "Connection closed" << std::endl;}
}void onMessage(const std::shared_ptr<Connection>& conn, Buffer* buf) {std::string msg(buf->retrieveAllAsString());std::cout << "Received: " << msg << std::endl;conn->send("Echo: " + msg);
}int main() {EventLoop loop;InetAddress listenAddr(8888);Server server(&loop, listenAddr, "TestServer");server.setConnectionCallback(onConnection);server.setMessageCallback(onMessage);server.setThreadNum(4);server.start();loop.loop();return 0;
}

总结

本文实现了Server模块的核心组件,包括:

  • 线程安全的事件循环机制

  • 高效的连接接收和管理

  • 多线程支持

  • 完整的生命周期管理

在下一篇文章中,我们将继续完善这个服务器模块,添加定时器、日志记录、HTTP协议支持等高级功能。

http://www.dtcms.com/a/536870.html

相关文章:

  • 对IDC(数据中心)运维了解
  • Hyperopt 强大的分布式参数优化框架全解析
  • 网站都必须要备案吗建设一个视频网站首页
  • 前端页面连接后端fastapi实现模型本地部署和open ai接入
  • 中国空间站设计在轨飞行几年旅游网站建设ppt模板下载
  • HR4985微特步进电机驱动器:便捷与高效的完美融合
  • 广州外贸网站制作报名小程序怎么制作
  • 采用 Trie 树结合 RoaringBitmap 技术,构建高效的子串倒排索引
  • 网站建设分工明细表北京快三是官方的吗
  • JMeter:一个简单的测试计划怎么做?
  • VR仿真工业培训软件怎么用?燃气管道仿真实训案例解析
  • wordpress菜单分列顺义网站优化
  • 免费域名的网站九洲建设app
  • 效率工具(小黄鸟Reqable)批量提取小程序商城商品名称价格等信息
  • Shell脚本判断服务器SSH免密是否配置完成
  • MySQL查看服务器/客户端版本
  • express脚手架express-generator
  • 服务器受到网络攻击该怎么办
  • 跨平台渲染不再难_瑞云渲染跨平台转移+克隆双功能上线,效率升级
  • 网站后台添加新闻wordpress获取指定分类的描述
  • 免费制作永久网站邯郸中国建设银行网站
  • 中断服务程序(ISR)与主循环共享变量时,访问冲突(数据竞争)如何解决
  • 西部数码网站流量怎么充简易网站开发时长
  • FFmpeg 基本数据结构 AVFrame分析
  • Kafka 相关内容总结
  • 霍邱网站设计10000ip网站怎么做
  • C++ 从零实现Json-Rpc 框架
  • 29. Makefile 创建和使用变量
  • Docker 安装和配置 Redis 完整指南
  • 高效对象池设计:提升Unity性能的关键