当前位置: 首页 > news >正文

C++网络编程 5.TCP套接字(socket)通信进阶-基于多线程的TCP多客户端通信

基于多线程的TCP多客户端通信示例详解

这个示例实现了一个能同时处理多个客户端连接的TCP服务器,以及一个能创建多个连接的客户端程序。核心通过C++11的std::thread库实现多线程并发处理,下面分服务器和客户端两部分详细讲解。
先附上完整代码:

multi_conn_threads_server.cpp
#include <iostream>
#include <sys/socket.h> // 套接字API
#include <netinet/in.h> //
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <atomic> // 原子操作(线程安全标志)
#include <functional> // 函数对象支持// 处理客户端连接的函数
void handleClient(int client_sock, const std::string& client_ip, int client_port) {std::cout << "新客户端连接: " << client_ip << ":" << client_port << std::endl;char buffer[1024];while (true) {// 接收客户端消息memset(buffer, 0, sizeof(buffer));int recv_size = read(client_sock, buffer, sizeof(buffer));if (recv_size <= 0) {// 客户端断开连接或出错if (recv_size == 0) {std::cout << "客户端 " << client_ip << ":" << client_port << " 断开连接" << std::endl;} else {std::cerr << "接收数据出错: " << strerror(errno) << std::endl;}break;}std::cout << "从 " << client_ip << ":" << client_port << " 接收: " << buffer;// 发送响应std::string response = "服务器已收到: " + std::string(buffer);write(client_sock, response.c_str(), response.size());}// 关闭客户端套接字close(client_sock);
}int main() {// 创建监听套接字int listen_sock = socket(AF_INET, SOCK_STREAM, 0);if (listen_sock == -1) {std::cerr << "创建套接字失败: " << strerror(errno) << std::endl;return 1;}// 设置端口复用,避免程序重启时端口被占用int opt = 1;if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {std::cerr << "设置套接字选项失败: " << strerror(errno) << std::endl;close(listen_sock);return 1;}// 绑定IP地址和端口struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY; // 监听所有IPserver_addr.sin_port = htons(9888); // 端口号if (bind(listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {std::cerr << "绑定失败: " << strerror(errno) << std::endl;close(listen_sock);return 1;}// 开始监听if (listen(listen_sock, 5) == -1) {std::cerr << "监听失败: " << strerror(errno) << std::endl;close(listen_sock);return 1;}std::cout << "服务器启动成功,监听在 0.0.0.0:9888" << std::endl;std::vector<std::thread> client_threads;std::atomic<bool> running(true);// 主循环接受客户端连接while (running) {struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_sock = accept(listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);if (client_sock == -1) {if (running) { // 只有在运行中时才视为错误std::cerr << "接受连接失败: " << strerror(errno) << std::endl;}continue;}// 获取客户端IP和端口std::string client_ip = inet_ntoa(client_addr.sin_addr);int client_port = ntohs(client_addr.sin_port);// 创建新线程处理客户端连接client_threads.emplace_back(handleClient, client_sock, client_ip, client_port);// 分离线程,避免主线程等待client_threads.back().detach();// 限制同时存在的线程数量(可选)if (client_threads.size() > 100) {client_threads.erase(client_threads.begin());}}// 关闭监听套接字close(listen_sock);std::cout << "服务器已停止" << std::endl;return 0;
}
multi_conn_threads_client.cpp
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <chrono>// 服务器地址和端口(需与服务器保持一致)
const std::string SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 9888;
const int NUM_CONNECTIONS = 5; // 同时建立的连接数量// 单个连接的处理函数
void handleConnection(int conn_id) {// 1. 创建客户端套接字int client_sock = socket(AF_INET, SOCK_STREAM, 0);if (client_sock == -1) {std::cerr << "连接 " << conn_id << " 创建套接字失败: " << strerror(errno) << std::endl;return;}// 2. 配置服务器地址struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(SERVER_PORT);// 将服务器IP从字符串转换为网络字节序if (inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr) <= 0) {std::cerr << "连接 " << conn_id << " IP地址转换失败" << std::endl;close(client_sock);return;}// 3. 连接服务器if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {std::cerr << "连接 " << conn_id << " 连接服务器失败: " << strerror(errno) << std::endl;close(client_sock);return;}std::cout << "连接 " << conn_id << " 成功连接到服务器 " << SERVER_IP << ":" << SERVER_PORT << std::endl;try {// 4. 数据交互(每个连接发送3条消息)for (int i = 0; i < 3; ++i) {// 发送消息(包含连接ID和序号,方便服务器区分)std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);ssize_t send_size = send(client_sock, msg.c_str(), msg.size(), 0);if (send_size == -1) {std::cerr << "连接 " << conn_id << " 发送消息失败: " << strerror(errno) << std::endl;break;}std::cout << "连接 " << conn_id << " 发送: " << msg << std::endl;// 接收服务器响应char buffer[1024] = {0};ssize_t recv_size = recv(client_sock, buffer, sizeof(buffer)-1, 0);if (recv_size <= 0) {if (recv_size == 0) {std::cout << "连接 " << conn_id << " 服务器已断开" << std::endl;} else {std::cerr << "连接 " << conn_id << " 接收响应失败: " << strerror(errno) << std::endl;}break;}std::cout << "连接 " << conn_id << " 收到响应: " << buffer << std::endl;// 模拟间隔,避免消息发送过快std::this_thread::sleep_for(std::chrono::milliseconds(500));}} catch (const std::exception& e) {std::cerr << "连接 " << conn_id << " 异常: " << e.what() << std::endl;}// 5. 关闭连接close(client_sock);std::cout << "连接 " << conn_id << " 已关闭" << std::endl;
}int main() {std::cout << "多连接TCP客户端启动,将创建 " << NUM_CONNECTIONS << " 个连接" << std::endl;// 存储所有连接的线程std::vector<std::thread> connection_threads;// 创建多个连接(每个连接一个线程)for (int i = 0; i < NUM_CONNECTIONS; ++i) {connection_threads.emplace_back(handleConnection, i+1); // 连接ID从1开始// 稍微延迟,避免同时建立连接导致服务器压力集中std::this_thread::sleep_for(std::chrono::milliseconds(100));}// 等待所有连接线程完成for (auto& t : connection_threads) {if (t.joinable()) {t.join();}}std::cout << "所有连接已处理完成,客户端退出" << std::endl;return 0;
}

一、多线程TCP服务器程序详解

服务器的核心目标是:同时接受并处理多个客户端的连接请求,每个客户端的通信逻辑在独立线程中执行,避免单线程阻塞导致其他客户端无法连接。

1. 核心功能概述

  • 启动服务器并监听指定端口(9888);
  • 主循环持续接受新的客户端连接;
  • 每接收到一个客户端连接,创建独立线程处理该客户端的消息收发;
  • 支持客户端正常断开或异常断开的处理;
  • 避免端口占用问题(通过SO_REUSEADDR选项)。

2. 关键代码解析

2.1 初始化与监听套接字
// 创建监听套接字(TCP类型)
int listen_sock = socket(AF_INET, SOCK_STREAM, 0);// 设置端口复用(解决服务器重启时端口被TIME_WAIT状态占用的问题)
int opt = 1;
setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));// 绑定IP和端口(与基础示例一致)
struct sockaddr_in server_addr;
// ... 填充地址信息(INADDR_ANY绑定所有IP,端口9888)
bind(listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr));// 开始监听(backlog=5,表示未完成连接队列最大长度)
listen(listen_sock, 5);
  • 端口复用(SO_REUSEADDR):服务器关闭后,端口可能处于TIME_WAIT状态(确保最后一个数据包被接收),此时直接重启服务器会绑定失败。setsockopt设置该选项后,允许端口在TIME_WAIT状态下被重新绑定,方便开发调试。
