linux网络编程-----TCP服务端并发模型(epoll)
1.select、poll与epoll的特点
(1).select特点
1.使用位图实现对文件描述符集合的保存,最多允许同时监听1024个文件描述符;
2.需要应用和内核的反复数据(文件描述符集合表)拷贝;
3.返回的集合表需要遍历寻找到达事件;
4.只能工作在水平触发模式(低俗模式),不能工作在边沿触发模式(高速模式);
(2)poll特点
1.使用链表实现对文件描述符集合的保存,没有监听文件描述符的上限限制;
2.需要应用和内核的反复数据(文件描述符集合表)拷贝;
3.返回的集合表需要遍历寻找到达事件;
4.只能工作在水平触发模式(低俗模式),不能工作在边沿触发模式(高速模式);
(3)epoll特点
1.使用红黑树(二叉树)实现文件描述符集合的存储,没有文件描述符上限限制,提高查找效率
2.将文件描述符集合创建在内核层,避免了应用层和内核层的反复数据拷贝
3.返回到达事件,不需要遍历,只需要处理事件即可
4.可工作在水平触发模式(低速模式),也可工作在边沿触发模式(高速模式)
2.epoll
(1)步骤
①创建文件描述符结合
②添加关注的文件描述符
③epoll通知内核开始进行事件监测
④epoll返回时,获取到达事件的结果
⑤根据到达事件做任务处理
(2)函数接口
int epoll_create(int size);
功能:通知内核创建文件描述符集合
参数:
size:监测的文件描述符个数
返回值:
成功:文件描述符(代表内核创建的集合)
失败:-1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
功能:对epoll的文件描述符集合进行操作
参数:
epfd:创建的epoll集合
op:对文件描述符集合进行的操作
EPOLL_CTL_ADD : 添加文件描述符到集合
EPOLL_CTL_MOD : 修改集合中的文件描述符
EPOLL_CTL_DEL :删除集合中的文件描述符
fd:要操作的文件描述符
event:文件描述符对应的事件
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events:文件描述符的事件:
EPOLLIN: 读事件
EOPLLOUT:写事件
data.fd : 关注的文件描述符返回值:
成功:0
失败:-1
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
功能:通知内核开始监测文件描述符的事件
参数:
epfd:监测的文件描述符集合
events:保存返回的到达事件的结果(数组)eg:struct epoll_event evs[MAX_FD_CNT];
maxevents:最大的事件个数
timeout:监测的超时时间
-1 :不设置超时(一直阻塞)返回值:
成功:到达的事件的个数
失败:-1
#include "head.h"#define MAX_SIZE 1024
#define MAX_FD_CNT 2int epoll_fd_add(int epfds, int fd, uint32_t events)
{struct epoll_event ev;ev.data.fd = fd;ev.events = events;int ret = epoll_ctl(epfds, EPOLL_CTL_ADD, fd, &ev);if(ret < 0){perror("epoll ctl error");return -1;}return 0;
}int main(int argc, char const *argv[])
{char buff[MAX_SIZE] = {0};mkfifo("./myfifo", 0664);int fifofd = open("./myfifo", O_RDONLY);if(fifofd < 0){perror("open error");return -1;}int epfds = epoll_create(MAX_FD_CNT);if(epfds < 0){perror("epoll create error");return -1;}epoll_fd_add(epfds, 0, EPOLLIN);epoll_fd_add(epfds, fifofd, EPOLLIN);struct epoll_event evs[MAX_FD_CNT];while (1){int cnt = epoll_wait(epfds, evs, MAX_FD_CNT, -1);if(cnt < 0){perror("epoll wait error");return -1;}for (int i = 0; i < cnt; i++){if(0 == evs[i].data.fd){fgets(buff, sizeof(buff), stdin);printf("stdin = %s\n", buff);}else if(fifofd == evs[i].data.fd){memset(buff, 0, sizeof(buff));ssize_t n = read(evs[i].data.fd, buff, sizeof(buff));if (0 == n) {epoll_ctl(epfds, EPOLL_CTL_DEL, fifofd, NULL);close(fifofd);printf("fifo写端已关闭,停止监听\n");} else if (n > 0) {printf("fifo = %s\n", buff);}}}}close(fifofd);return 0;
}
(3)实现TCP的并发
实现群聊功能
服务器
#include "head.h"#define SER_PORT 50001
#define SER_IP "192.168.0.106"#define MAX_FD_CNT 256int connfds_g[MAX_FD_CNT] = {0};
int total_fd_g = 0;int save_connfd(int *connfds_g, int fd)
{if (total_fd_g >= MAX_FD_CNT || total_fd_g < 0){return -1;}connfds_g[total_fd_g] = fd;total_fd_g++;return 0;
}int del_connfd(int *connfds_g, int fd)
{int i;for (i = 0; i < total_fd_g; ++i){if (connfds_g[i] == fd){break;}}if (i >= total_fd_g){printf("connfds_g Not found %d\n", fd);return -1;}for ( ;i < total_fd_g-1; ++i){connfds_g[i] = connfds_g[i+1];}total_fd_g--;if (total_fd_g < 0){return -1;}return 0;
}int init_tcp_ser()
{int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){perror("socket error");return -1;}struct sockaddr_in seraddr;seraddr.sin_family = AF_INET;seraddr.sin_port = htons(SER_PORT);seraddr.sin_addr.s_addr = inet_addr(SER_IP);int ret = bind(sockfd, (struct sockaddr *)&seraddr, sizeof(seraddr));if (ret < 0){perror("bind error");return -1;}ret = listen(sockfd, 100);if (ret < 0){perror("listen error");return -1;}return sockfd;
}int epoll_add_fd(int epfds, int fd, uint32_t events)
{struct epoll_event ev;ev.events = events;ev.data.fd = fd;int ret = epoll_ctl(epfds, EPOLL_CTL_ADD, fd, &ev);if (ret < 0){perror("epoll_ctl add error");return -1;}return 0;
}int epoll_del_fd(int epfds, int fd)
{int ret = epoll_ctl(epfds, EPOLL_CTL_DEL, fd, NULL);if (ret < 0){perror("epoll_ctl del error");return -1;}return 0;
}int main(int argc, const char *argv[])
{Msg_t mymsg;struct sockaddr_in cliaddr;socklen_t clilen = sizeof(cliaddr);int sockfd = init_tcp_ser();if (sockfd < 0){return -1;}int epfds = epoll_create(MAX_FD_CNT);if (epfds < 0){perror("epoll_create error");return -1;}epoll_add_fd(epfds, sockfd, EPOLLIN);struct epoll_event evs[MAX_FD_CNT];while (1){int cnt = epoll_wait(epfds, evs, MAX_FD_CNT, -1);if (cnt < 0){perror("epoll_wait error");return -1;}for (int i = 0; i < cnt; ++i){if (sockfd == evs[i].data.fd){int connfd = accept(sockfd, (struct sockaddr *)&cliaddr, &clilen);if (connfd < 0){perror("accept error");return -1;}epoll_add_fd(epfds, connfd, EPOLLIN);save_connfd(connfds_g, connfd);}else{memset(&mymsg, 0, sizeof(mymsg));ssize_t size = recv(evs[i].data.fd, &mymsg, sizeof(Msg_t), 0);if (size < 0){perror("recv error");epoll_del_fd(epfds, evs[i].data.fd);close(evs[i].data.fd);continue;}else if (0 == size){epoll_del_fd(epfds, evs[i].data.fd);close(evs[i].data.fd);continue; }if (mymsg.type == MSG_JOIN){printf("[%s] 加入群聊!\n", mymsg.name);}else if (mymsg.type == MSG_QUIT){printf("[%s] 退出群聊!\n", mymsg.name);}for (int j = 0; j < total_fd_g; ++j){if (evs[i].data.fd != connfds_g[j]){size = send(connfds_g[j], &mymsg, sizeof(Msg_t), 0);if (size < 0){perror("send error");close(connfds_g[j]);del_connfd(connfds_g, connfds_g[j]);continue;}}} }}}close(sockfd);return 0;
}
客户端
#include "head.h"#define SER_PORT 50001
#define SER_IP "192.168.0.106" Msg_type_t mymsg_type = MSG_JOIN;
Msg_t mymsg;int create_sockfd()
{int sockfd = socket(AF_INET, SOCK_STREAM, 0);if(sockfd < 0){perror("socket error");return -1;}struct sockaddr_in sockaddr;sockaddr.sin_family = AF_INET;sockaddr.sin_port = htons(SER_PORT);sockaddr.sin_addr.s_addr = inet_addr(SER_IP);int connfd = connect(sockfd, (struct sockaddr *)&sockaddr, sizeof(sockaddr));if(connfd < 0){perror("connect error");return -1;}return sockfd;
}int send_mymsg(int sockfd)
{mymsg.type = mymsg_type;int ret = send(sockfd, &mymsg, sizeof(mymsg), 0);if(ret < 0){perror("send error");return -1;}return 0;
}int main(int argc, char const *argv[])
{ if(argc != 2){printf("eg: ./a.out <username>\n");return -1;}strcpy(mymsg.name, argv[1]);strcpy(mymsg.buff, "已加入群聊");int sockfd = create_sockfd();if (sockfd < 0) { return -1;}send_mymsg(sockfd);pid_t pid = fork();if(pid > 0){ while(1){ fgets(mymsg.buff, sizeof(mymsg.buff), stdin);mymsg.buff[strlen(mymsg.buff) - 1] = 0;if(0 == strcmp(mymsg.buff, ".quit")){ mymsg_type = MSG_QUIT;strcpy(mymsg.buff, "已退出群聊");}else if(mymsg.buff){mymsg_type = MSG_CHAT;}switch (mymsg_type){case MSG_QUIT:send_mymsg(sockfd);break;case MSG_CHAT:send_mymsg(sockfd);break;default:break;}if(mymsg_type == MSG_QUIT){break;}}}else if(pid == 0){while(1){ memset(mymsg.buff, 0, sizeof(mymsg.buff));int ret = recv(sockfd, &mymsg, sizeof(mymsg), 0);if(ret < 0){perror("recv error");exit(1);}if(ret == 0){exit(1);}mymsg_type = mymsg.type;if(mymsg_type == MSG_CHAT){printf("[%s]:%s\n", mymsg.name, mymsg.buff);}else{printf("[%s]%s\n", mymsg.name, mymsg.buff);}} }return 0;
}