当前位置: 首页 > news >正文

聊天室项目多进程纯C版

项目源代码

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536struct client_data
{sockaddr_in address;int connfd;pid_t pid;//处理这个连接的子进程的pidint pipefd[2];//父子通信的socketpair全双工套接字管道
};static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_mem = 0;
client_data* users = 0;
int* sub_process = 0;
int user_count = 0;
bool stop_child = false;int setnonblocking( int fd )
{int old_option = fcntl( fd, F_GETFL );int new_option = old_option | O_NONBLOCK;fcntl( fd, F_SETFL, new_option );return old_option;
}void addfd( int epollfd, int fd )
{epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );setnonblocking( fd );
}void sig_handler( int sig )
{int save_errno = errno;int msg = sig;send( sig_pipefd[1], ( char* )&msg, 1, 0 );errno = save_errno;
}void addsig( int sig, void(*handler)(int), bool restart = true )
{struct sigaction sa;memset( &sa, '\0', sizeof( sa ) );sa.sa_handler = handler;if( restart ){sa.sa_flags |= SA_RESTART;}sigfillset( &sa.sa_mask );assert( sigaction( sig, &sa, NULL ) != -1 );
}void del_resource()
{close( sig_pipefd[0] );close( sig_pipefd[1] );close( listenfd );close( epollfd );shm_unlink( shm_name );delete [] users;delete [] sub_process;
}void child_term_handler( int sig )
{stop_child = true;
}// 子进程逻辑
// idx--->客户连接对应的客户在users中的下标
// users--->客户信息数组
// share_mem--->共享内存的位置
int run_child( int idx, client_data* users, char* share_mem )
{epoll_event events[ MAX_EVENT_NUMBER ];//子进程的关心事件捕捞结构体数组int child_epollfd = epoll_create( 5 );assert( child_epollfd != -1 );int connfd = users[idx].connfd;// 子进程将连接的读端加入自己的关心事件;主进程将连接交给子进程后用于不在关心这个连接addfd( child_epollfd, connfd );int pipefd = users[idx].pipefd[1];// 子进程将用于与主进程交流的users[idx].pipefd[1]套接字读事件加入关心事件addfd( child_epollfd, pipefd );int ret;// 子进程将SIGTERM信号捕捉,自定义处理方式为结束子进程主循环。addsig( SIGTERM, child_term_handler, false );while( !stop_child ){int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number < 0 ) && ( errno != EINTR ) ){printf( "epoll failure\n" );break;}for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;// 子进程处理来自客户连接的数据// 将客户数据读取到共享内存的对应位置// 这个位置是share_mem + idx*BUFFER_SIZEif( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) ){memset( share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE );ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );//这里默认一次就可以读取完整if( ret < 0 ){if( errno != EAGAIN ){stop_child = true;}}else if( ret == 0 )//连接关闭也会触发EPOLLIN事件{stop_child = true;}//正确接收后,将客户下标发给主进程else{send( pipefd, ( char* )&idx, sizeof( idx ), 0 );}}// 子进程收到主进程的群发消息else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) ){int client = 0;//users下标ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );if( ret < 0 ){if( errno != EAGAIN ){stop_child = true;}}else if( ret == 0 ){stop_child = true;}else{send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 );}}else{continue;}}}close( connfd );//连接结束时关闭连接套接字close( pipefd );//连接结束时关闭父子信道close( child_epollfd );//关闭子进程的内核事件表child_epollfdreturn 0;
}int main( int argc, char* argv[] )
{if( argc <= 1 ){printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );return 1;}// const char* ip = argv[1];int port = atoi( argv[1] );int ret = 0;struct sockaddr_in address;bzero( &address, sizeof( address ) );address.sin_family = AF_INET;// inet_pton( AF_INET, ip, &address.sin_addr );address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons( port );listenfd = socket( PF_INET, SOCK_STREAM, 0 );assert( listenfd >= 0 );ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );assert( ret != -1 );ret = listen( listenfd, 5 );assert( ret != -1 );user_count = 0;users = new client_data [ USER_LIMIT+1 ];//创建一大块空间,用于存储后续到来的用户数据sub_process = new int [ PROCESS_LIMIT ];//创建一大块空间,用建立子进程pid与客户连接之间的映射;通过子进程pid索引到客户连接for( int i = 0; i < PROCESS_LIMIT; ++i ){sub_process[i] = -1;}epoll_event events[ MAX_EVENT_NUMBER ];//创建用于主线程捕捞事件的epoll_event结构体数组epollfd = epoll_create( 5 );assert( epollfd != -1 );addfd( epollfd, listenfd );//主线程将listenfd上的读事件加入关心事件// 创建全双工管道的信号通知管道,主线程收到特定信号时,通过该管道通知主循环,交给主循环去处理ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );assert( ret != -1 );setnonblocking( sig_pipefd[1] );//将信号通知管道写端设置为非阻塞addfd( epollfd, sig_pipefd[0] );//主线程将通知管道读端加入关心事件// 注意此时socketpair是全双工管道,但是我们却是把其当做半双工的匿名管道来看待// 之所以不直接创建半双工的匿名管道,是因为把套接字加入epollfd的关心事件,更加合适// SIGCHLD 信号是当一个子进程终止、停止或恢复执行时,内核会向其父进程发送的信号。addsig( SIGCHLD, sig_handler );// 主线程将SIGCHLD信号,通过sig_pipefd管道统一事件源addsig( SIGTERM, sig_handler );// 主线程将SIGTERM信号,通过sig_pipefd管道统一事件源addsig( SIGINT, sig_handler );// 主线程将SIGINT信号,通过sig_pipefd管道统一事件源addsig( SIGPIPE, SIG_IGN );// 主线程忽略SIGPIPE信号bool stop_server = false;bool terminate = false;// 共享内存// 大小为 USER_LIMIT * BUFFER_SIZE// 每一个客户连接的数据,按照客户所属在users下标,放在共享内存指定位置shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 );assert( shmfd != -1 );ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE ); assert( ret != -1 );share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );assert( share_mem != MAP_FAILED );close( shmfd );while( !stop_server ){// 主线程只关心listenfd套接字和信号通知管道读端上的读事件以及父子信道上来来自于任意子进程上的读事件int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number < 0 ) && ( errno != EINTR ) ){printf( "epoll failure\n" );break;}for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;// 处理新连接,这里假定连接不会密集到来if( sockfd == listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );if ( connfd < 0 ){printf( "errno is: %d\n", errno );continue;}// 如果连接超限,直接关闭这个连接;客户端从而三次握手,关闭连接if( user_count >= USER_LIMIT ){const char* info = "too many users\n";printf( "%s", info );send( connfd, info, strlen( info ), 0 );close( connfd );continue;}// 存储这个用户信息至users数组相关位置users[user_count].address = client_address;users[user_count].connfd = connfd;// 每个子进程都创建一个用于同父进程通信的管道ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );assert( ret != -1 );pid_t pid = fork();if( pid < 0 ){close( connfd );continue;}else if( pid == 0 ){close( epollfd );// 子进程关闭父进程的epollfd,不关心父进程的所已经关心的所有事件close( listenfd );// 子进程关闭listenfd套接字close( users[user_count].pipefd[0] );//子进程用users[user_count].pipefd[1]作为与主进程交流的通道close( sig_pipefd[0] );//子进程会收到之前的那些信号,这些信号会通过信号传递管道发给子进程,但是发现管道已经关闭,忽略该信号close( sig_pipefd[1] );//父进程会收到这些信号,然后对信号做出决策,这是非常合理的run_child( user_count, users, share_mem );//子进程关联这个连接munmap( (void*)share_mem,  USER_LIMIT * BUFFER_SIZE );exit( 0 );}else{close( connfd );//主进程关闭连接套接字,这意味着这一连接完全交给子进程去处理close( users[user_count].pipefd[1] );//主进程用users[user_count].pipefd[0]作为与子进程交流的通道 addfd( epollfd, users[user_count].pipefd[0] );//将父子通信管道读端加入主线程关心事件users[user_count].pid = pid;//将子进程pid信息补全sub_process[pid] = user_count;//建立子进程pid与客户在users下标之间的映射user_count++;}}//主进程接收到信号,这里假定信号不会密集到来else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) ){int sig;char signals[1024];ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );if( ret == -1 ){continue;}else if( ret == 0 ){continue;}else{for( int i = 0; i < ret; ++i ){switch( signals[i] ){case SIGCHLD://收到子进程退出信号,waitpid子进程,处理users用户信息数组,处理pid与下标映射数组sub_process{pid_t pid;int stat;while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ){int del_user = sub_process[pid];//获得要删除的users下标sub_process[pid] = -1;//解除该子进程pid与该用户users中的下标的映射if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) ){printf( "the deleted user was not change\n" );continue;}// 子进程退出后主进程的epollfd解除主进程用于同该子进程通信的users[del_user].pipefd[0]的关心epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );close( users[del_user].pipefd[0] );//关闭该对父子通信套接字users[del_user] = users[--user_count];//该操作真巧,删除该用户sub_process[users[del_user].pid] = del_user;//重建映射printf( "child %d exit, now we have %d users\n", del_user, user_count ); }if( terminate && user_count == 0 )//子进程全部退出{stop_server = true;}break;}case SIGTERM://主进程收到退出信号case SIGINT:{printf( "kill all the clild now\n" );//addsig( SIGTERM, SIG_IGN );//addsig( SIGINT, SIG_IGN );// 如果此时没有子进程,则直接退出即可if( user_count == 0 ){stop_server = true;break;}//向所有子进程发送终止信号for( int i = 0; i < user_count; ++i ){int pid = users[i].pid;kill( pid, SIGTERM );//不与子进程通信,直接通过系统调用杀死子进程}terminate = true;//此处是用于waitpid子进程,表示主进程正在进行退出环节,一旦子进程全部回收,则主进程结束主循环break;}default:{break;}}}}}// 处理来自子进程上的信息else if( events[i].events & EPOLLIN ){int child = 0;//存储来自子进程发来的客户下标ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 );printf( "read data from child accross pipe\n" );if( ret == -1 ){continue;}else if( ret == 0 ){continue;}else{for( int j = 0; j < user_count; ++j ){// socket是父子信道users[user_count].pipefd[0],也就是说子进程给sockfd发了数据// 具体这个sockfd,是众多子进程中刚给主进程发送数据的那个子进程对应的主进程父子信道users[user_count].pipefd[0]if( users[j].pipefd[0] != sockfd )//只要不是刚接收的数据的users[user_count].pipefd[0],就发送要发送数据的位置{printf( "send data to child accross pipe\n" );// 本意是将要发送的数据在共享内存中的位置发送给一个个除了最近给主进程发送数据的一个个子进程// 转化为发送这个用户的下标即可// 那么要发送的数据在共享内存中的位置是:share_mem + child*BUFFER_SIZEsend( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 );}}}}}}del_resource();return 0;
}