2.2 主循环:接受客户端连接并创建线程
std::vector<std::thread> client_threads;  // 存储客户端线程
std::atomic<bool> running(true);          // 原子变量:服务器运行标志(线程安全)while (running) {// 接受客户端连接(阻塞等待新连接)struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_sock = accept(listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);if (client_sock == -1) {if (running) std::cerr << "接受连接失败" << std::endl;continue;}// 获取客户端IP和端口(转换为可读性字符串)std::string client_ip = inet_ntoa(client_addr.sin_addr);  // 二进制IP→点分十进制int client_port = ntohs(client_addr.sin_port);            // 网络字节序→主机字节序// 创建线程处理该客户端,传入通信套接字、IP、端口client_threads.emplace_back(handleClient, client_sock, client_ip, client_port);// 分离线程:线程独立运行,结束后自动释放资源(无需主线程join)client_threads.back().detach();// 可选:限制线程容器大小,避免内存占用过高if (client_threads.size() > 100) {client_threads.erase(client_threads.begin());}
}
  • 线程分离(detach()):服务器不需要等待每个客户端线程结束(否则主线程会被阻塞),通过detach()让线程在后台独立运行,完成后自动回收资源。
  • 原子变量running:用于线程安全地控制服务器是否继续运行(后续可通过信号处理设置running=false实现优雅关闭)。
