当前位置: 首页 > news >正文

muduo源码解析

 1.对类进行禁止拷贝

class noncopyable
{public:noncopyable(const noncopyable&) = delete;void operator=(const noncopyable&) = delete;protected:noncopyable() = default;~noncopyable() = default;
};

2.日志

使用枚举定义日志等级

enum LogLevel{TRACE,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS,};

1.获取日志实例对象

Logger& instance();

2.设置日志级别

void setLogLevel(int level);

3.写日志

void log(std::string& msg);
#define LOG_info(fmt, ...) \do { \fprintf(stderr, "[INFO] %s:%d: " fmt "\n", \__FILE__, __LINE__, ##__VA_ARGS__); \} while(0)

1.C/C++ 的预处理器规定,一个完整的宏定义必须位于同一逻辑行。但为了代码可读性,需要用 \ 显式声明换行。

2.为什么用 do { ... } while(0),避免宏展开后的语法问题

3. LOG_info(fmt, ...)

  • fmt:格式化字符串参数(如 "Value: %d"

  • ...:可变参数(变长参数列表),表示可以传递任意数量的额外参数。

  • ##__VA_ARGS__:GNU 扩展语法,处理可变参数

#ifndef LOG_H
#define LOG_H#include <cstdio>
#include <ctime>
#include <cstdarg>// 定义日志级别
enum LogLevel
{TRACE = 0,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS
};// 设置默认日志级别
extern LogLevel g_logLevel;  // 在别的地方定义这个全局变量// 宏定义日志级别
#define LOG_TRACE(fmt, ...) \do { \if (g_logLevel <= TRACE) log_message("TRACE", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_DEBUG(fmt, ...) \do { \if (g_logLevel <= DEBUG) log_message("DEBUG", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_INFO(fmt, ...) \do { \if (g_logLevel <= INFO) log_message("INFO", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_WARN(fmt, ...) \do { \if (g_logLevel <= WARN) log_message("WARN", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_ERROR(fmt, ...) \do { \if (g_logLevel <= ERROR) log_message("ERROR", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_FATAL(fmt, ...) \do { \if (g_logLevel <= FATAL) log_message("FATAL", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)inline void log_message(const char* level, const char* file, int line, const char* fmt, ...)
{// 获取当前时间char timeBuf[20];std::time_t now = std::time(nullptr);std::strftime(timeBuf, sizeof(timeBuf), "%Y-%m-%d %H:%M:%S", std::localtime(&now));// 打印日志fprintf(stderr, "[%s] [%s] %s:%d: ", timeBuf, level, file, line);va_list args;va_start(args, fmt);vfprintf(stderr, fmt, args);va_end(args);fprintf(stderr, "\n");
}#endif // LOG_H

 3.TimeStamp

#include <chrono>
#include <string>
#include <ctime>
#include <iomanip>
#include <sstream>class Timestamp {
public:// 构造函数Timestamp() : tp_(std::chrono::system_clock::now()) {}// 从时间点构造explicit Timestamp(std::chrono::system_clock::time_point tp) : tp_(tp) {}// 获取当前时间戳 (静态工厂方法)static Timestamp now() {return Timestamp();}// 转换为字符串 (默认格式)std::string to_string() const {return format("%Y-%m-%d %H:%M:%S");}   private:std::chrono::system_clock::time_point tp_;
};

4.InetAddress

# include<netinet/in.h> // 定义了Internet地址族// 封装socket地址
class InetAdress{
public:explicit InetAddress(std::string& ip, uint16_t port, bool ipv6 = false);explicit InetAddress(const sockaddr_in &addr): addr_(addr);std::string toIp() const;std::string toIpPort() const;uint16_t port() const;
private:socketaddr_in addr_;
}

struct sockaddr_in {short            sin_family;   // 地址族unsigned short   sin_port;     // 端口号struct in_addr   sin_addr;     // IP 地址char             sin_zero[8];  // 填充字节,用于对齐
};
InetAddress::InetAddress(StringArg ip, uint16_t port, bool ipv6)
{if (ipv6 || strchr(ip.c_str(), ':')){// ...}else{memset(&addr_, 0, sizeof addr_);addr_->sin_family = AF_INET;addr_->sin_port = htons(port);struct in_addr addr;if (inet_pton(AF_INET, ip, &addr) <= 0) {printf("Invalid IPv4 address\n");} else {printf("IPv4 address converted successfully\n");}addr_->sin_addr = addr;}
}string InetAddress::toIpPort() const
{char buf[64] = "";inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);size_t end  = strlen(buf);uint16_t port = ntohs(addr_.sin_port);sprintf(buf + end, ":%u", port);return buf;
}

5.EventLoop

1.获取和管理线程的标识(TID)、线程名称、线程堆栈信息

namespace muduo
{
namespace CurrentThread
{// internalextern __thread int t_cachedTid;extern __thread char t_tidString[32];extern __thread int t_tidStringLength;extern __thread const char* t_threadName;void cacheTid();inline int tid(){if (__builtin_expect(t_cachedTid == 0, 0)){cacheTid();}return t_cachedTid;}inline const char* tidString() // for logging{return t_tidString;}inline int tidStringLength() // for logging{return t_tidStringLength;}inline const char* name(){return t_threadName;}bool isMainThread();void sleepUsec(int64_t usec);  // for testingstring stackTrace(bool demangle);
}  // namespace CurrentThread
}  // namespace muduo
  • extern 关键字用于声明变量或函数的存储位置 
  • __thread 是 GCC/Clang 提供的关键字,用于声明线程局部存储变量(Thread-Local Storage, TLS)。每个线程都有自己独立的变量副本。 
  • __builtin_expect 是 GCC/Clang 提供的内置函数,用于向编译器提供分支预测的提示 
static_cast<pid_t>(::syscall(SYS_gettid));

 2.EventLoop:

1. EventLoop 干什么?(职责)

EventLoop事件循环的核心类,负责管理和调度 I/O 事件和异步任务。
主要职责:

  1. 事件循环管理: 不断循环等待和处理事件。

  2. 事件分发: 将活跃事件交给合适的回调处理。

  3. 任务调度: 支持在事件循环线程内安全地执行异步任务。

  4. 线程唤醒: 当其他线程提交任务时能够唤醒阻塞的事件循环。

