Linux网络编程——TCP网络通信多线程处理
一、前言
上篇文章中我们实现了简单的TCP网络通信,有单进程版本的,多进程版本的。
但是它们都存在着许多的不足之处。如单进程版本当给多个客户端提供服务时会出现问题,从而实现了多进程的两个版本,分别利用忽略子进程的退出信号使得子进程被自动回收和创建孙子进程的两种方法实现。
但是对于多进程来说,这样的资源消耗会很大,所以接下来我们要考虑的是多线程版本。
二、多线程服务器
我们首先需要考虑的是在启动服务器,成功接收到客户端的消息之后,创建一个线程,让该线程代替当前进程去执行一个包含 to_upper() 函数的回调函数 threadRount()。回调函数(一个函数A给另一个函数B作为参数)。
但是此时如果threadRount()函数作为类中的普通成员函数含有一个隐含的 this指针,指向调用该成员函数的对象实例。
如果在 phread_creat() 中调用该回调函数时,我们第四个参数是回调函数的参数,但是里面含有隐含的this指针,所以没法传参。所以将回调函数变为静态成员函数。
而静态成员函数由于不属于类的任何实例,因此它们不能直接访问类的非静态成员变量或方法。这就导致了如果想要在静态成员函数内部访问类的实例数据,需要一种方式来传递该实例的指针。
所以我们先创建了一个结构体,包含了需要传给回调函数的参数,而且由于不能直接访问类的非静态成员变量或方法,所以还包含了一个指向 tcpServer 对象指针。如下
struct threadData{
threadData(uint16_t clientPort,string clientIP,int sock,tcpServer* ts)
:_clientPort(clientPort)
,_clientIP(clientIP)
,_sock(sock)
,_this(ts){}
uint16_t _clientPort;
string _clientIP;
int _sock;
tcpServer* _this;
};
回调函数:
static void* threadRount(void* args)
{
pthread_detach(pthread_self());//线程分离
threadData* tD=static_cast<threadData*>(args);
tD->_this->to_upper(tD->_sock,tD->_clientIP,tD->_clientPort);
delete tD;//线程在执行完任务结束后,需要delete掉tD
//不能在主线程delete,因为线程还没有使用
return nullptr;
}
TCP服务器启动
threadData* tD=new threadData(peerPort,peerIp,serviceSockFd,this);
pthread_t tid;
pthread_create(&tid, nullptr, threadRount,(void*)tD);
完整代码
#include"tcp.hpp"
using std::string;
class tcpServer;
struct threadData{
threadData(uint16_t clientPort,string clientIP,int sock,tcpServer* ts)
:_clientPort(clientPort)
,_clientIP(clientIP)
,_sock(sock)
,_this(ts){}
uint16_t _clientPort;
string _clientIP;
int _sock;
tcpServer* _this;
};
class tcpServer{
public:
tcpServer(uint16_t port,const string& ip="")
:_port(port)
,_ip(ip)
,_LiSockFd(-1){}
static void* threadRount(void* args)
{
pthread_detach(pthread_self());
threadData* tD=static_cast<threadData*>(args);
tD->_this->to_upper(tD->_sock,tD->_clientIP,tD->_clientPort);
delete tD;
return nullptr;
}
void Init()
{
_LiSockFd=socket(AF_INET,SOCK_STREAM,0);
if(_LiSockFd<0)
{
logMessage(FATAL,"socket() failed::%s : %d",strerror(errno),_LiSockFd);
exit(SOCKET_ERR);
}
logMessage(DEBUG,"socket() success::%d",_LiSockFd);
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(_port);
_ip.empty()?(local.sin_addr.s_addr=htonl(INADDR_ANY)):(inet_aton(_ip.c_str(),&local.sin_addr));
if(bind(_LiSockFd,(const struct sockaddr*)&local,sizeof(local))==-1)
{
logMessage(FATAL,"socket() failed::%s : %d",strerror(errno),_LiSockFd);
exit(BIND_ERR);
}
logMessage(DEBUG,"bind() success::%d",_LiSockFd);
if(listen(_LiSockFd,5)==-1)
{
logMessage(FATAL,"listen() failed::%s : %d",strerror(errno),_LiSockFd);
exit(LISTEN_ERR);
}
logMessage(DEBUG,"listen() success::%d",_LiSockFd);
}
void to_upper(int sock,const string& clientIP,const uint16_t& clientPort)
{
assert(sock>0);
assert(!clientIP.empty());
char inbuffer[BUFFER_SIZE];
while(true)
{
ssize_t s=read(sock,inbuffer,sizeof(inbuffer)-1);
if(s>0)
{
inbuffer[s]='\0';
if(strcasecmp(inbuffer,"quit")==0)
{
logMessage(DEBUG,"Client requests to quit: [%s: %d]",clientIP.c_str(),clientPort);
break;
}
logMessage(DEBUG,"to_upper before:[%s:%d]>> %s",clientIP.c_str(),clientPort,inbuffer);
for(int i=0;i<s;i++)
{
if(isalpha(inbuffer[i])&&islower(inbuffer[i]))
inbuffer[i]=toupper(inbuffer[i]);
}
logMessage(DEBUG,"to_upper after:[%s:%d]>>%s",clientIP.c_str(),clientPort,inbuffer);
write(sock,inbuffer,strlen(inbuffer));
}
else if(s==0)
{
logMessage(DEBUG,"Client has quited:[%s:%d]",clientIP.c_str(),clientPort);
break;
}
else{
logMessage(DEBUG,"Client [%s:%d] read::%s",clientIP.c_str(),clientPort,strerror(errno));
break;
}
}
close(sock);
logMessage(DEBUG,"Service close %d sockFd",sock);
}
void start()
{
while(true)
{
struct sockaddr_in peer;
socklen_t peerlen=sizeof(peer);
int serviceSockFd=accept(_LiSockFd,(struct sockaddr*)&peer,&peerlen);
if(serviceSockFd==-1)
{
logMessage(WARINING,"accept() failed::%s : %d",strerror(errno),_LiSockFd);
continue;
}
string peerIp=inet_ntoa(peer.sin_addr);
uint16_t peerPort=ntohs(peer.sin_port);
logMessage(DEBUG,"accept() success:: [%s: %d] | %d",peerIp.c_str(),peerPort,serviceSockFd);
threadData* tD=new threadData(peerPort,peerIp,serviceSockFd,this);
pthread_t tid;
pthread_create(&tid, nullptr, threadRount,(void*)tD);
}
}
private:
uint16_t _port;
string _ip;
int _LiSockFd;
};
三、线程池服务器
这里实现线程池是不太合理的,因为任务是死循环的,线程池会被占满的,这里只是简单试验一下。
//tcp.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <signal.h>
#include <pthread.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "logMessage.hpp"
#define SOCKET_ERR 1
#define BIND_ERR 2
#define LISTEN_ERR 3
#define USE_ERR 4
#define CONNECT_ERR 5
#define FORK_ERR 6
#define WAIT_ERR 7
#define BUFFER_SIZE 1024
//lock.hpp
#pragma once
#include<iostream>
#include<pthread.h>
class Mutex{
public:
Mutex()
{
pthread_mutex_init(&_lock,nullptr);
}
void lock(){
pthread_mutex_lock(&_lock);
}
void unlock()
{
pthread_mutex_unlock(&_lock);
}
~Mutex()
{
pthread_mutex_destroy(&_lock);
}
private:
pthread_mutex_t _lock;
};
class LockGuard{
public:
LockGuard(Mutex* mutex)
:_mutex(mutex){
_mutex->lock();
}
~LockGuard()
{
_mutex->unlock();
}
private:
Mutex* _mutex;
};
//taskId.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>
#include<pthread.h>
#include<unistd.h>
#include"logMessage.hpp"
using std::string;
// 此例中的任务, 是tcp服务器与客户端通信
// 需要知道 客户端的网络信息, 还需要知道线程需要执行的功能函数
// 因为 此任务是在线程池中让线程执行的. 会在线程的回调函数中 通过此任务类调用, 所以此类还需要知道功能函数
class Task{
public:
using callback_t=std::function<void(int,string,uint16_t)>;
Task()
:_sock(-1)
,_port(-1){}
Task(int sock,string ip,uint16_t port,callback_t func)
:_sock(sock)
,_ip(ip)
,_port(port)
,_func(func){}
void operator()(){
logMessage(DEBUG,"Thread[%p] has done %s:%d request ---start",pthread_self(),_ip.c_str(),_port);
_func(_sock,_ip,_port);
logMessage(DEBUG,"Thread[%p] has done %s:%d request ---end",pthread_self(),_ip.c_str(),_port);
}
void run(){
(*this)();
}
private:
int _sock;
string _ip;
uint16_t _port;
callback_t _func;
};
//logMessage.hpp
#pragma once
#include<cstdio>
#include<ctime>
#include<cstdarg>
#include<cstring>
#include<cassert>
#include<cerrno>
#include<cstdlib>
#define DEBUG 0
#define NOTICE 1
#define WARINING 2
#define FATAL 3
const char* log_level[]={"DEBUG","NOTICE","WARINING","FATAL"};
void logMessage(int level,const char* format,...)
{
assert(level>=DEBUG);
assert(level<=FATAL);
char* name=getenv("USER");
char buffer[1024];
va_list ap;
va_start(ap,format);
vsnprintf(buffer,sizeof(buffer)-1,format,ap);
va_end(ap);
FILE* out= (level==FATAL)?stderr:stdout;
time_t tm =time(nullptr);
struct tm* localTm=localtime(&tm);
char* localTmStr=asctime(localTm);
char* nc=strstr(localTmStr,"\n");
if(nc)
{
*nc='\0';
}
fprintf(out,"%s | %s |%s | %s\n",
log_level[level],
localTmStr,
name==nullptr?"unknow":name,
buffer);
}
//threadPool.hpp RAII思想实现的锁
#pragma once
#include<cstddef>
#include<iostream>
#include<ostream>
#include<queue>
#include<cassert>
#include<pthread.h>
#include<unistd.h>
#include"lock.hpp"
using std::queue;
using std::cout;
using std::endl;
#define THREADNUM 5
template<class T>
class threadPool{
public:
static threadPool<T>* getInstance()线程回调函数
{
static Mutex mutex;
if(_instance==nullptr)
{
LockGuard lockG(&mutex);
if(_instance==nullptr)
{
_instance=new threadPool<T>();
}
}
return _instance;
}
static void* threadRountine(void* args)// 线程回调函数
{
pthread_detach(pthread_self());
threadPool<T>* tP=static_cast<threadPool<T>*>(args);// 获取this指针
while(true)
{ // 即将通过任务队列给线程分配任务, 即 多线程访问临界资源, 需要上锁
tP->LockQueue();
while(!tP->haveTask())
{
tP->waitForTask(); // 任务队列中没有任务, 就让线程通过条件变量等待
}
// 走到这里 说明条件队列中有任务
// 线程已经可以获取到任务
T task=tP->popTask(); // 获取到任务之后 临界资源的访问就结束了, 可以释放锁了.
// 尽量避免拿着锁 执行任务
tP->unlockQueue();
task.run();// 为任务类提供一个运行的接口, 这样获取到任务之后 直接 task.run();
// 或者 重载operator() 实现仿函数task()执行任务
}
}
void start() // 开启线程池
{
try{
if(_isStart)
throw"Error: thread pool already exists";
}
catch(const char* e)
{
cout<<e<<endl;
return;
}
for(int i=0;i<_threadNum;i++)
{
pthread_t temp;
pthread_creat(&temp,nullptr,threadRountine,this);// 回调函数的参数传入this指针, 用于类访问内成员
}
_isStart=true;
}
void pushTask(const T& in) // 给任务队列添加任务 并分配任务
{
lockQueue();
_taskQueue.push(in);
choiceThreadForHandler(); // 任务队列中已经存在任务, 线程就不用再等待了, 就可以唤醒线程
unlockQueue();
}
int getThreadNum()
{
return _threadNum;
}
~threadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
threadPool(const threadPool<T>&)=delete;
threadPool<T>& operator=(const threadPool<T>&)=delete;
private:
threadPool(size_t threadNum=THREADNUM)
:_threadNum(threadNum)
,_isStart(false){
assert(_threadNum>0);
pthread_mutex_init(&_mutex,nullptr);
pthread_mutex_init(&_cond,nullptr);
}
void lockQueue()// 线程调度 即为从任务队列中给各线程分配任务
// 所以 任务队列是临界资源需要上锁
{
pthread_mutex_unlock(&_mutex);
}
void unlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
bool haveTask()
{
return !_taskQueue.empty();
}
void waitForTask()
{
pthread_cond_wait(&_cond,&_mutex);
}
T popTask()
{
T task=_taskQueue.front();
_taskQueue.pop();
return task;
}
// 唤醒在条件变量前等待的线程
// 由于唤醒之后就是线程调度的过程
// 所以函数名 是线程调度相关
void choiceThreadForHandler()
{
pthread_cond_signal(&_cond);
}
private:
size_t _threadNum;
bool _isStart;
queue<T> _taskQueue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static threadPool<T>* _instance;
};
//tcpServer.cc
#include"tcp.hpp"
#include"threadPool.hpp"
#include"taskId.hpp"
using std::string;
using std::cerr;
using std::endl;
class tcpServer
{
public:
tcpServer(uint16_t port,const string& ip="")
:_port(port)
,_ip(ip)
,_LiSockFd(-1){}
void Init()
{
_LiSockFd=socket(AF_INET,SOCK_STREAM,0);
if(_LiSockFd<0)
{
logMessage(FATAL,"socket() failed::%s : %d",strerror(errno),_LiSockFd);
exit(SOCKET_ERR);
}
logMessage(DEBUG,"socket() success::%d",_LiSockFd);
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family=AF_INET;
local.sin_port=htons(_port);
_ip.empty()?(local.sin_addr.s_addr=htonl(INADDR_ANY)):(inet_aton(_ip.c_str(),&local.sin_addr));
if(bind(_LiSockFd,(const struct sockaddr*)&local,sizeof(local))==-1)
{
logMessage(FATAL,"socket() failed::%s : %d",strerror(errno),_LiSockFd);
exit(BIND_ERR);
}
logMessage(DEBUG,"bind() success::%d",_LiSockFd);
if(listen(_LiSockFd,5)==-1)
{
logMessage(FATAL,"listen() failed::%s : %d",strerror(errno),_LiSockFd);
exit(LISTEN_ERR);
}
logMessage(DEBUG,"listen() success::%d",_LiSockFd);
}
void to_upper(int sock,const string& clientIP,const uint16_t &clientPort)
{
assert(sock>0);
assert(!clientIP.empty());
char inbuffer[BUFFER_SIZE];
while(true)
{
ssize_t s=read(sock,inbuffer,sizeof(inbuffer)-1);
if(s>0)
{
inbuffer[s]='\0';
if(strcasecmp(inbuffer,"quit")==0)
{
logMessage(DEBUG,"Client requests to quit: [%s: %d]",clientIP.c_str(),clientPort);
break;
}
logMessage(DEBUG,"to_upper before:[%s:%d]>> %s",clientIP.c_str(),clientPort,inbuffer);
for(int i=0;i<s;i++)
{
if(isalpha(inbuffer[i])&&islower(inbuffer[i]))
inbuffer[i]=toupper(inbuffer[i]);
}
logMessage(DEBUG,"to_upper after:[%s:%d]>>%s",clientIP.c_str(),clientPort,inbuffer);
write(sock,inbuffer,strlen(inbuffer));
}
else if(s==0)
{
logMessage(DEBUG,"Client has quited:[%s:%d]",clientIP.c_str(),clientPort);
break;
}
else{
logMessage(DEBUG,"Client [%s:%d] read::%s",clientIP.c_str(),clientPort,strerror(errno));
break;
}
}
close(sock);
logMessage(DEBUG,"Service close %d sockFd",sock);
}
void start()
{
_tp->start();
logMessage(DEBUG,"threadPool start,thread num:%d",_tp->getThreadNum());
while(true)
{
struct sockaddr_in peer;
socklen_t peerlen=sizeof(peer);
int serviceSockFd=accept(_LiSockFd,(struct sockaddr*)&peer,&peerlen);
if(serviceSockFd==-1)
{
logMessage(WARINING,"accept() failed::%s : %d",strerror(errno),_LiSockFd);
continue;
}
string peerIp=inet_ntoa(peer.sin_addr);
uint16_t peerPort=ntohs(peer.sin_port);
logMessage(DEBUG,"accept() success:: [%s: %d] | %d",peerIp.c_str(),peerPort,serviceSockFd);
//创建任务并向线程池中添加任务
Task t(serviceSockFd,peerIp,peerPort,bind(&tcpServer::to_upper,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
_tp->pushTask(t);
}
}
private:
uint16_t _port;
string _ip;
int _LiSockFd;
threadPool<Task>* _tp;
};
void Usage(string proc)
{
cerr<<"Usage::\n\t"<<proc<<"port ip"<<endl;
cerr<<"example::\n\t"<<proc<<"8080 127.0.0.1"<<endl;
}
int main(int argc,char* argv[])
{
if(argc!=3&&argc!=2)
{
Usage(argv[0]);
exit(USE_ERR);
}
uint16_t port=atoi(argv[1]);
string ip;
if(argc==3)
{
ip=argv[2];
}
tcpServer Usvr(port,ip);
Usvr.Init();
Usvr.start();
return 0;
}