2.3 客户端处理函数(handleClient)

每个客户端连接的核心逻辑,在独立线程中执行:

void handleClient(int client_sock, const std::string& client_ip, int client_port) {std::cout << "新客户端连接: " << client_ip << ":" << client_port << std::endl;char buffer[1024];while (true) {// 接收客户端消息memset(buffer, 0, sizeof(buffer));int recv_size = read(client_sock, buffer, sizeof(buffer));if (recv_size <= 0) {// 客户端断开(recv_size=0)或出错(recv_size=-1)std::cout << "客户端 " << client_ip << ":" << client_port << " 断开连接" << std::endl;break;}// 打印消息并回复std::cout << "从 " << client_ip << ":" << client_port << " 接收: " << buffer;std::string response = "服务器已收到: " + std::string(buffer);write(client_sock, response.c_str(), response.size());}close(client_sock);  // 关闭通信套接字
}
  • 循环接收消息:线程进入循环,持续从客户端读取消息,直到客户端断开连接(recv_size=0)或出错。
  • 资源释放:客户端断开后,关闭通信套接字,线程自动结束。

3. 服务器核心技术点

3.1 多线程并发处理
  • 每个客户端连接对应一个独立线程,线程间通过client_sock区分,互不干扰,实现“同时处理多个客户端”。
  • 对比单线程服务器:单线程需处理完一个客户端才能接受下一个,多线程服务器在主线程接受连接后,立即将处理逻辑交给子线程,主线程继续接受新连接,显著提升并发能力。
3.2 线程管理与安全
  • 线程分离(detach()):避免主线程调用join()等待子线程,否则主线程会被阻塞,无法接受新连接。但detach()后主线程无法控制子线程,需确保子线程能正常结束。
  • 原子变量std::atomic<bool> running保证多线程环境下对running的读写操作是原子的(无数据竞争),后续可通过设置running=false优雅关闭服务器(需配合信号处理)。
3.3 潜在问题与优化方向
  • 线程数量无限制:频繁创建线程会消耗系统资源(内存、CPU),高并发下可能导致服务器性能下降。实际开发中应使用线程池(预先创建固定数量线程,循环处理连接)。
  • 输出线程不安全:多个线程同时调用std::cout可能导致输出错乱(cout不是线程安全的),需通过std::mutex加锁保护。
  • 未处理粘包:若客户端发送大数据或连续消息,服务器可能无法区分消息边界(需用“长度前缀法”优化)。

二、多连接TCP客户端程序详解

客户端的核心目标是:模拟多个客户端同时连接服务器,每个连接在独立线程中与服务器交互,验证服务器的多客户端处理能力。

1. 核心功能概述

  • 创建指定数量(NUM_CONNECTIONS=5)的客户端连接;
  • 每个连接在独立线程中执行:连接服务器→发送多条消息→接收响应→关闭连接;
  • 主线程等待所有连接线程完成后退出。

2. 关键代码解析

2.1 多连接线程创建
const int NUM_CONNECTIONS = 5;  // 同时创建5个连接
std::vector<std::thread> connection_threads;// 为每个连接创建线程
for (int i = 0; i < NUM_CONNECTIONS; ++i) {connection_threads.emplace_back(handleConnection, i+1);  // 传入连接ID(1~5)std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 错开连接时间
}// 等待所有线程完成
for (auto& t : connection_threads) {if (t.joinable()) t.join();
}
  • std::vector<std::thread>存储所有连接线程,主线程通过join()等待所有线程执行完毕,确保所有连接都完成交互。
2.2 单个连接处理函数(handleConnection)

每个线程执行的核心逻辑,模拟客户端与服务器的交互:

void handleConnection(int conn_id) {// 1. 创建客户端套接字(与基础客户端一致)int client_sock = socket(AF_INET, SOCK_STREAM, 0);if (client_sock == -1) { /* 错误处理 */ }// 2. 配置服务器地址并连接struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_port = htons(SERVER_PORT);inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr);  // IP转换(现代用法)if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {/* 错误处理 */}// 3. 数据交互:发送3条消息并接收响应for (int i = 0; i < 3; ++i) {std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);send(client_sock, msg.c_str(), msg.size(), 0);  // 发送消息char buffer[1024] = {0};recv(client_sock, buffer, sizeof(buffer)-1, 0);  // 接收响应std::cout << "连接 " << conn_id << " 收到响应: " << buffer << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(500));  // 模拟延迟}// 4. 关闭连接close(client_sock);
}

3. 客户端核心技术点

3.1 多连接模拟

通过创建多个线程,每个线程模拟一个客户端,验证服务器能否同时处理多个连接。线程间通过conn_id区分,方便观察输出。

3.2 连接控制
  • 连接创建时添加小延迟(sleep_for(100ms)),避免所有连接同时发起,减轻服务器瞬间压力(模拟真实场景中客户端陆续连接)。
  • 主线程通过join()等待所有连接线程完成,确保程序正常退出。

三、程序运行流程与预期输出

1. 运行步骤

  1. 编译服务器和客户端:
    g++ server.cpp -o server -lpthread  # 链接线程库
    g++ client.cpp -o client -lpthread
    
  2. 启动服务器:
    ./server  # 输出:服务器启动成功,监听在 0.0.0.0:9888
    
  3. 启动客户端:
    ./client  # 输出:多连接TCP客户端启动,将创建5个连接
    

2. 预期输出

服务器输出
服务器启动成功,监听在 0.0.0.0:9888
新客户端连接: 127.0.0.1:54321
从 127.0.0.1:54321 接收: 客户端连接 1 的消息 1
从 127.0.0.1:54321 接收: 客户端连接 1 的消息 2
新客户端连接: 127.0.0.1:54322
从 127.0.0.1:54322 接收: 客户端连接 2 的消息 1
...(其他客户端连接和消息)
客户端 127.0.0.1:54321 断开连接
...(所有客户端处理完毕后断开)
客户端输出
多连接TCP客户端启动,将创建 5 个连接
连接 1 成功连接到服务器 127.0.0.1:9888
连接 1 发送: 客户端连接 1 的消息 1
连接 1 收到响应: 服务器已收到: 客户端连接 1 的消息 1
连接 2 成功连接到服务器 127.0.0.1:9888
...(其他连接的发送和响应)
连接 1 已关闭
...(所有连接关闭后)
所有连接已处理完成,客户端退出

四、总结与扩展

这个示例通过多线程实现了TCP服务器的多客户端并发处理,核心亮点是:

  • 服务器用线程分离实现“接受连接”与“处理消息”的并发;
  • 客户端用多线程模拟真实多用户场景;
  • 包含实用技术(端口复用、原子变量、线程管理)。

扩展优化方向

  1. 线程池替代多线程:避免频繁创建销毁线程,用固定数量线程循环处理连接,提升性能。
  2. 添加粘包处理:服务器和客户端用“长度前缀法”解析消息边界。
  3. 优雅关闭机制:通过信号(如SIGINT)捕获Ctrl+C,设置running=false,关闭监听套接字并等待线程结束。
  4. 线程安全输出:用std::mutex保护std::cout,避免多线程输出错乱。

掌握这些内容后,可进一步学习IO复用(select/poll/epoll)等更高效的并发模型,应对高并发场景。

补充:优化后的多线程TCP客户端代码及完整讲解

针对原代码的不足(线程管理低效、输出错乱、无粘包处理、无优雅关闭等),以下是优化后的代码及详细讲解,重点优化了线程池管理、线程安全、粘包处理、优雅关闭四大核心问题。

一、优化后的服务器代码(server.cpp)

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <csignal>
#include <functional>
#include <errno.h>// 线程安全的输出工具
std::mutex g_cout_mutex;
#define LOG(msg) do { \std::lock_guard<std::mutex> lock(g_cout_mutex); \std::cout << msg << std::endl; \
} while(0)// 全局运行标志(原子变量确保线程安全)
std::atomic<bool> g_running(true);
// 监听套接字(全局以便信号处理函数关闭)
int g_listen_sock = -1;// 线程池类:预先创建固定数量线程,循环处理任务
class ThreadPool {
private:std::vector<std::thread> workers;       // 工作线程std::queue<std::function<void()>> tasks;// 任务队列std::mutex queue_mutex;                 // 保护任务队列的互斥锁std::condition_variable condition;      // 唤醒线程的条件变量bool stop;                              // 线程池停止标志public:// 构造函数:创建n个工作线程ThreadPool(size_t n) : stop(false) {for (size_t i = 0; i < n; ++i) {workers.emplace_back([this] {while (!stop) {std::function<void()> task;// 从队列取任务(加锁){std::unique_lock<std::mutex> lock(this->queue_mutex);// 等待任务或停止信号this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty()) return;task = std::move(this->tasks.front());this->tasks.pop();}// 执行任务task();}});}}// 析构函数:停止所有线程~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();  // 唤醒所有等待的线程for (std::thread& worker : workers) {if (worker.joinable()) {worker.join();  // 等待线程结束}}}// 添加任务到队列void enqueue(std::function<void()> task) {{std::unique_lock<std::mutex> lock(queue_mutex);if (stop) {throw std::runtime_error("enqueue on stopped ThreadPool");}tasks.emplace(std::move(task));}condition.notify_one();  // 唤醒一个等待的线程}
};// 粘包处理:发送数据(先发送4字节长度,再发送数据)
bool sendData(int sock, const std::string& data) {// 计算数据长度(转换为网络字节序)uint32_t data_len = htonl(data.size());// 先发送长度if (send(sock, &data_len, sizeof(data_len), 0) != sizeof(data_len)) {LOG("发送长度失败: " << strerror(errno));return false;}// 再发送数据if (send(sock, data.c_str(), data.size(), 0) != (ssize_t)data.size()) {LOG("发送数据失败: " << strerror(errno));return false;}return true;
}// 粘包处理:接收数据(先接收4字节长度,再接收对应长度的数据)
std::string recvData(int sock) {// 先接收长度uint32_t data_len;ssize_t recv_len = recv(sock, &data_len, sizeof(data_len), 0);if (recv_len <= 0) {if (recv_len < 0) LOG("接收长度失败: " << strerror(errno));return "";  // 连接断开或错误}data_len = ntohl(data_len);  // 转换为主机字节序// 接收数据std::string data;data.resize(data_len);size_t total_recv = 0;while (total_recv < data_len) {recv_len = recv(sock, &data[total_recv], data_len - total_recv, 0);if (recv_len <= 0) {if (recv_len < 0) LOG("接收数据失败: " << strerror(errno));return "";  // 连接断开或错误}total_recv += recv_len;}return data;
}// 处理客户端连接的任务函数
void handleClient(int client_sock, const std::string& client_ip, int client_port) {LOG("新客户端连接: " << client_ip << ":" << client_port);while (g_running) {// 接收客户端消息(带粘包处理)std::string recv_msg = recvData(client_sock);if (recv_msg.empty()) {LOG("客户端 " << client_ip << ":" << client_port << " 断开连接");break;}LOG("从 " << client_ip << ":" << client_port << " 接收: " << recv_msg);// 发送响应(带粘包处理)std::string response = "服务器已收到: " + recv_msg;if (!sendData(client_sock, response)) {break;}}// 关闭客户端套接字close(client_sock);LOG("客户端 " << client_ip << ":" << client_port << " 连接已关闭");
}// 信号处理函数:捕获Ctrl+C,设置优雅关闭标志
void signalHandler(int signum) {if (signum == SIGINT) {LOG("\n收到停止信号,正在优雅关闭服务器...");g_running = false;// 关闭监听套接字,唤醒accept阻塞if (g_listen_sock != -1) {close(g_listen_sock);}}
}int main() {// 注册信号处理函数(捕获Ctrl+C)signal(SIGINT, signalHandler);// 创建监听套接字g_listen_sock = socket(AF_INET, SOCK_STREAM, 0);if (g_listen_sock == -1) {LOG("创建套接字失败: " << strerror(errno));return 1;}// 设置端口复用int opt = 1;if (setsockopt(g_listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {LOG("设置端口复用失败: " << strerror(errno));close(g_listen_sock);return 1;}// 绑定IP和端口struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(9888);if (bind(g_listen_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {LOG("绑定失败: " << strerror(errno));close(g_listen_sock);return 1;}// 开始监听if (listen(g_listen_sock, 5) == -1) {LOG("监听失败: " << strerror(errno));close(g_listen_sock);return 1;}LOG("服务器启动成功,监听在 0.0.0.0:9888(线程池大小: 4)");// 创建线程池(4个工作线程)ThreadPool thread_pool(4);// 主循环接受客户端连接while (g_running) {struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);// accept可能被信号中断,需要处理EINTR错误int client_sock = accept(g_listen_sock, (struct sockaddr*)&client_addr, &client_addr_len);if (client_sock == -1) {if (g_running && errno != EINTR) {  // EINTR是正常中断(信号导致)LOG("接受连接失败: " << strerror(errno));}continue;}// 获取客户端IP和端口std::string client_ip = inet_ntoa(client_addr.sin_addr);int client_port = ntohs(client_addr.sin_port);// 将客户端处理任务添加到线程池thread_pool.enqueue([client_sock, client_ip, client_port]() {handleClient(client_sock, client_ip, client_port);});}// 关闭监听套接字close(g_listen_sock);LOG("服务器已停止监听");// 线程池析构时会等待所有任务完成LOG("服务器已优雅关闭");return 0;
}

二、优化后的客户端代码(client.cpp)

#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <thread>
#include <vector>
#include <string>
#include <chrono>
#include <mutex>
#include <cstdint>// 线程安全的输出工具(避免多线程输出错乱)
std::mutex g_cout_mutex;
#define LOG(msg) do { \std::lock_guard<std::mutex> lock(g_cout_mutex); \std::cout << msg << std::endl; \
} while(0)// 服务器配置
const std::string SERVER_IP = "127.0.0.1";
const int SERVER_PORT = 9888;
const int NUM_CONNECTIONS = 5;  // 同时建立的连接数量// 粘包处理:发送数据(与服务器一致,先发送4字节长度,再发送数据)
bool sendData(int sock, const std::string& data) {uint32_t data_len = htonl(data.size());  // 长度转为网络字节序// 先发送长度if (send(sock, &data_len, sizeof(data_len), 0) != sizeof(data_len)) {LOG("连接发送长度失败: " << strerror(errno));return false;}// 再发送数据if (send(sock, data.c_str(), data.size(), 0) != (ssize_t)data.size()) {LOG("连接发送数据失败: " << strerror(errno));return false;}return true;
}// 粘包处理:接收数据(与服务器一致,先接收长度,再接收数据)
std::string recvData(int sock) {uint32_t data_len;ssize_t recv_len = recv(sock, &data_len, sizeof(data_len), 0);if (recv_len <= 0) {if (recv_len < 0) LOG("连接接收长度失败: " << strerror(errno));return "";  // 连接断开或错误}data_len = ntohl(data_len);  // 转为主机字节序std::string data;data.resize(data_len);size_t total_recv = 0;// 循环接收完整数据(处理数据分片)while (total_recv < data_len) {recv_len = recv(sock, &data[total_recv], data_len - total_recv, 0);if (recv_len <= 0) {if (recv_len < 0) LOG("连接接收数据失败: " << strerror(errno));return "";}total_recv += recv_len;}return data;
}// 单个连接的处理函数
void handleConnection(int conn_id) {// 创建客户端套接字int client_sock = socket(AF_INET, SOCK_STREAM, 0);if (client_sock == -1) {LOG("连接 " << conn_id << " 创建套接字失败: " << strerror(errno));return;}// 配置服务器地址struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(SERVER_PORT);// 转换服务器IP为网络字节序(现代用法,支持IPv6扩展)if (inet_pton(AF_INET, SERVER_IP.c_str(), &server_addr.sin_addr) <= 0) {LOG("连接 " << conn_id << " IP地址转换失败");close(client_sock);return;}// 连接服务器if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {LOG("连接 " << conn_id << " 连接服务器失败: " << strerror(errno));close(client_sock);return;}LOG("连接 " << conn_id << " 成功连接到服务器 " << SERVER_IP << ":" << SERVER_PORT);try {// 数据交互:每个连接发送3条消息for (int i = 0; i < 3; ++i) {std::string msg = "客户端连接 " + std::to_string(conn_id) + " 的消息 " + std::to_string(i+1);if (!sendData(client_sock, msg)) {break;}LOG("连接 " << conn_id << " 发送: " << msg);// 接收服务器响应std::string response = recvData(client_sock);if (response.empty()) {LOG("连接 " << conn_id << " 接收响应失败或服务器断开");break;}LOG("连接 " << conn_id << " 收到响应: " << response);// 模拟业务处理延迟std::this_thread::sleep_for(std::chrono::milliseconds(500));}} catch (const std::exception& e) {LOG("连接 " << conn_id << " 异常: " << e.what());}// 关闭连接close(client_sock);LOG("连接 " << conn_id << " 已关闭");
}int main() {LOG("多连接TCP客户端启动,将创建 " << NUM_CONNECTIONS << " 个连接");std::vector<std::thread> connection_threads;// 创建多个连接(每个连接一个线程)for (int i = 0; i < NUM_CONNECTIONS; ++i) {connection_threads.emplace_back(handleConnection, i + 1);  // 连接ID从1开始// 错开连接创建时间,避免服务器瞬间压力过大std::this_thread::sleep_for(std::chrono::milliseconds(100));}// 等待所有连接线程完成for (auto& t : connection_threads) {if (t.joinable()) {t.join();}}LOG("所有连接已处理完成,客户端退出");return 0;
}

三、核心优化点详解(服务器+客户端)

1. 线程管理优化:从“动态创建线程”到“线程池”

原代码问题:

原服务器每接一个连接就动态创建线程,线程创建/销毁开销大,高并发下系统资源易耗尽(线程栈内存、CPU调度压力)。

优化方案:

服务器引入线程池ThreadPool类),预先创建固定数量线程(示例中4个),通过任务队列循环处理客户端连接,避免频繁创建线程:

// 线程池核心逻辑
class ThreadPool {
private:std::vector<std::thread> workers;  // 固定数量的工作线程std::queue<std::function<void()>> tasks;  // 任务队列std::mutex queue_mutex;  // 保护任务队列的互斥锁std::condition_variable condition;  // 唤醒线程的条件变量bool stop;  // 停止标志
public:ThreadPool(size_t n) : stop(false) {// 预先创建n个工作线程,循环取任务执行for (size_t i = 0; i < n; ++i) {workers.emplace_back([this] {while (!stop) {std::function<void()> task;// 加锁取任务{std::unique_lock<std::mutex> lock(queue_mutex);condition.wait(lock, [this] { return stop || !tasks.empty(); });if (stop && tasks.empty()) return;task = std::move(tasks.front());tasks.pop();}task();  // 执行任务(客户端处理逻辑)}});}}// 添加任务到队列void enqueue(std::function<void()> task) { /* ... */ }
};
优势:
  • 线程复用:工作线程长期存在,循环处理任务,减少线程创建/销毁开销;
  • 资源控制:通过线程池大小(如4个)限制并发线程数,避免资源耗尽。

2. 线程安全优化:解决多线程输出错乱

原代码问题:

多个线程同时调用std::cout,导致输出内容错乱(cout非线程安全,多个线程输出可能交叉)。

优化方案:

使用std::mutex为输出加锁,定义线程安全的LOG宏:

std::mutex g_cout_mutex;  // 全局输出互斥锁
#define LOG(msg) do { \std::lock_guard<std::mutex> lock(g_cout_mutex);  // 自动加锁/解锁 \std::cout << msg << std::endl; \
} while(0)
效果:

所有线程的输出操作通过互斥锁串行执行,确保日志清晰可读,无交叉错乱。

3. 粘包处理:解决TCP数据边界问题

原代码问题:

TCP是流式传输,若客户端连续发送小消息,服务器可能将多个消息合并为一个“大数据块”,无法区分边界(粘包)。

优化方案:

采用“长度前缀法”明确消息边界,发送数据时先传4字节长度(网络字节序),再传实际数据:

// 服务器/客户端发送数据(统一逻辑)
bool sendData(int sock, const std::string& data) {uint32_t data_len = htonl(data.size());  // 长度转为网络字节序send(sock, &data_len, sizeof(data_len), 0);  // 先发送长度send(sock, data.c_str(), data.size(), 0);    // 再发送数据return true;
}// 服务器/客户端接收数据(统一逻辑)
std::string recvData(int sock) {uint32_t data_len;recv(sock, &data_len, sizeof(data_len), 0);  // 先接收长度data_len = ntohl(data_len);  // 转为主机字节序std::string data(data_len, '\0');recv(sock, &data[0], data_len, 0);  // 接收对应长度的数据return data;
}
效果:

无论消息大小或发送频率如何,接收方都能通过“先读长度,再读数据”准确解析每个消息,彻底解决粘包问题。

4. 优雅关闭:确保资源正确释放

原代码问题:

原服务器若通过Ctrl+C强制终止,可能导致线程未完成、套接字未关闭,资源泄漏。

优化方案:
  1. 信号处理:捕获SIGINT信号(Ctrl+C),设置g_running=false标志;
  2. 唤醒阻塞:关闭监听套接字,唤醒accept阻塞;
  3. 线程池等待:线程池析构时等待所有工作线程完成当前任务:
// 信号处理函数
void signalHandler(int signum) {if (signum == SIGINT) {LOG("\n收到停止信号,正在优雅关闭服务器...");g_running = false;  // 设置全局停止标志close(g_listen_sock);  // 唤醒accept阻塞}
}// main函数中注册信号处理
signal(SIGINT, signalHandler);// 线程池析构时自动等待所有线程完成
~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;  // 通知线程停止}condition.notify_all();  // 唤醒所有工作线程for (auto& worker : workers) worker.join();  // 等待线程结束
}
效果:

服务器收到停止信号后,会完成当前客户端的消息处理,关闭所有套接字,释放线程资源,避免泄漏。

5. 错误处理与鲁棒性优化

原代码问题:

对系统调用错误(如accept被信号中断、send/recv失败)处理简单,可能导致程序异常退出。

优化方案:
  1. 处理EINTR错误accept等系统调用可能被信号中断(返回-1errno=EINTR),这是正常情况,应忽略而非报错;
  2. 完整的发送/接收循环recv可能只接收部分数据(尤其大数据),通过循环确保接收完整;
  3. 资源释放兜底:所有套接字在close前检查有效性,线程任务结束后确保关闭通信套接字。

三、运行效果与总结

运行步骤:

  1. 编译(需链接线程库):
    g++ server.cpp -o server -lpthread
    g++ client.cpp -o client -lpthread
    
  2. 启动服务器,再启动客户端,可观察到:
    • 服务器日志清晰,无输出错乱;
    • 多个客户端连接同时被处理,无阻塞;
    • 消息收发准确,无粘包;
    • Ctrl+C停止服务器时,会优雅关闭,无资源泄漏。

核心优化价值:

  • 性能:线程池减少线程开销,支持更高并发;
  • 可靠性:粘包处理确保数据正确解析,错误处理避免异常退出;
  • 可维护性:模块化设计(线程池、粘包工具),逻辑清晰;
  • 安全性:线程安全输出、优雅关闭机制确保资源正确管理。

这些优化使代码从“演示级”提升为“生产可用级”,可作为多线程TCP服务器的基础框架扩展。

http://www.dtcms.com/a/285158.html

相关文章:

  • 鸿蒙状态栏操作
  • 能碳管理平台:企业碳减排解决方案绿色工厂达标工具
  • Trae IDE:打造完美Java开发环境的实战指南
  • 基于深度学习的电信号分类识别与混淆矩阵分析
  • AI 总结工作报告
  • 【人工智能agent】--dify版本更新(通用)
  • 错误经验一:计算两个整数a和b的和
  • Paimon 动态分桶
  • 如何优雅处理 Flowable 工作流的 TaskAlreadyClaimedException?
  • SpringBoot02-application配置文件
  • 行业研究 | 2025金融可观测性实践与趋势洞察报告重磅发布!
  • 数据结构自学Day9: 二叉树的遍历
  • 克鲁斯焊接机器人保护气省气方案
  • JS - - - - - 数组乱序排序「进阶版」
  • c++:类型转换函数
  • mongodb-org-mongos : Depends: libssl1.1 (>= 1.1.1) but it is not installable
  • 基于LSTM的机场天气分析及模型预测
  • .NET Core EFCore零基础快速入门简单使用
  • 微信远程控制系统2.0
  • 二叉树的总结
  • mysql 字符集不一致导致索引失效问题
  • 为何“白名单媒体”是性价比之选?
  • 2025年视频超高清技术应用全景介绍
  • CSS Grid布局:构建现代网页的强大网格系统
  • Doris 物化视图:原理、使用及常见问题处理
  • Python类型转换,深浅拷贝
  • python的旧时光咖啡厅数据分析管理系统
  • 深入解析Linux进程创建与fork机制
  • Dify:在MacOS系统下Dify的本地部署与使用
  • Android Jetpack 系列(四)DataStore 全面解析与实践