Connector
Connector 类详细介绍
一、类的功能概述
Connector 是网络库中用于管理客户端连接的关键类。它负责:
- 发起非阻塞连接到服务器
- 处理连接成功/失败的情况
- 在连接失败时自动重试,重试间隔指数增长
- 通过回调机制通知上层代码连接结果
二、主要 API 接口
2.1 公共方法
Connector(EventLoop* loop, const InetAddress& serverAddr)
- 功能:构造函数,初始化连接器
- 参数:
loop:事件循环对象serverAddr:目标服务器地址
- 初始状态:
kDisconnected,重试延迟为 500ms
void setNewConnectionCallback(const NewConnectionCallback& cb)
- 功能:设置连接成功时的回调函数
- 参数:
cb是std::function<void (int sockfd)>类型的回调 - 调用线程:任意线程
void start()
- 功能:开始连接操作
- 调用线程:任意线程
- 原理:将
startInLoop()任务提交到事件循环线程 - 关键标志:设置
connect_ = true
void restart()
- 功能:重启连接操作(重置重试延迟和状态)
- 调用线程:只能在事件循环线程中调用
- 作用:将重试延迟重置为初始值 500ms
void stop()
- 功能:停止连接操作
- 调用线程:任意线程
- 原理:设置
connect_ = false,并在事件循环中执行stopInLoop()
const InetAddress& serverAddress() const
- 功能:返回目标服务器地址
2.2 私有方法
void startInLoop()
- 在事件循环线程中执行
- 检查状态是否为
kDisconnected - 如果
connect_ == true,调用connect()发起连接
void stopInLoop()
- 在事件循环线程中执行
- 如果当前正在连接中(
kConnecting),则关闭连接
void connect()
- 核心连接逻辑
- 创建非阻塞 socket
- 调用
connect()系统调用 - 根据返回值和 errno 判断连接结果:
- 成功情况(errno=0, EINPROGRESS, EINTR, EISCONN):调用
connecting() - 暂时失败(EAGAIN, ECONNREFUSED 等):调用
retry() - 严重错误(EACCES, EPERM 等):直接关闭 socket
- 成功情况(errno=0, EINPROGRESS, EINTR, EISCONN):调用
void connecting(int sockfd)
- 处理连接中的状态
- 设置状态为
kConnecting - 创建
Channel对象 - 设置写事件回调(连接成功时会触发写事件)
- 设置错误事件回调
- 启用写事件监听
- 设置状态为
void handleWrite()
- 处理写事件(连接结果)
- 检查 socket 错误码(
SO_ERROR) - 如果无错误,检查是否为自连接
- 如果一切正常,设置状态为
kConnected,调用newConnectionCallback_ - 如果有错误,调用
retry()重试
- 检查 socket 错误码(
void handleError()
- 处理错误事件
- 获取 socket 错误码并调用
retry()
- 获取 socket 错误码并调用
void retry(int sockfd)
- 连接失败重试逻辑
- 关闭当前 socket
- 设置状态为
kDisconnected - 如果
connect_ == true:- 使用定时器在
retryDelayMs_毫秒后再次调用startInLoop() - 重试延迟翻倍:
retryDelayMs_ = min(retryDelayMs_ * 2, kMaxRetryDelayMs)
- 使用定时器在
- 最大重试延迟为 30 秒
int removeAndResetChannel()
- 禁用 channel 的所有事件
- 从 EventLoop 中移除 channel
- 返回 socket fd
void resetChannel()
- 重置
channel_智能指针
三、状态机
kDisconnected ──start()/restart()──→ kConnecting ──连接成功──→ kConnected↑ │└─────────────connect失败/retry()────┘
四、完整调用流程图
Connector 完整调用流程图
基于源代码,以下是 Connector 的详细时间调用流程图:
一、完整时序图(Sequence Diagram)
二、状态转移时序图
详细流程图(完整版)
代码执行路径示例
连接成功路径:
start() → runInLoop(startInLoop)→ startInLoop()→ connect()→ connecting()→ enableWriting()→ [等待写事件]→ handleWrite()→ removeAndResetChannel()→ newConnectionCallback_(sockfd) ✓
连接失败且重试路径:
start()→ runInLoop(startInLoop)→ startInLoop()→ connect()→ retry()→ close(sockfd)→ runAfter(500ms, startInLoop)→ [等待500ms]→ startInLoop() [第2次尝试]→ connect()→ retry()→ close(sockfd)→ runAfter(1000ms, startInLoop)→ [等待1000ms]→ ...以此类推
五、关键特性说明
5.1 非阻塞连接
- 使用
sockets::createNonblockingOrDie()创建非阻塞 socket - 通过
EINPROGRESS错误判断连接正在进行
5.2 指数退避重试策略
- 初始重试延迟:500ms
- 每次重试延迟翻倍:500ms → 1s → 2s → 4s → … → 30s(最大)
- 使用
loop_->runAfter()实现定时重试
5.3 自连接检测
- 在
handleWrite()中调用sockets::isSelfConnect()检测 - 防止客户端连接到自己的监听端口
5.4 线程安全
start()和stop()可以在任意线程调用- 内部使用
loop_->runInLoop()和loop_->queueInLoop()确保操作在事件循环线程中执行 - 使用原子操作
connect_标志控制连接逻辑
// 创建连接器
EventLoop loop;
InetAddress serverAddr("127.0.0.1", 8888);
auto connector = std::make_shared<Connector>(&loop, serverAddr);// 设置连接成功回调
connector->setNewConnectionCallback([](int sockfd) {LOG_INFO << "Connected successfully, sockfd = " << sockfd;// 使用 sockfd 创建 TcpConnection
});// 开始连接
connector->start();// 在事件循环中运行
loop.loop();// 停止连接
connector->stop();
七、总结
Connector 通过精心设计的状态机、事件驱动机制和指数退避重试策略,实现了高效的客户端连接管理。它充分利用非阻塞 I/O 的特性,避免了阻塞等待,使整个网络编程框架能够高效处理大量并发连接。
Connector.cc 详细解析
一、核心概念说明
1. 非阻塞 connect() 的返回码
在非阻塞模式下,connect() 系统调用的返回值和 errno 有特殊含义:
| errno | 含义 | 处理方法 |
|---|---|---|
| 0 | 连接立即成功 | 调用 connecting() |
| EINPROGRESS | 连接正在进行 | 调用 connecting(),等待写事件 |
| EINTR/EISCONN | 中断或已连接 | 调用 connecting() |
| EAGAIN/EADDRINUSE | 本地端口暂时用完 | 关闭socket,延期重试 ⚠️ |
| ECONNREFUSED | 对端拒绝连接 | 关闭socket,延期重试 |
| EACCES/EPERM | 权限不足(严重错误) | 关闭socket,不重试 ❌ |
2. 为什么 EAGAIN 需要重试?
case EAGAIN: // 本地ephemeral port暂时用完
case EADDRINUSE: // 地址已被使用
case EADDRNOTAVAIL: // 地址不可用
case ECONNREFUSED: // 对端拒绝
case ENETUNREACH: // 网络不可达retry(sockfd); // 关闭当前socket,延迟后重试break;
这与 accept(2) 不同:
- accept() 的 EAGAIN 不是真正的错误,只是"暂时没有连接",继续监听即可
- connect() 的 EAGAIN 是真正的错误,表示本机的 ephemeral port(临时端口)暂时用完,需要关闭当前 socket 后延迟重试
二、handleWrite() 的三重检查机制
即使 socket 变为可写,也不能直接确认连接成功。需要三层验证:
void Connector::handleWrite()
{LOG_TRACE << "Connector::handleWrite " << state_;if (state_ == kConnecting){int sockfd = removeAndResetChannel();// 第一层检查:SO_ERRORint err = sockets::getSocketError(sockfd);if (err){LOG_WARN << "Connector::handleWrite - SO_ERROR = "<< err << " " << strerror_tl(err);retry(sockfd); // ❌ 连接失败,重试return;}// 第二层检查:自连接检测if (sockets::isSelfConnect(sockfd)){LOG_WARN << "Connector::handleWrite - Self connect";retry(sockfd); // ❌ 自连接,重试return;}// 第三层检查:都通过,连接成功setState(kConnected);if (connect_){newConnectionCallback_(sockfd); // ✓ 回调上层}else{sockets::close(sockfd);}}
}
为什么需要 SO_ERROR 检查?
int err = sockets::getSocketError(sockfd);
// 这个系统调用是必要的!
原因:即使 epoll_wait() 返回写事件就绪,也可能连接失败(例如对端主动拒绝、网络不可达等)。此时 SO_ERROR 会被设置为相应的错误码。
示例:
// 场景:connect() 返回 EINPROGRESS
// -> 等待写事件
// -> 对端发送 RST 包(拒绝连接)
// -> epoll 返回写事件就绪
// -> 但 SO_ERROR = ECONNREFUSEDint err = sockets::getSocketError(sockfd); // 获取真实错误
if (err) {retry(sockfd); // 必须重试
}
三、重试机制:指数退避
void Connector::retry(int sockfd)
{sockets::close(sockfd); // 1. 关闭当前socketsetState(kDisconnected); // 2. 状态回到断开if (connect_){LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()<< " in " << retryDelayMs_ << " milliseconds. ";// 3. 延迟后重新连接loop_->runAfter(retryDelayMs_ / 1000.0,std::bind(&Connector::startInLoop, shared_from_this()));// 4. 指数退避:延迟时间翻倍,但不超过30秒retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);}else{LOG_DEBUG << "do not connect";}
}
重试延迟时间线:
第1次:500ms 失败
第2次:1000ms 失败
第3次:2000ms 失败
第4次:4000ms 失败
第5次:8000ms 失败
...
第N次:30000ms(最大值)
七、总结对比
| 方面 | accept(2) | connect(2) |
|---|---|---|
| EAGAIN | 假错误,继续监听 | 真错误,需要关闭并重试 |
| 可写事件 | 表示有新连接 | 可能表示连接失败,需要 SO_ERROR 检查 |
| 重试策略 | 无需重试 | 需要指数退避重试 |
| 多线程安全 | 通过 runInLoop | 通过 runInLoop/queueInLoop |
核心要点:
- ✅ EAGAIN 需要重试(ephemeral port 用完)
- ✅ 可写事件后必须检查 SO_ERROR(可能连接失败)
- ✅ 需要检查自连接(防止客户端连接到自己的服务端口)
- ✅ 使用指数退避重试(避免频繁尝试消耗资源)