核心处理逻辑

  • 主进程监听listenfd套接字,每收到一个客户连接就fork一个子进程去处理
  • 单独一个子进程负责一个客户连接
  • 子进程收到客户数据,保存客户数据至共享内存,通知主进程
  • 主进程通知其他进程把数据发送给各自对应的客户

主进程信号处理逻辑

  • 主进程采用统一事件源的方式处理信号
  • 信号产生后,通过全双工套接字管道sig_pipefd[1]发送给主进程读关心的套接字sig_pipefd[0]
  • 主进程会在主循环上处理这些信号

子进程数据处理逻辑

整体数据流动

项目中共享内存的作用

        如果大家不是很了解共享内存,可以去看多进程通信之共享内存-CSDN博客。下列是本项目关于共享内存的流程说明。

  1. 主进程创建共享内存,并关联共享内存,从而获得了一份共享内存映射区
  2. 所有的其它进程都是由主进程fork而来(fork基本介绍-CSDN博客),于是所有的进程都是主进程的子进程,从而子进程都各自继承拥有一份各自的共享内存映射区,但是主进程所创建的物理共享内存只有一份。各个子进程在自己的共享内存映射区修改会被映射到物理共享内存中,从而实现进程间数据交流。
  3. 每个子进程负责一个客户连接,每个客户都有一个在客户组users中的下标,本项目同一时刻最多允许连接数为5,共享内存被分为五份,客户下标对应共享内存中存储该用户数据的位置。
  4. 当客户数据到达被负责该客户的子进程时,子进程把数据放在共享内存指定位置,然后把这个位置通知给主进程。
  5. 主进程收到这个通知,将这个数据的位置发送给其余子进程。
  6. 其余子进程中的每个子进程收到该位置,把该位置的数据发送给自己负责的客户。

