Tcp套接字编程
Tcp Socket
Tcp和Udp在代码中的区别,就是多了一个连接的步骤。
不同的接口介绍
//服务端不同
#include <sys/socket.h>
int listen(int sockfd, int backlog); //我们需要对创建的sockfd进行监听
//进行监听,这里的 sockfd就是 socket的返回值
//backlog 控制了等待处理连接请求的队列长度,合理设置有助于平衡服务器负载和资源使用。(AI生成)
//backlog后面会讲解
RETURN VALUE (返回值):
On success, zero is returned. On error, -1 is returned, and errno
is set to indicate the error.
#include <sys/socket.h>
//这个返回值才是我们进行IO操作的文件描述符
int accept(int sockfd, struct sockaddr *client_addr,
socklen_t *client_addrlen);
// client_addr: 连接的客户端信息
//client_addrlen: 上面的信息结构体的大小
当我们accept之后使用返回值进行IO操作。
//客户端不同
//客户端不需要显示绑定, 但是由于tcp是面向字节流的,所以需要connect
int connect(int sockfd, const struct sockaddr *addr,ocklen_t addrlen);
//示例代码
sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = ::htons(port); //传入的端口
server_addr.sin_addr.s_addr = ::inet_addr(ip.c_str());//传入的ip
socklen_t len = sizeof(server_addr);
int n = connect(sockfd,CONV(&server_addr),len);
if (n < 0)
{
LOG(LogLevel::DEBUG) << "connect error" << strerror(errno);
}
实现代码
服务器代码 TcpServer.hpp
#pragma once
#include <string.h>
#include <iostream>
#include <memory>
#include <cstdlib>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"
#include "Commn.hpp"
#define BACKLOG 8
const uint16_t gport = 8888;
using namespace LogModule;
sockaddr_in peer;
class TcpServer
{
public:
TcpServer(uint16_t port = gport)
: _port(port),
_isrunning(false)
{
}
void InitServer()
{
_listensockfd = ::socket(AF_INET, SOCK_STREAM, 0); //进行监听
if (_listensockfd < 0)
{
LOG(LogLevel::FATAL) << "create listensockfd error";
Die(SOCKET_ERROR);
}
LOG(LogLevel::INFO) << "create listensockfd success, listensockfd is: " << _listensockfd;
sockaddr_in local;
local.sin_addr.s_addr = INADDR_ANY;
local.sin_family = AF_INET;
local.sin_port = ::htons(_port);
socklen_t len = sizeof(local);
int n = ::bind(_listensockfd, CONV(&local), len);
if (n < 0)
{
LOG(LogLevel::FATAL) << "bind error";
}
LOG(LogLevel::INFO) << "bind success";
// 连接
int n1 = ::listen(_listensockfd, BACKLOG); //招呼客人进店
if (n1 < 0)
{
LOG(LogLevel::FATAL) << "listen listensocketfd :" << _listensockfd << " error";
Die(LISTEN_ERROR);
}
LOG(LogLevel::INFO) << "listen listensocketfd :" << _listensockfd << " success";
}
void HandlerRequest(int socketfd)
{
LOG(LogLevel::INFO) << "HandlerRequest socketfs is: " << socketfd;
char inbuffer[4096];
while (true)
{
int n = ::read(socketfd, inbuffer, sizeof(inbuffer)-1);
if (n > 0)
{
inbuffer[n] = 0;
LOG(LogLevel::INFO) << inbuffer;
std::string clie = "server_echo # ";
clie += inbuffer;
int n1 = ::write(socketfd, clie.c_str(), clie.size());
}
}
}
void Start()
{
_isrunning = true;
while (_isrunning)
{
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int socketfd = accept(_listensockfd, CONV(&peer), &len); //叫一个服务员来招待客人,然后我继续去外面拉客人。
//peer 初始化之后用来接收连接成功的客户端信IP和端口信息
if (socketfd < 0)
{
LOG(LogLevel::WARNING) << "accept fd error, repeat";
continue;
}
// 连接成功
LOG(LogLevel::INFO) << "accept socketfd success socketfd is: " << socketfd;
HandlerRequest(socketfd);
}
}
void Stop()
{
_isrunning = false;
}
~TcpServer()
{
}
private:
int _listensockfd;
uint16_t _port;
bool _isrunning;
};
理解Tcp
如上图,对于服务端而言我们需要监听我们创建的sockfd
,而真正起作用的是accept
之后返回的文件描述符。
_listensockfd = ::socket(AF_INET, SOCK_STREAM, 0); //进行监听
if (_listensockfd < 0)
{
LOG(LogLevel::FATAL) << "create listensockfd error";
Die(SOCKET_ERROR);
}
LOG(LogLevel::INFO) << "create listensockfd success, listensockfd is: " << _listensockfd;
//对应代码
int socketfd = accept(_listensockfd, CONV(&peer), &len); //叫一个服务员来招待客人,然后我继续去外面拉客人。
if (socketfd < 0)
{
LOG(LogLevel::WARNING) << "accept fd error, repeat";
continue;
}
// 连接成功
LOG(LogLevel::INFO) << "accept socketfd success socketfd is: " << socketfd;
HandlerRequest(socketfd); //此时我们拿着文件描述符,继续进行IO操作
TcpClient.cc
#include "Commn.hpp"
#include <sys/socket.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include <cstring>
#include <errno.h>
using namespace LogModule;
int main(int argc,char* argv[])
{
if (argc != 3)
{
std::cout << "Usage Fail: use ./client_tcp server_ip server_port" << std::endl;
return 1;
}
std::string ip = argv[1];
uint16_t port = std::stoi(argv[2]);
int sockfd = socket(AF_INET,SOCK_STREAM,0);
if (sockfd < 0)
{
LOG(LogLevel::DEBUG) << "socket create error";
Die(SOCKET_ERROR);
}
//客户端不需要显示绑定, 但是由于tcp是面向字节流的,所以需要connect
// int connect(int sockfd, const struct sockaddr *addr,
// socklen_t addrlen);
sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = ::htons(port);
server_addr.sin_addr.s_addr = ::inet_addr(ip.c_str());
socklen_t len = sizeof(server_addr);
int n = connect(sockfd,CONV(&server_addr),len);
if (n < 0)
{
LOG(LogLevel::DEBUG) << "connect error" << strerror(errno);
}
//此时就可以接受数据了
std::string message;
while(true)
{
char inbuffer[1024];
std::cout << "input message: ";
std::getline(std::cin,message);
int n = ::write(sockfd,message.c_str(),message.size());
if (n > 0)
{
int ret = ::read(sockfd,inbuffer,sizeof(inbuffer)-1);
inbuffer[ret] = 0;
std::cout << inbuffer << std::endl;
}
else if (n == 0)
{
break;
}
else
return 1;
}
return 0;
}
⭐问题一: 当我们客户端ctrl+c之后再连接,输入没有结果返回了?
这里我们需要观察我们的服务端代码:
void HandlerRequest(int socketfd)
{
LOG(LogLevel::INFO) << "HandlerRequest socketfs is: " << socketfd;
char inbuffer[4096];
while (true)
{
int n = ::read(socketfd, inbuffer, sizeof(inbuffer)-1);
if (n > 0)
{
inbuffer[n] = 0;
LOG(LogLevel::INFO) << inbuffer;
std::string clie = "server_echo # ";
clie += inbuffer;
int n1 = ::write(socketfd, clie.c_str(), clie.size());
}
}
}
可以看到,我们并没有处理写端关闭的情况,写端关闭,n此时为0
修改代码:
void HandlerRequest(int socketfd)
{
LOG(LogLevel::INFO) << "HandlerRequest socketfs is: " << socketfd;
char inbuffer[4096];
while (true)
{
int n = ::read(socketfd, inbuffer, sizeof(inbuffer)-1);
if (n > 0)
{
inbuffer[n] = 0;
LOG(LogLevel::INFO) << inbuffer;
std::string clie = "server_echo # ";
clie += inbuffer;
int n1 = ::write(socketfd, clie.c_str(), clie.size());
}
else if(n == 0)
{
std::cout << "client quit: " << socketfd << std::endl;
break;
}
else return;
}
::close(socketfd); //关闭文件描述符
}
但是注意对于一个进程而言,我们创建一个文件描述符本质就是一个数组小标(fd_array
),一般linux机器都会限制文件描述符的个数,而我的机器是云服务器,工程师进行了处理,我们使用ulimit -a
可以查看自己可以打开的文件个数。
所以文件描述符是一个有限资源,那么当我们不用的时候(写端关闭),那么我们就需要关闭不用的文件描述符,不然就是文件描述符泄露。
⭐问题二: 无法处理多个用户
如上图,虽然我们客户端退出重连可以得到结果,但是当我们新来一个用户的时候也没有结果回显,也就是说我们这个服务器现在只能接受一个人的请求! 这肯定不行,所以我们需要改为多进程或者多线程的方式来处理多用户。
多进程
//version-1 多进程
pid_t id = fork();
if (id == 0) //子进程
{
HandlerRequest(socketfd);
}
//pid_t waitpid(pid_t pid, int *_Nullable wstatus, int options);
int wid = waitpid(id,nullptr,0);
if (wid < 0)
{
std::cout << "wait error" << std::endl;
}
如上端代码,这样就行了吗? 我们先来明确一个概念:此时父子进程共有两个文件描述符表,子进程会浅拷贝我们父进程的文件描述符表,所以我们父子进程需要关闭不需要的文件描述符。
- 对于子进程,我只需要
socketfd
,所以我需要把listenfd
关闭掉 - 对于父进程,我只需要
listenfd
,所以我需要把socketfd
关闭掉
但是此时我们的父进程还需要等待子进程,这并没有解决问题,其中一种解决方法就是让父进程的等待方式变为非阻塞,同时使用signal()
忽略子进程给父进程发送的SIGCHILD
信号。但是此时我们不用这种方法,用如下方法:
void Start()
{
_isrunning = true;
while (_isrunning)
{
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int socketfd = accept(_listensockfd, CONV(&peer), &len); //叫一个服务员来招待客人,然后我继续去外面拉客人。
if (socketfd < 0)
{
LOG(LogLevel::WARNING) << "accept fd error, repeat";
continue;
}
//version-0
// 连接成功
LOG(LogLevel::INFO) << "accept socketfd success socketfd is: " << socketfd;
// HandlerRequest(socketfd);
//version-1 多进程
pid_t id = fork();
if (id == 0) //子进程
{
::close(_listensockfd); //关闭没有使用的文件描述符
if (fork() > 0) exit(0);
//此时执行当前代码的是孙子进程
HandlerRequest(socketfd);
exit(0);
}
//pid_t waitpid(pid_t pid, int *_Nullable wstatus, int options);
::close(socketfd);
int wid = waitpid(id,nullptr,0);
if (wid < 0)
{
std::cout << "wait error" << std::endl;
}
}
我们在子进程执行过程中又创建一个孙子进程,同时子进程直接退出,此时父进程直接回收子进程,由于每一个父进程都只管自己的子进程,所以此时的孙子进程就变为了孤儿进程被操作系统领养,那么资源的释放由操作系统自动释放,我们就不需要管了。
看上面的效果图我们发现每个用户的文件描述符都是4? 这是很显然的因为每一次我们子进程创建父进程后直接就退出了,而父进程又关闭了 socketfd,所以父进程每一次accept的文件描述符都是4。
多线程
这里我们先使用原生线程库来实现:
static void* PthreadEnter(void*args)
{
}
void Start()
{
_isrunning = true;
while (_isrunning)
{
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int socketfd = accept(_listensockfd, CONV(&peer), &len); //叫一个服务员来招待客人,然后我继续去外面拉客人。
if (socketfd < 0)
{
LOG(LogLevel::WARNING) << "accept fd error, repeat";
continue;
}
//version-0
// 连接成功
LOG(LogLevel::INFO) << "accept socketfd success socketfd is: " << socketfd;
pthread_t pid;
pthread_create(&pid,nullptr,PthreadEnter,&socketfd);
// int pthread_create(pthread_t *restrict thread,
// const pthread_attr_t *restrict attr,
// void *(*start_routine)(void *),
// void *restrict arg);
注意:我们pthread_create要求的函数类型为 void*(void*)
,所以我们要自己定义一个新的函数PthreadEnter(),但是在类内中定义会自带一个this指针,所以我们需要变为static静态类成员函数。(这里还有bug)
我们先说我们可以直接传入 &socketfd吗?
这里我们是线程,所以我们使用的是同一份资源,所以当我们子线程去执行IO操作的时候,主线程继续执行,那么当我们主线程执行到 accept的时候,socketfd会重新获取,可能原来传入子线程的就变为野指针了?所以我们不可以直接传入 &socketfd
。
再来说static静态类成员函数,他就跟这个类没啥关系了,他只是定义在这个类中,访问不了类中的成员函数 (this指针),所以我们可以创建一个结构体,把this指针传进去。
同时,对于线程而言,主线程也要等待子线程,所以我们需要让子线程与主线程分离。
最终代码如下:
struct PthreadData
{
TcpServer* self;
int fd;
};
static void* PthreadEnter(void*args)
{
pthread_detach(pthread_self()); //把自己与主线程分离,主线程不需要管我了。
PthreadData* data = (PthreadData*)args;
data->self->HandlerRequest(data->fd);
return nullptr;
}
void Start()
{
_isrunning = true;
while (_isrunning)
{
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int socketfd = accept(_listensockfd, CONV(&peer), &len); //叫一个服务员来招待客人,然后我继续去外面拉客人。
if (socketfd < 0)
{
LOG(LogLevel::WARNING) << "accept fd error, repeat";
continue;
}
LOG(LogLevel::INFO) << "accept socketfd success socketfd is: " << socketfd;
PthreadData* data = new PthreadData;
int* sockfdp = new int;
*sockfdp = socketfd;
data->fd = *sockfdp;
data->self = this;
pthread_t pid;
pthread_create(&pid,nullptr,PthreadEnter,(void*)data);
}
使用 ps -aL
查看线程个数
如上图,同时我们也可以看到,我们的文件描述符是不一样的了。
线程池
线程池只需要构建一个任务,然后传入线程池即可。
void Start()
{
_isrunning = true;
while (_isrunning)
{
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int socketfd = accept(_listensockfd, CONV(&peer), &len); //叫一个服务员来招待客人,然后我继续去外面拉客人。
if (socketfd < 0)
{
LOG(LogLevel::WARNING) << "accept fd error, repeat";
continue;
}
LOG(LogLevel::INFO) << "accept socketfd success socketfd is: " << socketfd;
task_t t = [this,socketfd](){
this->HandlerRequest(socketfd);
};
threadpool<task_t>::Getinstance()->Equeue(t);
}
}
多线程远程命令执行
上面所说的都是我发一个内容给服务器,服务器给我返回我发的什么,但是我访问服务器肯定是需要服务器处理我发送的数据然后把结果返回给我啊,所以我们来实现一个小业务:远程命令执行 。我们给服务器发送 ls,pwd等等命令,服务器给我返回在服务器运行的结果。
代码实现
目的: 处理我发送的请求然后把结果返回给我,这里的请求就是ls,pwd等等命令。
思路:
- 这里我们肯定是需要在当前进程执行另一个进程(子进程),就跟我们之前的手写shell一样,只不过我们的手写shell是直接输出到显示器上,而这一次我们需要把结果得到然后返回给父进程。
那么我们处理的步骤如下:
- 创建管道
- 创建子进程,子进程执行命令(execv)并把结果返回给我们
那么如何把结果返回给父进程呢? dup2(fd[1],1)
重定向管道的写端为标准输出。这样父进程通过管道的读端就可以得到结果。我们这里需要那么麻烦吗?C语言为我们封装了一个函数,他自动帮我们完成这些任务:
参数解析:
Command:
代表你要执行的命令字符串Type:
代表对管道是度还是写,传入 “r” 就是读,"w"写。Return val:
他这里是以一个文件的方式返回,也就是说我们读写的话,可以使用文件的读写操作来完成。如果操作失败则是返回空。
那么这里我们是要得到返回的结果,就是使用"r"读,
代码实现:
- 这里我们限制一下我们接受的命令,我们创建一个白名单,表示我们可以解析的命令名单,如果不在这个名单则直接返回错误。
//Command.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstdio>
#include <set>
class CommandClass
{
public:
CommandClass()
{
_witelist.insert("ls");
_witelist.insert("ls -al");
_witelist.insert("ls -al --color");
_witelist.insert("pwd");
_witelist.insert("ps");
}
//ls -a -b -c -d -e
//正常解法:
/*
1/ 创建管道 (pipe) + 创建子进程 fork()
2. 子进程执行命令 execv(替换)
3. 把结果输出到屏幕当中 => 返回给父进程 dup2(fd,1)
*/
std::string CommandStr(std::string& cmdstr)
{
if (_witelist.find(cmdstr) == _witelist.end()) return "操作不合法";
FILE* ret = popen(cmdstr.c_str(),"r");//如果传入的是错误的命令,并不会返回nullptr,而是直接打印一条错误信息?
if (!ret) return "不可以创建子进程或者系统资源不足\n";
std::string result;
char buffer[1024];
//opoen返回的是文件描述符,我们直接以文件的读方式拿到结果
while(fgets(buffer,sizeof(buffer),ret))
{
result += buffer;
}
//把最后的\n去掉
if (!result.empty() && result.back() == '\n') result.pop_back();
else if(result.empty()) return "command not found\n";
return result;
}
private:
std::set<std::string> _witelist;
};
这是我们处理任务的函数,在服务端中我们需要把任务注册进来,客户给我们发送指令字符串我们需要把结果返回给用户。
服务端代码:
//server.cc
#include "TcpServer.hpp"
#include "Command.hpp"
int main(int argc,char* argv[])
{
CommandClass cmd;
if (argc != 2)
{
std::cout << "Usage: user ./server_tcp port" << std::endl;
return 1;
}
uint16_t port = std::stoi(argv[1]);
std::shared_ptr<TcpServer> server = std::make_shared<TcpServer>(port);
server->InitServer();
server->RegisterHander([&cmd](std::string&s){ return cmd.CommandStr(s);});
server->Start();
return 0;
}
//server.hpp
#pragma once
#include <string.h>
#include <iostream>
#include <memory>
#include <cstdlib>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"
#include "Commn.hpp"
#include <sys/wait.h>
#include <pthread.h>
#include "ThreadPool.hpp"
#include <functional>
#define BACKLOG 8
const uint16_t gport = 8888;
using namespace LogModule;
using namespace MyThreadPool;
using task_t = std::function<void(void)>;
using hander_t = std::function<std::string(std::string&)>;
sockaddr_in peer;
class TcpServer
{
struct PthreadData
{
TcpServer* self;
int fd;
};
public:
TcpServer(uint16_t port = gport)
: _port(port),
_isrunning(false)
{
}
void InitServer()
{
_listensockfd = ::socket(AF_INET, SOCK_STREAM, 0); //进行监听
if (_listensockfd < 0)
{
LOG(LogLevel::FATAL) << "create listensockfd error";
Die(SOCKET_ERROR);
}
LOG(LogLevel::INFO) << "create listensockfd success, listensockfd is: " << _listensockfd;
sockaddr_in local;
local.sin_addr.s_addr = INADDR_ANY;
local.sin_family = AF_INET;
local.sin_port = ::htons(_port);
socklen_t len = sizeof(local);
int n = ::bind(_listensockfd, CONV(&local), len);
if (n < 0)
{
LOG(LogLevel::FATAL) << "bind error";
}
LOG(LogLevel::INFO) << "bind success";
// 连接
int n1 = ::listen(_listensockfd, BACKLOG); //招呼客人进店
if (n1 < 0)
{
LOG(LogLevel::FATAL) << "listen listensocketfd :" << _listensockfd << " error";
Die(LISTEN_ERROR);
}
LOG(LogLevel::INFO) << "listen listensocketfd :" << _listensockfd << " success";
}
void RegisterHander(hander_t hander)
{
_hander = hander;
}
void HandlerRequest(int socketfd)
{
LOG(LogLevel::INFO) << "HandlerRequest socketfs is: " << socketfd;
char inbuffer[4096];
while (true)
{
int n = ::recv(socketfd, inbuffer, sizeof(inbuffer)-1,0);
if (n > 0)
{
inbuffer[n] = 0;
LOG(LogLevel::INFO) << inbuffer;
std::string cmdstr = inbuffer;
std::string returnstr = _hander(cmdstr); //把结果返回
int n1 = ::send(socketfd, returnstr.c_str(), returnstr.size(),0);
}
else if(n == 0)
{
std::cout << "client quit: " << socketfd << std::endl;
break;
}
else return;
}
::close(socketfd); //关闭文件描述符
}
static void* PthreadEnter(void*args)
{
pthread_detach(pthread_self()); //把自己与主线程分离,主线程不需要管我了。
PthreadData* data = (PthreadData*)args;
std::cout << " socketfd: " << data->fd << std::endl;
data->self->HandlerRequest(data->fd);
return nullptr;
}
void Start()
{
_isrunning = true;
while (_isrunning)
{
memset(&peer, 0, sizeof(peer));
socklen_t len = sizeof(peer);
int socketfd = accept(_listensockfd, CONV(&peer), &len); //叫一个服务员来招待客人,然后我继续去外面拉客人。
if (socketfd < 0)
{
LOG(LogLevel::WARNING) << "accept fd error, repeat";
continue;
}
//version-0
// 连接成功
LOG(LogLevel::INFO) << "accept socketfd success socketfd is: " << socketfd;
// HandlerRequest(socketfd);
//version-1 多进程
// pid_t id = fork();
// if (id == 0) //子进程
// {
// ::close(_listensockfd); //关闭没有使用的文件描述符
// if (fork() > 0) exit(0);
// //此时执行当前代码的是孙子进程
// HandlerRequest(socketfd);
// exit(0);
// }
// //pid_t waitpid(pid_t pid, int *_Nullable wstatus, int options);
// ::close(socketfd);
// int wid = waitpid(id,nullptr,0);
// if (wid < 0)
// {
// std::cout << "wait error" << std::endl;
// }
//version-2 多线程 (原生线程库)
// int pthread_create(pthread_t *restrict thread,
// const pthread_attr_t *restrict attr,
// void *(*start_routine)(void *),
// void *restrict arg);
// PthreadData* data = new PthreadData;
// int* sockfdp = new int;
// std::cout << " socketfd: " << socketfd << std::endl;
// *sockfdp = socketfd;
// std::cout << " sockfdp: " << *sockfdp << std::endl;
// data->fd = *sockfdp;
// data->self = this;
// pthread_t pid;
// pthread_create(&pid,nullptr,PthreadEnter,(void*)data);
//version-3 线程池
task_t t = [this,socketfd](){
this->HandlerRequest(socketfd);
};
threadpool<task_t>::Getinstance()->Equeue(t);
}
}
void Stop()
{
_isrunning = false;
}
~TcpServer()
{
}
private:
int _listensockfd;
uint16_t _port;
bool _isrunning;
hander_t _hander;
};