muduo源码解析
1.对类进行禁止拷贝
class noncopyable
{public:noncopyable(const noncopyable&) = delete;void operator=(const noncopyable&) = delete;protected:noncopyable() = default;~noncopyable() = default;
};
2.日志
使用枚举定义日志等级
enum LogLevel{TRACE,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS,};
1.获取日志实例对象
Logger& instance();
2.设置日志级别
void setLogLevel(int level);
3.写日志
void log(std::string& msg);
#define LOG_info(fmt, ...) \do { \fprintf(stderr, "[INFO] %s:%d: " fmt "\n", \__FILE__, __LINE__, ##__VA_ARGS__); \} while(0)
1.C/C++ 的预处理器规定,一个完整的宏定义必须位于同一逻辑行。但为了代码可读性,需要用 \
显式声明换行。
2.为什么用 do { ... } while(0),
避免宏展开后的语法问题
3. LOG_info(fmt, ...)
-
fmt
:格式化字符串参数(如"Value: %d"
) -
...
:可变参数(变长参数列表),表示可以传递任意数量的额外参数。 -
##__VA_ARGS__:
GNU 扩展语法,处理可变参数
#ifndef LOG_H
#define LOG_H#include <cstdio>
#include <ctime>
#include <cstdarg>// 定义日志级别
enum LogLevel
{TRACE = 0,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS
};// 设置默认日志级别
extern LogLevel g_logLevel; // 在别的地方定义这个全局变量// 宏定义日志级别
#define LOG_TRACE(fmt, ...) \do { \if (g_logLevel <= TRACE) log_message("TRACE", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_DEBUG(fmt, ...) \do { \if (g_logLevel <= DEBUG) log_message("DEBUG", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_INFO(fmt, ...) \do { \if (g_logLevel <= INFO) log_message("INFO", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_WARN(fmt, ...) \do { \if (g_logLevel <= WARN) log_message("WARN", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_ERROR(fmt, ...) \do { \if (g_logLevel <= ERROR) log_message("ERROR", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)#define LOG_FATAL(fmt, ...) \do { \if (g_logLevel <= FATAL) log_message("FATAL", __FILE__, __LINE__, fmt, ##__VA_ARGS__); \} while (0)inline void log_message(const char* level, const char* file, int line, const char* fmt, ...)
{// 获取当前时间char timeBuf[20];std::time_t now = std::time(nullptr);std::strftime(timeBuf, sizeof(timeBuf), "%Y-%m-%d %H:%M:%S", std::localtime(&now));// 打印日志fprintf(stderr, "[%s] [%s] %s:%d: ", timeBuf, level, file, line);va_list args;va_start(args, fmt);vfprintf(stderr, fmt, args);va_end(args);fprintf(stderr, "\n");
}#endif // LOG_H
3.TimeStamp
#include <chrono>
#include <string>
#include <ctime>
#include <iomanip>
#include <sstream>class Timestamp {
public:// 构造函数Timestamp() : tp_(std::chrono::system_clock::now()) {}// 从时间点构造explicit Timestamp(std::chrono::system_clock::time_point tp) : tp_(tp) {}// 获取当前时间戳 (静态工厂方法)static Timestamp now() {return Timestamp();}// 转换为字符串 (默认格式)std::string to_string() const {return format("%Y-%m-%d %H:%M:%S");} private:std::chrono::system_clock::time_point tp_;
};
4.InetAddress
# include<netinet/in.h> // 定义了Internet地址族// 封装socket地址
class InetAdress{
public:explicit InetAddress(std::string& ip, uint16_t port, bool ipv6 = false);explicit InetAddress(const sockaddr_in &addr): addr_(addr);std::string toIp() const;std::string toIpPort() const;uint16_t port() const;
private:socketaddr_in addr_;
}
struct sockaddr_in {short sin_family; // 地址族unsigned short sin_port; // 端口号struct in_addr sin_addr; // IP 地址char sin_zero[8]; // 填充字节,用于对齐
};
InetAddress::InetAddress(StringArg ip, uint16_t port, bool ipv6)
{if (ipv6 || strchr(ip.c_str(), ':')){// ...}else{memset(&addr_, 0, sizeof addr_);addr_->sin_family = AF_INET;addr_->sin_port = htons(port);struct in_addr addr;if (inet_pton(AF_INET, ip, &addr) <= 0) {printf("Invalid IPv4 address\n");} else {printf("IPv4 address converted successfully\n");}addr_->sin_addr = addr;}
}string InetAddress::toIpPort() const
{char buf[64] = "";inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);size_t end = strlen(buf);uint16_t port = ntohs(addr_.sin_port);sprintf(buf + end, ":%u", port);return buf;
}
5.EventLoop
1.获取和管理线程的标识(TID)、线程名称、线程堆栈信息
namespace muduo
{
namespace CurrentThread
{// internalextern __thread int t_cachedTid;extern __thread char t_tidString[32];extern __thread int t_tidStringLength;extern __thread const char* t_threadName;void cacheTid();inline int tid(){if (__builtin_expect(t_cachedTid == 0, 0)){cacheTid();}return t_cachedTid;}inline const char* tidString() // for logging{return t_tidString;}inline int tidStringLength() // for logging{return t_tidStringLength;}inline const char* name(){return t_threadName;}bool isMainThread();void sleepUsec(int64_t usec); // for testingstring stackTrace(bool demangle);
} // namespace CurrentThread
} // namespace muduo
extern
关键字用于声明变量或函数的存储位置- __thread 是 GCC/Clang 提供的关键字,用于声明线程局部存储变量(Thread-Local Storage, TLS)。每个线程都有自己独立的变量副本。
- __builtin_expect 是 GCC/Clang 提供的内置函数,用于向编译器提供分支预测的提示
static_cast<pid_t>(::syscall(SYS_gettid));
2.EventLoop:
1. EventLoop 干什么?(职责)
EventLoop
是事件循环的核心类,负责管理和调度 I/O 事件和异步任务。
主要职责:
-
事件循环管理: 不断循环等待和处理事件。
-
事件分发: 将活跃事件交给合适的回调处理。
-
任务调度: 支持在事件循环线程内安全地执行异步任务。
-
线程唤醒: 当其他线程提交任务时能够唤醒阻塞的事件循环。
class EvenLoop : noncopyable{
public:using Functor = std::function<viod()>;EvenLoop();~EvenLoop();void loop();void quit();//状态查询Timestamp pollReturnTime() const { return pollReturnTime_; }//回调管理void runInLoop(Functor cb);// 在事件循环线程中立即执行回调 cbvoid queueInLoop(Functor cb);//将回调 cb 排队,稍后在事件循环线程中执行size_t queueSize() const;//通道管理void wakeup();void updateChannel(Channel* channel);void removeChannel(Channel* channel);bool hasChannel(Channel* channel);//线程检查bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:using ChannelList = std::vector<Channel*>;bool looping_;/* atomic *///是否正在进行事件循环std::atomic_bool quit_;//是否退出时间循环bool eventHandling_; /* atomic */ //表示是否正在处理事件bool callingPendingFunctors_; /* atomic *///表示是否正在执行待处理回调const pid_t threadId_;//当前循环tid// PollerTimestamp pollReturnTime_;//Poller返回时间std::unique_ptr<Poller> poller_;//Eventloop管理的PollerChannelList activeChannels_;// 存储当前Poller 返回的活动通道列表// Channelint wakeupFd_;//eventfd,用于跨线程唤醒事件循环std::unique_ptr<Channel> wakeupChannel_;//封装 wakeupFd_,绑定到 Poller(如 epoll),监控读事件mutable MutexLock mutex_;std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);// 存储其他线程提交的任务void doPendingFunctors();
}
1.在 muduo 网络库中,EventLoop
类的构造函数初始化了一系列 bool
类型的标志变量(如 looping_
、quit_
、eventHandling_
、callingPendingFunctors_
),这些变量用于 精确控制事件循环的状态和线程安全
looping_
的限制:禁止重复启动事件循环
eventHandling_的限制:
不能在处理事件时析构channel
callingPendingFunctors_
:确保跨线程任务提交或事件循环忙时能够及时唤醒 EventLoop
if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
在 Linux 系统中,获取当前线程的唯一 ID(TID)可以通过 syscall(SYS_gettid)
//防止一个线程创建多个EventLoop
__thread EventLoop* t_loopInThisThread = 0;
2.在 Linux 系统中,::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC)
用于创建一个 事件文件描述符(eventfd),通常用于线程间或进程间的异步事件通知(例如唤醒事件循环)
3. std::atomic_bool 是 C++ 标准库(<atomic> 头文件)提供的一种原子类型,专门用于布尔值的原子操作。它是 std::atomic<bool> 的特化版本,允许在多线程环境中安全地读写布尔值,而无需显式使用锁(如 std::mutex)。std::atomic_bool 确保操作的原子性,避免数据竞争(data race),适用于需要高效、无锁同步的场景。
4.assertInLoopThread();当前线程是否是loop所在线程
-
EventLoop::loop()
-
EventLoop::updateChannel(Channel* channel)
-
EventLoop::removeChannel(Channel* channel)
-
bool EventLoop::hasChannel(Channel* channel)
-
void TcpConnection::sendInLoop(const void* data, size_t len)
-
void TcpConnection::shutdownInLoop()
-
void TcpConnection::startReadInLoop() void TcpConnection::stopReadInLoop()
-
void TcpConnection::connectEstablished() void TcpConnection::connectDestroyed()
-
void TcpConnection::handleRead(Timestamp receiveTime)
-
void TcpConnection::handleWrite()
EventLoop::updateChannel(Channel* channel)|
poller_->updateChannel(channel);
对channel进行操作确保: 当前线程是loop所在线程,当前线程是channel所在线程
void EventLoop::loop()
{looping_ = true;quit_ = false;while (!quit_){activeChannels_.clear();// 等待事件发生poller_->poll(kPollTimeMs, &activeChannels_);// 遍历所有活跃通道,逐个调用其事件处理函数for (Channel* channel : activeChannels_){channel->handleEvent();}// 执行异步任务doPendingFunctors();}looping_ = false;
}
void EventLoop::doPendingFunctors()
{std::vector<Functor> functors; // 定义一个局部任务列表callingPendingFunctors_ = true; // 标记正在执行任务{MutexLockGuard lock(mutex_); // 上锁,保护任务队列functors.swap(pendingFunctors_); // 将任务队列中的任务移到本地变量}// 逐个执行任务for (const Functor& functor : functors){functor(); // 执行任务回调}callingPendingFunctors_ = false; // 执行完毕,重置标记
}
用 swap()
将任务队列和局部变量互换,而不是逐个取出任务:降低锁竞争,只加锁一次,把所有任务交换到局部变量中,后续执行不需要加锁。
2. 为什么需要任务队列?
特性 | handleEvent() | doPendingFunctors() |
---|---|---|
作用 | 处理I/O事件 | 处理异步任务 |
触发时机 | poll()返回后,有活跃事件时触发 | 每轮事件循环末尾,I/O事件处理完毕后 |
优先级 | 高,实时性要求高 | 低,非实时任务 |
来源 | 网络 I/O、定时器、信号等事件 | 其他线程提交的任务、延迟任务 |
典型场景 | 读写网络数据、关闭连接 | 线程间任务提交、定时任务、异步回调 |
线程安全性 | 需要确保 Channel 所在线程安全 | 通过加锁保证线程安全 |
执行频率 | 每个活跃事件都调用 | 每次事件循环结束后调用一次 |
在高性能服务器中,muduo
采用单线程 Reactor 模式,一个 EventLoop
对象只能在一个线程中运行。
但是,多个线程可能需要向同一个事件循环线程提交任务,比如:
-
工作线程向主线程提交任务。
-
异步回调需要在事件循环线程中执行。
-
事件处理函数中异步添加任务。
当需要将任务提交到事件循环时,调用 queueInLoop()
:
void EventLoop::runInLoop(Functor cb)
{if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(Functor cb)
{{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
}
callingPendingFunctors_
:
-
如果当前正在执行回调队列中的回调函数(
callingPendingFunctors_
为true
),并且有新的回调函数被加入到队列中,需要通过wakeup()
方法唤醒事件循环线程,以确保新的回调函数能够被处理。 -
如果不检查
callingPendingFunctors_
,可能会导致事件循环线程被重复唤醒,从而增加不必要的开销
int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd < 0){LOG_SYSERR << "Failed in eventfd";abort();}return evtfd;
}
eventfd 文件描述符(FD),这是一个 Linux 特定的机制,用于线程间通信或事件通知。eventfd 常用于高效的无锁同步。
EventLoop::EventLoop(): looping_(false),quit_(false),eventHandling_(false),callingPendingFunctors_(false),iteration_(0),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),timerQueue_(new TimerQueue(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_)),currentActiveChannel_(NULL)
{LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;if (t_loopInThisThread){LOG_FATAL << "Another EventLoop " << t_loopInThisThread<< " exists in this thread " << threadId_;}else{t_loopInThisThread = this;}wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));// we are always reading the wakeupfdwakeupChannel_->enableReading();
}
void EventLoop::wakeup()
{uint64_t one = 1;ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}
事件循环线程持有 wakeupFd_
,并监听其读端。当其他线程调用 wakeup()
方法时,事件循环线程会从阻塞状态中唤醒。
-
交换后,
pendingFunctors_
变为空向量,新任务可继续写入。 -
局部变量
functors
在栈上析构时自动清理已执行的任务
MutexLockGuard
是一个 RAII(Resource Acquisition Is Initialization,资源获取即初始化)风格的锁管理器。它的作用是确保在作用域结束时自动释放锁,从而避免忘记手动释放锁导致的死锁问题。
RAII 的核心思想是将资源的获取(如分配内存、打开文件、锁定互斥锁等)与对象的构造绑定在一起,将资源的释放与对象的析构绑定在一起,确保资源在对象生命周期结束时自动释放,从而避免资源泄漏和其他资源管理错误。
3.Thread
封装了对 POSIX 线程(pthread)的创建、管理和操作
class Thread : nocopyable{
public:using ThreadFunc = std::function<void()>;explicit Thread(ThreadFunc, const std::string &name = string());~Thread();void start();void join();bool started() const { return started_; }pthread_t pthreadId() const { return pthreadId_; }pid_t tid() const { return tid_; }const string& name() const { return name_; }
pruvate:bool started_;bool joined_;std::shared_ptr<std::thread> thread_;pid_t tid_;ThreadFunc func_;string name_;static std::atomic_int32 numCreated;
}
void Thread::start() {assert(!started_);started_ = true;if (pthread_create(&pthreadId_, nullptr, &startThread, this) != 0) {started_ = false;throw std::runtime_error("Failed to create thread");}latch_.wait(); // 等待线程真正启动
}void* startThread(void* obj)
{ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}
int Thread::join() {assert(started_);assert(!joined_);joined_ = true;return pthread_join(pthreadId_, nullptr);
}
4.EventLoopThread:它将事件循环和线程进行绑定,用于在一个独立线程中运行 EventLoop
,从而避免在主线程中阻塞 I/O 事件处理
class EventLoopThread : noncopable(){
public:using ThreadInitCallback = std::functional<void(EventLoop*)>;EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),const string& name = string());~EventLoopThread();EventLoop* startLoop();private:EventLoop* loop_; // 事件循环指针bool exiting_; // 标志退出Thread thread_; // 封装的线程对象std::mutex mutex_; // 互斥锁,保证线程安全Condition cond_; // 条件变量,同步线程启}
EventLoop* EventLoopThread::startLoop() {assert(!thread_.started());thread_.start(); // 启动线程EventLoop* loop = nullptr;{MutexLockGuard lock(mutex_);while (loop_ == nullptr) { // 等待子线程初始化cond_.wait();}loop = loop_;}return loop;
}
为什么要在循环中检查?
(1)条件变量的常见问题:
-
虚假唤醒(Spurious Wakeup):
-
条件变量在没有通知的情况下,
wait()
可能意外返回。 -
这通常由操作系统调度或中断信号引起。
-
如果仅使用
cond_.wait();
,即使条件未满足,程序也可能继续执行,导致逻辑错误。
-
-
多线程竞争:
-
可能存在多个线程等待同一条件,一旦被唤醒,不一定是期望的线程。
-
没有循环检查,程序容易进入非预期状态
-
void EventLoopThread::threadFunc() {EventLoop loop; // 创建事件循环对象{MutexLockGuard lock(mutex_);loop_ = &loop; // 将loop_指向刚创建的EventLoop对象cond_.notify(); // 通知主线程:loop_已初始化}loop.loop(); // 启动事件循环
}
为什么要使用条件变量?
-
EventLoop
对象是在子线程中创建的,但主线程需要获取它的指针。 -
如果没有同步机制,主线程有可能在子线程创建之前就访问 loop_,导致空指针。
-
通过互斥锁和条件变量,让主线程在
loop_
初始化之前阻塞等待,直到子线程创建完成并通知。
5.EventLoopThreadPool:单个线程的事件循环(EventLoop
)无法应对大量并发请求。
为了提升性能,通常使用多线程事件循环,即线程池:
-
主线程(I/O 线程): 负责监听新连接。
-
工作线程(计算/事件线程): 负责数据读写和计算。
class EventLoopThreadPool : noncopyable {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) { numThreads_ = numThreads; }void start(const ThreadInitCallback& cb = ThreadInitCallback());/// round-robinEventLoop* getNextLoop();/// with the same hash code, it will always return the same EventLoopEventLoop* getLoopForHash(size_t hashCode);std::vector<EventLoop*> getAllLoops();bool started() const{ return started_; }const string& name() const{ return name_; }private:EventLoop* baseLoop_; // 主线程中的EventLoopbool started_; // 是否启动int numThreads_; // 线程数量int next_; // 下一个分配的EventLoop下标std::vector<std::unique_ptr<EventLoopThread>> threads_; // 线程列表std::vector<EventLoop*> loops_; // 事件循环列表};
//启动线程池 void EventLoopThreadPool::start() {assert(!started_);baseLoop_->assertInLoopThread();started_ = true;for (int i = 0; i < numThreads_; ++i) {auto thread = std::make_unique<EventLoopThread>();loops_.push_back(thread->startLoop()); // 启动并获取事件循环threads_.push_back(std::move(thread)); // 存储线程对象}// 如果没有设置线程,主线程直接作为唯一的EventLoopif (numThreads_ == 0) {loops_.push_back(baseLoop_);} }
//获取下一个事件循环 EventLoop* EventLoopThreadPool::getNextLoop() {baseLoop_->assertInLoopThread();EventLoop* loop = baseLoop_;if (!loops_.empty()) {loop = loops_[next_];next_ = (next_ + 1) % loops_.size(); // 轮询选择下一个Loop}return loop; }
6.channel
作为 Reactor 模式中的事件处理器,封装了文件描述符的事件管理和回调处理
1.关键的成员变量
变量 | 类型 | 作用 |
---|---|---|
fd_ | const int | 管理的文件描述符(socket/eventfd/timerfd 等),Channel 只管理但不拥有它 |
events_ | int | 关注的事件类型(EPOLLIN /EPOLLOUT 等),通过 enableReading() /enableWriting() 设置 |
revents_ | int | 实际触发的事件类型(由 Poller/Epoll 返回) |
loop_ | EventLoop* | 所属的事件循环,所有操作必须在对应 EventLoop 线程中执行 |
readCallback_ | ReadEventCallback | 读事件回调(带时间戳参数,如收到数据时调用) |
writeCallback_ | EventCallback | 写事件回调(如可写时调用) |
closeCallback_ | EventCallback | 连接关闭回调 |
errorCallback_ | EventCallback | 错误处理回调 |
tie_ | std::weak_ptr<void> | 生命周期绑定(防止 Channel 在处理事件期间被意外销毁) |
2.关键的成员函数
1. 事件管理
函数 | 作用 | |
---|---|---|
enableReading() | 注册读事件(`events_ | = kReadEvent`) |
disableReading() | 取消读事件 | |
enableWriting() | 注册写事件(`events_ | = kWriteEvent`) |
disableWriting() | 取消写事件 | |
disableAll() | 取消所有事件 | |
update() | 通知 EventLoop 更新关注的事件(内部调用 EventLoop::updateChannel() ) |
2. 事件处理核心
函数 | 作用 |
---|---|
handleEvent(Timestamp) | 处理事件的入口(根据 revents_ 调用对应的回调) |
handleEventWithGuard(Timestamp) | 实际处理事件(加生命周期保护) |
setReadCallback() /setWriteCallback() | 设置读/写/关闭/错误的回调函数 |
3. 生命周期控制
函数 | 作用 |
---|---|
tie(const std::shared_ptr<void>&) | 绑定共享指针,防止回调执行期间对象被销毁 |
remove() | 从 EventLoop 中移除 Channel |
3.核心设计思想
-
事件与回调分离
-
events_
和revents_
分离,由 Poller 监听events_
,返回revents_
。 -
通过
setXXXCallback()
设置业务逻辑,Channel 只负责事件分发。
-
-
线程安全
-
所有操作必须通过
EventLoop
在 IO 线程执行(通过loop_
指针保证)。
-
-
资源管理
-
使用
tie_
绑定共享指针(如TcpConnection
),避免处理事件时对象被析构。
-
-
高效事件过滤
-
isReading()
/isWriting()
快速检查事件状态,避免无效回调。
-
class Channel : noncopyable{
public:using EventCallback = std::function<void()>;using ReadEventCallback = std::function<void(Timestamp)>;Channel(EventLoop* loop, int fd);~Channel();// poller通知后,处理事件
// 是否绑定对象,来决定是否需要使用智能指针 guard 来保护这个对象的生命周期void handleEvent(Timestamp receiveTime);//设置回调函数对象:movevoid setReadCallback(ReadEventCallback cb){ readCallback_ = std::move(cb); }void setWriteCallback(EventCallback cb){ writeCallback_ = std::move(cb); }void setCloseCallback(EventCallback cb){ closeCallback_ = std::move(cb); }void setErrorCallback(EventCallback cb){ errorCallback_ = std::move(cb); }// 防止channel执行回调时被删除void tie(const std::shared_ptr<void>&);// 事件管理void enableReading() { events_ |= kReadEvent; update(); }void disableReading() { events_ &= ~kReadEvent; update(); }void enableWriting() { events_ |= kWriteEvent; update(); }void disableWriting() { events_ &= ~kWriteEvent; update(); }void disableAll() { events_ = kNoneEvent; update(); }bool isWriting() const { return events_ & kWriteEvent; }bool isReading() const { return events_ & kReadEvent; }bool isNoneEvent() const { return events_ == kNoneEvent; }int fd() const { return fd_; }int events() const { return events_; }void set_revents(int revt) { revents_ = revt; }EventLoop* ownerLoop() { return loop_; }// for Pollerint index() { return index_; }void set_index(int idx) { index_ = idx; }void remove();private:void update();void handleEventWithGuard(Timestamp receiveTime);static const int kNoneEvent;static const int kReadEvent;static const int kWriteEvent;EventLoop* loop_;const int fd_;int events_;int revents_; // it's the received event types of epoll or pollint index_; // used by Poller.(channel的状态:new/added/deleted)std::weak_ptr<void> tie_;//回调函数ReadEventCallback readCallback_;EventCallback writeCallback_;EventCallback closeCallback_;EventCallback errorCallback_;
}
1. guard
和 tie
的作用(防止对象生命周期问题)
在事件驱动模型中,Channel
对象通常与某个资源(如 TCP 连接 TcpConnection
)关联。当事件触发时,Channel
会调用用户注册的回调函数(如 readCallback_
)。
风险:在回调执行期间,关联的资源(如 TcpConnection
)可能被其他线程销毁,导致回调访问野指针,引发崩溃。
在 handleEvent
中尝试将 tie_
提升为 shared_ptr
(guard = tie_.lock()
)
在 muduo 的 Channel
类中,使用 std::weak_ptr<void>
而非 std::shared_ptr<void>
来管理关联对象的生命周期,主要基于以下设计考量:
1. 避免循环引用(核心原因)
-
问题场景:
如果Channel
直接持有shared_ptr<void>
,而关联对象(如TcpConnection
)又持有Channel
的shared_ptr
,会导致循环引用 -
解决方案:
weak_ptr
是弱引用,不会增加引用计数,打破循环依赖。
只有通过lock()
临时提升为shared_ptr
时才增加计数,确保安全访问。
2. 明确所有权关系
-
Channel
不拥有资源:
Channel
只是事件处理器,其关联对象(如TcpConnection
)应由更高层(如EventLoop
或用户代码)管理生命周期。- 使用
weak_ptr
表明Channel
仅“观察”资源,不参与所有权管理。 - 资源销毁时,
tie_.lock()
会返回nullptr
,Channel
自动跳过回调。
- 使用
-
对比
shared_ptr
:
如果Channel
持有shared_ptr
,会模糊所有权边界,增加资源意外存活的风险。
std::weak_ptr
是 C++ 标准库中的一种智能指针类型,用于解决 std::shared_ptr
的循环引用问题。它允许一个对象安全地引用另一个对象,而不会增加引用计数。std::weak_ptr
通常用于观察(但不拥有)一个由 std::shared_ptr
管理的对象。lock()
方法,尝试将弱引用提升为强引用(std::shared_ptr
)
循环引用:假设我们有两个类 A
和 B
,它们相互引用。如果不使用 std::weak_ptr
,会导致循环引用。
2.setXXXCallback()
函数使用 std::move
来设置回调函数:避免不必要的拷贝,提高性能
3.std::function
是 C++ 标准库中的一个模板类,用于封装可调用对象
7.poller
1.poller
封装操作系统的 I/O 多路复用机制(如 poll
, epoll
),提供一个统一接口让 EventLoop
能获取到就绪的 I/O 事件(如可读、可写、出错等),并通知对应的 Channel
。 EventLoop
的核心组件之一
功能 | 描述 |
---|
抽象封装 | 抽象 I/O 多路复用(poll/epoll/...) |
事件驱动核心 | 与 EventLoop 协作,驱动整个网络库 |
管理 fd → Channel 映射 | 负责让系统知道我们关心哪些事件、什么时候触发 |
class Poller : noncopyable{
Public:using ChannelList = std::vector<Channel*>;Poller(EventLoop *loop);virtual ~Poller();// 虚函数接口(抽象方法)// 执行一次 I/O 事件轮询(超时等待 timeoutMs 毫秒),将活跃的 Channel 填入virtual TimeStamp poll(int timeoutMs, ChannelList* activeChannels) = 0;// 当 Channel 感兴趣的事件发生变化时(如从监听读改为监听写),调用该函数virtual void updataChannel(Channel* channel) = 0;//从 Poller 中移除某个 Channelvirtual void removeChannel(Channel* channel) = 0;virtual bool hasChannel(Channel* channel) const;//工厂方法static Poller* newDefaultPoller(EventLoop* loop);protected:using ChannelMap = std::unordered<int, Channel*>;ChannelMap channels_;
private:EventLoop *ownweLoop_;
}
static Poller* newDefaultPoller(EventLoop* loop);是工厂方法,工厂方法是将对象的创建“推迟”到子类或函数中,“返回基类指针”是工厂方法模式最常见、最有用的实践形式。实现“抽象创建 + 多态调用”的关键方式
2.EPollPoller
EPollPoller
是对 epoll
的封装,实现了 Poller
接口,核心作用是“等待就绪事件并通知 EventLoop”
class EPollPoller : public Poller{
public:EPollEpoller(EventLoop *loop);~EPollPoller() override;Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;// epoll_waitvoid updateChannel(Channel* channel) override; //epoll_ctl :add/modvoid removeChannel(Channel* channel) override; //epoll_ctl :delprivate:void fillActiveChannels(int numEvents, ChannelList* activeChannels) const;// 将 epoll_event 转成 Channelvoid update(int operation, Channel* channel);// 封装 epoll_ctlint epollfd_;EventList events_;//就绪的IO事件
}
8.Socket:提供对底层 socket 操作的封装(如设置选项、绑定、连接、关闭等)
class Socket : noncopyable {public:explicit Socket(int sockfd) : sockfd_(sockfd) {}~Socket();int fd() const { return sockfd_; }void bindAddress(const InetAddress& localaddr);void listen();int accept(InetAddress* peeraddr);void shutdownWrite();void setTcpNoDelay(bool on);void setReuseAddr(bool on);void setReusePort(bool on);void setKeepAlive(bool on);private:const int sockfd_;
};
1. TCP 选项总览:
TCP 选项 | 对应方法 | 作用 |
---|---|---|
禁用 Nagle 算法 | setTcpNoDelay(bool on) | 减少小包延迟,提高实时性 |
地址复用(SO_REUSEADDR) | setReuseAddr(bool on) | 解决端口占用,快速重启服务 |
端口复用(SO_REUSEPORT) | setReusePort(bool on) | 多进程监听同一端口,提高并发能力 |
TCP 保活 | setKeepAlive(bool on) | 检测长时间空闲连接,防止假死 |
2. TCP_NODELAY:禁用 Nagle 算法
-
Nagle算法:
-
为了减少网络中小数据包的数量,TCP 默认会将小包合并后再发送。
-
只有当前一个数据包得到ACK 确认后,才发送下一个小包。
-
-
问题:
-
在实时通信(如即时消息、游戏)中,小包积累和延迟会影响性能。
-
-
低延迟要求高的应用: 如即时通信、实时游戏。
大量小数据包传输: 如流媒体、物联网设备。
void Socket::setTcpNoDelay(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
}
-
IPPROTO_TCP
: 指定协议类型。 -
TCP_NODELAY
: 禁用 Nagle 算法。 -
optval
: 1 为禁用,0 为启用。
3. SO_REUSEADDR:地址复用
-
场景:
-
当服务器意外崩溃或重启后,端口可能仍处于 TIME_WAIT 状态,导致端口被占用。
-
-
问题:
-
无法立即重新绑定相同端口,导致服务重启失败。
-
-
服务器重启: 服务器崩溃后端口快速重用。
-
端口复用: 多个进程监听相同端口(配合 SO_REUSEPORT)。
void Socket::setReuseAddr(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
}
-
SOL_SOCKET
: 套接字级别的选项。 -
SO_REUSEADDR
: 地址复用。 -
optval
: 1 为启用,0 为禁用。
4. SO_REUSEPORT:端口复用
-
场景:
-
多核服务器上,为了充分利用 CPU 资源,通常使用多个进程监听相同端口。
-
-
问题:
-
未开启时,多个进程无法绑定同一端口,导致服务扩展性差。
-
-
多核服务器: 高并发服务器提高吞吐量。
-
负载均衡: 允许多个进程共享一个监听端口。
void Socket::setReusePort(bool on) {
#ifdef SO_REUSEPORTint optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
#elseif (on) {LOG_SYSERR << "SO_REUSEPORT is not supported.";}
#endif
}
5. SO_KEEPALIVE:TCP 保活
-
场景:
-
长时间空闲连接,如客户端掉线或网络中断。
-
-
问题:
-
服务器长时间无法检测到客户端断开。
-
-
长连接服务: 保持连接状态,防止假死。
-
防止资源浪费: 检测空闲连接,及时释放。
void Socket::setKeepAlive(bool on) {int optval = on ? 1 : 0;::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
}
TIME_WAIT 产生的原因
当主动关闭连接的一方发送 FIN
并接收到对方的 ACK
后,进入 TIME_WAIT
状态:
-
防止旧数据包干扰新连接:
-
如果立即释放端口,新连接可能收到上一个连接的残留数据包。
-
-
确保对方收到 ACK:
-
最后一个 ACK 可能丢失,对方会重新发送 FIN,TIME_WAIT 可以重新应答。
-
3. TIME_WAIT 的问题
在高并发服务器中,TIME_WAIT 数量过多会导致以下问题:
-
端口耗尽:
-
短时间内建立大量连接,导致端口不足。
-
-
重启服务失败:
-
上次监听的端口仍处于
TIME_WAIT
,导致新进程无法重新绑定相同端口。
-
-
资源占用:
-
系统中有大量处于
TIME_WAIT
状态的连接,浪费内存和 CPU。
-
SO_REUSEADDR 是一个套接字选项,允许服务器在端口处于 TIME_WAIT 时快速重启。
-
允许端口复用:
-
即使处于
TIME_WAIT
,也能重新绑定相同的端口。
-
-
避免端口占用:
-
当服务器崩溃重启时,避免地址已被占用的错误。
-
使用示例:TCP 服务器
InetAddress serverAddr(8080);
Socket listenSock(::socket(AF_INET, SOCK_STREAM, 0)); // 创建套接字listenSock.setReuseAddr(true); // 设置地址复用
listenSock.bindAddress(serverAddr); // 绑定端口
listenSock.listen(); // 监听InetAddress clientAddr;
int connfd = listenSock.accept(&clientAddr); // 接受连接if (connfd >= 0) {printf("New connection from %s\n", clientAddr.toIpPort().c_str());Socket connSocket(connfd); // 管理新连接connSocket.setTcpNoDelay(true); // 禁用 Nagle 算法connSocket.shutdownWrite(); // 关闭写操作
}
9.Acceptor:监听客户端连接并接收新连接
class Acceptor : noncopyable {public:typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);void setNewConnectionCallback(const NewConnectionCallback& cb) {newConnectionCallback_ = cb;}bool listening() const { return listening_; }void listen();private:void handleRead();EventLoop* loop_;Socket acceptSocket_; // 封装监听套接字Channel acceptChannel_; // 封装监听事件NewConnectionCallback newConnectionCallback_; // 新连接回调bool listening_;
};
//构造函数
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),acceptChannel_(loop, acceptSocket_.fd()),listening_(false) {acceptSocket_.setReuseAddr(true);acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}
void Acceptor::handleRead() {InetAddress peerAddr;int connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0) {if (newConnectionCallback_) {newConnectionCallback_(connfd, peerAddr);} else {sockets::close(connfd);}}
}
10. buffer:是存储从套接字读入的数据或待发送的数据,并进行高效的读写操作
+-------------------+------------------+------------------+
| prependable bytes | readable bytes | writable bytes |
| | (数据区域) | |
+-------------------+------------------+------------------+
0 readerIndex_ writerIndex_ buffer_.size()
class Buffer : copable{
public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend){}size_t readableBytes() const{ return writerIndex_ - readerIndex_; }size_t writableBytes() const{ return buffer_.size() - writerIndex_; }size_t prependableBytes() const{ return readerIndex_; }const char* peek() const{ return begin() + readerIndex_; }//可读数据起始地址void retrieveAll()//读写指针复位{readerIndex_ = kCheapPrepend;writerIndex_ = kCheapPrepend;}void retrieve(size_t len)//读指针偏移{assert(len <= readableBytes());if (len < readableBytes()){readerIndex_ += len;}else{retrieveAll();}}string retrieveAllAsString(){return retrieveAsString(readableBytes());//将所有可读数据转成字符串}string retrieveAsString(size_t len)//将可读数据转成字符串{assert(len <= readableBytes());string result(peek(), len);retrieve(len);return result;}void makeSpace(size_t len)//扩容{if (writableBytes() + prependableBytes() < len + kCheapPrepend){// FIXME: move readable databuffer_.resize(writerIndex_+len);}else{// 将可读数据前移,覆盖已读部分assert(kCheapPrepend < readerIndex_);size_t readable = readableBytes();std::copy(begin()+readerIndex_,begin()+writerIndex_,begin()+kCheapPrepend);readerIndex_ = kCheapPrepend;writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}void ensureWritableBytes(size_t len)//判断是否有足够空间可写{if (writableBytes() < len){makeSpace(len);}assert(writableBytes() >= len);}void append(const StringPiece& str)//追加写内容{append(str.data(), str.size());}void append(const char* /*restrict*/ data, size_t len){ensureWritableBytes(len);std::copy(data, data+len, beginWrite());hasWritten(len);}ssize_t readFd(int fd, int* savedErrno);//从文件描述符读数据private:char* begin(){ return &*buffer_.begin(); }//数组首元素地址std::vector<char> buffer_; // 底层存储容器size_t readerIndex_; // 读起始位置size_t writerIndex_; // 写起始位置}
ssize_t readFd(int fd, int* savedErrno) {char extraBuf[65536]; // 栈上空间struct iovec vec[2];const size_t writable = writableBytes();vec[0].iov_base = begin() + writerIndex_;vec[0].iov_len = writable;vec[1].iov_base = extraBuf;vec[1].iov_len = sizeof(extraBuf);const int iovcnt = (writable < sizeof(extraBuf)) ? 2 : 1;const ssize_t n = ::readv(fd, vec, iovcnt);if (n < 0) {*savedErrno = errno;} else if (implicit_cast<size_t>(n) <= writable) {writerIndex_ += n;} else {writerIndex_ = buffer_.size();append(extrabuf, n - writable);}return n;
}
readv
和 writev
是POSIX 提供的系统调用,“分散读”和"聚集写"操作,它们用于一次性从文件描述符读取多个缓冲区的数据,将多个缓冲区数据写入文件描述符
#include <sys/uio.h>ssize_t readv(int fd, const struct iovec *iov, int iovcnt);
从 fd
顺序读取数据,依次填充到 iov[0]
、iovec[1]
... 直到数据读完或所有缓冲区填满
struct iovec {void *iov_base; // 缓冲区起始地址size_t iov_len; // 缓冲区长度
};
iovec
是 readv
的核心参数,用于指定数据读取的目标缓冲区。这里设置了 两个缓冲区:
(1) 主缓冲区(vec[0]
)
vec[0].iov_base = begin() + writerIndex_; // 指向当前可写位置的起始地址 vec[0].iov_len = writable; // 可写入的最大字节数
-
目的:
优先将数据直接读取到 内部主缓冲区 的剩余空间(writerIndex_
之后的部分)。 -
优点:
-
避免内存拷贝:如果数据能完全放入主缓冲区,无需额外处理。
-
减少内存碎片:复用已有的缓冲区空间。
-
(2) 备用栈缓冲区(vec[1]
)
vec[1].iov_base = extraBuf; // 栈上的临时缓冲区 vec[1].iov_len = sizeof(extraBuf); // 固定大小(64KB)
-
目的:
如果主缓冲区剩余空间不足(writable
太小),则 超额数据 会暂存到栈上的extraBuf
。 -
优点:
-
避免内存浪费:主缓冲区空间不足时,临时用栈空间兜底(栈分配速度快,且函数退出后自动释放)。
-
防止丢包:即使主缓冲区满,也能保证数据不丢失(暂存到
extraBuf
后再处理)
-
11.TcpServer
1.TcpServer 的作用
-
管理连接:
-
负责接受客户端连接,并生成相应的 TcpConnection 对象。
-
-
线程分配:
-
使用 EventLoopThreadPool 进行多线程管理,保证高并发。
-
-
消息处理:
通过回调机制(如连接建立、消息接收、连接关闭)实现事件响应。
class TcpServer : noncopyable {public:typedef std::function<void(EventLoop*)> ThreadInitCallback;enum Option{kNoReusePort,kReusePort,};TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option = kNoReusePort);~TcpServer();void setConnectionCallback(const ConnectionCallback& cb){ connectionCallback_ = cb; }void setMessageCallback(const MessageCallback& cb){ messageCallback_ = cb; }void start(); // 启动服务器private:void newConnection(int sockfd, const InetAddress& peerAddr);void removeConnection(const TcpConnectionPtr& conn);void removeConnectionInLoop(const TcpConnectionPtr& conn);EventLoop* loop_; // 主线程的事件循环const string ipPort_;const string name_;std::unique_ptr<Acceptor> acceptor_;std::shared_ptr<EventLoopThreadPool> threadPool_;ConnectionCallback connectionCallback_;MessageCallback messageCallback_;WriteCompleteCallback writeCompleteCallback_;ThreadInitCallback threadInitCallback_; //线程初始化回调std::atomic started_;int nextConnId_;std::unordered_map<string, TcpConnectionPtr> connections_; // 连接管理
};
枚举(enum
)是一种用户自定义的数据类型,它允许为一组整数值赋予有意义的名称
2. 常见的回调类型
回调名称 | 类型 | 触发时机 |
---|---|---|
ConnectionCallback | void(const TcpConnectionPtr&) | 连接建立或断开时 |
MessageCallback | void(const TcpConnectionPtr&, Buffer*, Timestamp) | 收到数据时 |
WriteCompleteCallback | void(const TcpConnectionPtr&) | 数据发送完成时 |
HighWaterMarkCallback | void(const TcpConnectionPtr&, size_t) | 缓冲区到达高水位线时 |
CloseCallback | void(const TcpConnectionPtr&) | 连接关闭时 |
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(loop),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),nextConnId_(1) {acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {EventLoop* ioLoop = threadPool_->getNextLoop();string connName = name_ + to_string(nextConnId_++);TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, peerAddr));connections_[connName] = conn;conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1));ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
//初始化连接并注册回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop(); // 获取下一个 IO 线程char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;std::string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));// 创建 TcpConnection 对象TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));connections_[connName] = conn; // 添加到连接管理表// 设置各种回调conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);// 设置关闭回调,当连接关闭时会自动移除conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, std::placeholders::_1));// 在 IO 线程中调用 connectEstablished()ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
//getLocalAddr 来获取本地监听地址
struct sockaddr_in6 sockets::getLocalAddr(int sockfd)
{struct sockaddr_in6 localaddr;memZero(&localaddr, sizeof localaddr);socklen_t addrlen = static_cast<socklen_t>(sizeof localaddr);if (::getsockname(sockfd, sockaddr_cast(&localaddr), &addrlen) < 0){LOG_SYSERR << "sockets::getLocalAddr";}return localaddr;
}
TcpServer
类的 removeConnection
和 removeConnectionInLoop
函数用于在连接关闭时从服务器的连接管理中移除相应的 TcpConnection
对象
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{// FIXME: unsafeloop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{loop_->assertInLoopThread();LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();size_t n = connections_.erase(conn->name());(void)n;assert(n == 1);EventLoop* ioLoop = conn->getLoop();ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
TcpServer::~TcpServer()
{loop_->assertInLoopThread();LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";for (auto& item : connections_){TcpConnectionPtr conn(item.second);item.second.reset();conn->getLoop()->runInLoop(std::bind(&TcpConnection::connectDestroyed, conn));}
}
1. 连接绑定线程的原则
每个 TcpConnection
都绑定在一个特定的**事件循环(I/O 线程)**中。
核心思想:
-
一个连接只能在所属的 I/O 线程中操作,不能跨线程直接操作连接对象。
-
原因:
-
事件循环是单线程运行的,如果其他线程直接操作连接,会破坏线程安全,导致数据竞争或崩溃。
-
2. 为什么不能直接在析构函数中销毁?
连接的生命周期与服务器不完全一致:
-
服务器关闭时,可能还有活跃连接未完成数据传输。
-
直接销毁可能导致正在传输的数据被中断,或者导致未完成的回调直接崩溃。
跨线程问题:
-
假设:
-
服务器(主线程)正在析构,直接调用
TcpConnection::connectDestroyed()
。 -
但是该连接实际上是在其他 I/O 线程中活跃,直接操作会破坏线程隔离。
-
-
后果:
-
如果直接调用销毁函数,容易导致访问未释放或无效内存,导致程序崩溃。
-
12.TcpConnection
-
连接管理: 负责连接的建立和关闭。
-
数据收发: 提供异步发送和接收数据的接口。
回调类型 | 描述 | 典型设置方式 |
---|---|---|
连接回调 | 当连接建立或断开时触发 | setConnectionCallback() |
消息回调 | 当有消息到达时触发 | setMessageCallback() |
写完成回调 | 发送缓冲区中的数据完全发送时触发 | setWriteCompleteCallback() |
高水位回调 | 发送缓冲区数据量超过高水位时触发 | setHighWaterMarkCallback() |
关闭回调 | 连接关闭时触发 | setCloseCallback() |
class TcpConnection : public std::enable_shared_from_this<TcpConnection>, noncopyable{
public:TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr);~TcpConnection();const std::string& name() const { return name_; } // 获取连接名称const InetAddress& localAddress() const { return localAddr_; } // 获取本地地址const InetAddress& peerAddress() const { return peerAddr_; } // 获取对端地址bool connected() const { return state_ == kConnected; } // 检查是否已连接void setConnectionCallback(const ConnectionCallback& cb);void setMessageCallback(const MessageCallback& cb);void setWriteCompleteCallback(const WriteCompleteCallback& cb);void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark);void setCloseCallback(const CloseCallback& cb);void send(Buffer* buf); // 发送缓冲区内容void shutdown(); // 关闭写端void connectEstablished(); // 连接建立时调用void connectDestroyed(); // 连接销毁时调用private:void handleRead(Timestamp receiveTime); // 读取数据事件void handleWrite(); // 写入数据事件void handleClose(); // 关闭事件void handleError(); // 错误事件void sendInLoop(const void* message, size_t len);void shutdownInLoop();enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };EventLoop* loop_; // 事件循环指针const string name_; // 连接名称StateE state_; // 连接状态(连接、断开、正在连接)bool reading_; // 是否正在读取std::unique_ptr<Socket> socket_; // 封装的 TCP 套接字std::unique_ptr<Channel> channel_; // 事件分发器const InetAddress localAddr_; // 本地地址const InetAddress peerAddr_; // 对端地址Buffer inputBuffer_; // 接收缓冲区Buffer outputBuffer_; // 发送缓冲区ConnectionCallback connectionCallback_; // 连接回调MessageCallback messageCallback_; // 消息回调WriteCompleteCallback writeCompleteCallback_; // 写完成回调HighWaterMarkCallback highWaterMarkCallback_; // 高水位回调CloseCallback closeCallback_; // 关闭回调}
1.std::enable_shared_from_this
std::enable_shared_from_this
是一个辅助类模板,允许在类的成员函数中安全地获取当前对象的 shared_ptr
std::enable_shared_from_this
的工作原理
-
当
TcpConnection
对象作为shared_ptr
被创建时,enable_shared_from_this
会在内部保存一个弱引用。 -
当需要获取自身的
shared_ptr
时,调用shared_from_this()
方法即可。 -
这样可以确保即使在类的成员函数中,也不会错误地生成一个新的
shared_ptr
,而是与原来的共享控制块关联。
假设你在类方法中直接这样写:
std::shared_ptr<MyClass> ptr(this); // 错误示范
出现问题的原因:
-
引用计数错误:
-
当对象通过
new
创建并使用std::shared_ptr<T>(this)
包装时,会生成一个新的控制块,与已有的shared_ptr
无关。 -
这样做使得同一个对象有两个独立的控制块,各自管理引用计数。
-
-
双重释放:
-
当第一个
shared_ptr
析构时,引用计数减为零,释放内存。 -
第二个
shared_ptr
析构时,再次释放已经无效的内存,导致崩溃。
-
2.使用智能指针 std::unique_ptr
来管理 Socket 和 Channel 对象
1.资源独占性:保证 Socket 和 Channel 的唯一所有权
2.自动释放资源:防止内存泄漏
//处理读事件,当有数据到达时触发。
void TcpConnection::handleRead(Timestamp receiveTime) {int savedErrno = 0;ssize_t n = inputBuffer_.readFd(socket_->fd(), &savedErrno);if (n > 0) {messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); // 调用消息回调} else if (n == 0) {handleClose(); // 对端关闭} else {errno = savedErrno;handleError(); // 读取出错}
}
void TcpConnection::handleClose() {LOG_INFO << "TcpConnection::handleClose - fd = " << channel_->fd();state_ = kDisconnected;channel_->disableAll(); // 停止监听所有事件TcpConnectionPtr guardThis(shared_from_this());connectionCallback_(guardThis); // 通知上层连接已关闭closeCallback_(guardThis); // 触发关闭回调
}
void TcpConnection::send(const std::string &buf)
{if (state_ == kConnected){if (loop_->isInLoopThread()){sendInLoop(message);}else{void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;loop_->runInLoop(std::bind(fp,this, // FIXMEmessage.as_string()));//std::forward<string>(message)));}}
}
sendInLoop()
在网络编程中用于发送数据,但它是在事件循环线程中执行的。
-
将数据发送到网络连接上(通过 socket 写入)。
-
如果无法一次性写完,则将剩余数据存入输出缓冲区,等待下次可写时继续发送。
void TcpConnection::sendInLoop(const void* data, size_t len)
{loop_->assertInLoopThread(); // 确保在事件循环线程中ssize_t nwrote = 0;size_t remaining = len;bool faultError = false;if (state_ == kDisconnected){LOG_WARN << "disconnected, give up writing";return;}// Step 1: 直接写入数据(如果可能)if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = sockets::write(channel_->fd(), data, len);if (nwrote >= 0){remaining = len - nwrote;if (remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else {nwrote = 0;if (errno != EWOULDBLOCK){LOG_SYSERR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET){faultError = true;}}}}// Step 2: 如果没有写完,则保存剩余数据到缓冲区if (!faultError && remaining > 0){size_t oldLen = outputBuffer_.readableBytes();if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);if (!channel_->isWriting()){//可写事件监听:表示套接字缓冲区有空闲空间,可以继续写入数据。channel_->enableWriting();}}
}
void TcpConnection::handleWrite()
{loop_->assertInLoopThread();if (channel_->isWriting()){ssize_t n = sockets::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes());if (n > 0){outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){channel_->disableWriting();if (writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}if (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_SYSERR << "TcpConnection::handleWrite";// if (state_ == kDisconnecting)// {// shutdownInLoop();// }}}else{LOG_TRACE << "Connection fd = " << channel_->fd()<< " is down, no more writing";}
}
TcpConnection
数据发送流程总结
-
直接发送
- 当 输出缓冲区为空 且 socket 不可写 时,直接调用
write()
尝试发送数据。 - 如果全部发送成功,触发
writeCompleteCallback_
。
- 当 输出缓冲区为空 且 socket 不可写 时,直接调用
-
缓冲剩余数据
- 如果未完全发送(或
write()
返回EAGAIN
),剩余数据存入outputBuffer_
,并注册EPOLLOUT
事件。
- 如果未完全发送(或
-
处理可写事件
- 当
EPOLLOUT
触发时,handleWrite()
从outputBuffer_
读取数据继续发送。 - 如果缓冲区清空,取消
EPOLLOUT
监听,避免忙等待。
- 当
将当前对象(TcpConnection
)传递给回调函数,以便在回调函数中正确访问和操作该对象
-
使用
shared_from_this()
获取当前对象的智能指针(std::shared_ptr
),而不是裸指针。 -
这样可以在回调函数中持有对象的生命周期,防止对象在回调执行之前被销毁。
void TcpConnection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);channel_->tie(shared_from_this()); // 防止channel中使用悬空指针// 注册读事件channel_->enableReading();// 调用连接建立回调,通知上层用户连接已经建立connectionCallback_(shared_from_this());
}
void TcpConnection::connectDestroyed() {loop_->assertInLoopThread();if (state_ == kConnected) {setState(kDisconnected);channel_->disableAll(); // 取消所有事件监听// 调用连接关闭回调,通知上层用户连接已关闭connectionCallback_(shared_from_this());}channel_->remove(); // 从 Poller 中移除
}
void TcpConnection::shutdown()
{// FIXME: use compare and swapif (state_ == kConnected){setState(kDisconnecting);// FIXME: shared_from_this()?loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));}
}void TcpConnection::shutdownInLoop()
{loop_->assertInLoopThread();if (!channel_->isWriting()) //没有数据发送(写关闭){socket_->shutdownWrite();}
}