各自的epollfd

        主进程的epollfd:

  • 关心listenfd监听套接字
  • 关心sig_pipefd[0]上的信号通知
  • 关心users[user_count].pipefd[0]上来自子进程的新到来的客户数据在共享内存中位置的信息

        子进程的child_epollfd:

  • 关心connfd连接套接字
  • 关心users[user_count].pipefd[1]上来自主进程要发送的数据的位置信息

为什么忽略SIGPIPE信号

        在往一个读端关闭的管道或者套接字里写入数据时,会触发这个信号,该信号默认处理动作是终止进程。这里有几个关键点:

  • 主进程通过socketpair()创建里两个相互通信的全双工套接字sig_pipefd[0]和sig_pipefd[1]
  • 子进程从主进程中继承了这两个套接字并关闭了它
  • 可是子进程还继承了相关信号的默认处理方式
  • 一旦收到相关信号被触发,例如ctrl+c发送的SIGINT终止信号,主进程和所有的子进程都会收到这个信号
  • 按照该信号的处理逻辑,会向sig_pipefd[1]套接字写入数据,对于主进程来讲,epoll_wait可以得到这个数据到来的通知;可是对于子进程来讲sig_pipefd[0]和sig_pipefd[1]早已被关闭。
  • 也就是说此时子进程会遇到这样一件事:往一个已经关闭的套接字中写入数据,会触发SIGPIPE信号,忽略即可

        除了上面这个原因,还有一个典型原因:

  • 服务器收到客户数据
  • 开始处理客户数据
  • 客户连接在服务器处理客户数据时退出
  • 服务器将结果发送给对端已经关闭的套接字
  • 忽略该信号后,我们可以通过send或者recv反馈给我们的信息,来判断对方是否已经关闭连接。

