C++网络编程 5.TCP套接字(socket)通信进阶-基于多线程的TCP多客户端通信
基于多线程的TCP多客户端通信示例详解
这个示例实现了一个能同时处理多个客户端连接的TCP服务器,以及一个能创建多个连接的客户端程序。核心通过C++11的std::thread
库实现多线程并发处理,下面分服务器和客户端两部分详细讲解。
先附上完整代码:
multi_conn_threads_server.cpp
#include <iostream>
#include <sys/socket.h> // 套接字API
#include <netinet/in.h> //
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <atomic> // 原子操作(线程安全标志)
#include <functional> // 函数对象支持// 处理客户端连接的函数
void handleClient(int client_sock, const std::string& client_ip, int client_port) {std::cout << "新客户端连接: " << client_ip << ":" << client_port << std::endl;char buffer[1024];while (true) {// 接收客户端消息memset(buffer, 0, sizeof(buffer));int recv_size = read(client_sock, buffer, sizeof(buffer));if (recv_size <= 0) {// 客户端断开连接或出错if (recv_size == 0) {std::cout << "客户端 " << client_ip << ":" << client_port << " 断开连接" << std::endl;} else {std::cerr << "接收数据出错: " << strerror(errno) << std::endl;}break;}std::cout << "从 " << client_ip << ":" << client_port << " 接收: " << buffer;// 发送响应std::string response = "服务器已收到: " + std::string(buffer);write(client_sock, response.c_str(), response.size());}// 关闭客户端套接字close(client_sock);
}int main() {// 创建监听套接字int listen_sock = socket(AF_INET, SOCK_STREAM, 0);if (listen_sock == -1) {std::cerr << "创建套接字失败: " << strerror(errno) << std::endl;return 1;}// 设置端口复用,避免程序重启时端口被占用int opt = 1;if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {std::cerr << "设置套接字选项失败: " << strerror(errno) << std::endl;close(listen_sock);return 1;}// 绑定IP地址和端口struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY; // 监听所有IPserver_addr.sin_port = htons(9888); // 端口号if (bind(listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {std::cerr << "绑定失败: " << strerror(errno) << std::endl;close(listen_sock);return 1;}// 开始监听if (listen(listen_sock, 5) == -1) {std::cerr << "监听失败: " << strerror(errno) << std::endl;close(listen_sock);return 1;}std::cout << "服务器启动成功,监听在 0.0.0.0:9888" << std::endl;std::vector<std::thread> client_threads;std::atomic<bool> running(true);// 主循环接受客户端连接while (running) {struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_sock = accept(listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);if (client_sock == -1) {if (running) { // 只有在运行中时才视为错误std::cerr << "接受连接失败: " << strerror(errno) << std::endl;}continue;}// 获取客户端IP和端口std::string client_ip = inet_ntoa(client_addr.sin_addr);int client_port = ntohs(client_addr.sin_port);// 创建新线程处理客户端连接client_threads.emplace_back(handleClient, client_sock, client_ip, client_port);// 分离线程,避免主线程等待client_threads.back().detach();// 限制同时存在的线程数量(可选)if (client_threads.size() > 100) {client_threads.erase(client_threads.begin());}}// 关闭监听套接字close(listen_sock);std::cout << "服务器已停止" << std::endl;return 0;
}
multi_conn_threads_client.cpp
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <chrono>// 服务器地址和端口(需与服务器保持一致)
const std::string SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 9888;
const int NUM_CONNECTIONS = 5; // 同时建立的连接数量// 单个连接的处理函数
void handleConnection(int conn_id) {// 1. 创建客户端套接字int client_sock = socket(AF_INET, SOCK_STREAM, 0);if (client_sock == -1) {std::cerr << "连接 " << conn_id << " 创建套接字失败: " << strerror(errno) << std::endl;return;}// 2. 配置服务器地址struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(SERVER_PORT);// 将服务器IP从字符串转换为网络字节序if (inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr) <= 0) {std::cerr << "连接 " << conn_id << " IP地址转换失败" << std::endl;close(client_sock);return;}// 3. 连接服务器if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {std::cerr << "连接 " << conn_id << " 连接服务器失败: " << strerror(errno) << std::endl;close(client_sock);return;}std::cout << "连接 " << conn_id << " 成功连接到服务器 " << SERVER_IP << ":" << SERVER_PORT << std::endl;try {// 4. 数据交互(每个连接发送3条消息)for (int i = 0; i < 3; ++i) {// 发送消息(包含连接ID和序号,方便服务器区分)std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);ssize_t send_size = send(client_sock, msg.c_str(), msg.size(), 0);if (send_size == -1) {std::cerr << "连接 " << conn_id << " 发送消息失败: " << strerror(errno) << std::endl;break;}std::cout << "连接 " << conn_id << " 发送: " << msg << std::endl;// 接收服务器响应char buffer[1024] = {0};ssize_t recv_size = recv(client_sock, buffer, sizeof(buffer)-1, 0);if (recv_size <= 0) {if (recv_size == 0) {std::cout << "连接 " << conn_id << " 服务器已断开" << std::endl;} else {std::cerr << "连接 " << conn_id << " 接收响应失败: " << strerror(errno) << std::endl;}break;}std::cout << "连接 " << conn_id << " 收到响应: " << buffer << std::endl;// 模拟间隔,避免消息发送过快std::this_thread::sleep_for(std::chrono::milliseconds(500));}} catch (const std::exception& e) {std::cerr << "连接 " << conn_id << " 异常: " << e.what() << std::endl;}// 5. 关闭连接close(client_sock);std::cout << "连接 " << conn_id << " 已关闭" << std::endl;
}int main() {std::cout << "多连接TCP客户端启动,将创建 " << NUM_CONNECTIONS << " 个连接" << std::endl;// 存储所有连接的线程std::vector<std::thread> connection_threads;// 创建多个连接(每个连接一个线程)for (int i = 0; i < NUM_CONNECTIONS; ++i) {connection_threads.emplace_back(handleConnection, i+1); // 连接ID从1开始// 稍微延迟,避免同时建立连接导致服务器压力集中std::this_thread::sleep_for(std::chrono::milliseconds(100));}// 等待所有连接线程完成for (auto& t : connection_threads) {if (t.joinable()) {t.join();}}std::cout << "所有连接已处理完成,客户端退出" << std::endl;return 0;
}
一、多线程TCP服务器程序详解
服务器的核心目标是:同时接受并处理多个客户端的连接请求,每个客户端的通信逻辑在独立线程中执行,避免单线程阻塞导致其他客户端无法连接。
1. 核心功能概述
- 启动服务器并监听指定端口(9888);
- 主循环持续接受新的客户端连接;
- 每接收到一个客户端连接,创建独立线程处理该客户端的消息收发;
- 支持客户端正常断开或异常断开的处理;
- 避免端口占用问题(通过
SO_REUSEADDR
选项)。
2. 关键代码解析
2.1 初始化与监听套接字
// 创建监听套接字(TCP类型)
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);// 设置端口复用(解决服务器重启时端口被TIME_WAIT状态占用的问题)
int opt = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));// 绑定IP和端口(与基础示例一致)
struct sockaddr_in server_addr;
// ... 填充地址信息(INADDR_ANY绑定所有IP,端口9888)
bind(listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr));// 开始监听(backlog=5,表示未完成连接队列最大长度)
listen(listen_sock, 5);
- 端口复用(SO_REUSEADDR):服务器关闭后,端口可能处于
TIME_WAIT
状态(确保最后一个数据包被接收),此时直接重启服务器会绑定失败。setsockopt
设置该选项后,允许端口在TIME_WAIT
状态下被重新绑定,方便开发调试。
2.2 主循环:接受客户端连接并创建线程
std::vector<std::thread> client_threads; // 存储客户端线程
std::atomic<bool> running(true); // 原子变量:服务器运行标志(线程安全)while (running) {// 接受客户端连接(阻塞等待新连接)struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_sock = accept(listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);if (client_sock == -1) {if (running) std::cerr << "接受连接失败" << std::endl;continue;}// 获取客户端IP和端口(转换为可读性字符串)std::string client_ip = inet_ntoa(client_addr.sin_addr); // 二进制IP→点分十进制int client_port = ntohs(client_addr.sin_port); // 网络字节序→主机字节序// 创建线程处理该客户端,传入通信套接字、IP、端口client_threads.emplace_back(handleClient, client_sock, client_ip, client_port);// 分离线程:线程独立运行,结束后自动释放资源(无需主线程join)client_threads.back().detach();// 可选:限制线程容器大小,避免内存占用过高if (client_threads.size() > 100) {client_threads.erase(client_threads.begin());}
}
- 线程分离(detach()):服务器不需要等待每个客户端线程结束(否则主线程会被阻塞),通过
detach()
让线程在后台独立运行,完成后自动回收资源。 - 原子变量
running
:用于线程安全地控制服务器是否继续运行(后续可通过信号处理设置running=false
实现优雅关闭)。
2.3 客户端处理函数(handleClient)
每个客户端连接的核心逻辑,在独立线程中执行:
void handleClient(int client_sock, const std::string& client_ip, int client_port) {std::cout << "新客户端连接: " << client_ip << ":" << client_port << std::endl;char buffer[1024];while (true) {// 接收客户端消息memset(buffer, 0, sizeof(buffer));int recv_size = read(client_sock, buffer, sizeof(buffer));if (recv_size <= 0) {// 客户端断开(recv_size=0)或出错(recv_size=-1)std::cout << "客户端 " << client_ip << ":" << client_port << " 断开连接" << std::endl;break;}// 打印消息并回复std::cout << "从 " << client_ip << ":" << client_port << " 接收: " << buffer;std::string response = "服务器已收到: " + std::string(buffer);write(client_sock, response.c_str(), response.size());}close(client_sock); // 关闭通信套接字
}
- 循环接收消息:线程进入循环,持续从客户端读取消息,直到客户端断开连接(
recv_size=0
)或出错。 - 资源释放:客户端断开后,关闭通信套接字,线程自动结束。
3. 服务器核心技术点
3.1 多线程并发处理
- 每个客户端连接对应一个独立线程,线程间通过
client_sock
区分,互不干扰,实现“同时处理多个客户端”。 - 对比单线程服务器:单线程需处理完一个客户端才能接受下一个,多线程服务器在主线程接受连接后,立即将处理逻辑交给子线程,主线程继续接受新连接,显著提升并发能力。
3.2 线程管理与安全
- 线程分离(detach()):避免主线程调用
join()
等待子线程,否则主线程会被阻塞,无法接受新连接。但detach()
后主线程无法控制子线程,需确保子线程能正常结束。 - 原子变量:
std::atomic<bool> running
保证多线程环境下对running
的读写操作是原子的(无数据竞争),后续可通过设置running=false
优雅关闭服务器(需配合信号处理)。
3.3 潜在问题与优化方向
- 线程数量无限制:频繁创建线程会消耗系统资源(内存、CPU),高并发下可能导致服务器性能下降。实际开发中应使用线程池(预先创建固定数量线程,循环处理连接)。
- 输出线程不安全:多个线程同时调用
std::cout
可能导致输出错乱(cout
不是线程安全的),需通过std::mutex
加锁保护。 - 未处理粘包:若客户端发送大数据或连续消息,服务器可能无法区分消息边界(需用“长度前缀法”优化)。
二、多连接TCP客户端程序详解
客户端的核心目标是:模拟多个客户端同时连接服务器,每个连接在独立线程中与服务器交互,验证服务器的多客户端处理能力。
1. 核心功能概述
- 创建指定数量(
NUM_CONNECTIONS=5
)的客户端连接; - 每个连接在独立线程中执行:连接服务器→发送多条消息→接收响应→关闭连接;
- 主线程等待所有连接线程完成后退出。
2. 关键代码解析
2.1 多连接线程创建
const int NUM_CONNECTIONS = 5; // 同时创建5个连接
std::vector<std::thread> connection_threads;// 为每个连接创建线程
for (int i = 0; i < NUM_CONNECTIONS; ++i) {connection_threads.emplace_back(handleConnection, i+1); // 传入连接ID(1~5)std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 错开连接时间
}// 等待所有线程完成
for (auto& t : connection_threads) {if (t.joinable()) t.join();
}
- 用
std::vector<std::thread>
存储所有连接线程,主线程通过join()
等待所有线程执行完毕,确保所有连接都完成交互。
2.2 单个连接处理函数(handleConnection)
每个线程执行的核心逻辑,模拟客户端与服务器的交互:
void handleConnection(int conn_id) {// 1. 创建客户端套接字(与基础客户端一致)int client_sock = socket(AF_INET, SOCK_STREAM, 0);if (client_sock == -1) { /* 错误处理 */ }// 2. 配置服务器地址并连接struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(SERVER_PORT);inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr); // IP转换(现代用法)if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {/* 错误处理 */}// 3. 数据交互:发送3条消息并接收响应for (int i = 0; i < 3; ++i) {std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);send(client_sock, msg.c_str(), msg.size(), 0); // 发送消息char buffer[1024] = {0};recv(client_sock, buffer, sizeof(buffer)-1, 0); // 接收响应std::cout << "连接 " << conn_id << " 收到响应: " << buffer << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 模拟延迟}// 4. 关闭连接close(client_sock);
}
3. 客户端核心技术点
3.1 多连接模拟
通过创建多个线程,每个线程模拟一个客户端,验证服务器能否同时处理多个连接。线程间通过conn_id
区分,方便观察输出。
3.2 连接控制
- 连接创建时添加小延迟(
sleep_for(100ms)
),避免所有连接同时发起,减轻服务器瞬间压力(模拟真实场景中客户端陆续连接)。 - 主线程通过
join()
等待所有连接线程完成,确保程序正常退出。
三、程序运行流程与预期输出
1. 运行步骤
- 编译服务器和客户端:
g++ server.cpp -o server -lpthread # 链接线程库 g++ client.cpp -o client -lpthread
- 启动服务器:
./server # 输出:服务器启动成功,监听在 0.0.0.0:9888
- 启动客户端:
./client # 输出:多连接TCP客户端启动,将创建5个连接
2. 预期输出
服务器输出
服务器启动成功,监听在 0.0.0.0:9888
新客户端连接: 127.0.0.1:54321
从 127.0.0.1:54321 接收: 客户端连接 1 的消息 1
从 127.0.0.1:54321 接收: 客户端连接 1 的消息 2
新客户端连接: 127.0.0.1:54322
从 127.0.0.1:54322 接收: 客户端连接 2 的消息 1
...(其他客户端连接和消息)
客户端 127.0.0.1:54321 断开连接
...(所有客户端处理完毕后断开)
客户端输出
多连接TCP客户端启动,将创建 5 个连接
连接 1 成功连接到服务器 127.0.0.1:9888
连接 1 发送: 客户端连接 1 的消息 1
连接 1 收到响应: 服务器已收到: 客户端连接 1 的消息 1
连接 2 成功连接到服务器 127.0.0.1:9888
...(其他连接的发送和响应)
连接 1 已关闭
...(所有连接关闭后)
所有连接已处理完成,客户端退出
四、总结与扩展
这个示例通过多线程实现了TCP服务器的多客户端并发处理,核心亮点是:
- 服务器用线程分离实现“接受连接”与“处理消息”的并发;
- 客户端用多线程模拟真实多用户场景;
- 包含实用技术(端口复用、原子变量、线程管理)。
扩展优化方向
- 线程池替代多线程:避免频繁创建销毁线程,用固定数量线程循环处理连接,提升性能。
- 添加粘包处理:服务器和客户端用“长度前缀法”解析消息边界。
- 优雅关闭机制:通过信号(如
SIGINT
)捕获Ctrl+C
,设置running=false
,关闭监听套接字并等待线程结束。 - 线程安全输出:用
std::mutex
保护std::cout
,避免多线程输出错乱。
掌握这些内容后,可进一步学习IO复用(select/poll/epoll
)等更高效的并发模型,应对高并发场景。
补充:优化后的多线程TCP客户端代码及完整讲解
针对原代码的不足(线程管理低效、输出错乱、无粘包处理、无优雅关闭等),以下是优化后的代码及详细讲解,重点优化了线程池管理、线程安全、粘包处理、优雅关闭四大核心问题。
一、优化后的服务器代码(server.cpp)
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <csignal>
#include <functional>
#include <errno.h>// 线程安全的输出工具
std::mutex g_cout_mutex;
#define LOG(msg) do { \std::lock_guard<std::mutex> lock(g_cout_mutex); \std::cout << msg << std::endl; \
} while(0)// 全局运行标志(原子变量确保线程安全)
std::atomic<bool> g_running(true);
// 监听套接字(全局以便信号处理函数关闭)
int g_listen_sock = -1;// 线程池类:预先创建固定数量线程,循环处理任务
class ThreadPool {
private:std::vector<std::thread> workers; // 工作线程std::queue<std::function<void()>> tasks;// 任务队列std::mutex queue_mutex; // 保护任务队列的互斥锁std::condition_variable condition; // 唤醒线程的条件变量bool stop; // 线程池停止标志public:// 构造函数:创建n个工作线程ThreadPool(size_t n) : stop(false) {for (size_t i = 0; i < n; ++i) {workers.emplace_back([this] {while (!stop) {std::function<void()> task;// 从队列取任务(加锁){std::unique_lock<std::mutex> lock(this->queue_mutex);// 等待任务或停止信号this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty()) return;task = std::move(this->tasks.front());this->tasks.pop();}// 执行任务task();}});}}// 析构函数:停止所有线程~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all(); // 唤醒所有等待的线程for (std::thread& worker : workers) {if (worker.joinable()) {worker.join(); // 等待线程结束}}}// 添加任务到队列void enqueue(std::function<void()> task) {{std::unique_lock<std::mutex> lock(queue_mutex);if (stop) {throw std::runtime_error("enqueue on stopped ThreadPool");}tasks.emplace(std::move(task));}condition.notify_one(); // 唤醒一个等待的线程}
};// 粘包处理:发送数据(先发送4字节长度,再发送数据)
bool sendData(int sock, const std::string& data) {// 计算数据长度(转换为网络字节序)uint32_t data_len = htonl(data.size());// 先发送长度if (send(sock, &data_len, sizeof(data_len), 0) != sizeof(data_len)) {LOG("发送长度失败: " << strerror(errno));return false;}// 再发送数据if (send(sock, data.c_str(), data.size(), 0) != (ssize_t)data.size()) {LOG("发送数据失败: " << strerror(errno));return false;}return true;
}// 粘包处理:接收数据(先接收4字节长度,再接收对应长度的数据)
std::string recvData(int sock) {// 先接收长度uint32_t data_len;ssize_t recv_len = recv(sock, &data_len, sizeof(data_len), 0);if (recv_len <= 0) {if (recv_len < 0) LOG("接收长度失败: " << strerror(errno));return ""; // 连接断开或错误}data_len = ntohl(data_len); // 转换为主机字节序// 接收数据std::string data;data.resize(data_len);size_t total_recv = 0;while (total_recv < data_len) {recv_len = recv(sock, &data[total_recv], data_len - total_recv, 0);if (recv_len <= 0) {if (recv_len < 0) LOG("接收数据失败: " << strerror(errno));return ""; // 连接断开或错误}total_recv += recv_len;}return data;
}// 处理客户端连接的任务函数
void handleClient(int client_sock, const std::string& client_ip, int client_port) {LOG("新客户端连接: " << client_ip << ":" << client_port);while (g_running) {// 接收客户端消息(带粘包处理)std::string recv_msg = recvData(client_sock);if (recv_msg.empty()) {LOG("客户端 " << client_ip << ":" << client_port << " 断开连接");break;}LOG("从 " << client_ip << ":" << client_port << " 接收: " << recv_msg);// 发送响应(带粘包处理)std::string response = "服务器已收到: " + recv_msg;if (!sendData(client_sock, response)) {break;}}// 关闭客户端套接字close(client_sock);LOG("客户端 " << client_ip << ":" << client_port << " 连接已关闭");
}// 信号处理函数:捕获Ctrl+C,设置优雅关闭标志
void signalHandler(int signum) {if (signum == SIGINT) {LOG("\n收到停止信号,正在优雅关闭服务器...");g_running = false;// 关闭监听套接字,唤醒accept阻塞if (g_listen_sock != -1) {close(g_listen_sock);}}
}int main() {// 注册信号处理函数(捕获Ctrl+C)signal(SIGINT, signalHandler);// 创建监听套接字g_listen_sock = socket(AF_INET, SOCK_STREAM, 0);if (g_listen_sock == -1) {LOG("创建套接字失败: " << strerror(errno));return 1;}// 设置端口复用int opt = 1;if (setsockopt(g_listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {LOG("设置端口复用失败: " << strerror(errno));close(g_listen_sock);return 1;}// 绑定IP和端口struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(9888);if (bind(g_listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {LOG("绑定失败: " << strerror(errno));close(g_listen_sock);return 1;}// 开始监听if (listen(g_listen_sock, 5) == -1) {LOG("监听失败: " << strerror(errno));close(g_listen_sock);return 1;}LOG("服务器启动成功,监听在 0.0.0.0:9888(线程池大小: 4)");// 创建线程池(4个工作线程)ThreadPool thread_pool(4);// 主循环接受客户端连接while (g_running) {struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);// accept可能被信号中断,需要处理EINTR错误int client_sock = accept(g_listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);if (client_sock == -1) {if (g_running && errno != EINTR) { // EINTR是正常中断(信号导致)LOG("接受连接失败: " << strerror(errno));}continue;}// 获取客户端IP和端口std::string client_ip = inet_ntoa(client_addr.sin_addr);int client_port = ntohs(client_addr.sin_port);// 将客户端处理任务添加到线程池thread_pool.enqueue([client_sock, client_ip, client_port]() {handleClient(client_sock, client_ip, client_port);});}// 关闭监听套接字close(g_listen_sock);LOG("服务器已停止监听");// 线程池析构时会等待所有任务完成LOG("服务器已优雅关闭");return 0;
}
二、优化后的客户端代码(client.cpp)
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <chrono>
#include <mutex>
#include <cstdint>// 线程安全的输出工具(避免多线程输出错乱)
std::mutex g_cout_mutex;
#define LOG(msg) do { \std::lock_guard<std::mutex> lock(g_cout_mutex); \std::cout << msg << std::endl; \
} while(0)// 服务器配置
const std::string SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 9888;
const int NUM_CONNECTIONS = 5; // 同时建立的连接数量// 粘包处理:发送数据(与服务器一致,先发送4字节长度,再发送数据)
bool sendData(int sock, const std::string& data) {uint32_t data_len = htonl(data.size()); // 长度转为网络字节序// 先发送长度if (send(sock, &data_len, sizeof(data_len), 0) != sizeof(data_len)) {LOG("连接发送长度失败: " << strerror(errno));return false;}// 再发送数据if (send(sock, data.c_str(), data.size(), 0) != (ssize_t)data.size()) {LOG("连接发送数据失败: " << strerror(errno));return false;}return true;
}// 粘包处理:接收数据(与服务器一致,先接收长度,再接收数据)
std::string recvData(int sock) {uint32_t data_len;ssize_t recv_len = recv(sock, &data_len, sizeof(data_len), 0);if (recv_len <= 0) {if (recv_len < 0) LOG("连接接收长度失败: " << strerror(errno));return ""; // 连接断开或错误}data_len = ntohl(data_len); // 转为主机字节序std::string data;data.resize(data_len);size_t total_recv = 0;// 循环接收完整数据(处理数据分片)while (total_recv < data_len) {recv_len = recv(sock, &data[total_recv], data_len - total_recv, 0);if (recv_len <= 0) {if (recv_len < 0) LOG("连接接收数据失败: " << strerror(errno));return "";}total_recv += recv_len;}return data;
}// 单个连接的处理函数
void handleConnection(int conn_id) {// 创建客户端套接字int client_sock = socket(AF_INET, SOCK_STREAM, 0);if (client_sock == -1) {LOG("连接 " << conn_id << " 创建套接字失败: " << strerror(errno));return;}// 配置服务器地址struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(SERVER_PORT);// 转换服务器IP为网络字节序(现代用法,支持IPv6扩展)if (inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr) <= 0) {LOG("连接 " << conn_id << " IP地址转换失败");close(client_sock);return;}// 连接服务器if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {LOG("连接 " << conn_id << " 连接服务器失败: " << strerror(errno));close(client_sock);return;}LOG("连接 " << conn_id << " 成功连接到服务器 " << SERVER_IP << ":" << SERVER_PORT);try {// 数据交互:每个连接发送3条消息for (int i = 0; i < 3; ++i) {std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);if (!sendData(client_sock, msg)) {break;}LOG("连接 " << conn_id << " 发送: " << msg);// 接收服务器响应std::string response = recvData(client_sock);if (response.empty()) {LOG("连接 " << conn_id << " 接收响应失败或服务器断开");break;}LOG("连接 " << conn_id << " 收到响应: " << response);// 模拟业务处理延迟std::this_thread::sleep_for(std::chrono::milliseconds(500));}} catch (const std::exception& e) {LOG("连接 " << conn_id << " 异常: " << e.what());}// 关闭连接close(client_sock);LOG("连接 " << conn_id << " 已关闭");
}int main() {LOG("多连接TCP客户端启动,将创建 " << NUM_CONNECTIONS << " 个连接");std::vector<std::thread> connection_threads;// 创建多个连接(每个连接一个线程)for (int i = 0; i < NUM_CONNECTIONS; ++i) {connection_threads.emplace_back(handleConnection, i + 1); // 连接ID从1开始// 错开连接创建时间,避免服务器瞬间压力过大std::this_thread::sleep_for(std::chrono::milliseconds(100));}// 等待所有连接线程完成for (auto& t : connection_threads) {if (t.joinable()) {t.join();}}LOG("所有连接已处理完成,客户端退出");return 0;
}
三、核心优化点详解(服务器+客户端)
1. 线程管理优化:从“动态创建线程”到“线程池”
原代码问题:
原服务器每接一个连接就动态创建线程,线程创建/销毁开销大,高并发下系统资源易耗尽(线程栈内存、CPU调度压力)。
优化方案:
服务器引入线程池(ThreadPool
类),预先创建固定数量线程(示例中4个),通过任务队列循环处理客户端连接,避免频繁创建线程:
// 线程池核心逻辑
class ThreadPool {
private:std::vector<std::thread> workers; // 固定数量的工作线程std::queue<std::function<void()>> tasks; // 任务队列std::mutex queue_mutex; // 保护任务队列的互斥锁std::condition_variable condition; // 唤醒线程的条件变量bool stop; // 停止标志
public:ThreadPool(size_t n) : stop(false) {// 预先创建n个工作线程,循环取任务执行for (size_t i = 0; i < n; ++i) {workers.emplace_back([this] {while (!stop) {std::function<void()> task;// 加锁取任务{std::unique_lock<std::mutex> lock(queue_mutex);condition.wait(lock, [this] { return stop || !tasks.empty(); });if (stop && tasks.empty()) return;task = std::move(tasks.front());tasks.pop();}task(); // 执行任务(客户端处理逻辑)}});}}// 添加任务到队列void enqueue(std::function<void()> task) { /* ... */ }
};
优势:
- 线程复用:工作线程长期存在,循环处理任务,减少线程创建/销毁开销;
- 资源控制:通过线程池大小(如4个)限制并发线程数,避免资源耗尽。
2. 线程安全优化:解决多线程输出错乱
原代码问题:
多个线程同时调用std::cout
,导致输出内容错乱(cout
非线程安全,多个线程输出可能交叉)。
优化方案:
使用std::mutex
为输出加锁,定义线程安全的LOG
宏:
std::mutex g_cout_mutex; // 全局输出互斥锁
#define LOG(msg) do { \std::lock_guard<std::mutex> lock(g_cout_mutex); // 自动加锁/解锁 \std::cout << msg << std::endl; \
} while(0)
效果:
所有线程的输出操作通过互斥锁串行执行,确保日志清晰可读,无交叉错乱。
3. 粘包处理:解决TCP数据边界问题
原代码问题:
TCP是流式传输,若客户端连续发送小消息,服务器可能将多个消息合并为一个“大数据块”,无法区分边界(粘包)。
优化方案:
采用“长度前缀法”明确消息边界,发送数据时先传4字节长度(网络字节序),再传实际数据:
// 服务器/客户端发送数据(统一逻辑)
bool sendData(int sock, const std::string& data) {uint32_t data_len = htonl(data.size()); // 长度转为网络字节序send(sock, &data_len, sizeof(data_len), 0); // 先发送长度send(sock, data.c_str(), data.size(), 0); // 再发送数据return true;
}// 服务器/客户端接收数据(统一逻辑)
std::string recvData(int sock) {uint32_t data_len;recv(sock, &data_len, sizeof(data_len), 0); // 先接收长度data_len = ntohl(data_len); // 转为主机字节序std::string data(data_len, '\0');recv(sock, &data[0], data_len, 0); // 接收对应长度的数据return data;
}
效果:
无论消息大小或发送频率如何,接收方都能通过“先读长度,再读数据”准确解析每个消息,彻底解决粘包问题。
4. 优雅关闭:确保资源正确释放
原代码问题:
原服务器若通过Ctrl+C
强制终止,可能导致线程未完成、套接字未关闭,资源泄漏。
优化方案:
- 信号处理:捕获
SIGINT
信号(Ctrl+C
),设置g_running=false
标志; - 唤醒阻塞:关闭监听套接字,唤醒
accept
阻塞; - 线程池等待:线程池析构时等待所有工作线程完成当前任务:
// 信号处理函数
void signalHandler(int signum) {if (signum == SIGINT) {LOG("\n收到停止信号,正在优雅关闭服务器...");g_running = false; // 设置全局停止标志close(g_listen_sock); // 唤醒accept阻塞}
}// main函数中注册信号处理
signal(SIGINT, signalHandler);// 线程池析构时自动等待所有线程完成
~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true; // 通知线程停止}condition.notify_all(); // 唤醒所有工作线程for (auto& worker : workers) worker.join(); // 等待线程结束
}
效果:
服务器收到停止信号后,会完成当前客户端的消息处理,关闭所有套接字,释放线程资源,避免泄漏。
5. 错误处理与鲁棒性优化
原代码问题:
对系统调用错误(如accept
被信号中断、send/recv
失败)处理简单,可能导致程序异常退出。
优化方案:
- 处理
EINTR
错误:accept
等系统调用可能被信号中断(返回-1
,errno=EINTR
),这是正常情况,应忽略而非报错; - 完整的发送/接收循环:
recv
可能只接收部分数据(尤其大数据),通过循环确保接收完整; - 资源释放兜底:所有套接字在
close
前检查有效性,线程任务结束后确保关闭通信套接字。
三、运行效果与总结
运行步骤:
- 编译(需链接线程库):
g++ server.cpp -o server -lpthread g++ client.cpp -o client -lpthread
- 启动服务器,再启动客户端,可观察到:
- 服务器日志清晰,无输出错乱;
- 多个客户端连接同时被处理,无阻塞;
- 消息收发准确,无粘包;
Ctrl+C
停止服务器时,会优雅关闭,无资源泄漏。
核心优化价值:
- 性能:线程池减少线程开销,支持更高并发;
- 可靠性:粘包处理确保数据正确解析,错误处理避免异常退出;
- 可维护性:模块化设计(线程池、粘包工具),逻辑清晰;
- 安全性:线程安全输出、优雅关闭机制确保资源正确管理。
这些优化使代码从“演示级”提升为“生产可用级”,可作为多线程TCP服务器的基础框架扩展。