class EvenLoop : noncopyable{
public:using Functor = std::function<viod()>;EvenLoop();~EvenLoop();void loop();void quit();//状态查询Timestamp pollReturnTime() const { return pollReturnTime_; }//回调管理void runInLoop(Functor cb);// 在事件循环线程中立即执行回调 cbvoid queueInLoop(Functor cb);//将回调 cb 排队,稍后在事件循环线程中执行size_t queueSize() const;//通道管理void wakeup();void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool hasChannel(Channel* channel);//线程检查bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:using ChannelList = std::vector<Channel*>;bool looping_;/* atomic *///是否正在进行事件循环std::atomic_bool quit_;//是否退出时间循环bool eventHandling_; /* atomic */ //表示是否正在处理事件bool callingPendingFunctors_; /* atomic *///表示是否正在执行待处理回调const pid_t threadId_;//当前循环tid// PollerTimestamp pollReturnTime_;//Poller返回时间std::unique_ptr<Poller> poller_;//Eventloop管理的PollerChannelList activeChannels_;// 存储当前Poller 返回的活动通道列表// Channelint wakeupFd_;//eventfd,用于跨线程唤醒事件循环std::unique_ptr<Channel> wakeupChannel_;//封装 wakeupFd_,绑定到 Poller(如 epoll),监控读事件mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);// 存储其他线程提交的任务void doPendingFunctors();
}

1.在 muduo 网络库中,EventLoop 类的构造函数初始化了一系列 bool 类型的标志变量(如 looping_quit_eventHandling_callingPendingFunctors_),这些变量用于 ​​精确控制事件循环的状态和线程安全​

looping_ 的限制​:​禁止重复启动事件循环

 eventHandling_的限制:不能在处理事件时析构channel

callingPendingFunctors_ :确保跨线程任务提交或事件循环忙时能够及时唤醒 EventLoop

if (!isInLoopThread() || callingPendingFunctors_){wakeup();}

 在 Linux 系统中,获取当前线程的唯一 ID(TID)可以通过 syscall(SYS_gettid) 

//防止一个线程创建多个EventLoop
__thread EventLoop* t_loopInThisThread = 0;

2.在 Linux 系统中,::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC) 用于创建一个 ​​事件文件描述符(eventfd)​​,通常用于线程间或进程间的异步事件通知(例如唤醒事件循环) 

3. std::atomic_bool 是 C++ 标准库(<atomic> 头文件)提供的一种原子类型,专门用于布尔值的原子操作。它是 std::atomic<bool> 的特化版本,允许在多线程环境中安全地读写布尔值,而无需显式使用锁(如 std::mutex)。std::atomic_bool 确保操作的原子性,避免数据竞争(data race),适用于需要高效、无锁同步的场景。 

4.assertInLoopThread();当前线程是否是loop所在线程

  • EventLoop::loop()

  • EventLoop::updateChannel(Channel* channel)

  • EventLoop::removeChannel(Channel* channel)

  • bool EventLoop::hasChannel(Channel* channel)

  • void TcpConnection::sendInLoop(const void* data, size_t len)

  • void TcpConnection::shutdownInLoop()

  • void TcpConnection::startReadInLoop() void TcpConnection::stopReadInLoop()

  • void TcpConnection::connectEstablished() void TcpConnection::connectDestroyed()

  • void TcpConnection::handleRead(Timestamp receiveTime)

  • void TcpConnection::handleWrite()

EventLoop::updateChannel(Channel* channel)|
poller_->updateChannel(channel);

对channel进行操作确保: 当前线程是loop所在线程,当前线程是channel所在线程

void EventLoop::loop()
{looping_ = true;quit_ = false;while (!quit_){activeChannels_.clear();// 等待事件发生poller_->poll(kPollTimeMs, &activeChannels_);// 遍历所有活跃通道,逐个调用其事件处理函数for (Channel* channel : activeChannels_){channel->handleEvent();}// 执行异步任务doPendingFunctors();}looping_ = false;
}

void EventLoop::doPendingFunctors()
{std::vector<Functor> functors;           // 定义一个局部任务列表callingPendingFunctors_ = true;          // 标记正在执行任务{MutexLockGuard lock(mutex_);         // 上锁,保护任务队列functors.swap(pendingFunctors_);     // 将任务队列中的任务移到本地变量}// 逐个执行任务for (const Functor& functor : functors){functor();                          // 执行任务回调}callingPendingFunctors_ = false;         // 执行完毕,重置标记
}

 用 swap()任务队列局部变量互换,而不是逐个取出任务:降低锁竞争,只加锁一次,把所有任务交换到局部变量中,后续执行不需要加锁。

2. 为什么需要任务队列?
特性handleEvent()doPendingFunctors()
作用处理I/O事件处理异步任务
触发时机poll()返回后,有活跃事件时触发每轮事件循环末尾,I/O事件处理完毕后
优先级高,实时性要求高低,非实时任务
来源网络 I/O、定时器、信号等事件其他线程提交的任务、延迟任务
典型场景读写网络数据、关闭连接线程间任务提交、定时任务、异步回调
线程安全性需要确保 Channel 所在线程安全通过加锁保证线程安全
执行频率每个活跃事件都调用每次事件循环结束后调用一次

在高性能服务器中,muduo 采用单线程 Reactor 模式,一个 EventLoop 对象只能在一个线程中运行。
但是,多个线程可能需要向同一个事件循环线程提交任务,比如:

  1. 工作线程向主线程提交任务。

  2. 异步回调需要在事件循环线程中执行。

  3. 事件处理函数中异步添加任务。

 当需要将任务提交到事件循环时,调用 queueInLoop()

​
void EventLoop::runInLoop(Functor cb)
{if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(Functor cb)
{{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
}​

 callingPendingFunctors_ :

  • 如果当前正在执行回调队列中的回调函数(callingPendingFunctors_true),并且有新的回调函数被加入到队列中,需要通过 wakeup() 方法唤醒事件循环线程,以确保新的回调函数能够被处理。

  • 如果不检查 callingPendingFunctors_,可能会导致事件循环线程被重复唤醒,从而增加不必要的开销

int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_SYSERR << "Failed in eventfd";abort();}return evtfd;
}

 eventfd 文件描述符(FD),这是一个 Linux 特定的机制,用于线程间通信或事件通知。eventfd 常用于高效的无锁同步。

EventLoop::EventLoop(): looping_(false),quit_(false),eventHandling_(false),callingPendingFunctors_(false),iteration_(0),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),timerQueue_(new TimerQueue(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_)),currentActiveChannel_(NULL)
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// we are always reading the wakeupfdwakeupChannel_->enableReading();
}
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}

事件循环线程持有 wakeupFd_,并监听其读端。当其他线程调用 wakeup() 方法时,事件循环线程会从阻塞状态中唤醒。

  • 交换后,pendingFunctors_ 变为空向量,新任务可继续写入。

  • 局部变量 functors 在栈上析构时自动清理已执行的任务

