17.TCP编程
一.makefile的实现
.PHONY:all
all:server_tcp client_tcpserver_tcp:TcpServer.ccg++ -o $@ $^ -std=c++17
client_tcp:TcpClient.ccg++ -o $@ $^ -std=c++17.PHONY:clean
clean:rm -f server_tcp client_tcp
二.TcpServer的搭建 和 单进程版通信
1.框架搭建
#pragma once#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>class TcpServer
{
public:TcpServer(){}void InitServer(){}void Start(){}~TcpServer(){}
private:int _sockfd;
};
2.InitServer()的实现




void InitServer(){//1.创建TCP套接字_sockfd = ::socket(AF_INET,SOCK_STREAM,0);if(_sockfd < 0){LOG(LogLevel::FATAL) << "socket error";Die(SOCKET_ERR);}LOG(LogLevel::INFO) << "socket create success, sockfd is : " << _sockfd;}


void InitServer(){//1.创建TCP套接字_sockfd = ::socket(AF_INET,SOCK_STREAM,0);if(_sockfd < 0){LOG(LogLevel::FATAL) << "socket error";Die(SOCKET_ERR);}LOG(LogLevel::INFO) << "socket create success, sockfd is : " << _sockfd;struct sockaddr_in local;memset(&local,0,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(gport);local.sin_addr.s_addr = INADDR_ANY;//2.bindint n = ::bind(_sockfd,CONV(&local),sizeof(local));if(n < 0){LOG(LogLevel::FATAL) << "bind error";Die(BIND_ERR);}}
目前我们的TCP代码和UDP代码,只有创建sockfd时,是不一样的(SOCK_STREAM和SOCK_DGRAM)



void InitServer(){//1.创建TCP套接字_sockfd = ::socket(AF_INET,SOCK_STREAM,0);if(_sockfd < 0){LOG(LogLevel::FATAL) << "socket error";Die(SOCKET_ERR);}LOG(LogLevel::INFO) << "socket create success, sockfd is : " << _sockfd;struct sockaddr_in local;memset(&local,0,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(gport);local.sin_addr.s_addr = INADDR_ANY;//2.bindint n = ::bind(_sockfd,CONV(&local),sizeof(local));if(n < 0){LOG(LogLevel::FATAL) << "bind error";Die(BIND_ERR);}LOG(LogLevel::INFO) << "bind success...";//3.cs,tcp是面向链接的,就要求tcp随时随地等待被链接//tcp 需要将我们的套接字设置为监听状态n = ::listen(_sockfd,BACKLOG);if(n < 0){LOG(LogLevel::FATAL) << "listen error";Die(LISTEN_ERR);}LOG(LogLevel::INFO) << "listen success...";}
3.Start()的实现

后面两个参数是输出型参数

如果没有人进行连接,就进行阻塞

void Start(){_isrunning = true;while(_isrunning){//不能直接读数据//1.获取新连接struct sockaddr_in peer;socklen_t peerlen;int sockfd = ::accept(_listensockfd,CONV(&peer),&peerlen);if(sockfd < 0){LOG(LogLevel::WARNING) << "accept error: " << strerror(errno);continue;}//2.获取链接成功LOG(LogLevel::INFO) << "accept success,sockfd is : " << sockfd;}}

处理链接实现:
TCP是面上字节流的,所以我们可以直接使用read来读取,write来写(和文件一样)
void HandlerRequest(int sockfd){char inbuffer[4096];while(true){ssize_t n = read(sockfd,inbuffer,sizeof(inbuffer)-1);if(n > 0){inbuffer[n] = 0;LOG(LogLevel::INFO) << inbuffer;std::string echo_str = "server echo# ";echo_str += inbuffer;::write(sockfd,echo_str.c_str(),echo_str.size());}}}
我们链接成功之后,直接调用HandlerRequest()进行读取数据

我们目前写的是一个单进程的程序,所以只能由一个人来发消息
4.客户端代码编写


#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>// ./clinet server_ip server_port
int main(int argc,char* argv[])
{if(argc != 3){std::cout << "Usage:./client server_ip server_port" << std::endl;return 1;}std::string server_ip = argv[1];int server_port = std::stoi(argv[2]);int sockfd = ::socket(AF_INET,SOCK_STREAM,0);if(sockfd < 0){std::cout << "create socket error" << std::endl;return 2;}//client 不需要显示进行bind, 因为bind只在服务端使用//TCP时面向连接的struct sockaddr_in server_addr;memset(&server_addr,0,sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);server_addr.sin_addr.s_addr = inet_addr(server_ip.c_str());//connect底层会自动进行bindint n = ::connect(sockfd, (struct sockaddr*)&server_addr,sizeof(server_addr));if(n < 0){std::cout << "connect error" << std::endl;return 3;}//echo clientstd::string message;while(true){char inbuffer[1024];std::cout << "input message: ";std::getline(std::cin,message);n = write(sockfd,message.c_str(),message.size());if(n > 0){int m = ::read(sockfd,inbuffer,sizeof(inbuffer));if(m > 0){inbuffer[m] = 0;std::cout << inbuffer << std::endl;}else{break;}}else{std::cout << "write error" << std::endl;break;}}::close(sockfd);return 0;
}



5.解决client退出,server还在一直等待输入的问题
void HandlerRequest(int sockfd){char inbuffer[4096];while(true){ssize_t n = read(sockfd,inbuffer,sizeof(inbuffer)-1);if(n > 0){inbuffer[n] = 0;LOG(LogLevel::INFO) << inbuffer;std::string echo_str = "server echo# ";echo_str += inbuffer;::write(sockfd,echo_str.c_str(),echo_str.size());}else if(n == 0){// read 如果读取返回值是0,标识client退出LOG(LogLevel::INFO) << "client exit: " << sockfd;break;}else{//读取失败了break;}}}



6.处理文件描述符一直增加问题
![]()
void HandlerRequest(int sockfd){char inbuffer[4096];while(true){ssize_t n = read(sockfd,inbuffer,sizeof(inbuffer)-1);if(n > 0){inbuffer[n] = 0;LOG(LogLevel::INFO) << inbuffer;std::string echo_str = "server echo# ";echo_str += inbuffer;::write(sockfd,echo_str.c_str(),echo_str.size());}else if(n == 0){// read 如果读取返回值是0,标识client退出LOG(LogLevel::INFO) << "client exit: " << sockfd;break;}else{//读取失败了break;}}::close(sockfd);}


7.获取客户端信息
这里我们将上次写的InetAddr类进行导入
"InetAddr.hpp"#pragma once#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Common.hpp"class InetAddr
{
private:void PortNetToHost(){_port = ::ntohs(_net_addr.sin_port);}void IpNetToHost(){char ipbuffer[64];const char* ip = ::inet_ntop(AF_INET,&_net_addr.sin_addr,ipbuffer,sizeof(ipbuffer));_ip = ipbuffer;(void)ip;}
public:InetAddr(){}InetAddr(const struct sockaddr_in& addr):_net_addr(addr){PortNetToHost();IpNetToHost();}InetAddr(uint16_t port):_port(port){_net_addr.sin_family = AF_INET;_net_addr.sin_port = htons(_port);_net_addr.sin_addr.s_addr = INADDR_ANY;}struct sockaddr* NetAddr(){return CONV(&_net_addr);}socklen_t NetAddrLen(){return sizeof(_net_addr);}std::string Ip(){return _ip;}uint16_t Port(){return _port;}std::string Addr(){return Ip() + ":" + std::to_string(Port());}bool operator==(const InetAddr& addr){return _ip == addr._ip && _port == addr._port;}~InetAddr(){}
private:struct sockaddr_in _net_addr;std::string _ip;uint16_t _port;
};


三.多进程之间通信

#pragma once#include <iostream>
#include <cstring>
#include <string>
#include <cerrno>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"static const uint16_t gport = 8080;
#define BACKLOG 8
using namespace LogModule;class TcpServer
{
public:TcpServer(int port = gport):_port(port),_isrunning(false){}void InitServer(){//1.创建TCP套接字_listensockfd = ::socket(AF_INET,SOCK_STREAM,0);if(_listensockfd < 0){LOG(LogLevel::FATAL) << "socket error";Die(SOCKET_ERR);}LOG(LogLevel::INFO) << "socket create success, sockfd is : " << _listensockfd;struct sockaddr_in local;memset(&local,0,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(gport);local.sin_addr.s_addr = INADDR_ANY;//2.bindint n = ::bind(_listensockfd,CONV(&local),sizeof(local));if(n < 0){LOG(LogLevel::FATAL) << "bind error";Die(BIND_ERR);}LOG(LogLevel::INFO) << "bind success...";//3.cs,tcp是面向链接的,就要求tcp随时随地等待被链接//tcp 需要将我们的套接字设置为监听状态n = ::listen(_listensockfd,BACKLOG);if(n < 0){LOG(LogLevel::FATAL) << "listen error";Die(LISTEN_ERR);}LOG(LogLevel::INFO) << "listen success...";//子进程退出最好的做法// ::signal(SIGCHLD,SIG_IGN);//子进程退出,OS会自动回收资源,不用再进行wait了}void HandlerRequest(int sockfd){char inbuffer[4096];while(true){ssize_t n = read(sockfd,inbuffer,sizeof(inbuffer)-1);if(n > 0){inbuffer[n] = 0;LOG(LogLevel::INFO) << inbuffer;std::string echo_str = "server echo# ";echo_str += inbuffer;::write(sockfd,echo_str.c_str(),echo_str.size());}else if(n == 0){// read 如果读取返回值是0,标识client退出LOG(LogLevel::INFO) << "client exit: " << sockfd;break;}else{//读取失败了break;}}::close(sockfd);}void Start(){_isrunning = true;while(_isrunning){//不能直接读数据//1.获取新连接struct sockaddr_in peer;socklen_t peerlen = sizeof(peer);LOG(LogLevel::DEBUG) << "accept ing...";int sockfd = ::accept(_listensockfd,CONV(&peer),&peerlen);if(sockfd < 0){LOG(LogLevel::WARNING) << "accept error: " << strerror(errno);continue;}//2.获取链接成功LOG(LogLevel::INFO) << "accept success,sockfd is : " << sockfd;InetAddr addr(peer);LOG(LogLevel::INFO) << "client info: " << addr.Addr();//version - 1pid_t id = fork();if(id == 0){//child//问题1:子进程能继承父进程的文件描述符表(那么是不是我们的 sockfd(会使用) 和 listensockfd(不会继续使用)) 都能被子进程看到)//让子进程关闭不需要的文件描述符(不会影响我们对应的父进程)::close(_listensockfd);if(fork() > 0){exit(0);//这里我们将子进程之间退出,孙子进程执行handler(孙子进程交给bash)}HandlerRequest(sockfd);exit(0);}::close(sockfd);int rid = ::waitpid(id,nullptr,0);if(rid < 0){LOG(LogLevel::WARNING) << "waitpid error";}}}void Stop(){_isrunning = false;}~TcpServer(){}
private:int _listensockfd;uint16_t _port;bool _isrunning;
};




但是我们来一个用户,创建一个进程,但是进程创建的成本太高了,我们可不可以替换成多线程呢?
四.多线程之间通信
我们如果创建多线程进行通信的话,我们不需要关闭文件描述符表
#pragma once#include <iostream>
#include <cstring>
#include <string>
#include <cerrno>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"static const uint16_t gport = 8080;
#define BACKLOG 8
using namespace LogModule;class TcpServer
{struct ThreadData{int sockfd;TcpServer* self;};
public:TcpServer(int port = gport):_port(port),_isrunning(false){}void InitServer(){//1.创建TCP套接字_listensockfd = ::socket(AF_INET,SOCK_STREAM,0);if(_listensockfd < 0){LOG(LogLevel::FATAL) << "socket error";Die(SOCKET_ERR);}LOG(LogLevel::INFO) << "socket create success, sockfd is : " << _listensockfd;struct sockaddr_in local;memset(&local,0,sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(gport);local.sin_addr.s_addr = INADDR_ANY;//2.bindint n = ::bind(_listensockfd,CONV(&local),sizeof(local));if(n < 0){LOG(LogLevel::FATAL) << "bind error";Die(BIND_ERR);}LOG(LogLevel::INFO) << "bind success...";//3.cs,tcp是面向链接的,就要求tcp随时随地等待被链接//tcp 需要将我们的套接字设置为监听状态n = ::listen(_listensockfd,BACKLOG);if(n < 0){LOG(LogLevel::FATAL) << "listen error";Die(LISTEN_ERR);}LOG(LogLevel::INFO) << "listen success...";//子进程退出最好的做法// ::signal(SIGCHLD,SIG_IGN);//子进程退出,OS会自动回收资源,不用再进行wait了}void HandlerRequest(int sockfd){char inbuffer[4096];while(true){ssize_t n = read(sockfd,inbuffer,sizeof(inbuffer)-1);if(n > 0){inbuffer[n] = 0;LOG(LogLevel::INFO) << inbuffer;std::string echo_str = "server echo# ";echo_str += inbuffer;::write(sockfd,echo_str.c_str(),echo_str.size());}else if(n == 0){// read 如果读取返回值是0,标识client退出LOG(LogLevel::INFO) << "client exit: " << sockfd;break;}else{//读取失败了break;}}::close(sockfd);}static void* ThreadEntry(void* args){pthread_detach(pthread_self());ThreadData* data = (ThreadData*)args;data->self->HandlerRequest(data->sockfd);return nullptr;}void Start(){_isrunning = true;while(_isrunning){//不能直接读数据//1.获取新连接struct sockaddr_in peer;socklen_t peerlen = sizeof(peer);LOG(LogLevel::DEBUG) << "accept ing...";int sockfd = ::accept(_listensockfd,CONV(&peer),&peerlen);if(sockfd < 0){LOG(LogLevel::WARNING) << "accept error: " << strerror(errno);continue;}//2.获取链接成功LOG(LogLevel::INFO) << "accept success,sockfd is : " << sockfd;InetAddr addr(peer);LOG(LogLevel::INFO) << "client info: " << addr.Addr();//version - 2pthread_t tid;ThreadData* data = new ThreadData;data->sockfd = sockfd;data->self = this;pthread_create(&tid,nullptr,ThreadEntry,data);//主线程和新线程怎么看待文件描述符表?//主线程和新线程,共享同一张文件描述符表!!! (千万不能关闭我们的文件描述符表)}}void Stop(){_isrunning = false;}~TcpServer(){}
private:int _listensockfd;uint16_t _port;bool _isrunning;
};




五.线程池版本
我们将原来封装的线程池CP过来
"Mutex.hpp"#pragma once
#include <iostream>
#include <pthread.h>namespace LockModule
{class Mutex{public:Mutex(const Mutex&) = delete;const Mutex& operator=(const Mutex&) = delete;Mutex(){int n = ::pthread_mutex_init(&_lock,nullptr);(void)n;}void Lock(){int n = ::pthread_mutex_lock(&_lock);(void)n;}void Unlock(){int n = ::pthread_mutex_unlock(&_lock);(void)n;}pthread_mutex_t* LockPtr(){return &_lock;}~Mutex(){int n = ::pthread_mutex_destroy(&_lock);(void)n;}private:pthread_mutex_t _lock;};class LockGuard{public:LockGuard(Mutex& mtx):_mtx(mtx){_mtx.Lock();}~LockGuard(){_mtx.Unlock();}private:Mutex& _mtx;};
};
"Cond.hpp"#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"namespace CondModule
{using namespace LockModule;class Cond{public:Cond(){int n = ::pthread_cond_init(&_cond,nullptr);(void)n;}void Wait(Mutex& mutex)//让线程曾经的锁释放曾经的锁{int n = ::pthread_cond_wait(&_cond,mutex.LockPtr());(void)n;}void Notify(){int n = ::pthread_cond_signal(&_cond);(void)n;}void NotifyAll(){int n = ::pthread_cond_broadcast(&_cond);(void)n;}~Cond(){int n = ::pthread_cond_destroy(&_cond);(void)n;}private:pthread_cond_t _cond;};
}
"Thread.hpp"#ifndef _THREAD_HPP__
#define _THREAD_HPP__#include <iostream>
#include <string>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <functional>namespace ThreadModule
{using func_t = std::function<void(std::string name)>;static int number = 1;enum class TSTATUS{NEW,RUN,STOP};class Thread{private://这个地方不能写成成员方法// void* Routine(Threadthis,void* args)// {// }static void* Routine(void* args){Thread* t = static_cast<Thread*>(args);t->_func(t->Name());return nullptr;}void EnableDetach(){_joinable = false;}public:Thread(func_t func):_func(func),_status(TSTATUS::NEW),_joinable(true){_name = "Thread-" + std::to_string(number++);_pid = getpid();}bool Start(){if(_status != TSTATUS::RUN){int n = pthread_create(&_tid,nullptr,Routine,this);if(n != 0){return false;}_status = TSTATUS::RUN;return true;}return false;}bool Stop(){if(_status == TSTATUS::RUN){int n = pthread_cancel(_tid);if(n != 0){return false;}_status = TSTATUS::STOP;return true;}return false;}bool Join(){if(_joinable){int n = pthread_join(_tid,nullptr);if(n != 0){return false;}_status = TSTATUS::STOP;return true;}return false;}void Detach(){EnableDetach();pthread_detach(_tid);}bool IsJoinable(){return _joinable;}std::string Name(){return _name;}~Thread(){}private:std::string _name;pthread_t _tid;pid_t _pid;bool _joinable;//默认不分离func_t _func;TSTATUS _status;};
};#endif
"ThreadPool.hpp"#pragma once#include <iostream>
#include <string>
#include <queue>
#include <vector>
#include <memory>
#include "Log.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"namespace ThreadPoolModule
{using namespace LogModule;using namespace ThreadModule;using namespace LockModule;using namespace CondModule;using thread_t = std::shared_ptr<Thread>;void DefaultTest(){while(true){LOG(LogLevel::DEBUG) << "我是一个测试方法";sleep(1);}}const static int defaultnum = 5;template<class T>class ThreadPool{private:bool IsEmpty(){return _taskq.empty();}void HandlerTask(){LOG(LogLevel::INFO) << "线程" << name << ", 进入HandlerTask的逻辑";while(true){//1.拿任务T t;{LockGuard lockguard(_lock);while(IsEmpty() && _isrunning){_wait_num++;_cond.Wait(_lock);_wait_num--;}//任务弄完了,并且线程池退出了,就退出if(IsEmpty() && !_isrunning){break;}t = _taskq.front();_taskq.pop();}//2.处理任务t(); //规定,未来所有的任务处理,全部都是必须提供()方法!}}ThreadPool(const ThreadPool<T>&) = delete;ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;ThreadPool(int num = defaultnum):_num(num),_wait_num(0),_isrunning(false){for(int i = 0;i < _num;i++){_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1)));LOG(LogLevel::DEBUG) << "构建线程" << _threads[i]->Name() << "对象 ... 成功";}}public:static ThreadPool<T>* getInstance(){if(instance == NULL){LockGuard lockguard(mutex);if(instance == NULL){LOG(LogLevel::INFO) << "单例首次被执行,需要加载对象...";instance = new ThreadPool<T>();instance->Start();}return instance;}}void Equeue(T& in){LockGuard lockguard(_lock);if(!_isrunning){return;}_taskq.push(in);if(_wait_num > 0){_cond.Notify();}}void Start(){if(_isrunning){return;}_isrunning = true;for(auto& thread_ptr : _threads){thread_ptr->Start();LOG(LogLevel::DEBUG) << "启动线程" << thread_ptr->Name() << " ... 成功";}}void Wait(){for(auto& thread_ptr : _threads){thread_ptr->Join();LOG(LogLevel::DEBUG) << "停止线程" << thread_ptr->Name() << " ... 成功";}}void Stop(){LockGuard lockguard(_lock);if(_isrunning)//3.不能再入任务了{_isrunning = false;//1.让线程自己退出(唤醒) && 2.历史任务处理完if(_wait_num > 0){_cond.NotifyAll();}}}~ThreadPool(){}private:int _num;int _wait_num;std::queue<T> _taskq;std::vector<thread_t> _threads;Mutex _lock;Cond _cond;bool _isrunning;static ThreadPool<T>* instance;static Mutex mutex;//只用于保护单例};template<class T>ThreadPool<T>* ThreadPool<T>::instance = NULL;template<class T>Mutex ThreadPool<T>::mutex;
}

对于我们的线程池,比较适合处理短任务,或者是用户量较少的情况(因为线程池的个数是固定的)

六.读取和发送修改


void HandlerRequest(int sockfd){char inbuffer[4096];//长任务while(true){// ssize_t n = read(sockfd,inbuffer,sizeof(inbuffer)-1);ssize_t n = recv(sockfd,inbuffer,sizeof(inbuffer)-1,0);if(n > 0){inbuffer[n] = 0;LOG(LogLevel::INFO) << inbuffer;std::string echo_str = "server echo# ";echo_str += inbuffer;// ::write(sockfd,echo_str.c_str(),echo_str.size());::send(sockfd,echo_str.c_str(),echo_str.size(),0);}else if(n == 0){// read 如果读取返回值是0,标识client退出LOG(LogLevel::INFO) << "client exit: " << sockfd;break;}else{//读取失败了break;}}::close(sockfd);}
这里我们将read和write改成对应的send和recv
