嵌入式第四十天(TCP并发服务端(IO多路复用))
一.IO多路复用:
1.select
2.poll
3.epoll
二.epoll
1.
epoll 会维护一个红黑树结构,专门用于存储用户注册的、需要监控其 IO 事件的文件描述符。红黑树的特性(有序、查找/插入/删除效率高,时间复杂度为 O(log n))保证了 epoll 能高效管理大量的文件描述符,即使监控的 FD 数量庞大,相关操作也能保持较好的性能。
此外,epoll 还会维护一个就绪链表,当监控的 FD 有 IO 事件触发时(如可读、可写),对应的 FD 会被放入该链表中。用户调用 epoll_wait 时,只需直接从就绪链表中获取就绪的 FD 即可,无需遍历所有注册的 FD,这也是 epoll 相比 select/poll 高效的关键原因之一。
总结来说:epoll 通过红黑树存储所有注册的待监控 FD,通过就绪链表存储已触发 IO 事件的 FD,两者配合实现了高效的 IO 多路复用管理。
2.
3.
#ifndef _HEAD_H
#define _HEAD_H#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h> /* superset of previous */
#include <arpa/inet.h>
#include <pthread.h>
#include <sys/wait.h>
#include <signal.h>
#include <sys/time.h>
#include <sys/select.h>
#include <sys/epoll.h>#endif
#include "head.h"#define MAX_FD_CNT 100int epoll_fd_add(int epfds, int fd, uint32_t events)
{struct epoll_event ev;ev.events = events;ev.data.fd = fd;int ret = epoll_ctl(epfds, EPOLL_CTL_ADD, fd, &ev);if (ret < 0){perror("epoll_ctl error");return -1;}return 0;
}int epoll_del_fd(int epfds, int fd)
{int ret = epoll_ctl(epfds, EPOLL_CTL_DEL, fd, NULL);if (ret < 0){perror("epoll_ctl del error");return -1;}return 0;
}int main(int argc,const char *argv[])
{int sockfd = socket(AF_INET,SOCK_STREAM,0);int optval = 1;setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));if(sockfd < 0){perror("socket error");return -1;}struct sockaddr_in cliaddr;socklen_t clilen = sizeof(cliaddr);struct sockaddr_in seraddr;seraddr.sin_family = AF_INET;seraddr.sin_port = htons(50000);seraddr.sin_addr.s_addr = inet_addr("192.168.19.129");int ret = bind(sockfd,(struct sockaddr*)&seraddr,sizeof(seraddr));if(ret < 0){perror("bind error");return -1;}ret = listen(sockfd,100);if(ret < 0){perror("listen error");return -1;}char buff[1024] = {0};int epfds = epoll_create(MAX_FD_CNT);if(epfds < 0){perror("epoll_create error");return -1;}epoll_fd_add(epfds,sockfd,EPOLLIN);struct epoll_event evs[MAX_FD_CNT];while(1){int cnt = epoll_wait(epfds,evs,MAX_FD_CNT,-1);if(cnt < 0){perror("epoll_wait error");return -1;}for(int i = 0;i < cnt;i++){if(sockfd == evs[i].data.fd){int connfd = accept(sockfd,(struct sockaddr *)&cliaddr,&clilen);if(connfd < 0){perror("accept error");return -1;}epoll_fd_add(epfds,connfd,EPOLLIN);}else{memset(buff,0,sizeof(buff));ssize_t cnt = recv(evs[i].data.fd,buff,sizeof(buff),0);if(cnt < 0){perror("recv error");epoll_del_fd(epfds,evs[i].data.fd);close(evs[i].data.fd);continue;}else if(0 == cnt){epoll_del_fd(epfds,evs[i].data.fd);close(evs[i].data.fd);continue;}printf("%s\n",buff);strcat(buff,"---->ok");cnt = send(evs[i].data.fd,buff,strlen(buff),0);if(cnt < 0){perror("send error");epoll_del_fd(epfds,evs[i].data.fd);close(evs[i].data.fd);continue;}}}}close(sockfd);return 0;
}
4.
#include"head.h"#define SER_PORT 50000
#define SER_IP "192.168.19.129"#define MAX_FD_CNT 256int connfds_g[MAX_FD_CNT] = {0};
int total_fd_g = 0;int save_connfd(int *connfds_g, int fd)
{if (total_fd_g >= MAX_FD_CNT || total_fd_g < 0){return -1;}connfds_g[total_fd_g] = fd;total_fd_g++;return 0;
}int del_connfd(int *connfds_g, int fd)
{int i;for (i = 0; i < total_fd_g; ++i){if (connfds_g[i] == fd){break;}}if (i >= total_fd_g){printf("connfds_g Not found %d\n", fd);return -1;}for ( ;i < total_fd_g-1; ++i){connfds_g[i] = connfds_g[i+1];}total_fd_g--;if (total_fd_g < 0){return -1;}return 0;
}int init_tcp_ser()
{int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){perror("socket error");return -1;}struct sockaddr_in seraddr;seraddr.sin_family = AF_INET;seraddr.sin_port = htons(SER_PORT);seraddr.sin_addr.s_addr = inet_addr(SER_IP);int ret = bind(sockfd, (struct sockaddr *)&seraddr, sizeof(seraddr));if (ret < 0){perror("bind error");return -1;}ret = listen(sockfd, 100);if (ret < 0){perror("listen error");return -1;}return sockfd;
}int epoll_add_fd(int epfds, int fd, uint32_t events)
{struct epoll_event ev;ev.events = events;ev.data.fd = fd;int ret = epoll_ctl(epfds, EPOLL_CTL_ADD, fd, &ev);if (ret < 0){perror("epoll_ctl add error");return -1;}return 0;
}int epoll_del_fd(int epfds, int fd)
{int ret = epoll_ctl(epfds, EPOLL_CTL_DEL, fd, NULL);if (ret < 0){perror("epoll_ctl del error");return -1;}return 0;
}int main(int argc, const char *argv[])
{Msg_t mymsg;struct sockaddr_in cliaddr;socklen_t clilen = sizeof(cliaddr);int sockfd = init_tcp_ser();if (sockfd < 0){return -1;}int epfds = epoll_create(MAX_FD_CNT);if (epfds < 0){perror("epoll_create error");return -1;}epoll_add_fd(epfds, sockfd, EPOLLIN);struct epoll_event evs[MAX_FD_CNT];while (1){int cnt = epoll_wait(epfds, evs, MAX_FD_CNT, -1);if (cnt < 0){perror("epoll_wait error");return -1;}for (int i = 0; i < cnt; ++i){if (sockfd == evs[i].data.fd){int connfd = accept(sockfd, (struct sockaddr *)&cliaddr, &clilen);if (connfd < 0){perror("accept error");return -1;}epoll_add_fd(epfds, connfd, EPOLLIN);save_connfd(connfds_g, connfd);}else{memset(&mymsg, 0, sizeof(mymsg));ssize_t size = recv(evs[i].data.fd, &mymsg, sizeof(Msg_t), 0);if (size < 0){perror("recv error");epoll_del_fd(epfds, evs[i].data.fd);close(evs[i].data.fd);continue;}else if (0 == size){epoll_del_fd(epfds, evs[i].data.fd);close(evs[i].data.fd);continue;}if (mymsg.type == MSG_JOIN){printf("[%s] 加入群聊!\n", mymsg.name);}else if (mymsg.type == MSG_QUIT){printf("[%s] 退出群聊!\n", mymsg.name);}else if(mymsg.type == MSG_CHAT){printf("[%s]:%s\n",mymsg.name,mymsg.buff);}for (int j = 0; j < total_fd_g; ++j){if (evs[i].data.fd != connfds_g[j]){size = send(connfds_g[j], &mymsg, sizeof(Msg_t), 0);if (size < 0){perror("send error");close(connfds_g[j]);del_connfd(connfds_g, connfds_g[j]);continue;}}}}}}close(sockfd);return 0;
}
#include "head.h"#define SER_PORT 50000
#define SER_IP "192.168.19.129"int sockfd;
char username[64] = {0};void *recv_thread(void *arg)
{Msg_t mymsg;while(1){memset(&mymsg, 0, sizeof(mymsg));ssize_t size = recv(sockfd, &mymsg, sizeof(Msg_t), 0);if(size <= 0){perror("recv error or connection closed");break;}if(mymsg.type == MSG_JOIN){printf("[服务通知] %s 加入了群聊\n", mymsg.name);}else if(mymsg.type == MSG_QUIT){printf("[服务通知] %s 退出了群聊\n", mymsg.name);}else if(mymsg.type == MSG_CHAT){printf("[%s] %s\n", mymsg.name, mymsg.buff);}}return NULL;
}int main(int argc, char *argv[])
{if(argc < 2){printf("Usage: %s <username>\n", argv[0]);return -1;}strncpy(username, argv[1], sizeof(username)-1);sockfd = socket(AF_INET, SOCK_STREAM, 0);if(sockfd < 0){perror("socket error");return -1;}struct sockaddr_in seraddr;seraddr.sin_family = AF_INET;seraddr.sin_port = htons(SER_PORT);seraddr.sin_addr.s_addr = inet_addr(SER_IP);int connfd = connect(sockfd, (struct sockaddr*)&seraddr, sizeof(seraddr));if(connfd < 0){perror("connect error");close(sockfd);return -1;}Msg_t join_msg;join_msg.type = MSG_JOIN;strncpy(join_msg.name, username, sizeof(join_msg.name)-1); strncpy(join_msg.buff, "Hello I'm zhangsan", sizeof(join_msg.buff)-1);int cnt = send(sockfd, &join_msg, sizeof(Msg_t), 0);if(cnt < 0){perror("send join msg error");close(sockfd);return -1;}pthread_t tid;pthread_create(&tid, NULL, recv_thread, NULL);Msg_t send_msg;send_msg.type = MSG_CHAT;strncpy(send_msg.name, username, sizeof(send_msg.name)-1);printf("进入群聊!输入'quiet'退出\n");while(1){fgets(send_msg.buff, sizeof(send_msg.buff), stdin);send_msg.buff[strlen(send_msg.buff) - 1] = '\0';if(strcmp(send_msg.buff, "quiet") == 0){printf("退出聊天\n");break;}int cnt = send(sockfd, &send_msg, sizeof(Msg_t), 0);if(cnt < 0){perror("send msg error");break;}}Msg_t quit_msg;quit_msg.type = MSG_QUIT;strncpy(quit_msg.name, username, sizeof(quit_msg.name)-1);strncpy(quit_msg.buff, "zhangsan go go go!", sizeof(quit_msg.buff)-1);send(sockfd, &quit_msg, sizeof(Msg_t), 0);close(sockfd);return 0;
}