使用socket实现TCP服务端
功能实现说明:
1.TCPServer跟随程序启动一起启动,一直检测,客户端是否有新的链接请求
2.127.0.0.1和非127.0.0.1同时请求连接,非127.0.0.1优先
3.每次仅且只有一个客户端跟服务端连接
4.客户端发送ping心跳,服务端恢复pong
#pragma once
#include "namespace.h"
#include "lcdcompanion_global.h"
#include <string>
#include <memory>
#include <functional>
#include <thread>
#include <atomic>#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
typedef SOCKET socket_t;
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
typedef int socket_t;
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#endifLCDCOMPANION_NAMESPACE_BEGINclass LCDCOMPANION_EXPORT TcpServer
{
public:explicit TcpServer();~TcpServer();/*** @brief 启动TCP服务器* @param port 监听端口* @return 是否启动成功*/bool start(uint16_t port = 17100);/*** @brief 停止TCP服务器*/void stop();/*** @brief 设置接收到数据时的回调函数* @param callback 回调函数,参数为接收到的数据*/void setDataReceivedCallback(std::function<void(const std::string&)> callback);/*** @brief 向当前连接的客户端发送数据* @param data 要发送的数据* @return 是否发送成功*/bool sendData(const std::string& data);private:void acceptThread();void receiveThread(); // 单一的数据接收线程void closeSocket(socket_t socket);void shutdownSocket(socket_t socket);bool isLocalhost(const std::string& ip);private:class Impl;std::unique_ptr<Impl> _impl;
};LCDCOMPANION_NAMESPACE_END// ALL_CODE_LINE: 502
// AI_CODE_LINE: 415
#include "tcpserver.h"
#include "namespace.h"
#include <iostream>
#include <cstring>
#include <mutex>
#include <thread>
#include <chrono>
#ifdef _WIN32
#include <winsock2.h>
#else
#include <errno.h>
#endif#ifdef _WIN32
#pragma comment(lib, "ws2_32.lib")
#endifconst int msgSize = 65535;
LCDCOMPANION_NAMESPACE_BEGINclass TcpServer::Impl
{
public:Impl(): _listenSocket(INVALID_SOCKET), _clientSocket(INVALID_SOCKET), _isRunning(false), _port(0){
#ifdef _WIN32WSADATA wsaData_;WSAStartup(MAKEWORD(2, 2), &wsaData_);
#endif}~Impl(){// 清理资源_isRunning = false;// 关闭监听socketif (_listenSocket != INVALID_SOCKET){
#ifdef _WIN32closesocket(_listenSocket);
#elseclose(_listenSocket);
#endif_listenSocket = INVALID_SOCKET;}// 关闭客户端socket{std::lock_guard<std::mutex> lock_(_clientMutex);if (_clientSocket != INVALID_SOCKET){
#ifdef _WIN32closesocket(_clientSocket);
#elseclose(_clientSocket);
#endif_clientSocket = INVALID_SOCKET;}}// 等待线程结束if (_acceptThread.joinable()){_acceptThread.join();}if (_receiveThread.joinable()){_receiveThread.join();}#ifdef _WIN32WSACleanup();
#endif}public:socket_t _listenSocket;socket_t _clientSocket;std::atomic<bool> _isRunning;uint16_t _port = 17100;std::string _clientIp;std::thread _acceptThread;std::thread _receiveThread; // 单一的数据接收线程std::mutex _clientMutex;std::function<void(const std::string&)> _dataReceivedCallback;
};////////////////////////////////////////////////////////////////////////////////TcpServer::TcpServer(): _impl(std::make_unique<Impl>())
{
}TcpServer::~TcpServer()
{stop();
}bool TcpServer::start(uint16_t port)
{if (_impl->_isRunning.load()){return false;}_impl->_port = port;// 创建监听socket_impl->_listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (_impl->_listenSocket == INVALID_SOCKET){return false;}// 设置socket选项,允许地址重用
#ifdef _WIN32int opt_ = 1;setsockopt(_impl->_listenSocket, SOL_SOCKET, SO_REUSEADDR, (char*)&opt_, sizeof(opt_));
#elseint opt_ = 1;setsockopt(_impl->_listenSocket, SOL_SOCKET, SO_REUSEADDR, &opt_, sizeof(opt_));
#endif// 绑定地址和端口sockaddr_in serverAddr_{};serverAddr_.sin_family = AF_INET;serverAddr_.sin_addr.s_addr = INADDR_ANY;serverAddr_.sin_port = htons(port);if (bind(_impl->_listenSocket, (sockaddr*)&serverAddr_, sizeof(serverAddr_)) == SOCKET_ERROR){closeSocket(_impl->_listenSocket);_impl->_listenSocket = INVALID_SOCKET;return false;}// 开始监听if (listen(_impl->_listenSocket, 5) == SOCKET_ERROR){closeSocket(_impl->_listenSocket);_impl->_listenSocket = INVALID_SOCKET;return false;}_impl->_isRunning = true;// 启动接受连接线程_impl->_acceptThread = std::thread(&TcpServer::acceptThread, this);// 启动数据接收线程(单一线程,一直运行)_impl->_receiveThread = std::thread(&TcpServer::receiveThread, this);return true;
}void TcpServer::stop()
{if (!_impl->_isRunning.load()){return;}_impl->_isRunning = false;// 关闭监听socketif (_impl->_listenSocket != INVALID_SOCKET){closeSocket(_impl->_listenSocket);_impl->_listenSocket = INVALID_SOCKET;}// 关闭客户端socket{std::lock_guard<std::mutex> lock_(_impl->_clientMutex);if (_impl->_clientSocket != INVALID_SOCKET){closeSocket(_impl->_clientSocket);_impl->_clientSocket = INVALID_SOCKET;}}// 等待线程结束if (_impl->_acceptThread.joinable()){_impl->_acceptThread.join();}if (_impl->_receiveThread.joinable()){_impl->_receiveThread.join();}
}void TcpServer::acceptThread()
{// 一直循环检测是否有客户端请求连接,只有程序退出时才退出while (_impl->_isRunning.load()){// 检查监听socket是否有效,如果无效则延迟后继续尝试if (_impl->_listenSocket == INVALID_SOCKET){std::this_thread::sleep_for(std::chrono::milliseconds(100));continue;}sockaddr_in clientAddr_{};
#ifdef _WIN32int clientAddrLen_ = sizeof(clientAddr_);
#elsesocklen_t clientAddrLen_ = sizeof(clientAddr_);
#endif// 阻塞等待客户端连接请求socket_t newClientSocket_ = accept(_impl->_listenSocket, (sockaddr*)&clientAddr_, &clientAddrLen_);if (newClientSocket_ == INVALID_SOCKET){// 只有程序退出时才退出循环if (!_impl->_isRunning.load()){break;}// 对于所有错误,都继续循环等待,只添加延迟避免CPU空转
#ifdef _WIN32int error_ = WSAGetLastError();// WSAEINTR 和 WSAEWOULDBLOCK 是正常情况,不需要延迟if (error_ != WSAEINTR && error_ != WSAEWOULDBLOCK){std::this_thread::sleep_for(std::chrono::milliseconds(10));}
#elseint error_ = errno;// EINTR 是正常情况(被信号中断),不需要延迟if (error_ != EINTR){std::this_thread::sleep_for(std::chrono::milliseconds(10));}
#endifcontinue;}// 获取客户端IP地址std::string clientIp_ = inet_ntoa(clientAddr_.sin_addr);bool newIsLocalhost_ = isLocalhost(clientIp_);// 连接管理逻辑(需要加锁保护){std::lock_guard<std::mutex> lock_(_impl->_clientMutex);bool shouldAccept_ = false;// 情况1:当前没有连接,直接接受新连接if (_impl->_clientSocket == INVALID_SOCKET){shouldAccept_ = true;}// 情况2:当前连接的是127.0.0.1客户端else if (isLocalhost(_impl->_clientIp)){if (newIsLocalhost_){// 新请求也是127.0.0.1,不接受连接,继续检测shouldAccept_ = false;}else{// 新请求是非127.0.0.1客户端,断开当前127.0.0.1连接,接受新连接shouldAccept_ = true;// 断开旧连接的步骤:// 1. 保存旧的socketsocket_t oldSocket_ = _impl->_clientSocket;// 2. shutdown socket,让recv立即返回shutdownSocket(oldSocket_);// 3. 立即清空_clientSocket,让receiveThread检查时能切换到新socket_impl->_clientSocket = INVALID_SOCKET;_impl->_clientIp.clear();// 4. 关闭socketcloseSocket(oldSocket_);}}// 情况3:当前连接的是非127.0.0.1客户端else{if (newIsLocalhost_){// 新请求是127.0.0.1客户端,不接受连接,继续检测shouldAccept_ = false;}else{// 新请求是非127.0.0.1客户端if (_impl->_clientIp == clientIp_){// 如果是同一个客户端,不接受连接shouldAccept_ = false;}else{// 不同的非127.0.0.1客户端,断开当前连接,接受新连接shouldAccept_ = true;// 断开旧连接的步骤:// 1. 保存旧的socketsocket_t oldSocket_ = _impl->_clientSocket;// 2. shutdown socket,让recv立即返回shutdownSocket(oldSocket_);// 3. 立即清空_clientSocket,让receiveThread检查时能切换到新socket_impl->_clientSocket = INVALID_SOCKET;_impl->_clientIp.clear();// 4. 关闭socketcloseSocket(oldSocket_);}}}// 如果需要接受连接if (shouldAccept_){// 设置新的客户端连接信息(receiveThread会自动从新socket接收数据)_impl->_clientSocket = newClientSocket_;_impl->_clientIp = clientIp_;}else{// 拒绝连接,关闭socketcloseSocket(newClientSocket_);}}// 继续循环检测是否有新的客户端请求连接}
}void TcpServer::receiveThread()
{// 单一的数据接收线程,一直运行,从当前的_clientSocket接收数据char buffer_[msgSize];const std::string pingMsg_ = "ping";const std::string pongMsg_ = "pong\n";while (_impl->_isRunning.load()){socket_t currentSocket_ = INVALID_SOCKET;std::string currentIp_;// 获取当前连接的socket(需要加锁){std::lock_guard<std::mutex> lock_(_impl->_clientMutex);currentSocket_ = _impl->_clientSocket;currentIp_ = _impl->_clientIp;}// 如果没有客户端连接,等待一段时间后继续检查if (currentSocket_ == INVALID_SOCKET){std::this_thread::sleep_for(std::chrono::milliseconds(10));continue;}// 接收数据(如果socket被shutdown,recv会立即返回0或错误)int bytesReceived_ = recv(currentSocket_, buffer_, sizeof(buffer_) - 1, 0);// 检查socket是否仍然有效(可能被acceptThread切换了){std::lock_guard<std::mutex> lock_(_impl->_clientMutex);if (_impl->_clientSocket != currentSocket_){// socket已被切换,继续循环从新socket接收continue;}}if (bytesReceived_ <= 0){// 连接断开或socket被关闭{std::lock_guard<std::mutex> lock_(_impl->_clientMutex);if (_impl->_clientSocket == currentSocket_){// 确实是当前socket断开,清理状态_impl->_clientSocket = INVALID_SOCKET;_impl->_clientIp.clear();// 关闭socketcloseSocket(currentSocket_);}}continue;}buffer_[bytesReceived_] = '\0';std::string data_ = std::string(buffer_, bytesReceived_);// 处理心跳机制:收到客户端发送来的ping消息,立马回复pong// 去除前后空白字符后比较std::string trimmedData_ = data_;while (!trimmedData_.empty() && (trimmedData_.back() == '\n' || trimmedData_.back() == '\r' || trimmedData_.back() == ' ' || trimmedData_.back() == '\t')){trimmedData_.pop_back();}while (!trimmedData_.empty() && (trimmedData_.front() == ' ' || trimmedData_.front() == '\t')){trimmedData_.erase(0, 1);}if (trimmedData_ == pingMsg_){// 发送pong前再次检查socket是否仍然有效{std::lock_guard<std::mutex> lock_(_impl->_clientMutex);if (_impl->_clientSocket != currentSocket_){// socket已被切换,继续循环continue;}}// 发送pong回复,检查返回值int bytesSent_ = send(currentSocket_, pongMsg_.c_str(), (int)pongMsg_.length(), 0);if (bytesSent_ <= 0){// 发送失败,连接可能已断开,清理状态{std::lock_guard<std::mutex> lock_(_impl->_clientMutex);if (_impl->_clientSocket == currentSocket_){_impl->_clientSocket = INVALID_SOCKET;_impl->_clientIp.clear();closeSocket(currentSocket_);}}}continue;}// 调用数据接收回调if (_impl->_dataReceivedCallback){_impl->_dataReceivedCallback(data_);}}
}void TcpServer::closeSocket(socket_t socket)
{if (socket != INVALID_SOCKET){
#ifdef _WIN32closesocket(socket);
#elseclose(socket);
#endif}
}void TcpServer::shutdownSocket(socket_t socket)
{if (socket != INVALID_SOCKET){
#ifdef _WIN32shutdown(socket, SD_BOTH);
#elseshutdown(socket, SHUT_RDWR);
#endif}
}bool TcpServer::isLocalhost(const std::string& ip)
{return ip == "127.0.0.1" || ip == "::1" || ip == "localhost";
}void TcpServer::setDataReceivedCallback(std::function<void(const std::string&)> callback)
{_impl->_dataReceivedCallback = callback;
}bool TcpServer::sendData(const std::string& data)
{std::lock_guard<std::mutex> lock_(_impl->_clientMutex);if (_impl->_clientSocket != INVALID_SOCKET){int bytesSent_ = send(_impl->_clientSocket, data.c_str(), (int)data.length(), 0);return bytesSent_ > 0;}return false;
}LCDCOMPANION_NAMESPACE_END
