统一事件源
- 信号是一种异步事件:信号处理函数和程序的主循环是两条不同的执行路线。
- 信号处理函数需要尽可能快地执行完毕,以确保该信号不被屏蔽(为了避免一些竞态条件,信号在处理期间,系统不会再次触发它)太久。
- 一种典型的解决方案是:把信号的主要处理逻辑放到程序的主循环中,当信号处理函数被触发时,它只是简单地通知主循环程序接收到信号,并把信号值传递给主循环,主循环再根据接收到的信号值执行目标信号对应的逻辑代码。
- 信号处理函数通常使用管道来将信号“传递”给主循环:信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出该信号值。那么主循环怎么知道管道上何时有数据可读呢?这很简单,我们只需要使用I/O复用系统调用来监听管道的读端文件描述符上的可读事件。如此一来,信号事件就能和其他I/O事件一样被处理,即统一事件源。
项目需要用到的几个函数:
- sigaction信号处理函数(不清楚的可以看进程信号之sigaction系统调用-CSDN博客)
- socketpair创建两个相互通信的套接字(不清楚的可以看管道与进程间通信-CSDN博客)
- fcntl这里用于将文件描述符设置为非阻塞
还有一些注意事项,我会在代码中说明。
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536struct client_data
{sockaddr_in address;int connfd;pid_t pid;//处理这个连接的子进程的pidint pipefd[2];//父子通信的socketpair全双工套接字管道
};static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_mem = 0;
client_data* users = 0;
int* sub_process = 0;
int user_count = 0;
bool stop_child = false;int setnonblocking( int fd )
{int old_option = fcntl( fd, F_GETFL );int new_option = old_option | O_NONBLOCK;fcntl( fd, F_SETFL, new_option );return old_option;
}void addfd( int epollfd, int fd )
{epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );setnonblocking( fd );
}void sig_handler( int sig )
{int save_errno = errno;int msg = sig;send( sig_pipefd[1], ( char* )&msg, 1, 0 );errno = save_errno;
}void addsig( int sig, void(*handler)(int), bool restart = true )
{struct sigaction sa;memset( &sa, '\0', sizeof( sa ) );sa.sa_handler = handler;if( restart ){sa.sa_flags |= SA_RESTART;}sigfillset( &sa.sa_mask );assert( sigaction( sig, &sa, NULL ) != -1 );
}void del_resource()
{close( sig_pipefd[0] );close( sig_pipefd[1] );close( listenfd );close( epollfd );shm_unlink( shm_name );delete [] users;delete [] sub_process;
}void child_term_handler( int sig )
{stop_child = true;
}// 子进程逻辑
// idx--->客户连接对应的客户在users中的下标
// users--->客户信息数组
// share_mem--->共享内存的位置
int run_child( int idx, client_data* users, char* share_mem )
{epoll_event events[ MAX_EVENT_NUMBER ];//子进程的关心事件捕捞结构体数组int child_epollfd = epoll_create( 5 );assert( child_epollfd != -1 );int connfd = users[idx].connfd;// 子进程将连接的读端加入自己的关心事件;主进程将连接交给子进程后用于不在关心这个连接addfd( child_epollfd, connfd );int pipefd = users[idx].pipefd[1];// 子进程将用于与主进程交流的users[idx].pipefd[1]套接字读事件加入关心事件addfd( child_epollfd, pipefd );int ret;// 子进程将SIGTERM信号捕捉,自定义处理方式为结束主进程主循环。addsig( SIGTERM, child_term_handler, false );while( !stop_child ){int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number < 0 ) && ( errno != EINTR ) ){printf( "epoll failure\n" );break;}for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;// 子进程处理来自客户连接的数据// 将客户数据读取到共享内存的对应位置// 这个位置是share_mem + idx*BUFFER_SIZEif( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) ){memset( share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE );ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );//这里默认一次就可以读取完整if( ret < 0 ){if( errno != EAGAIN ){stop_child = true;}}else if( ret == 0 )//连接关闭也会触发EPOLLIN事件{stop_child = true;}//正确接收后,将客户下标发给主进程else{send( pipefd, ( char* )&idx, sizeof( idx ), 0 );}}// 子进程收到主进程的群发消息else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) ){int client = 0;//users下标ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );if( ret < 0 ){if( errno != EAGAIN ){stop_child = true;}}else if( ret == 0 ){stop_child = true;}else{send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 );}}else{continue;}}}close( connfd );//连接结束时关闭连接套接字close( pipefd );//连接结束时关闭父子信道close( child_epollfd );//关闭子进程的内核事件表child_epollfdreturn 0;
}int main( int argc, char* argv[] )
{if( argc <= 2 ){printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );return 1;}const char* ip = argv[1];int port = atoi( argv[2] );int ret = 0;struct sockaddr_in address;bzero( &address, sizeof( address ) );address.sin_family = AF_INET;inet_pton( AF_INET, ip, &address.sin_addr );address.sin_port = htons( port );listenfd = socket( PF_INET, SOCK_STREAM, 0 );assert( listenfd >= 0 );ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );assert( ret != -1 );ret = listen( listenfd, 5 );assert( ret != -1 );user_count = 0;users = new client_data [ USER_LIMIT+1 ];//创建一大块空间,用于存储后续到来的用户数据sub_process = new int [ PROCESS_LIMIT ];//创建一大块空间,用建立子进程pid与客户连接之间的映射;通过子进程pid索引到客户连接for( int i = 0; i < PROCESS_LIMIT; ++i ){sub_process[i] = -1;}epoll_event events[ MAX_EVENT_NUMBER ];//创建用于主线程捕捞事件的epoll_event结构体数组epollfd = epoll_create( 5 );assert( epollfd != -1 );addfd( epollfd, listenfd );//主线程将listenfd上的读事件加入关心事件// 创建全双工管道的信号通知管道,主线程收到特定信号时,通过该管道通知主循环,交给主循环去处理ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );assert( ret != -1 );setnonblocking( sig_pipefd[1] );//将信号通知管道写端设置为非阻塞addfd( epollfd, sig_pipefd[0] );//主线程将通知管道读端加入关心事件// 注意此时socketpair是全双工管道,但是我们却是把其当做半双工的匿名管道来看待// 之所以不直接创建半双工的匿名管道,是因为把套接字加入epollfd的关心事件,更加合适// SIGCHLD 信号是当一个子进程终止、停止或恢复执行时,内核会向其父进程发送的信号。addsig( SIGCHLD, sig_handler );// 主线程将SIGCHLD信号,通过sig_pipefd管道统一事件源addsig( SIGTERM, sig_handler );// 主线程将SIGTERM信号,通过sig_pipefd管道统一事件源addsig( SIGINT, sig_handler );// 主线程将SIGINT信号,通过sig_pipefd管道统一事件源addsig( SIGPIPE, SIG_IGN );// 主线程忽略SIGPIPE信号bool stop_server = false;bool terminate = false;// 共享内存// 大小为 USER_LIMIT * BUFFER_SIZE// 每一个客户连接的数据,按照客户所属在users下标,放在共享内存指定位置shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 );assert( shmfd != -1 );ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE ); assert( ret != -1 );share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );assert( share_mem != MAP_FAILED );close( shmfd );while( !stop_server ){// 主线程只关心listenfd套接字和信号通知管道读端上的读事件以及父子信道上来来自于任意子进程上的读事件int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number < 0 ) && ( errno != EINTR ) ){printf( "epoll failure\n" );break;}for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;// 处理新连接if( sockfd == listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );if ( connfd < 0 ){printf( "errno is: %d\n", errno );continue;}// 如果连接超限,直接关闭这个连接;客户端从而三次握手,关闭连接if( user_count >= USER_LIMIT ){const char* info = "too many users\n";printf( "%s", info );send( connfd, info, strlen( info ), 0 );close( connfd );continue;}// 存储这个用户信息至users数组相关位置users[user_count].address = client_address;users[user_count].connfd = connfd;// 每个子进程都创建一个用于同父进程通信的管道ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );assert( ret != -1 );pid_t pid = fork();if( pid < 0 ){close( connfd );continue;}else if( pid == 0 ){close( epollfd );// 子进程关闭父进程的epollfd,不关心父进程的所已经关心的所有事件close( listenfd );// 子进程关闭listenfd套接字close( users[user_count].pipefd[0] );//子进程用users[user_count].pipefd[1]作为与主进程交流的通道close( sig_pipefd[0] );//子进程会收到之前的那些信号,这些信号会通过信号传递管道发给子进程,但是发现管道已经关闭,忽略该信号close( sig_pipefd[1] );//父进程会收到这些信号,然后对信号做出决策,这是非常合理的run_child( user_count, users, share_mem );//子进程关联这个连接munmap( (void*)share_mem, USER_LIMIT * BUFFER_SIZE );exit( 0 );}else{close( connfd );//主进程关闭连接套接字,这意味着这一连接完全交给子进程去处理close( users[user_count].pipefd[1] );//主进程用users[user_count].pipefd[0]作为与子进程交流的通道 addfd( epollfd, users[user_count].pipefd[0] );//将父子通信管道读端加入主线程关心事件users[user_count].pid = pid;//将子进程pid信息补全sub_process[pid] = user_count;//建立子进程pid与客户在users下标之间的映射user_count++;}}//主进程接收到信号else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) ){int sig;char signals[1024];ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );if( ret == -1 ){continue;}else if( ret == 0 ){continue;}else{for( int i = 0; i < ret; ++i ){switch( signals[i] ){case SIGCHLD://收到子进程退出信号,waitpid子进程,处理users用户信息数组,处理pid与下标映射数组sub_process{pid_t pid;int stat;while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ){int del_user = sub_process[pid];//获得要删除的users下标sub_process[pid] = -1;//解除该子进程pid与该用户users中的下标的映射if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) ){printf( "the deleted user was not change\n" );continue;}// 子进程退出后主进程的epollfd解除主进程用于同该子进程通信的users[del_user].pipefd[0]的关心epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );close( users[del_user].pipefd[0] );//关闭该对父子通信套接字users[del_user] = users[--user_count];//该操作真巧,删除该用户sub_process[users[del_user].pid] = del_user;//重建映射printf( "child %d exit, now we have %d users\n", del_user, user_count ); }if( terminate && user_count == 0 )//子进程全部退出{stop_server = true;}break;}case SIGTERM://主进程收到退出信号case SIGINT:{printf( "kill all the clild now\n" );//addsig( SIGTERM, SIG_IGN );//addsig( SIGINT, SIG_IGN );// 如果此时没有子进程,则直接退出即可if( user_count == 0 ){stop_server = true;break;}//向所有子进程发送终止信号for( int i = 0; i < user_count; ++i ){int pid = users[i].pid;kill( pid, SIGTERM );//不与子进程通信,直接通过系统调用杀死子进程}terminate = true;//此处是用于waitpid子进程,表示主进程正在进行退出环节,一旦子进程全部回收,则主进程结束主循环break;}default:{break;}}}}}// 处理来自子进程上的信息else if( events[i].events & EPOLLIN ){int child = 0;//存储来自子进程发来的客户下标ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 );printf( "read data from child accross pipe\n" );if( ret == -1 ){continue;}else if( ret == 0 ){continue;}else{for( int j = 0; j < user_count; ++j ){// socket是父子信道users[user_count].pipefd[0],也就是说子进程给sockfd发了数据// 具体这个sockfd,是众多子进程中刚给主进程发送数据的那个子进程对应的主进程父子信道users[user_count].pipefd[0]if( users[j].pipefd[0] != sockfd )//只要不是刚接收的数据的users[user_count].pipefd[0],就发送要发送数据的位置{printf( "send data to child accross pipe\n" );// 本意是将要发送的数据在共享内存中的位置发送给一个个除了最近给主进程发送数据的一个个子进程// 转化为发送这个用户的下标即可// 那么要发送的数据在共享内存中的位置是:share_mem + child*BUFFER_SIZEsend( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 );}}}}}}del_resource();return 0;
}
上述代码足以展示统一事件源的基本逻辑。