MutexLockGuard 是一个 RAII(Resource Acquisition Is Initialization,资源获取即初始化)风格的锁管理器。它的作用是确保在作用域结束时自动释放锁,从而避免忘记手动释放锁导致的死锁问题。 

RAII 的核心思想是将资源的获取(如分配内存、打开文件、锁定互斥锁等)与对象的构造绑定在一起,将资源的释放与对象的析构绑定在一起,确保资源在对象生命周期结束时自动释放,从而避免资源泄漏和其他资源管理错误。

3.Thread

封装了对 POSIX 线程(pthread)的创建、管理和操作

class Thread : nocopyable{
public:using ThreadFunc = std::function<void()>;explicit Thread(ThreadFunc, const std::string &name = string());~Thread();void start();void join();bool started() const { return started_; }pthread_t pthreadId() const { return pthreadId_; }pid_t tid() const { return tid_; }const string& name() const { return name_; }
pruvate:bool       started_;bool       joined_;std::shared_ptr<std::thread> thread_;pid_t      tid_;ThreadFunc func_;string     name_;static std::atomic_int32 numCreated;
}
void Thread::start() {assert(!started_);started_ = true;if (pthread_create(&pthreadId_, nullptr, &startThread, this) != 0) {started_ = false;throw std::runtime_error("Failed to create thread");}latch_.wait();  // 等待线程真正启动
}void* startThread(void* obj)
{ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}
int Thread::join() {assert(started_);assert(!joined_);joined_ = true;return pthread_join(pthreadId_, nullptr);
}

 4.EventLoopThread:它将事件循环和线程进行绑定,用于在一个独立线程中运行 EventLoop,从而避免在主线程中阻塞 I/O 事件处理

class EventLoopThread : noncopable(){
public:using ThreadInitCallback = std::functional<void(EventLoop*)>;EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),const string& name = string());~EventLoopThread();EventLoop* startLoop();private:EventLoop* loop_;               // 事件循环指针bool exiting_;                  // 标志退出Thread thread_;                 // 封装的线程对象std::mutex mutex_;               // 互斥锁,保证线程安全Condition cond_;                // 条件变量,同步线程启}
EventLoop* EventLoopThread::startLoop() {assert(!thread_.started());thread_.start();  // 启动线程EventLoop* loop = nullptr;{MutexLockGuard lock(mutex_);while (loop_ == nullptr) {  // 等待子线程初始化cond_.wait();}loop = loop_;}return loop;
}

 为什么要在循环中检查?

(1)条件变量的常见问题:

  1. 虚假唤醒(Spurious Wakeup):

    • 条件变量在没有通知的情况下,wait() 可能意外返回

    • 这通常由操作系统调度中断信号引起。

    • 如果仅使用 cond_.wait();,即使条件未满足,程序也可能继续执行,导致逻辑错误

  2. 多线程竞争:

    • 可能存在多个线程等待同一条件,一旦被唤醒,不一定是期望的线程

    • 没有循环检查,程序容易进入非预期状态

void EventLoopThread::threadFunc() {EventLoop loop;  // 创建事件循环对象{MutexLockGuard lock(mutex_);loop_ = &loop;       // 将loop_指向刚创建的EventLoop对象cond_.notify();      // 通知主线程:loop_已初始化}loop.loop();  // 启动事件循环
}

