【Linux跬步积累】—— 网络编程套接字(二)
🌏博客主页:PH_modest的博客主页
🚩当前专栏:Linux跬步积累
💌其他专栏:
🔴 每日一题
🟡 C++跬步积累
🟢 C语言跬步积累
🌈座右铭:广积粮,缓称王!
文章目录
- 一、TCP socket API
- 二、TCP API 使用
- 1、服务端创建套接字
- 2、服务端绑定
- 3、服务端监听
- 4、服务端获取连接
- 5、服务端接收连接测试
- echo server
- 多进程版本
- `TcpServer.hpp`
- `Main.cc`
- `MainClient.cc`
- 运行效果图
- 多线程版本
- `TcpServer.hpp`
- `Main.cc`
- `MainClient.cc`
- 运行效果图
- 线程池版本
- `TcpServer.hpp`
- `Main.cc`
- `MainClient.cc`
- `Thread.hpp`
- `ThreadPool.hpp`
- 运行效果图
一、TCP socket API
下面介绍程序中用到的socket API,这些函数都在sys/socket.h中。
//创建socket文件描述符(TCP/UDP,客户端 + 服务器)
int socket(int domain,int type,int protocol);//绑定端口号(TCP/UDP,服务器)
int bind(int socket,const struct sockaddr *address,socklen_t address_len);//开始监听socket(TCP,服务器)
int listen(int socket,int backlog);//接收请求(TCP,服务器)
int accept(int socket,struct sockaddr* address,socklen_t* address_len);//建立连接(TCP,客户端)
int connect(int sockfd,const struct sockaddr *addr,socklen_t addrlen);
二、TCP API 使用
1、服务端创建套接字
函数原型:
//创建socket文件描述符(TCP/UDP,客户端 + 服务器)
int socket(int domain,int type,int protocol);
使用示例:
//创建文件描述符
_listensock = socket(AF_INET,SOCK_STREAM,0);
if(_listensock < 0)
{std::cerr<<"socket error!"<<std::endl;exit(1);
}
2、服务端绑定
函数原型:
//绑定端口号(TCP/UDP,服务器)
int bind(int socket,const struct sockaddr *address,socklen_t address_len);
使用示例:
//绑定
struct sockaddr_in local;
memset(&local,0,sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port);
local.sin_addr.s_addr = INADDR_ANY;if(bind(_listensock,(struct sockaddr*)&local,sizeof(local)) < 0)
{std::cerr<<"bind error!"<<std::endl;exit(2);
}
3、服务端监听
函数原型:
//开始监听socket(TCP,服务器)
int listen(int socket,int backlog);
使用示例:
//监听
if(listen(_listensock,5) < 0)
{std::cerr<<"listen error!"<<std::endl;exit(3);
}
4、服务端获取连接
函数原型:
//接收请求(TCP,服务器)
int accept(int socket,struct sockaddr* address,socklen_t* address_len);
参数说明:
- socket:特定的监听套接字,表示从这个套接字获取连接。
- address:对端网络相关信息,包括协议家族、IP地址、端口号。
- address_len:这是一个输入输出型参数,调用时传入期望读取的长度,返回时表示实际读取的长度。
accept函数返回的套接字是什么?和socket有什么区别?
accept函数获取连接时,是从socket监听套接字当中获取的,如果accept获取连接成功,此时就会返回接收到的套接字对应的文件描述符。
socket监听套接字的作用是用来获取客户端发来的新的连接请求。accept会不断从监听套接字当中获取新连接。
accept返回的套接字是为本次获取到的连接提供服务的。监听套接字是不断获取新的连接,真正为这些连接提供服务的是accept返回的套接字,而不是监听套接字。
监听套接字可以看成饭店门口拉客的员工,当你被她说服进店之后,会有新的服务员单独为你提供服务,而这个新的服务员就是accept返回的套接字。
使用示例:
void Start()
{while(true){struct sockaddr_in peer;memset(&peer,0,sizeof(peer));socklen_t len = sizeof(peer);int sockfd = accept(_listensock,(struct sockaddr*)&peer,&len);if(sockfd < 0){std::cerr<<"accept error!"<<std::endl;continue;//这里不能直接退出,因为还需要获取其他连接}std::string client_ip = inet_ntoa(peer.sin_addr);//将网络序列转换为主机序列,同时将整数ip变为字符串ipint client_port = ntohs(peer.sin_port);//将网络序列转换为主机序列std::cout<<"get a new link-->"<<sockfd<<"["<<client_ip<<"]:"<<client_port<<std::endl;}
}
5、服务端接收连接测试
我们现在做一个简单的测试,测试一下当前服务器能否成功接受请求连接。
void Usage(char *proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_port\n";
}//./tcpserver port
int main(int argc,char *argv[])
{if(argc != 2){Usage(argv[0]);exit(1);}int port = std::stoi(argv[1]);TcpServer* tsvr = new TcpServer(port);tsvr->InitServer();tsvr->Start();return 0;
}
我们编译运行之后,可以通过netstat
命令来显示网络连接、路由表、接口统计等网络相关信息。
echo server
多进程版本
TcpServer.hpp
#pragma once
#include"TcpServer.hpp"
#include<iostream>
#include<string>
#include<strings.h>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/wait.h>
#include"InetAddr.hpp"static const int gbacklog = 15;class TcpServer
{
public:TcpServer(uint16_t port):_port(port),_isrunning(false){}void InitServer(){//1. 创建流式套接字_listensock = socket(AF_INET,SOCK_STREAM,0);if(_listensock < 0){std::cerr<<"socket error!\n";exit(1);}std::cout<<"socket success,sockfd is:"<<_listensock<<std::endl;//2. bindstruct sockaddr_in local;bzero(&local,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = ::bind(_listensock,(struct sockaddr*)&local,sizeof(local));if(n < 0){std::cerr<<"bind error!\n";exit(2);}std::cout<<"bind success,sockfd is:"<<_listensock<<std::endl;//3. tcp是面向连接的,所以通信之前,必须先建立连接,服务器是被连接的// tcpserver启动,未来首先要一直等待客户的连接到来n = listen(_listensock,gbacklog);if(n < 0){std::cerr<<"listen error!\n";exit(3);}std::cout<<"listen success,sockfd is:"<<_listensock<<std::endl;}void Service(int sockfd,InetAddr client){printf("get a new link,info %s:%d,fd:%d\n",client.IP().c_str(),client.Port(),sockfd);std::string clientaddr = "["+client.IP()+":"+std::to_string(client.Port())+"]# ";while(true){//读取数据char inbuffer[1024];ssize_t n = read(sockfd,inbuffer,sizeof(inbuffer)-1);if(n > 0)//回写{inbuffer[n] = 0;std::cout<<clientaddr<<inbuffer<<std::endl;std::string echo_string = "[server echo]# ";echo_string+=inbuffer;write(sockfd,echo_string.c_str(),echo_string.size());}else if(n == 0)//退出{//client 退出&&关闭链接了std::cout<<clientaddr<<" quit!\n";break;}else//报错{std::cerr<<clientaddr<<"read error!\n";break;}}//一定要关闭,因为文件描述符表是一个数组,数组容量是有限的close(sockfd);//如果不关闭,会导致文件描述符泄漏问题}void Loop(){//4. 不能直接收数据,先获取链接_isrunning = true;while(_isrunning){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = accept(_listensock,(struct sockaddr*)&peer,&len);if(sockfd < 0){std::cerr<<"accept error!\n";continue;}//version 0 : 一次只能处理一个请求//Service(sockfd,InetAddr(peer));//version 1 :采用多进程pid_t id = fork();if(id == 0){//child 只关心sockfd,不关心listensock::close(_listensock);//建议关闭if(fork() > 0) exit(0);//创建的父进程退出Service(sockfd,InetAddr(peer));//孙子进程,他的父进程提前退出了,没有等待,会变成孤儿进程,然后被系统领养exit(0);}//father 只关心listensock,不关心sockfd::close(sockfd);waitpid(id,nullptr,0);}_isrunning = false;}~TcpServer(){if(_listensock > -1){close(_listensock);}}
private:int _listensock;uint16_t _port;bool _isrunning;
};
Main.cc
#include"TcpServer.hpp"
#include<iostream>
#include<memory>void Usage(char *proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_port\n";
}//./tcpserver port
int main(int argc,char *argv[])
{if(argc != 2){Usage(argv[0]);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}
MainClient.cc
#include<iostream>
#include<string>
#include<cstdlib>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>void Usage(std::string proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_ip local_port\n";
}//./udpclient serverip serverport
int main(int argc,char *argv[])
{if(argc != 3){Usage(argv[0]);exit(1);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);//1. 创建套接字int sockfd = socket(AF_INET,SOCK_STREAM,0);if(sockfd < 0){std::cerr<<"socket error"<<std::endl;exit(2);}//2. 发起连接struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());int n = connect(sockfd,(struct sockaddr*)&server,sizeof(server));if(n < 0){std::cerr<<"connect error!\n";exit(3);}while(true){std::cout<<"Please Enter# ";std::string outstring;getline(std::cin,outstring);ssize_t n = send(sockfd,outstring.c_str(),outstring.size(),0);//writeif(n > 0){char inbuffer[1024];ssize_t m = recv(sockfd,&inbuffer,sizeof(inbuffer)-1,0);if(m > 0){inbuffer[m] = 0;std::cout<<inbuffer<<std::endl;}else{break;}}else{break;}}close(sockfd); return 0;
}
运行效果图
多线程版本
TcpServer.hpp
#pragma once
#include "TcpServer.hpp"
#include <iostream>
#include <string>
#include <strings.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include "InetAddr.hpp"static const int gbacklog = 15;
class TcpServer;class ThreadData
{
public:ThreadData(int fd,InetAddr addr,TcpServer *s):sockfd(fd),clientaddr(addr),self(s){}
public:int sockfd;InetAddr clientaddr;TcpServer *self;
};class TcpServer
{
public:TcpServer(uint16_t port) : _port(port), _isrunning(false){}void InitServer(){// 1. 创建流式套接字_listensock = socket(AF_INET, SOCK_STREAM, 0);if (_listensock < 0){std::cerr << "socket error!\n";exit(1);}std::cout << "socket success,sockfd is:" << _listensock << std::endl;// 2. bindstruct sockaddr_in local;bzero(&local, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = ::bind(_listensock, (struct sockaddr *)&local, sizeof(local));if (n < 0){std::cerr << "bind error!\n";exit(2);}std::cout << "bind success,sockfd is:" << _listensock << std::endl;// 3. tcp是面向连接的,所以通信之前,必须先建立连接,服务器是被连接的// tcpserver启动,未来首先要一直等待客户的连接到来n = listen(_listensock, gbacklog);if (n < 0){std::cerr << "listen error!\n";exit(3);}std::cout << "listen success,sockfd is:" << _listensock << std::endl;}void Service(int sockfd, InetAddr client){printf("get a new link,info %s:%d,fd:%d\n", client.IP().c_str(), client.Port(), sockfd);std::string clientaddr = "[" + client.IP() + ":" + std::to_string(client.Port()) + "]# ";while (true){// 读取数据char inbuffer[1024];ssize_t n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0) // 回写{inbuffer[n] = 0;std::cout << clientaddr << inbuffer << std::endl;std::string echo_string = "[server echo]# ";echo_string += inbuffer;write(sockfd, echo_string.c_str(), echo_string.size());}else if (n == 0) // 退出{// client 退出&&关闭链接了std::cout << clientaddr << " quit!\n";break;}else // 报错{std::cerr << clientaddr << "read error!\n";break;}}// 一定要关闭,因为文件描述符表是一个数组,数组容量是有限的close(sockfd); // 如果不关闭,会导致文件描述符泄漏问题}static void* HandlerSock(void *args){pthread_detach(pthread_self());ThreadData* data = static_cast<ThreadData*>(args);data->self->Service(data->sockfd,data->clientaddr);delete data;return nullptr;}void Loop(){// 4. 不能直接收数据,先获取链接_isrunning = true;while (_isrunning){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = accept(_listensock, (struct sockaddr *)&peer, &len);if (sockfd < 0){std::cerr << "accept error!\n";continue;}// version 0 : 一次只能处理一个请求// Service(sockfd,InetAddr(peer));// version 1 :采用多进程// pid_t id = fork();// if(id == 0)// {// //child 只关心sockfd,不关心listensock// ::close(_listensock);//建议关闭// if(fork() > 0) exit(0);//创建的父进程退出// Service(sockfd,InetAddr(peer));//孙子进程,他的父进程提前退出了,没有等待,会变成孤儿进程,然后被系统领养// exit(0);// }// //father 只关心listensock,不关心sockfd// ::close(sockfd);// waitpid(id,nullptr,0);// version 2:采用多线程pthread_t t;//线程之间共享文件描述符表ThreadData *data = new ThreadData(sockfd,InetAddr(peer),this);pthread_create(&t,nullptr,HandlerSock,data);}_isrunning = false;}~TcpServer(){if (_listensock > -1){close(_listensock);}}private:int _listensock;uint16_t _port;bool _isrunning;
};
Main.cc
#include"TcpServer.hpp"
#include<iostream>
#include<memory>void Usage(char *proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_port\n";
}//./tcpserver port
int main(int argc,char *argv[])
{if(argc != 2){Usage(argv[0]);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}
MainClient.cc
#include<iostream>
#include<string>
#include<cstdlib>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>void Usage(std::string proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_ip local_port\n";
}//./udpclient serverip serverport
int main(int argc,char *argv[])
{if(argc != 3){Usage(argv[0]);exit(1);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);//1. 创建套接字int sockfd = socket(AF_INET,SOCK_STREAM,0);if(sockfd < 0){std::cerr<<"socket error"<<std::endl;exit(2);}//2. 发起连接struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());int n = connect(sockfd,(struct sockaddr*)&server,sizeof(server));if(n < 0){std::cerr<<"connect error!\n";exit(3);}while(true){std::cout<<"Please Enter# ";std::string outstring;getline(std::cin,outstring);ssize_t n = send(sockfd,outstring.c_str(),outstring.size(),0);//writeif(n > 0){char inbuffer[1024];ssize_t m = recv(sockfd,&inbuffer,sizeof(inbuffer)-1,0);if(m > 0){inbuffer[m] = 0;std::cout<<inbuffer<<std::endl;}else{break;}}else{break;}}close(sockfd); return 0;
}
运行效果图
线程池版本
TcpServer.hpp
#pragma once
#include "TcpServer.hpp"
#include <iostream>
#include <string>
#include <strings.h>
#include <unistd.h>
#include <pthread.h>
#include <functional>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include "InetAddr.hpp"
#include "ThreadPool.hpp"static const int gbacklog = 15;
class TcpServer;class ThreadData
{
public:ThreadData(int fd,InetAddr addr,TcpServer *s):sockfd(fd),clientaddr(addr),self(s){}
public:int sockfd;InetAddr clientaddr;TcpServer *self;
};using task_t = std::function<void()>;class TcpServer
{
public:TcpServer(uint16_t port) : _port(port), _isrunning(false){}void InitServer(){// 1. 创建流式套接字_listensock = socket(AF_INET, SOCK_STREAM, 0);if (_listensock < 0){std::cerr << "socket error!\n";exit(1);}std::cout << "socket success,sockfd is:" << _listensock << std::endl;// 2. bindstruct sockaddr_in local;bzero(&local, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = ::bind(_listensock, (struct sockaddr *)&local, sizeof(local));if (n < 0){std::cerr << "bind error!\n";exit(2);}std::cout << "bind success,sockfd is:" << _listensock << std::endl;// 3. tcp是面向连接的,所以通信之前,必须先建立连接,服务器是被连接的// tcpserver启动,未来首先要一直等待客户的连接到来n = listen(_listensock, gbacklog);if (n < 0){std::cerr << "listen error!\n";exit(3);}std::cout << "listen success,sockfd is:" << _listensock << std::endl;}void Service(int sockfd, InetAddr client){printf("get a new link,info %s:%d,fd:%d\n", client.IP().c_str(), client.Port(), sockfd);std::string clientaddr = "[" + client.IP() + ":" + std::to_string(client.Port()) + "]# ";while (true){// 读取数据char inbuffer[1024];ssize_t n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0) // 回写{inbuffer[n] = 0;std::cout << clientaddr << inbuffer << std::endl;std::string echo_string = "[server echo]# ";echo_string += inbuffer;write(sockfd, echo_string.c_str(), echo_string.size());}else if (n == 0) // 退出{// client 退出&&关闭链接了std::cout << clientaddr << " quit!\n";break;}else // 报错{std::cerr << clientaddr << "read error!\n";break;}}// 一定要关闭,因为文件描述符表是一个数组,数组容量是有限的close(sockfd); // 如果不关闭,会导致文件描述符泄漏问题}static void* HandlerSock(void *args){pthread_detach(pthread_self());ThreadData* data = static_cast<ThreadData*>(args);data->self->Service(data->sockfd,data->clientaddr);delete data;return nullptr;}void Loop(){// 4. 不能直接收数据,先获取链接_isrunning = true;while (_isrunning){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sockfd = accept(_listensock, (struct sockaddr *)&peer, &len);if (sockfd < 0){std::cerr << "accept error!\n";continue;}// version 0 : 一次只能处理一个请求// Service(sockfd,InetAddr(peer));// version 1 :采用多进程// pid_t id = fork();// if(id == 0)// {// //child 只关心sockfd,不关心listensock// ::close(_listensock);//建议关闭// if(fork() > 0) exit(0);//创建的父进程退出// Service(sockfd,InetAddr(peer));//孙子进程,他的父进程提前退出了,没有等待,会变成孤儿进程,然后被系统领养// exit(0);// }// //father 只关心listensock,不关心sockfd// ::close(sockfd);// waitpid(id,nullptr,0);// version 2:采用多线程// pthread_t t;//线程之间共享文件描述符表// ThreadData *data = new ThreadData(sockfd,InetAddr(peer),this);// pthread_create(&t,nullptr,HandlerSock,data);// version 3:采用线程池task_t task = std::bind(&TcpServer::Service,this,sockfd,InetAddr(peer));ThreadPool<task_t>::GetInstance()->EnQueue(task);}_isrunning = false;}~TcpServer(){if (_listensock > -1){close(_listensock);}}private:int _listensock;uint16_t _port;bool _isrunning;
};
Main.cc
#include"TcpServer.hpp"
#include<iostream>
#include<memory>void Usage(char *proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_port\n";
}//./tcpserver port
int main(int argc,char *argv[])
{if(argc != 2){Usage(argv[0]);}uint16_t port = std::stoi(argv[1]);std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}
MainClient.cc
#include<iostream>
#include<string>
#include<cstdlib>
#include<cstring>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>void Usage(std::string proc)
{std::cout<<"Usage:\n\t"<<proc<<" local_ip local_port\n";
}//./udpclient serverip serverport
int main(int argc,char *argv[])
{if(argc != 3){Usage(argv[0]);exit(1);}std::string serverip = argv[1];uint16_t serverport = std::stoi(argv[2]);//1. 创建套接字int sockfd = socket(AF_INET,SOCK_STREAM,0);if(sockfd < 0){std::cerr<<"socket error"<<std::endl;exit(2);}//2. 发起连接struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());int n = connect(sockfd,(struct sockaddr*)&server,sizeof(server));if(n < 0){std::cerr<<"connect error!\n";exit(3);}while(true){std::cout<<"Please Enter# ";std::string outstring;getline(std::cin,outstring);ssize_t n = send(sockfd,outstring.c_str(),outstring.size(),0);//writeif(n > 0){char inbuffer[1024];ssize_t m = recv(sockfd,&inbuffer,sizeof(inbuffer)-1,0);if(m > 0){inbuffer[m] = 0;std::cout<<inbuffer<<std::endl;}else{break;}}else{break;}}close(sockfd); return 0;
}
Thread.hpp
#pragma once
#include<string>
#include<pthread.h>
#include<unistd.h>
#include<iostream>
#include<functional>namespace MyThread
{template<class T>using func_t = std::function<void(T&)>;//模版方法template<class T>class Thread{public://thread(func,5,"thread-1");Thread(func_t<T> func,const T &data,const std::string &name = "none-name"):_func(func),_data(data),_threadname(name){}//需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了static void* ThreadRoutinue(void* args){//将传过来的this指针强转一下,然后就可以访问到_func和_data了Thread<T>* self = static_cast<Thread<T>*>(args);self->_func(self->_data);//这里调用的_func是线程池中的HanderTask方法return nullptr;}bool Start(){//创建线程int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);return ret==0;}void Join(){pthread_join(_tid,nullptr);}void Detach(){pthread_detach(_tid);}~Thread(){}private:pthread_t _tid; //线程tidstd::string _threadname; //线程名func_t<T> _func; //线程执行的函数T _data; //需要处理的数据};
}
ThreadPool.hpp
#include"Thread.hpp"
#include<vector>
#include<queue>
#include<string>
#include <unistd.h>
#include <pthread.h>template<class T>
class ThreadPool
{
public:ThreadPool(const int num = 5):_threadNum(num),_waitNum(0),_isRunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);}void HanderTask(std::string name){//子线程需要一直处理,所以这里使用死循环while(true){pthread_mutex_lock(&_mutex);while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒{_waitNum++;pthread_cond_wait(&_cond,&_mutex);_waitNum--;}//线程池终止了,并且任务队列中没有任务 --> 线程退出if(_taskQueue.empty()&&!_isRunning){pthread_mutex_unlock(&_mutex);std::cout<<name<<" quit..."<<std::endl;break;}//走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出T task = _taskQueue.front();_taskQueue.pop();std::cout<<name<<" get a task..."<<std::endl;pthread_mutex_unlock(&_mutex);task();}}void ThreadInit(){for(int i=0;i<_threadNum;i++){auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);std::string name = "Thread-"+std::to_string(i);//_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员_threads.emplace_back(func,name,name);}_isRunning = true;}void StartAll(){for(auto& thread : _threads){thread.Start();}}void JoinAll(){for(auto& thread : _threads){thread.Join();}}void EnQueue(const T& task){pthread_mutex_lock(&_mutex);if(_isRunning){_taskQueue.push(task);if(_waitNum > 0){pthread_cond_signal(&_cond);}}pthread_mutex_unlock(&_mutex);}void Stop(){pthread_mutex_lock(&_mutex);_isRunning = false;//终止线程池pthread_cond_broadcast(&_cond);//唤醒所有等待的线程pthread_mutex_unlock(&_mutex);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
public:static ThreadPool<T> *GetInstance(){if(nullptr == _instance){pthread_mutex_lock(&_lock);if(nullptr == _instance){_instance = new ThreadPool<T>();_instance->ThreadInit();_instance->StartAll();std::cout<<"创建线程池单例!\n";return _instance;}pthread_mutex_unlock(&_lock);}std::cout<<"获取线程池样例!\n";return _instance;}
private:std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程std::queue<T> _taskQueue;//任务队列int _threadNum;//线程数int _waitNum;//等待的线程数bool _isRunning;//线程池是否在运行pthread_mutex_t _mutex;//互斥锁pthread_cond_t _cond;//条件变量//添加单例模式static ThreadPool<T> *_instance;static pthread_mutex_t _lock;
};template<class T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;