stop_server和terminate在干什么

        关于stop_server的作用,就是跟常规用于统一事件源的作用是一样的。大家可以看这篇稍短一些的代码感受一下:统一事件源-CSDN博客。

        terminate用于主进程收到终止信号后,当最后一个子进程被回收后,结束主循环:

  • 主进程收到终止信号,此时主进程不能直接退出,它需要杀死并回收每一个子进程
  • 主进程通过kill系统调用向每个子进程发出终止信号,然后设置terminate为true
  • 子进程退出触发SIGCHLD信号,主循环进入该信号的处理逻辑
  • 当子进程全部退出后,if( terminate && user_count == 0 )成立,设置stop_server为true,从此处退出主循环

其他问题大家可以私信我

相关文章:

  • 公司网络变差的解决方法(固定IP地址冲突)
  • 关于界面存在AB测试后UI刷新空白的问题
  • Redis:set类型和zset类型
  • 汽车制造通信革新:网关模块让EtherCAT成功对接CCLINK
  • gitlab相关操作
  • Redis GEO 底层实现(结合源码分析)
  • Redis的主从复制底层实现
  • 【编译工具】(调试)Chrome DevTools + Postman:调试组合如何让我的开发效率提升400%?
  • Guava常用工具类使用教程
  • 《Redis》持久化
  • Oracle线上故障问题解决
  • SpringMVC异步处理Servlet
  • Flask应用中处理异步事件(后台线程+事件循环)的方法
  • 达梦数据库 单机部署dmhs同步复制(DM8—>DM8)
  • 高频面试之6Hive
  • Redis: List类型
  • Redis免费客户端工具推荐
  • Github月度新锐热门工具 - 202506
  • [HarmonyOSNext鸿蒙开发]11.ArkUI框架:Swiper、Grid布局与代码复用实战指南
  • Spring Boot 集成 Redis 实战教程
  • 深圳网站建设公司的英文名是/百度站长工具seo综合查询
  • 腾讯空间个人认证 企业认证 网站认证哪种功能用途最齐全??/浙江seo外包
  • 网络营销网站 功能/网建公司
  • 国家商标网查询入口/湖南企业竞价优化首选
  • 做家政的在哪些网站推广/b2b自动发布信息软件
  • 网站开发 英语词汇/永久开源的免费建站系统