TcpClinet
1. 时序图(Mermaid)
正常连接流程
连接失败重试流程
断开连接流程
2. 详细函数调用说明
流程 A:成功连接
Step 1: TcpClient::connect()
void TcpClient::connect()
{LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "<< connector_->serverAddress().toIpPort();connect_ = true; // 标记为"要连接"connector_->start(); // 启动连接器
}
作用:设置标志位,告诉 Connector 开始尝试连接。
Step 2: Connector::start()
void Connector::start()
{connect_ = true;loop_->runInLoop(std::bind(&Connector::startInLoop, this));
}
作用:
- 设置
connect_标志为true(允许连接) - 将
startInLoop()任务放入EventLoop的队列,确保在 IO 线程中执行
Step 3: Connector::startInLoop()(在 EventLoop 线程中执行)
void Connector::startInLoop()
{loop_->assertInLoopThread();assert(state_ == kDisconnected);if (connect_){connect(); // 真正开始连接}else{LOG_DEBUG << "do not connect";}
}
作用:检查 connect_ 标志,如果为真则调用 connect()。
Step 4: Connector::connect()
void Connector::connect()
{int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());// 创建非阻塞 socketint ret = sockets::connect(sockfd, serverAddr_.getSockAddr());// 尝试连接(会立即返回,因为是非阻塞)int savedErrno = (ret == 0) ? 0 : errno;switch (savedErrno){case 0:case EINPROGRESS: // 连接正在进行case EINTR:case EISCONN: // 已连接connecting(sockfd); // 进入"连接中"状态break;case EAGAIN:case ECONNREFUSED: // 连接被拒绝// ...retry(sockfd); // 重试break;case EACCES:case EPERM:// ... 严重错误LOG_SYSERR << "connect error";sockets::close(sockfd);break;}
}
作用:
- 创建非阻塞 socket
- 尝试连接到服务器
- 根据返回值(errno)决定下一步:
EINPROGRESS:连接正在进行 → 进入connecting()等待可写- 连接失败(如
ECONNREFUSED)→ 进入retry()重试 - 严重错误 → 关闭 socket
Step 5: Connector::connecting(int sockfd)
void Connector::connecting(int sockfd)
{setState(kConnecting); // 状态改为"连接中"assert(!channel_);channel_.reset(new Channel(loop_, sockfd));// 设置回调:当 socket 可写时,调用 handleWritechannel_->setWriteCallback(std::bind(&Connector::handleWrite, this));channel_->setErrorCallback(std::bind(&Connector::handleError, this));// 开启"可写"事件监听channel_->enableWriting();
}
作用:
- 创建
Channel对象来管理这个 socket 的事件 - 注册 socket 的写事件监听(当 socket 可写时,表示连接已建立或失败)
EventLoop的 poller(epoll/poll)会监控这个 socket
Step 6: Channel::handleEvent()(当 socket 可写时触发)
这个函数由 EventLoop 的 poller 调用,最终会调用:
Step 7: Connector::handleWrite()
void Connector::handleWrite()
{LOG_TRACE << "Connector::handleWrite " << state_;if (state_ == kConnecting){int sockfd = removeAndResetChannel();// 获取 socket 的错误状态int err = sockets::getSocketError(sockfd);if (err){// 连接失败LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err;retry(sockfd); // 重试连接}else{// 连接成功!setState(kConnected);// 调用上层的回调newConnectionCallback_(sockfd);}}else{LOG_ERROR << "Connector::handleWrite - state error";}
}
作用:
- 检查 socket 的连接状态(通过
SO_ERROR) - 如果成功:状态改为
kConnected,调用newConnectionCallback_(sockfd) - 如果失败:调用
retry(sockfd)进行重试
Step 8: TcpClient::newConnection(int sockfd)(回调函数)
void TcpClient::newConnection(int sockfd)
{loop_->assertInLoopThread();// 获取对端地址InetAddress peerAddr(sockets::getPeerAddr(sockfd));char buf[32];snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;InetAddress localAddr(sockets::getLocalAddr(sockfd));// **关键:创建 TcpConnection 对象来管理这个连接**TcpConnectionPtr conn(new TcpConnection(loop_,connName,sockfd,localAddr,peerAddr));// 将用户设置的各种回调注册到 TcpConnectionconn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpClient::removeConnection, this, _1));{MutexLockGuard lock(mutex_);connection_ = conn; // 保存连接}// 通知 TcpConnection:连接已建立conn->connectEstablished();
}
作用:
- 创建
TcpConnection对象来管理这个连接 - 将用户的回调(如
connectionCallback_)注册到TcpConnection - 调用
connectEstablished()来初始化连接并触发用户的connectionCallback_
流程 B:连接失败与重试
Connector::retry(int sockfd)
void Connector::retry(int sockfd)
{sockets::close(sockfd); // 关闭失败的 socketsetState(kDisconnected); // 状态改回"未连接"if (connect_) // 如果仍然要连接{LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()<< " in " << retryDelayMs_ << " milliseconds.";// **关键:延迟后再次尝试连接**loop_->runAfter(retryDelayMs_ / 1000.0,std::bind(&Connector::startInLoop, shared_from_this()));// **backoff:重试间隔逐倍增长(0.5s -> 1s -> 2s -> ... -> 30s)**retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);}
}
作用:
- 关闭失败的 socket
- 如果
connect_标志仍为 true,就使用runAfter()延迟后再次调用startInLoop() - 每次重试前增加延迟时间(指数退避)
流程 C:停止与断开
TcpClient::stop()
void TcpClient::stop()
{connect_ = false; // 标记为"不要连接"connector_->stop(); // 停止连接器
}
Connector::stop()
void Connector::stop()
{connect_ = false; // 设置标志为 falseloop_->queueInLoop(std::bind(&Connector::stopInLoop, this));// 在 EventLoop 中执行 stopInLoop
}
Connector::stopInLoop()
void Connector::stopInLoop()
{loop_->assertInLoopThread();if (state_ == kConnecting){setState(kDisconnected);int sockfd = removeAndResetChannel(); // 移除 Channel,关闭 socketretry(sockfd); // 尝试调用 retry// 但因为 connect_ 已经是 false,retry 中不会再安排新的连接}
}
流程 D:连接建立后的断开
TcpClient::disconnect()
void TcpClient::disconnect()
{connect_ = false;{MutexLockGuard lock(mutex_);if (connection_){connection_->shutdown(); // 优雅关闭连接}}
}
TcpClient::removeConnection(const TcpConnectionPtr& conn)
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{loop_->assertInLoopThread();assert(loop_ == conn->getLoop());{MutexLockGuard lock(mutex_);assert(connection_ == conn);connection_.reset(); // 清空连接指针}// 异步销毁连接loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));// 如果启用了自动重连,则重新连接if (retry_ && connect_){LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "<< connector_->serverAddress().toIpPort();connector_->restart(); // 重新启动连接器}
}
作用:
- 移除已保存的连接指针
- 异步销毁
TcpConnection - 如果启用自动重连(
retry_ = true),则自动重新连接
总结
| 阶段 | 关键函数 | 状态 | 说明 |
|---|---|---|---|
| 1 | TcpClient::connect() | connect_=true | 用户发起连接请求 |
| 2 | Connector::start() | - | 将 startInLoop 放入任务队列 |
| 3 | Connector::startInLoop() | state_=kConnecting | 真正开始连接 |
| 4 | Connector::connect() | state_=kConnecting | 创建 socket,发起非阻塞连接 |
| 5 | Connector::connecting() | state_=kConnecting | 将 socket 的写事件加入 poller |
| 6 | Connector::handleWrite() | state_=kConnected | 连接成功,调用上层回调 |
| 7 | TcpClient::newConnection() | - | 创建 TcpConnection 对象 |
| 8 | TcpConnection::connectEstablished() | - | 触发用户的 connectionCallback_ |
