PostgreSQL 处理链接请求
PostgreSQL的后期版本(16,17)在监听客户端链接以及客户端退出部分的监听处理部分由select 函数修改成epoll函数,具体原因不在此介绍
具体如下:
PG14版本还使用的select
static int
ServerLoop(void)
{fd_set readmask;int nSockets;time_t last_lockfile_recheck_time,last_touch_time;last_lockfile_recheck_time = last_touch_time = time(NULL);nSockets = initMasks(&readmask);for (;;){fd_set rmask;int selres;time_t now;/** Wait for a connection request to arrive.** We block all signals except while sleeping. That makes it safe for* signal handlers, which again block all signals while executing, to* do nontrivial work.** If we are in PM_WAIT_DEAD_END state, then we don't want to accept* any new connections, so we don't call select(), and just sleep.*/memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set));if (pmState == PM_WAIT_DEAD_END){PG_SETMASK(&UnBlockSig);pg_usleep(100000L); /* 100 msec seems reasonable */selres = 0;PG_SETMASK(&BlockSig);}else{/* must set timeout each time; some OSes change it! */struct timeval timeout;/* Needs to run with blocked signals! */DetermineSleepTime(&timeout);PG_SETMASK(&UnBlockSig);//监听 ipv4,ipv6,unix_socket和MyLatchselres = select(nSockets, &rmask, NULL, NULL, &timeout);PG_SETMASK(&BlockSig);}/* Now check the select() result */if (selres < 0){if (errno != EINTR && errno != EWOULDBLOCK){ereport(LOG,(errcode_for_socket_access(),errmsg("select() failed in postmaster: %m")));return STATUS_ERROR;}}/** New connection pending on any of our sockets? If so, fork a child* process to deal with it.*/if (selres > 0){int i;for (i = 0; i < MAXLISTEN; i++){if (ListenSocket[i] == PGINVALID_SOCKET)break;if (FD_ISSET(ListenSocket[i], &rmask)){Port *port;//如果是发生在ListenSocket上的事件,就发起新的链接port = ConnCreate(ListenSocket[i]);
PG17
static int
ServerLoop(void)
{time_t last_lockfile_recheck_time,last_touch_time;WaitEvent events[MAXLISTEN];int nevents;//添加各种监听对象到epoll对象中ConfigurePostmasterWaitSet(true);last_lockfile_recheck_time = last_touch_time = time(NULL);for (;;){time_t now;nevents = WaitEventSetWait(pm_wait_set,DetermineSleepTime(),events,lengthof(events),0 /* postmaster posts no wait_events */ );/** Latch set by signal handler, or new connection pending on any of* our sockets? If the latter, fork a child process to deal with it.*/for (int i = 0; i < nevents; i++){if (events[i].events & WL_LATCH_SET)ResetLatch(MyLatch);/** The following requests are handled unconditionally, even if we* didn't see WL_LATCH_SET. This gives high priority to shutdown* and reload requests where the latch happens to appear later in* events[] or will be reported by a later call to* WaitEventSetWait().*/if (pending_pm_shutdown_request)process_pm_shutdown_request();if (pending_pm_reload_request)process_pm_reload_request();if (pending_pm_child_exit)process_pm_child_exit();if (pending_pm_pmsignal)process_pm_pmsignal();if (events[i].events & WL_SOCKET_ACCEPT){ClientSocket s;//发起链接if (AcceptConnection(events[i].fd, &s) == STATUS_OK)BackendStartup(&s);/* We no longer need the open socket in this process */if (s.sock != PGINVALID_SOCKET){if (closesocket(s.sock) != 0)elog(LOG, "could not close client socket: %m");}}}
static void
ConfigurePostmasterWaitSet(bool accept_connections)
{if (pm_wait_set)FreeWaitEventSet(pm_wait_set);pm_wait_set = NULL;//创建监听集合pm_wait_set = CreateWaitEventSet(NULL,accept_connections ? (1 + NumListenSockets) : 1);//添加Latch到集合中,主要是为了处理进程退出,更新参数文件等使用,早期版本使用的self pipe实现//后期版本使用了signalfd实现,具体看InitializeLatchSupport(latch.c)AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,NULL);//添加ipv4,ipv6,unix_socket到集合中,所以默认监听集合中有4个对象if (accept_connections){for (int i = 0; i < NumListenSockets; i++)AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i],NULL, NULL);}
}
链接过程:
if (AcceptConnection(events[i].fd, &s) == STATUS_OK)BackendStartup(&s);static int
BackendStartup(ClientSocket *client_sock)
{Backend *bn; /* for backend cleanup */pid_t pid;BackendStartupData startup_data;/** Create backend data structure. Better before the fork() so we can* handle failure cleanly.*/bn = (Backend *) palloc_extended(sizeof(Backend), MCXT_ALLOC_NO_OOM);if (!bn){ereport(LOG,(errcode(ERRCODE_OUT_OF_MEMORY),errmsg("out of memory")));return STATUS_ERROR;}/** Compute the cancel key that will be assigned to this backend. The* backend will have its own copy in the forked-off process' value of* MyCancelKey, so that it can transmit the key to the frontend.*/if (!RandomCancelKey(&MyCancelKey)){pfree(bn);ereport(LOG,(errcode(ERRCODE_INTERNAL_ERROR),errmsg("could not generate random cancel key")));return STATUS_ERROR;}/* Pass down canAcceptConnections state */startup_data.canAcceptConnections = canAcceptConnections(BACKEND_TYPE_NORMAL);bn->dead_end = (startup_data.canAcceptConnections != CAC_OK);bn->cancel_key = MyCancelKey;/** Unless it's a dead_end child, assign it a child slot number*/if (!bn->dead_end)bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();elsebn->child_slot = 0;/* Hasn't asked to be notified about any bgworkers yet */bn->bgworker_notify = false;//fork新的用户后台进程pid = postmaster_child_launch(B_BACKEND,(char *) &startup_data, sizeof(startup_data),client_sock);if (pid < 0){/* in parent, fork failed */int save_errno = errno;if (!bn->dead_end)(void) ReleasePostmasterChildSlot(bn->child_slot);pfree(bn);errno = save_errno;ereport(LOG,(errmsg("could not fork new process for connection: %m")));report_fork_failure_to_client(client_sock, save_errno);return STATUS_ERROR;}/* in parent, successful fork */ereport(DEBUG2,(errmsg_internal("forked new backend, pid=%d socket=%d",(int) pid, (int) client_sock->sock)));/** Everything's been successful, it's safe to add this backend to our list* of backends.*/bn->pid = pid;bn->bkend_type = BACKEND_TYPE_NORMAL; /* Can change later to WALSND */dlist_push_head(&BackendList, &bn->elem);#ifdef EXEC_BACKENDif (!bn->dead_end)ShmemBackendArrayAdd(bn);
#endifreturn STATUS_OK;
}