为什么要使用条件变量?

  • EventLoop 对象是在子线程中创建的,但主线程需要获取它的指针。

  • 如果没有同步机制,主线程有可能在子线程创建之前就访问 loop_,导致空指针

  • 通过互斥锁和条件变量,让主线程在 loop_ 初始化之前阻塞等待,直到子线程创建完成并通知。

 5.EventLoopThreadPool:单个线程的事件循环(EventLoop无法应对大量并发请求。
为了提升性能,通常使用
多线程事件循环
,即线程池

  • 主线程(I/O 线程): 负责监听新连接。

  • 工作线程(计算/事件线程): 负责数据读写和计算。

    class EventLoopThreadPool : noncopyable
    {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; }void start(const ThreadInitCallback& cb = ThreadInitCallback());/// round-robinEventLoop* getNextLoop();/// with the same hash code, it will always return the same EventLoopEventLoop* getLoopForHash(size_t hashCode);std::vector<EventLoop*> getAllLoops();bool started() const{ return started_; }const string& name() const{ return name_; }private:EventLoop* baseLoop_;                           // 主线程中的EventLoopbool started_;                                  // 是否启动int numThreads_;                                // 线程数量int next_;                                      // 下一个分配的EventLoop下标std::vector<std::unique_ptr<EventLoopThread>> threads_;  // 线程列表std::vector<EventLoop*> loops_;                  // 事件循环列表};
    //启动线程池
    void EventLoopThreadPool::start() {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;for (int i = 0; i < numThreads_; ++i) {auto thread = std::make_unique<EventLoopThread>();loops_.push_back(thread->startLoop());  // 启动并获取事件循环threads_.push_back(std::move(thread));  // 存储线程对象}// 如果没有设置线程,主线程直接作为唯一的EventLoopif (numThreads_ == 0) {loops_.push_back(baseLoop_);}
    }
    
    //获取下一个事件循环
    EventLoop* EventLoopThreadPool::getNextLoop() {baseLoop_->assertInLoopThread();EventLoop* loop = baseLoop_;if (!loops_.empty()) {loop = loops_[next_];next_ = (next_ + 1) % loops_.size();  // 轮询选择下一个Loop}return loop;
    }
    

6.channel  

作为 Reactor 模式中的事件处理器,封装了文件描述符的事件管理和回调处理

1.关键的成员变量

变量类型作用
fd_const int管理的文件描述符(socket/eventfd/timerfd 等),Channel 只管理但不拥有它
events_int关注的事件类型EPOLLIN/EPOLLOUT 等),通过 enableReading()/enableWriting() 设置
revents_int实际触发的事件类型(由 Poller/Epoll 返回)
loop_EventLoop*所属的事件循环,所有操作必须在对应 EventLoop 线程中执行
readCallback_ReadEventCallback读事件回调(带时间戳参数,如收到数据时调用)
writeCallback_EventCallback写事件回调(如可写时调用)
closeCallback_EventCallback连接关闭回调
errorCallback_EventCallback错误处理回调
tie_std::weak_ptr<void>生命周期绑定(防止 Channel 在处理事件期间被意外销毁)

2.关键的成员函数

1. 事件管理
函数作用
enableReading()注册读事件(`events_= kReadEvent`)
disableReading()取消读事件
enableWriting()注册写事件(`events_= kWriteEvent`)
disableWriting()取消写事件
disableAll()取消所有事件
update()通知 EventLoop 更新关注的事件(内部调用 EventLoop::updateChannel()
2. 事件处理核心
函数作用
handleEvent(Timestamp)处理事件的入口(根据 revents_ 调用对应的回调)
handleEventWithGuard(Timestamp)实际处理事件(加生命周期保护)
setReadCallback()/setWriteCallback()设置读/写/关闭/错误的回调函数
3. 生命周期控制
函数作用
tie(const std::shared_ptr<void>&)绑定共享指针,防止回调执行期间对象被销毁
remove()从 EventLoop 中移除 Channel

3.核心设计思想

  1. 事件与回调分离

    • events_ 和 revents_ 分离,由 Poller 监听 events_,返回 revents_

    • 通过 setXXXCallback() 设置业务逻辑,Channel 只负责事件分发。

  2. 线程安全

    • 所有操作必须通过 EventLoop 在 IO 线程执行(通过 loop_ 指针保证)。

  3. 资源管理

    • 使用 tie_ 绑定共享指针(如 TcpConnection),避免处理事件时对象被析构。

  4. 高效事件过滤

    • isReading()/isWriting() 快速检查事件状态,避免无效回调。

class Channel : noncopyable{
public:using EventCallback = std::function<void()>;using ReadEventCallback = std::function<void(Timestamp)>;Channel(EventLoop* loop, int fd);~Channel();// poller通知后,处理事件
// 是否绑定对象,来决定是否需要使用智能指针 guard 来保护这个对象的生命周期void handleEvent(Timestamp receiveTime);//设置回调函数对象:movevoid setReadCallback(ReadEventCallback cb){ readCallback_ = std::move(cb); }void setWriteCallback(EventCallback cb){ writeCallback_ = std::move(cb); }void setCloseCallback(EventCallback cb){ closeCallback_ = std::move(cb); }void setErrorCallback(EventCallback cb){ errorCallback_ = std::move(cb); }// 防止channel执行回调时被删除void tie(const std::shared_ptr<void>&);// 事件管理void enableReading() { events_ |= kReadEvent; update(); }void disableReading() { events_ &= ~kReadEvent; update(); }void enableWriting() { events_ |= kWriteEvent; update(); }void disableWriting() { events_ &= ~kWriteEvent; update(); }void disableAll() { events_ = kNoneEvent; update(); }bool isWriting() const { return events_ & kWriteEvent; }bool isReading() const { return events_ & kReadEvent; }bool isNoneEvent() const { return events_ == kNoneEvent; }int fd() const { return fd_; }int events() const { return events_; }void set_revents(int revt) { revents_ = revt; }EventLoop* ownerLoop() { return loop_; }// for Pollerint index() { return index_; }void set_index(int idx) { index_ = idx; }void remove();private:void update();void handleEventWithGuard(Timestamp receiveTime);static const int kNoneEvent;static const int kReadEvent;static const int kWriteEvent;EventLoop* loop_;const int  fd_;int        events_;int        revents_; // it's the received event types of epoll or pollint        index_;   // used by Poller.(channel的状态:new/added/deleted)std::weak_ptr<void> tie_;//回调函数ReadEventCallback readCallback_;EventCallback writeCallback_;EventCallback closeCallback_;EventCallback errorCallback_;
}

​1. guard 和 tie 的作用(防止对象生命周期问题)​​​

在事件驱动模型中,Channel 对象通常与某个资源(如 TCP 连接 TcpConnection)关联。当事件触发时,Channel 会调用用户注册的回调函数(如 readCallback_)。
​风险​​:在回调执行期间,关联的资源(如 TcpConnection)可能被其他线程销毁,导致回调访问野指针,引发崩溃。

在 handleEvent 中尝试将 tie_ 提升为 shared_ptrguard = tie_.lock()

在 muduo 的 Channel 类中,使用 std::weak_ptr<void> 而非 std::shared_ptr<void> 来管理关联对象的生命周期,主要基于以下设计考量:


​1. 避免循环引用(核心原因)​

  • ​问题场景​​:
    如果 Channel 直接持有 shared_ptr<void>,而关联对象(如 TcpConnection)又持有 Channel 的 shared_ptr,会导致循环引用

  • ​解决方案​​:
    weak_ptr 是弱引用,不会增加引用计数,打破循环依赖。
    只有通过 lock() 临时提升为 shared_ptr 时才增加计数,确保安全访问。


​2. 明确所有权关系​

  • Channel 不拥有资源​​:
    Channel 只是事件处理器,其关联对象(如 TcpConnection)应由更高层(如 EventLoop 或用户代码)管理生命周期。

    • 使用 weak_ptr 表明 Channel 仅“观察”资源,不参与所有权管理。
    • 资源销毁时,tie_.lock() 会返回 nullptrChannel 自动跳过回调。
  • ​对比 shared_ptr​:
    如果 Channel 持有 shared_ptr,会模糊所有权边界,增加资源意外存活的风险。

std::weak_ptr 是 C++ 标准库中的一种智能指针类型,用于解决 std::shared_ptr 的循环引用问题。它允许一个对象安全地引用另一个对象,而不会增加引用计数。std::weak_ptr 通常用于观察(但不拥有)一个由 std::shared_ptr 管理的对象。lock() 方法,尝试将弱引用提升为强引用(std::shared_ptr

循环引用:假设我们有两个类 AB,它们相互引用。如果不使用 std::weak_ptr,会导致循环引用。

2.setXXXCallback() 函数使用 std::move 来设置回调函数:避免不必要的拷贝,提高性能

3.std::function 是 C++ 标准库中的一个模板类,用于封装可调用对象 

7.poller

1.poller

封装操作系统的 I/O 多路复用机制(如 poll, epoll),提供一个统一接口让 EventLoop 能获取到就绪的 I/O 事件(如可读、可写、出错等),并通知对应的 Channel。 EventLoop 的核心组件之一

功能描述
抽象封装抽象 I/O 多路复用(poll/epoll/...)
事件驱动核心EventLoop 协作,驱动整个网络库
管理 fd → Channel 映射负责让系统知道我们关心哪些事件、什么时候触发

class Poller : noncopyable{
Public:using ChannelList = std::vector<Channel*>;Poller(EventLoop *loop);virtual ~Poller();// 虚函数接口(抽象方法)// 执行一次 I/O 事件轮询(超时等待 timeoutMs 毫秒),将活跃的 Channel 填入virtual TimeStamp poll(int timeoutMs, ChannelList* activeChannels) = 0;// 当 Channel 感兴趣的事件发生变化时(如从监听读改为监听写),调用该函数virtual void updataChannel(Channel* channel) = 0;//从 Poller 中移除某个 Channelvirtual void removeChannel(Channel* channel) = 0;virtual bool hasChannel(Channel* channel) const;//工厂方法static Poller* newDefaultPoller(EventLoop* loop);protected:using ChannelMap = std::unordered<int, Channel*>;ChannelMap channels_;
private:EventLoop *ownweLoop_;
}

static Poller* newDefaultPoller(EventLoop* loop);是工厂方法,工厂方法是将对象的创建“推迟”到子类或函数中,“返回基类指针”是工厂方法模式最常见、最有用的实践形式。实现“抽象创建 + 多态调用”的关键方式  

2.EPollPoller

EPollPoller 是对 epoll 的封装,实现了 Poller 接口,核心作用是“等待就绪事件并通知 EventLoop

class EPollPoller : public Poller{
public:EPollEpoller(EventLoop *loop);~EPollPoller() override;Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;// epoll_waitvoid updateChannel(Channel* channel) override;    //epoll_ctl :add/modvoid removeChannel(Channel* channel) override;    //epoll_ctl :delprivate:void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;// 将 epoll_event 转成 Channelvoid update(int operation, Channel* channel);// 封装 epoll_ctlint epollfd_;EventList events_;//就绪的IO事件
}

8.Socket:提供对底层 socket 操作的封装(如设置选项、绑定、连接、关闭等)

class Socket : noncopyable {public:explicit Socket(int sockfd) : sockfd_(sockfd) {}~Socket();int fd() const { return sockfd_; }void bindAddress(const InetAddress& localaddr);void listen();int accept(InetAddress* peeraddr);void shutdownWrite();void setTcpNoDelay(bool on);void setReuseAddr(bool on);void setReusePort(bool on);void setKeepAlive(bool on);private:const int sockfd_;
};

1. TCP 选项总览:

TCP 选项对应方法作用
禁用 Nagle 算法setTcpNoDelay(bool on)减少小包延迟,提高实时性
地址复用(SO_REUSEADDR)setReuseAddr(bool on)解决端口占用,快速重启服务
端口复用(SO_REUSEPORT)setReusePort(bool on)多进程监听同一端口,提高并发能力
TCP 保活setKeepAlive(bool on)检测长时间空闲连接,防止假死

2. TCP_NODELAY:禁用 Nagle 算法

  • Nagle算法:

    • 为了减少网络中小数据包的数量,TCP 默认会将小包合并后再发送。

    • 只有当前一个数据包得到ACK 确认后,才发送下一个小包。

  • 问题:

    • 在实时通信(如即时消息、游戏)中,小包积累和延迟会影响性能。

  • 低延迟要求高的应用:即时通信、实时游戏

大量小数据包传输:流媒体、物联网设备

void Socket::setTcpNoDelay(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
}
  • IPPROTO_TCP 指定协议类型。

  • TCP_NODELAY 禁用 Nagle 算法。

  • optval 1 为禁用,0 为启用。

3. SO_REUSEADDR:地址复用

  • 场景:

    • 当服务器意外崩溃或重启后,端口可能仍处于 TIME_WAIT 状态,导致端口被占用

  • 问题:

    • 无法立即重新绑定相同端口,导致服务重启失败。

  • 服务器重启: 服务器崩溃后端口快速重用。

  • 端口复用: 多个进程监听相同端口(配合 SO_REUSEPORT)。

void Socket::setReuseAddr(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
  • SOL_SOCKET 套接字级别的选项。

  • SO_REUSEADDR 地址复用。

  • optval 1 为启用,0 为禁用。

4. SO_REUSEPORT:端口复用

  • 场景:

    • 多核服务器上,为了充分利用 CPU 资源,通常使用多个进程监听相同端口

  • 问题:

    • 未开启时,多个进程无法绑定同一端口,导致服务扩展性差

  • 多核服务器: 高并发服务器提高吞吐量。

  • 负载均衡: 允许多个进程共享一个监听端口。

void Socket::setReusePort(bool on) {
#ifdef SO_REUSEPORTint optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
#elseif (on) {LOG_SYSERR << "SO_REUSEPORT is not supported.";}
#endif
}

5. SO_KEEPALIVE:TCP 保活

  • 场景:

    • 长时间空闲连接,如客户端掉线或网络中断。

  • 问题:

    • 服务器长时间无法检测到客户端断开

  • 长连接服务: 保持连接状态,防止假死。

  • 防止资源浪费: 检测空闲连接,及时释放。

void Socket::setKeepAlive(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
}

TIME_WAIT 产生的原因

主动关闭连接的一方发送 FIN 并接收到对方的 ACK 后,进入 TIME_WAIT 状态:

  1. 防止旧数据包干扰新连接:

    • 如果立即释放端口,新连接可能收到上一个连接的残留数据包

  2. 确保对方收到 ACK:

    • 最后一个 ACK 可能丢失,对方会重新发送 FIN,TIME_WAIT 可以重新应答。

3. TIME_WAIT 的问题

在高并发服务器中,TIME_WAIT 数量过多会导致以下问题:

  • 端口耗尽:

    • 短时间内建立大量连接,导致端口不足

  • 重启服务失败:

    • 上次监听的端口仍处于 TIME_WAIT,导致新进程无法重新绑定相同端口。

  • 资源占用:

    • 系统中有大量处于 TIME_WAIT 状态的连接,浪费内存和 CPU。

SO_REUSEADDR 是一个套接字选项,允许服务器在端口处于 TIME_WAIT快速重启

  • 允许端口复用:

    • 即使处于 TIME_WAIT,也能重新绑定相同的端口。

  • 避免端口占用:

    • 当服务器崩溃重启时,避免地址已被占用的错误。

 使用示例:TCP 服务器

InetAddress serverAddr(8080);
Socket listenSock(::socket(AF_INET, SOCK_STREAM, 0));  // 创建套接字listenSock.setReuseAddr(true);       // 设置地址复用
listenSock.bindAddress(serverAddr);  // 绑定端口
listenSock.listen();                 // 监听InetAddress clientAddr;
int connfd = listenSock.accept(&clientAddr);  // 接受连接if (connfd >= 0) {printf("New connection from %s\n", clientAddr.toIpPort().c_str());Socket connSocket(connfd);      // 管理新连接connSocket.setTcpNoDelay(true); // 禁用 Nagle 算法connSocket.shutdownWrite();     // 关闭写操作
}

9.Acceptor:监听客户端连接接收新连接

class Acceptor : noncopyable {public:typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);void setNewConnectionCallback(const NewConnectionCallback& cb) {newConnectionCallback_ = cb;}bool listening() const { return listening_; }void listen();private:void handleRead();EventLoop* loop_;Socket acceptSocket_;             // 封装监听套接字Channel acceptChannel_;           // 封装监听事件NewConnectionCallback newConnectionCallback_;  // 新连接回调bool listening_;
};

//构造函数
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),acceptChannel_(loop, acceptSocket_.fd()),listening_(false) {acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
void Acceptor::handleRead() {InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0) {if (newConnectionCallback_) {newConnectionCallback_(connfd, peerAddr);} else {sockets::close(connfd);}}
}

10. buffer:是存储从套接字读入的数据待发送的数据,并进行高效的读写操作

+-------------------+------------------+------------------+
| prependable bytes |  readable bytes   |  writable bytes   |
|                  |  (数据区域)        |                   |
+-------------------+------------------+------------------+
0                readerIndex_       writerIndex_      buffer_.size()

class Buffer : copable{
public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend){}size_t readableBytes() const{ return writerIndex_ - readerIndex_; }size_t writableBytes() const{ return buffer_.size() - writerIndex_; }size_t prependableBytes() const{ return readerIndex_; }const char* peek() const{ return begin() + readerIndex_; }//可读数据起始地址void retrieveAll()//读写指针复位{readerIndex_ = kCheapPrepend;writerIndex_ = kCheapPrepend;}void retrieve(size_t len)//读指针偏移{assert(len <= readableBytes());if (len < readableBytes()){readerIndex_ += len;}else{retrieveAll();}}string retrieveAllAsString(){return retrieveAsString(readableBytes());//将所有可读数据转成字符串}string retrieveAsString(size_t len)//将可读数据转成字符串{assert(len <= readableBytes());string result(peek(), len);retrieve(len);return result;}void makeSpace(size_t len)//扩容{if (writableBytes() + prependableBytes() < len + kCheapPrepend){// FIXME: move readable databuffer_.resize(writerIndex_+len);}else{// 将可读数据前移,覆盖已读部分assert(kCheapPrepend < readerIndex_);size_t readable = readableBytes();std::copy(begin()+readerIndex_,begin()+writerIndex_,begin()+kCheapPrepend);readerIndex_ = kCheapPrepend;writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}void ensureWritableBytes(size_t len)//判断是否有足够空间可写{if (writableBytes() < len){makeSpace(len);}assert(writableBytes() >= len);}void append(const StringPiece& str)//追加写内容{append(str.data(), str.size());}void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);std::copy(data, data+len, beginWrite());hasWritten(len);}ssize_t readFd(int fd, int* savedErrno);//从文件描述符读数据private:char* begin(){ return &*buffer_.begin(); }//数组首元素地址std::vector<char> buffer_;  // 底层存储容器size_t readerIndex_;        // 读起始位置size_t writerIndex_;        // 写起始位置}
ssize_t readFd(int fd, int* savedErrno) {char extraBuf[65536];  // 栈上空间struct iovec vec[2];const size_t writable = writableBytes();vec[0].iov_base = begin() + writerIndex_;vec[0].iov_len = writable;vec[1].iov_base = extraBuf;vec[1].iov_len = sizeof(extraBuf);const int iovcnt = (writable < sizeof(extraBuf)) ? 2 : 1;const ssize_t n = ::readv(fd, vec, iovcnt);if (n < 0) {*savedErrno = errno;} else if (implicit_cast<size_t>(n) <= writable) {writerIndex_ += n;} else {writerIndex_ = buffer_.size();append(extrabuf, n - writable);}return n;
}

 readvwritevPOSIX 提供的系统调用,“分散读”"聚集写"操作,它们用于一次性从文件描述符读取多个缓冲区的数据,将多个缓冲区数据写入文件描述符

#include <sys/uio.h>ssize_t readv(int fd, const struct iovec *iov, int iovcnt);

 从 fd 顺序读取数据,依次填充到 iov[0]iovec[1]... 直到数据读完或所有缓冲区填满

struct iovec {void  *iov_base;  // 缓冲区起始地址size_t iov_len;   // 缓冲区长度
};

iovec 是 readv 的核心参数,用于指定数据读取的目标缓冲区。这里设置了 两个缓冲区

(1) 主缓冲区(vec[0]

vec[0].iov_base = begin() + writerIndex_;  // 指向当前可写位置的起始地址
vec[0].iov_len = writable;                 // 可写入的最大字节数
  • 目的
    优先将数据直接读取到 内部主缓冲区 的剩余空间(writerIndex_ 之后的部分)。

  • 优点

    • 避免内存拷贝:如果数据能完全放入主缓冲区,无需额外处理。

    • 减少内存碎片:复用已有的缓冲区空间。

(2) 备用栈缓冲区(vec[1]

vec[1].iov_base = extraBuf;                // 栈上的临时缓冲区
vec[1].iov_len = sizeof(extraBuf);         // 固定大小(64KB)
  • 目的
    如果主缓冲区剩余空间不足(writable 太小),则 超额数据 会暂存到栈上的 extraBuf

  • 优点

    • 避免内存浪费:主缓冲区空间不足时,临时用栈空间兜底(栈分配速度快,且函数退出后自动释放)。

    • 防止丢包:即使主缓冲区满,也能保证数据不丢失(暂存到 extraBuf 后再处理)

11.TcpServer

1.TcpServer 的作用

  • 管理连接:

    • 负责接受客户端连接,并生成相应的 TcpConnection 对象。

  • 线程分配:

    • 使用 EventLoopThreadPool 进行多线程管理,保证高并发。

  • 消息处理:

通过回调机制(如连接建立、消息接收、连接关闭)实现事件响应。

class TcpServer : noncopyable {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option = kNoReusePort);~TcpServer();void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }void start();  // 启动服务器private:void newConnection(int sockfd, const InetAddress& peerAddr);void removeConnection(const TcpConnectionPtr& conn);void removeConnectionInLoop(const TcpConnectionPtr& conn);EventLoop* loop_;  // 主线程的事件循环const string ipPort_;const string name_;std::unique_ptr<Acceptor> acceptor_;std::shared_ptr<EventLoopThreadPool> threadPool_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;ThreadInitCallback threadInitCallback_; //线程初始化回调std::atomic started_;int nextConnId_;std::unordered_map<string, TcpConnectionPtr> connections_;  // 连接管理
};

枚举(enum)是一种用户自定义的数据类型,它允许为一组整数值赋予有意义的名称

2. 常见的回调类型

回调名称类型触发时机
ConnectionCallbackvoid(const TcpConnectionPtr&)连接建立或断开时
MessageCallbackvoid(const TcpConnectionPtr&, Buffer*, Timestamp)收到数据时
WriteCompleteCallbackvoid(const TcpConnectionPtr&)数据发送完成时
HighWaterMarkCallbackvoid(const TcpConnectionPtr&, size_t)缓冲区到达高水位线时
CloseCallbackvoid(const TcpConnectionPtr&)连接关闭时
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(loop),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),nextConnId_(1) {acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {EventLoop* ioLoop = threadPool_->getNextLoop();string connName = name_ + to_string(nextConnId_++);TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, peerAddr));connections_[connName] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
//初始化连接并注册回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();  // 获取下一个 IO 线程char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));// 创建 TcpConnection 对象TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));connections_[connName] = conn;  // 添加到连接管理表// 设置各种回调conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);// 设置关闭回调,当连接关闭时会自动移除conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));// 在 IO 线程中调用 connectEstablished()ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

//getLocalAddr 来获取本地监听地址
struct sockaddr_in6 sockets::getLocalAddr(int sockfd)
{struct sockaddr_in6 localaddr;memZero(&localaddr, sizeof localaddr);socklen_t addrlen = static_cast<socklen_t>(sizeof localaddr);if (::getsockname(sockfd, sockaddr_cast(&localaddr), &addrlen) < 0){LOG_SYSERR << "sockets::getLocalAddr";}return localaddr;
}

TcpServer 类的 removeConnectionremoveConnectionInLoop 函数用于在连接关闭时从服务器的连接管理中移除相应的 TcpConnection 对象 

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{// FIXME: unsafeloop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{loop_->assertInLoopThread();LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();size_t n = connections_.erase(conn->name());(void)n;assert(n == 1);EventLoop* ioLoop = conn->getLoop();ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

TcpServer::~TcpServer()
{loop_->assertInLoopThread();LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";for (auto& item : connections_){TcpConnectionPtr conn(item.second);item.second.reset();conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));}
}

1. 连接绑定线程的原则

每个 TcpConnection 都绑定在一个特定的**事件循环(I/O 线程)**中。
核心思想:

  • 一个连接只能在所属的 I/O 线程中操作,不能跨线程直接操作连接对象。

  • 原因:

    • 事件循环是单线程运行的,如果其他线程直接操作连接,会破坏线程安全,导致数据竞争崩溃

2. 为什么不能直接在析构函数中销毁?

连接的生命周期与服务器不完全一致:

  • 服务器关闭时,可能还有活跃连接未完成数据传输。

  • 直接销毁可能导致正在传输的数据被中断,或者导致未完成的回调直接崩溃。

跨线程问题:
  • 假设:

    • 服务器(主线程)正在析构,直接调用 TcpConnection::connectDestroyed()

    • 但是该连接实际上是在其他 I/O 线程中活跃,直接操作会破坏线程隔离

  • 后果:

    • 如果直接调用销毁函数,容易导致访问未释放或无效内存,导致程序崩溃。

12.TcpConnection

  • 连接管理: 负责连接的建立和关闭。

  • 数据收发: 提供异步发送和接收数据的接口。

回调类型描述典型设置方式
连接回调当连接建立或断开时触发setConnectionCallback()
消息回调当有消息到达时触发setMessageCallback()
写完成回调发送缓冲区中的数据完全发送时触发setWriteCompleteCallback()
高水位回调发送缓冲区数据量超过高水位时触发setHighWaterMarkCallback()
关闭回调连接关闭时触发setCloseCallback()
class TcpConnection : public std::enable_shared_from_this<TcpConnection>, noncopyable{
public:TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr);~TcpConnection();const std::string& name() const { return name_; }        // 获取连接名称const InetAddress& localAddress() const { return localAddr_; } // 获取本地地址const InetAddress& peerAddress() const { return peerAddr_; }   // 获取对端地址bool connected() const { return state_ == kConnected; }  // 检查是否已连接void setConnectionCallback(const ConnectionCallback& cb);void setMessageCallback(const MessageCallback& cb);void setWriteCompleteCallback(const WriteCompleteCallback& cb);void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark);void setCloseCallback(const CloseCallback& cb);void send(Buffer* buf);                    // 发送缓冲区内容void shutdown();                           // 关闭写端void connectEstablished();    // 连接建立时调用void connectDestroyed();      // 连接销毁时调用private:void handleRead(Timestamp receiveTime);     // 读取数据事件void handleWrite();                        // 写入数据事件void handleClose();                        // 关闭事件void handleError();                        // 错误事件void sendInLoop(const void* message, size_t len);void shutdownInLoop();enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };EventLoop* loop_;                  // 事件循环指针const string name_;                // 连接名称StateE state_;                     // 连接状态(连接、断开、正在连接)bool reading_;                     // 是否正在读取std::unique_ptr<Socket> socket_;   // 封装的 TCP 套接字std::unique_ptr<Channel> channel_; // 事件分发器const InetAddress localAddr_;      // 本地地址const InetAddress peerAddr_;       // 对端地址Buffer inputBuffer_;               // 接收缓冲区Buffer outputBuffer_;              // 发送缓冲区ConnectionCallback connectionCallback_;     // 连接回调MessageCallback messageCallback_;           // 消息回调WriteCompleteCallback writeCompleteCallback_; // 写完成回调HighWaterMarkCallback highWaterMarkCallback_; // 高水位回调CloseCallback closeCallback_;               // 关闭回调}

1.std::enable_shared_from_this

std::enable_shared_from_this 是一个辅助类模板,允许在类的成员函数中安全地获取当前对象的 shared_ptr

std::enable_shared_from_this 的工作原理

  1. TcpConnection 对象作为 shared_ptr 被创建时,enable_shared_from_this 会在内部保存一个弱引用。

  2. 当需要获取自身的 shared_ptr 时,调用 shared_from_this() 方法即可。

  3. 这样可以确保即使在类的成员函数中,也不会错误地生成一个新的 shared_ptr,而是与原来的共享控制块关联。

假设你在类方法中直接这样写:

std::shared_ptr<MyClass> ptr(this); // 错误示范

出现问题的原因:

  1. 引用计数错误:

    • 当对象通过 new 创建并使用 std::shared_ptr<T>(this) 包装时,会生成一个新的控制块,与已有的 shared_ptr 无关。

    • 这样做使得同一个对象有两个独立的控制块,各自管理引用计数。

  2. 双重释放:

    • 当第一个 shared_ptr 析构时,引用计数减为零,释放内存。

    • 第二个 shared_ptr 析构时,再次释放已经无效的内存,导致崩溃

2.使用智能指针 std::unique_ptr 来管理 SocketChannel 对象

1.资源独占性:保证 Socket 和 Channel 的唯一所有权

2.自动释放资源:防止内存泄漏

//处理读事件,当有数据到达时触发。
void TcpConnection::handleRead(Timestamp receiveTime) {int savedErrno = 0;ssize_t n = inputBuffer_.readFd(socket_->fd(), &savedErrno);if (n > 0) {messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);  // 调用消息回调} else if (n == 0) {handleClose();  // 对端关闭} else {errno = savedErrno;handleError();  // 读取出错}
}

void TcpConnection::handleClose() {LOG_INFO << "TcpConnection::handleClose - fd = " << channel_->fd();state_ = kDisconnected;channel_->disableAll();  // 停止监听所有事件TcpConnectionPtr guardThis(shared_from_this());connectionCallback_(guardThis);  // 通知上层连接已关闭closeCallback_(guardThis);       // 触发关闭回调
}
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(message);}else{void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;loop_->runInLoop(std::bind(fp,this,     // FIXMEmessage.as_string()));//std::forward<string>(message)));}}
}

sendInLoop() 在网络编程中用于发送数据,但它是在事件循环线程中执行的。

  • 将数据发送到网络连接上(通过 socket 写入)。

  • 如果无法一次性写完,则将剩余数据存入输出缓冲区,等待下次可写时继续发送。

void TcpConnection::sendInLoop(const void* data, size_t len)
{loop_->assertInLoopThread();  // 确保在事件循环线程中ssize_t nwrote = 0;size_t remaining = len;bool faultError = false;if (state_ == kDisconnected){LOG_WARN << "disconnected, give up writing";return;}// Step 1: 直接写入数据(如果可能)if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = sockets::write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else {nwrote = 0;if (errno != EWOULDBLOCK){LOG_SYSERR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET){faultError = true;}}}}// Step 2: 如果没有写完,则保存剩余数据到缓冲区if (!faultError && remaining > 0){size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);if (!channel_->isWriting()){//可写事件监听:表示套接字缓冲区有空闲空间,可以继续写入数据。channel_->enableWriting();}}
}

void TcpConnection::handleWrite()
{loop_->assertInLoopThread();if (channel_->isWriting()){ssize_t n = sockets::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes());if (n > 0){outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){channel_->disableWriting();if (writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}if (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_SYSERR << "TcpConnection::handleWrite";// if (state_ == kDisconnecting)// {//   shutdownInLoop();// }}}else{LOG_TRACE << "Connection fd = " << channel_->fd()<< " is down, no more writing";}
}

TcpConnection 数据发送流程总结​ 

  1. ​直接发送​

    • 当 ​​输出缓冲区为空​​ 且 ​​socket 不可写​​ 时,直接调用 write() 尝试发送数据。
    • 如果全部发送成功,触发 writeCompleteCallback_
  2. ​缓冲剩余数据​

    • 如果未完全发送(或 write() 返回 EAGAIN),剩余数据存入 outputBuffer_,并注册 EPOLLOUT 事件。
  3. ​处理可写事件​

    • 当 EPOLLOUT 触发时,handleWrite() 从 outputBuffer_ 读取数据继续发送。
    • 如果缓冲区清空,取消 EPOLLOUT 监听,避免忙等待。

 将当前对象(TcpConnection)传递给回调函数,以便在回调函数中正确访问和操作该对象

  • 使用 shared_from_this() 获取当前对象的智能指针(std::shared_ptr,而不是裸指针。

  • 这样可以在回调函数中持有对象的生命周期,防止对象在回调执行之前被销毁。

void TcpConnection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);channel_->tie(shared_from_this());  // 防止channel中使用悬空指针// 注册读事件channel_->enableReading();// 调用连接建立回调,通知上层用户连接已经建立connectionCallback_(shared_from_this());
}

void TcpConnection::connectDestroyed() {loop_->assertInLoopThread();if (state_ == kConnected) {setState(kDisconnected);channel_->disableAll();  // 取消所有事件监听// 调用连接关闭回调,通知上层用户连接已关闭connectionCallback_(shared_from_this());}channel_->remove();  // 从 Poller 中移除
}
void TcpConnection::shutdown()
{// FIXME: use compare and swapif (state_ == kConnected){setState(kDisconnecting);// FIXME: shared_from_this()?loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{loop_->assertInLoopThread();if (!channel_->isWriting())    //没有数据发送(写关闭){socket_->shutdownWrite();}
}

相关文章:

  • Bitacora:基因组组件中基因家族识别和注释的综合工具
  • PPO近端策略优化算法
  • 《Python星球日记》 第54天:卷积神经网络进阶
  • SQL注入问题
  • 用jsp简单实现C语言标准化测试系统
  • 2505d,d的借用检查器
  • 【Redis】string 字符串
  • Kubernetes 生产实战(十五):生产环境敏感信息纳入Secret管理指南
  • DB4S:一个开源跨平台的SQLite数据库管理工具
  • ThreadPoolExecutor源码阅读以及手写简单线程池 —— JDK17
  • @Transactional注解失效
  • 用c语言实现——一个交互式的中序线索二叉树系统,支持用户动态构建、线索化、遍历和查询功能
  • 超详细Kokoro-82M本地部署教程
  • 自定义类型-结构体(二)
  • 本地大模型工具深度评测:LM Studio vs Ollama,开发者选型指南
  • Java多线程(超详细版!!)
  • C++STL——priority_queue
  • 【Redis】基础命令数据结构
  • 【C++】string类
  • Linux进程间通信(四)之补充【日志】
  • 西藏日喀则市拉孜县发生5.5级地震,震感明显部分人被晃醒
  • 侧记|青年为何来沪创新创业?从这一天寻找答案
  • 成就彼此,照亮世界:“中欧建交50周年论坛”在沪成功举行
  • 宇树科技王兴兴:第一桶金来自上海,欢迎上海的年轻人加入
  • 央行:货币与物价的关系受多重因素影响,提振物价的关键在于扩大有效需求
  • 蔡达峰:推动食品安全法全面有效实施,为维护人民群众身体健康提供有力法治保障