基于Reactor模式的高性能C++仿Muduo库:Server服务器模块实现
本文将介绍如何实现一个基于Reactor模式的高性能C++服务器模块,这是仿照Muduo网络库设计的核心组件。
设计思路
我们的Server模块采用经典的Reactor模式,主要包含以下核心组件:
EventLoop: 事件循环,负责事件分发Acceptor: 连接接收器,处理新连接Connection: 连接对象,管理单个TCP连接ThreadPool: 线程池,实现多线程处理Server: 服务器主类,整合所有组件
核心组件实现
1. EventLoop 事件循环
cpp
// EventLoop.h
#ifndef EVENTLOOP_H
#define EVENTLOOP_H#include <memory>
#include <vector>
#include <functional>
#include <mutex>
#include "Epoll.h"
#include "Channel.h"class EventLoop {
public:EventLoop();~EventLoop();void loop();void quit();void runInLoop(std::function<void()> cb);void queueInLoop(std::function<void()> cb);void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool isInLoopThread() const { return threadId_ == std::this_thread::get_id(); }private:void wakeup();void handleWakeup();void doPendingFunctors();bool looping_;bool quit_;bool callingPendingFunctors_;const std::thread::id threadId_;std::unique_ptr<Epoll> epoller_;int wakeupFd_;std::unique_ptr<Channel> wakeupChannel_;std::vector<std::function<void()>> pendingFunctors_;std::mutex mutex_;
};#endifcpp
// EventLoop.cpp
#include "EventLoop.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <cassert>EventLoop::EventLoop() : looping_(false),quit_(false),callingPendingFunctors_(false),threadId_(std::this_thread::get_id()),epoller_(std::make_unique<Epoll>()),wakeupFd_(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)) {if (wakeupFd_ < 0) {// 错误处理}wakeupChannel_ = std::make_unique<Channel>(this, wakeupFd_);wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleWakeup, this));wakeupChannel_->enableReading();
rd.xjyl.gov.cn/upload/1982074951388594176.html
rd.xjyl.gov.cn/upload/1982074951422148608.html
rd.xjyl.gov.cn/upload/1982074951539589120.html
rd.xjyl.gov.cn/upload/1982074951594115072.html
rd.xjyl.gov.cn/upload/1982074951652835328.html
rd.xjyl.gov.cn/upload/1982074951673806848.html
rd.xjyl.gov.cn/upload/1982074951921270784.html
rd.xjyl.gov.cn/upload/1982074951917076480.html
rd.xjyl.gov.cn/upload/1982074951946436608.html
rd.xjyl.gov.cn/upload/1982074952038711296.html
rd.xjyl.gov.cn/upload/1982074952017739776.html
rd.xjyl.gov.cn/upload/1982074952294563840.html
rd.xjyl.gov.cn/upload/1982074952508473344.html
rd.xjyl.gov.cn/upload/1982074952546222080.html
rd.xjyl.gov.cn/upload/1982074952680439808.html
rd.xjyl.gov.cn/upload/1982074952659468288.html
rd.xjyl.gov.cn/upload/1982074952856600576.html
rd.xjyl.gov.cn/upload/1982074952919515136.html
rd.xjyl.gov.cn/upload/1982074953087287296.html
rd.xjyl.gov.cn/upload/1982074953129230336.html
rd.xjyl.gov.cn/upload/1982074953183756288.html
}EventLoop::~EventLoop() {wakeupChannel_->disableAll();::close(wakeupFd_);
}void EventLoop::loop() {assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false;while (!quit_) {auto activeChannels = epoller_->poll();for (auto& channel : activeChannels) {channel->handleEvent();}doPendingFunctors();}looping_ = false;
}void EventLoop::quit() {quit_ = true;if (!isInLoopThread()) {wakeup();}
}void EventLoop::runInLoop(std::function<void()> cb) {if (isInLoopThread()) {cb();} else {queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(std::function<void()> cb) {{std::lock_guard<std::mutex> lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_) {wakeup();}
}void EventLoop::updateChannel(Channel* channel) {assert(channel->ownerLoop() == this);assertInLoopThread();epoller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel* channel) {assert(channel->ownerLoop() == this);assertInLoopThread();epoller_->removeChannel(channel);
}void EventLoop::wakeup() {uint64_t one = 1;ssize_t n = ::write(wakeupFd_, &one, sizeof(one));if (n != sizeof(one)) {// 错误处理}
}void EventLoop::handleWakeup() {uint64_t one;ssize_t n = ::read(wakeupFd_, &one, sizeof(one));if (n != sizeof(one)) {// 错误处理}
}void EventLoop::doPendingFunctors() {std::vector<std::function<void()>> functors;callingPendingFunctors_ = true;{std::lock_guard<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const auto& functor : functors) {functor();}callingPendingFunctors_ = false;
}2. Acceptor 连接接收器
cpp
// Acceptor.h
#ifndef ACCEPTOR_H
#define ACCEPTOR_H#include <functional>
#include "Socket.h"
#include "Channel.h"class EventLoop;
class InetAddress;class Acceptor {
public:using NewConnectionCallback = std::function<void(int sockfd, const InetAddress&)>;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reusePort = true);~Acceptor();void setNewConnectionCallback(const NewConnectionCallback& cb) {newConnectionCallback_ = cb;}bool listening() const { return listening_; }void listen();private:void handleRead();EventLoop* loop_;Socket acceptSocket_;Channel acceptChannel_;NewConnectionCallback newConnectionCallback_;bool listening_;
};#endifcpp
// Acceptor.cpp
#include "Acceptor.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <sys/socket.h>
#include <unistd.h>Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reusePort): loop_(loop),acceptSocket_(Socket::createNonblocking()),acceptChannel_(loop, acceptSocket_.fd()),listening_(false) {acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reusePort);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}Acceptor::~Acceptor() {acceptChannel_.disableAll();acceptChannel_.remove();
}void Acceptor::listen() {loop_->assertInLoopThread();listening_ = true;acceptSocket_.listen();acceptChannel_.enableReading();
}void Acceptor::handleRead() {loop_->assertInLoopThread();InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0) {if (newConnectionCallback_) {newConnectionCallback_(connfd, peerAddr);} else {::close(connfd);}} else {// 错误处理}
}3. Connection 连接管理
cpp
// Connection.h
#ifndef CONNECTION_H
#define CONNECTION_H#include <memory>
#include <functional>
#include "Socket.h"
#include "Channel.h"
#include "Buffer.h"class EventLoop;
class InetAddress;class Connection : public std::enable_shared_from_this<Connection> {
public:using ConnectionCallback = std::function<void(const std::shared_ptr<Connection>&)>;using MessageCallback = std::function<void(const std::shared_ptr<Connection>&, Buffer*)>;using CloseCallback = std::function<void(const std::shared_ptr<Connection>&)>;Connection(EventLoop* loop, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr);~Connection();EventLoop* getLoop() const { return loop_; }const InetAddress& localAddress() const { return localAddr_; }const InetAddress& peerAddress() const { return peerAddr_; }bool connected() const { return state_ == kConnected; }void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; }void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; }void connectEstablished();void connectDestroyed();void send(const std::string& message);void shutdown();private:enum State { kDisconnected, kConnecting, kConnected, kDisconnecting };void setState(State s) { state_ = s; }void handleRead();void handleWrite();void handleClose();void handleError();void sendInLoop(const std::string& message);void shutdownInLoop();EventLoop* loop_;Socket socket_;Channel channel_;InetAddress localAddr_;InetAddress peerAddr_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;CloseCallback closeCallback_;State state_;Buffer inputBuffer_;Buffer outputBuffer_;
};#endifcpp
// Connection.cpp
#include "Connection.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <sys/socket.h>
#include <unistd.h>
#include <cassert>Connection::Connection(EventLoop* loop, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr): loop_(loop),socket_(sockfd),channel_(loop, sockfd),localAddr_(localAddr),peerAddr_(peerAddr),state_(kConnecting) {channel_.setReadCallback(std::bind(&Connection::handleRead, this));channel_.setWriteCallback(std::bind(&Connection::handleWrite, this));channel_.setCloseCallback(std::bind(&Connection::handleClose, this));channel_.setErrorCallback(std::bind(&Connection::handleError, this));
rd.xjyl.gov.cn/upload/1982074953204727808.html
rd.xjyl.gov.cn/upload/1982074953313779712.html
rd.xjyl.gov.cn/upload/1982074953343139840.html
rd.xjyl.gov.cn/upload/1982074953347334144.html
rd.xjyl.gov.cn/upload/1982074953582215168.html
rd.xjyl.gov.cn/upload/1982074953598992384.html
rd.xjyl.gov.cn/upload/1982074953649324032.html
rd.xjyl.gov.cn/upload/1982074953716432896.html
rd.xjyl.gov.cn/upload/1982074953758375936.html
rd.xjyl.gov.cn/upload/1982074953796124672.html
rd.xjyl.gov.cn/upload/1982074953800318976.html
rd.xjyl.gov.cn/upload/1982074953833873408.html
rd.xjyl.gov.cn/upload/1982074953838067712.html
rd.xjyl.gov.cn/upload/1982074954005839872.html
rd.xjyl.gov.cn/upload/1982074954010034176.html
rd.xjyl.gov.cn/upload/1982074954144251904.html
rd.xjyl.gov.cn/upload/1982074954219749376.html
rd.xjyl.gov.cn/upload/1982074954278469633.html
rd.xjyl.gov.cn/upload/1982074954282663936.html
rd.xjyl.gov.cn/upload/1982074954291052544.html
rd.xjyl.gov.cn/upload/1982074954278469632.html
rd.xjyl.gov.cn/upload/1982074954442047488.html
}Connection::~Connection() {assert(state_ == kDisconnected);
}void Connection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);channel_.enableReading();if (connectionCallback_) {connectionCallback_(shared_from_this());}
}void Connection::connectDestroyed() {loop_->assertInLoopThread();if (state_ == kConnected) {setState(kDisconnected);channel_.disableAll();if (connectionCallback_) {connectionCallback_(shared_from_this());}}channel_.remove();
}void Connection::send(const std::string& message) {if (state_ == kConnected) {if (loop_->isInLoopThread()) {sendInLoop(message);} else {loop_->runInLoop(std::bind(&Connection::sendInLoop, this, message));}}
}void Connection::shutdown() {if (state_ == kConnected) {setState(kDisconnecting);loop_->runInLoop(std::bind(&Connection::shutdownInLoop, this));}
}void Connection::handleRead() {loop_->assertInLoopThread();int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_.fd(), &savedErrno);if (n > 0) {if (messageCallback_) {messageCallback_(shared_from_this(), &inputBuffer_);}} else if (n == 0) {handleClose();} else {// 错误处理}
}void Connection::handleWrite() {loop_->assertInLoopThread();if (channel_.isWriting()) {ssize_t n = ::write(channel_.fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());if (n > 0) {outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0) {channel_.disableWriting();if (state_ == kDisconnecting) {shutdownInLoop();}}} else {// 错误处理}}
}void Connection::handleClose() {loop_->assertInLoopThread();setState(kDisconnected);channel_.disableAll();std::shared_ptr<Connection> guardThis(shared_from_this());if (connectionCallback_) {connectionCallback_(guardThis);}if (closeCallback_) {closeCallback_(guardThis);}
}void Connection::handleError() {// 错误处理
}void Connection::sendInLoop(const std::string& message) {loop_->assertInLoopThread();ssize_t nwrote = 0;if (!channel_.isWriting() && outputBuffer_.readableBytes() == 0) {nwrote = ::write(channel_.fd(), message.data(), message.size());if (nwrote >= 0) {if (static_cast<size_t>(nwrote) < message.size()) {// 剩余数据添加到输出缓冲区}} else {// 错误处理return;}}if (static_cast<size_t>(nwrote) < message.size()) {outputBuffer_.append(message.data() + nwrote, message.size() - nwrote);if (!channel_.isWriting()) {channel_.enableWriting();}}
}void Connection::shutdownInLoop() {loop_->assertInLoopThread();if (!channel_.isWriting()) {socket_.shutdownWrite();}
}4. Server 主类
cpp
// Server.h
#ifndef SERVER_H
#define SERVER_H#include <memory>
#include <map>
#include <atomic>
#include "Acceptor.h"
#include "EventLoop.h"
#include "EventLoopThreadPool.h"class InetAddress;class Server {
public:using ThreadInitCallback = std::function<void(EventLoop*)>;Server(EventLoop* loop, const InetAddress& listenAddr, const std::string& name = "");~Server();void setThreadNum(int numThreads);void setThreadInitCallback(const ThreadInitCallback& cb) {threadInitCallback_ = cb;}void start();void setConnectionCallback(const Connection::ConnectionCallback& cb) {connectionCallback_ = cb;}void setMessageCallback(const Connection::MessageCallback& cb) {messageCallback_ = cb;}private:void newConnection(int sockfd, const InetAddress& peerAddr);void removeConnection(const std::shared_ptr<Connection>& conn);void removeConnectionInLoop(const std::shared_ptr<Connection>& conn);using ConnectionMap = std::map<int, std::shared_ptr<Connection>>;EventLoop* baseLoop_;const std::string name_;std::unique_ptr<Acceptor> acceptor_;std::unique_ptr<EventLoopThreadPool> threadPool_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;ThreadInitCallback threadInitCallback_;std::atomic<bool> started_;int nextConnId_;ConnectionMap connections_;
};#endifcpp
// Server.cpp
#include "Server.h"
#include "InetAddress.h"
#include "Connection.h"
#include <functional>Server::Server(EventLoop* loop, const InetAddress& listenAddr, const std::string& name): baseLoop_(loop),name_(name),acceptor_(std::make_unique<Acceptor>(loop, listenAddr)),threadPool_(std::make_unique<EventLoopThreadPool>(loop)),started_(false),nextConnId_(1) {acceptor_->setNewConnectionCallback(std::bind(&Server::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}Server::~Server() {baseLoop_->assertInLoopThread();// 清理所有连接
}void Server::setThreadNum(int numThreads) {threadPool_->setThreadNum(numThreads);
}void Server::start() {if (!started_.exchange(true)) {threadPool_->start(threadInitCallback_);baseLoop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));}
}void Server::newConnection(int sockfd, const InetAddress& peerAddr) {baseLoop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();std::string connName = name_ + "-" + std::to_string(nextConnId_++);auto conn = std::make_shared<Connection>(ioLoop, sockfd, acceptor_->getLocalAddress(), peerAddr);connections_[sockfd] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setCloseCallback(std::bind(&Server::removeConnection, this, std::placeholders::_1));ioLoop->runInLoop(std::bind(&Connection::connectEstablished, conn));
}void Server::removeConnection(const std::shared_ptr<Connection>& conn) {baseLoop_->runInLoop(std::bind(&Server::removeConnectionInLoop, this, conn));
}void Server::removeConnectionInLoop(const std::shared_ptr<Connection>& conn) {baseLoop_->assertInLoopThread();size_t n = connections_.erase(conn->socket().fd());if (n != 1) {// 错误处理}EventLoop* ioLoop = conn->getLoop();ioLoop->queueInLoop(std::bind(&Connection::connectDestroyed, conn));
}关键特性说明
1. 线程安全的事件循环
每个EventLoop绑定一个线程
跨线程调用通过wakeup fd和pending functors实现
确保所有IO操作在正确的线程中执行
2. 高效的缓冲区设计
使用readv/scatter-gather IO提高读取效率
自动扩容机制避免频繁内存分配
零拷贝优化减少数据移动
3. 智能的资源管理
使用shared_ptr管理Connection生命周期
RAII模式确保资源正确释放
防止悬空指针和内存泄漏
4. 灵活的回调机制
支持用户自定义各种事件回调
类型安全的函数包装
便于扩展和定制
使用示例
cpp
#include "Server.h"
#include "EventLoop.h"
#include "InetAddress.h"
#include <iostream>void onConnection(const std::shared_ptr<Connection>& conn) {if (conn->connected()) {std::cout << "New connection from " << conn->peerAddress().toIpPort() << std::endl;} else {std::cout << "Connection closed" << std::endl;}
}void onMessage(const std::shared_ptr<Connection>& conn, Buffer* buf) {std::string msg(buf->retrieveAllAsString());std::cout << "Received: " << msg << std::endl;conn->send("Echo: " + msg);
}int main() {EventLoop loop;InetAddress listenAddr(8888);Server server(&loop, listenAddr, "TestServer");server.setConnectionCallback(onConnection);server.setMessageCallback(onMessage);server.setThreadNum(4);server.start();loop.loop();return 0;
}总结
本文实现了Server模块的核心组件,包括:
线程安全的事件循环机制
高效的连接接收和管理
多线程支持
完整的生命周期管理
在下一篇文章中,我们将继续完善这个服务器模块,添加定时器、日志记录、HTTP协议支持等高级功能。
