5.10-套接字通信 - C++
套接字通信
1.1 通信效率问题
-
服务器端
-
单线程 / 单进程
- 无法使用,不支持多客户端
-
多线程 / 多进程
- 写程序优先考虑多线程:
- 什么时候考虑多进程?
- 启动了一个可执行程序 A ,要在 A 中启动一个可执行程序 B
- 支持多客户端连接
-
IO 多路转接
-
单线程 / 进程
-
支持多客户端连接但是效率不是最高的
- 所有的客户端请求都是顺序处理的 -> 排队
-
多线程
int main() {// 1. 监听 fdint lfd = socket(); };// 2. 绑定bind();// 3. 监听listen();// 4. 初始化 epoll()int epfd = epoll_create(x);// 5. epooll 添加检测节点 -> lfdepoll_ctl(epfd, epoll_ctl_add, lfd, &ev);while (1){int num = epoll_wait(epfd, evs, 1024, NULL);for (int i = 0; i < num; i++){if (curfd == lfd){pthread_create(&tid, NULL, acceptConn, &epfd);// accept();}else{...// read();// write();}}} }
-
线程池
-
多个线程的一个集合,可以回收用完的线程
- 线程池的个数取决于业务逻辑
-
密集型业务逻辑:需要大量 cpu 时间进行数据处理
- 线程个数 == 电脑核心数
-
进行 io 操作
- 线程个数 == 两倍 cpu 核心数
-
- 线程池的个数取决于业务逻辑
-
不需要重复频繁地创建销毁线程
-
设计思路
1. 需要两个角色- 管理者 -> 1 个- 工作的线程 -> N 个 2. 管理者- 不工作(不处理业务逻辑,监测工作的线程的状态,管理线程的个数)- 假设工作线程不够用了,动态创建新的线程- 假设工作的线程太多了,销毁一部分工作的线程- 动态监测工作的线程的状态 3. 工作的线程- 处理业务逻辑 4. 需要一个任务队列- 存储任务 -> 唤醒阻塞线程 -> 条件变量- 工作的线程处理任务队列中的任务- 没有任务 -> 阻塞
-
-
-
-
-
-
客户端
// 创建TcpSocket对象 == 一个连接,这个对象就可以和服务器通信了,多个连接需要创建多个这样的对象 class TcpSocket { public:TcpSocket(){m_connfd = socket(af_inet, sock_stream, 0);}TcpSocket(int fd){m_connfd = fd; // 传递进行的fd是可以直接通信的文件描述符,不需要连接操作}~TcpSocket();/* 客户端 连接服务器 */int conectToHost(string ip, unsigned short port, int connecttime){connect(m_connfd, &serverAddress, &len);}/* 客户端 关闭和服务端的连接 */int disConnect();/* 客户端 发送报文 */int sendMsg(string sendMsg, int sendtime = 10000){senf(m_connfd, data, datalen, 0);}/* 客户端 接受报文 */string recvMsg(int timeout){recv(m_connfd, buffer, size, 0);return buffer;}private:int m_connfd; };
-
服务器
// 思想: 服务端不负责通信,只负责监听,如果通信使用客户端类 class TcpServer { public:// 初始化监听的套接字: 创建,绑定,监听TcpServer();~TcpServer(); // 在这里边关闭监听的fdTcpSocket* acceptConn(int timeout = 999999){int fd = accept(m_lfd, &address, &len);// 通信fd -> 类TcpSocket* tcp = new TcpSocket(fd);if (tcp != nullptr){return tcp;}return nullptr;}private:int m_lfd; // 监听的fd };
// 使用 void * callback(void* arg) {TcpSocket * tcp = (TcpSocket *) arg;tcp->sendMsg();tcp->recvMsg();tcp->disConnect();delete tcp; }int main() {TcpServer * server = new Tcpserver;while (1){TcpSocket * tcp = server->acceptConn();// 创建子进程 -> 通信pthread_create(&tid, NULL, callback, arg)}delete server;return 0; }
// 客户端程序 int main() {TcpSocket * tcp = new TcpSocket;tcp->ConnectToHost(ip, port, timeout);tcp->sendMsg();tcp->recvMsg();tcp->disConnect();delete tcp; }
2 套接字超时
套接字通信过程中的默认的阻塞函数
等待并接受客户端连接
通信、接受数据、发送数据、连接服务器时
设置超时处理的原因:不想让进程(线程)一直在对应为止阻塞
超时处理的思路:
- 定时器
sleep(10)
- 以上两种不可用,在指定时间内阻塞函数满足条件直接解除阻塞,以上两种不满足要求
- IO 多路转接函数
- 这些函数最后一个参数是设置阻塞的时长,如果有
fd
发生变化,函数直接返回- 帮助委托内核检测
fd
状态:读写异常
2.1 accept 超时
// 等待并接受客户端连接
// 如果没有客户端连接,一直阻塞
// 检测 accept 函数的 fd 读缓冲区就可以了
int accept(int sockfd, struct sockaddr * addr, socklen_t * addrlen);// 使用select 函数检测状态
int select(int nfds, fd_set * readfds,fd_set * writefds,fd_set * exceptfds,struct timeval * timeout);
// 通信的fd放到 fd_set 中检测
if (ret == 0)
{// 超时
}
else if (ret == 1)
{// 有新连接accept(); // 绝对不阻塞
}
else
{// error, -1
}
读超时、写超时略
2.2 connect 超时
// 连接服务器 -> 处于连接过程中,函数不返回 -> 程序阻塞在这个函数上,可通过返回值判断是不是连接成功了
// 返回值 0,成功,-1,失败
// 该函数默认有一个超时处理:75s 或 175s- 设置 connect 函数操作的文件描述符为非阻塞- 使用 select 检测- 设置 connect 函数操作的文件描述符为阻塞 -> 状态还原
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);// 与上述同理
2.3 tcp 粘包问题
造成原因:
- 发送的时候,内核进行了优化,数据达到一定量发一次
- 网络环境不好,有延迟
- 接收方频率低,一次性读到了多条客户端发送的数据
解决方案:
- 发送的时候强制缓存区发送数据
- 在发送数据的时候添加包头
- 包头:一块内存,存储了当前消息的属性信息
- 属于谁:char[12]
- 有多大:int
- …
3 进程间通信:共享内存
-
使用流程
1. 向内核申请一块内存 -> 指定大小 2. 如果有两个进程需要进行通信,可以使用共享内存通信,先创建两个进程 3. 进程 A 和进程 B 分别和共享内存进程关联- 拿到共享内存的地址 -> 首地址 4. 两个进程可以公国这个首地址对共享内存进行读(写)操作 5. 如果这个进程不再使用这块共享内存,需要和共享内存断开连接- 进程退出对共享内存没有任何影响 6. 当不再使用共享内存的时候,需要将共享内存销毁
-
共享内存头文件
#include <sys/ipc.h> #include <sys/shm.h>
-
共享内存操作函数
-
创建或打开一块共享内存区
// 创建共享内存 // 共享内存已经存在 // 可以创建多块共享内存 int shmget(key_t key, size_t size, int sh mhflg);key:通过 key 记录共享内存在内核中的位置,为一个大于零的整数,等于 0 不行,随便指定一个就可以size:创建共享内存的时候指定共享内存的大小。如果已经创建,设为 0shmflg:创建共享内存的时候使用,指定打开文件的方式 返回值:成功:创建(打开)成功,得到一个整形数失败:-1// 应用 // 1. 创建共享内存 int shmid = shmget(100, 4096, IPC_CREAT | 0664); int shmid = shmget(200, 4096, IPC_CREAT | 0664); // 2. open share memory int shmid = shmget(100, 0, 0)
-
将当前进程与共享内存关联
void * shmat(int shmid, const void * shmaddr, int shmflg);参数:- shmid:通过这个参数访问共享内存,shmget() 的返回值- shmaddr:指定共享内存在内核中的位置,指定为空,委托内核寻找- shmflg:关联成功后对内存的操作权限- SHM_RDONLY:只读- 0:读写 成功:内存的地址 失败:(void *)-1// 函数调用: shmat(shmid, NULL, 0); // write on shm memcpy(ptr, "xxx", len); printf("%s", (char*)ptr);
-
depart shm
int shmdt(const void * shmaddr);arg: address of shmreturn:suc: 0fai: -1
-
control shm
int shmctl(int shmid, int com, struct shmid_ds * buf);arg:-shimd: reuturn value of shmget- cmd: operation to shm- IPC_STAT: get status of cmd- IPC_SET: set shm- IPC_RMID: mark shm to be destroyed- buf: as a struct can describe the status of shm reutrn value:succ: 0fail: -1// demo: destroy shm shmctl(shmid, IPC_RMID, nullptr);
demo: communication of processes
read:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/ipc.h> #include <sys/shm.h>int main() {// 1. open shmint shmid = shmget(100, 0, 0);// 2. relate shm with cur processvoid* ptr = shmat(shmid, NULL, 0);// 3. write on shmprintf("content: %s\n", (char *)ptr);printf("press any key to continue ...\n");getchar();// 4. release relevanceshmdt(ptr);// 5. destory shmshmctl(shmid, IPC_RMID, NULL);return 0; }
write:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/ipc.h> #include <sys/shm.h>int main() {// 1. create shmint shmid = shmget(100, 4096, IPC_CREAT|0664);// 2. relate shm with cur processvoid* ptr = shmat(shmid, NULL, 0);// 3. write on shmconst char * tmp = "this is a damo for shm ...";memcpy(ptr, tmp, strlen(tmp) + 1);printf("press any key to continue ...\n");getchar();// 4. release relevanceshmdt(ptr);// 5. destory shmshmctl(shmid, IPC_RMID, NULL);return 0; }
-
-
question:
- question 1: how dose the operation system know the number of process that relate with a shm?
- shm keep a struct
struct shmid_ds
, that has a membershm_nattch
. shm_nattch
records the number of being related process.
- shm keep a struct
- question 2: whether can call
shmctl
to destroyed a shm more than once?- yes
- because
shmctl
function is just marking a shm to be destroy, not destroying it directly. - when it was destroyed truly?
- when
shm_nattch == 0
- when
the commands to observe the shm:
ipcs -m
-
ftok
function prototypekey_t ftok(const char *pathname, int proj_id);- pathname: the absolute path- proj_id: only use the top 1 byte- value range: 1 - 255key_t t = fotk("/home/", 'a');
-
difference of shm and mmp
. . .
4 the API encapsulation of shm
class BashShm
{
public:BashShm(int key); // open a shm according a given key.BashShm(string path); // **with no size**, according to string path -> int key, open a shm.BashShm(int key, int size); // create a shm by the given key and size.BashShm(string path, int size); // string -> key, according to size.void * mapshm(){m_ptr = shmat(shmid);return shmat();}int unmapshm(){return shmdt(m_ptr);}int delshm(){return shmctl(shmid);}private:int m_shmid; // reuturn value of `shmat()`void * m_ptr
}