暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

PostgreSQL数据库中Latch的实现

380

Latch是PostgreSQL中的一种多进程同步机制,提供休眠唤醒功能,比如,当前进程休眠指定的时间后唤醒,或者当前进程等待指定的信号后唤醒。

Latch的实现依赖底层操作系统,这里仅分析Linux操作系统的实现。

Latch类型

有两种类型的Latch本地Latch共享Latch

  • 其中本地LatchInitLatch初始化,只能被同一进程设置,可用于等待信号到达,通过在信号处理程序中调用SetLatch
  • 共享Latch位于共享内存,由postmaster进程在启动时通过InitSharedLatch初始化,在共享Latch可以被等待之前,必须使用OwnLatch将其与进程关联,只有拥有Latch的进程才能等待它,其可以被多个进程设置。

在使用上,共享Latch是其他进程在等待,如果你想唤醒其他进程,则需要获取其他进程的Latch对象,然后调用SetLatch(其他进程的Latch对象),而本地Latch是自己进程使用SetLatch(本地Latch对象)唤醒自己。

Latch的定义:

typedef struct Latch { sig_atomic_t is_set; // 原子操作,是否被设置 sig_atomic_t maybe_sleeping; // 可能有进程正在休眠 bool is_shared; // 是否是共享的 int owner_pid; // 当前进程pid } Latch;

本地Latch共享Latch的初始化函数如下:

void InitLatch(Latch *latch) { latch->is_set = false; latch->maybe_sleeping = false; latch->owner_pid = MyProcPid; // 当前进程pid latch->is_shared = false; // 本地Latch,设置为false } void InitSharedLatch(Latch *latch) { latch->is_set = false; latch->maybe_sleeping = false; latch->owner_pid = 0; // 共享Latch,owner_pid为0 latch->is_shared = true; // 共享Latch,设置为true }

共享Latch与进程关联,OwnLatch实现如下:

void OwnLatch(Latch *latch) { Assert(latch->is_shared); // 必须是共享Latch if (latch->owner_pid != 0) // 已经被关联 elog(ERROR, "latch already owned"); latch->owner_pid = MyProcPid; // 关联当前进程 }

Latch操作

有三种基本的Latch操作:

  • SetLatch:设置Latch
  • ResetLatch:重置Latch,使其可再次设置。
  • WaitLatch:等待Latch,直到被设置或者超时。

其中WaitLatch包括了超时机制,以及提供了postmaster子进程立即唤醒当postmaster进程终止时机制。

等待事件的正确模式:

for (;;) { ResetLatch(); if (work to do) // 有工作要处理 Do Stuff(); WaitLatch(); }

另一种方式是:

for (;;) { if (work to do) Do Stuff(); // in particular, exit loop if some condition satisfied WaitLatch(); ResetLatch(); }

SetLatch

设置Latch,即将latch->is_set置为true,唤醒等待该Latch的进程,分两种情况处理,一种是当前进程,唤醒方式可以是发送信号的方式或者通过自管道的方式;另一种是其他进程,可通过发送信号的方式进行唤醒(其他进程在创建之初会注册SIGURG信号处理函数pqsignal(SIGURG, latch_sigurg_handler)latch_sigurg_handler函数通过向自管道写端写入一个字节,触发本地epoll事件)。具体实现上,是通过自管道的形式,通过向自管道写入一个字节,然后WaitLatch通过epoll监听自管道读端,当自管道有数据可读时,触发epoll事件,从而唤醒处在WaitLatch等待状态的进程。其实现如下:

void SetLatch(Latch *latch) { pg_memory_barrier(); // 内存屏障,确保之前的所有内存操作已完成 if (latch->is_set) // Latch已经设置,直接返回 return; latch->is_set = true; // 设置is_set为true pg_memory_barrier(); // 内存屏障,确保该修改立即可见 if (!latch->maybe_sleeping) return; pid_t owner_pid = latch->owner_pid; if (owner_pid == 0) return; else if (owner_pid == MyProcPid) // 如果是当前进程 { #if defined(WAIT_USE_SELF_PIPE) if (waiting) sendSelfPipeByte(); // 如果当前进程在waiting状态则向当前进程发送一个字节用于唤醒WaitLatch #else if (waiting) // 如果当前进程在waiting状态则向当前进程发送SIGURG信号 kill(MyProcPid, SIGURG); #endif } else kill(owner_pid, SIGURG); // 向owner_pid进程发送SIGURG信号 }

WaitLatch

WaitLatch作用是阻塞当前进程,直到Latch被设置(is_set = true)或者超时,或者postmaster进程终止。其参数中,latch是要等待的Latch对象,wakeEvents是等待的事件类型掩码,timeout是超时时间,wait_event_info是等待事件信息。具体实现时,就是调用epoll来等待之前注册过的事件,其实现如下:

int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info) { WaitEvent event; if (!(wakeEvents & WL_LATCH_SET)) // 如果等待事件中没有指定WL_LATCH_SET,则latch置为NULL,无需监听Latch latch = NULL; ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch); LatchWaitSet->exit_on_postmaster_death = ((wakeEvents & WL_EXIT_ON_PM_DEATH) != 0); if (WaitEventSetWait(LatchWaitSet, (wakeEvents & WL_TIMEOUT) ? timeout : -1, &event, 1, wait_event_info) == 0) return WL_TIMEOUT; // 超时发生 else return event.events; // 事件触发 } // 等待事件发生,或者超时。至多nevents个被触发的事件返回。如果timeout为-1,则一直阻塞直到事件发生 // 如果timeout为0,则检查socket是否就绪,不阻塞;如果timeout大于0,则阻塞timeout毫秒。 int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info) { int returned_events = 0; instr_time start_time; instr_time cur_time; long cur_timeout = -1; if (timeout >= 0) // 如果timeout大于0,则记录当前时间以计算剩余时间 { INSTR_TIME_SET_CURRENT(start_time); cur_timeout = timeout; } pgstat_report_wait_start(wait_event_info); waiting = true; // 标记当前进程正在等待 while (returned_events == 0) { if (set->latch && !set->latch->is_set) { set->latch->maybe_sleeping = true; /* about to sleep on a latch */ pg_memory_barrier(); /* and recheck */ } if (set->latch && set->latch->is_set) // 如果latch已经设置,则返回WL_LATCH_SET事件 { occurred_events->fd = PGINVALID_SOCKET; occurred_events->pos = set->latch_pos; occurred_events->user_data = set->events[set->latch_pos].user_data; occurred_events->events = WL_LATCH_SET; occurred_events++; returned_events++; set->latch->maybe_sleeping = false; /* could have been set above */ break; // 跳出等待,直接返回 } // 等待事件发生,或者超时。 int rc = WaitEventSetWaitBlock(set, cur_timeout, occurred_events, nevents); if (set->latch) { Assert(set->latch->maybe_sleeping); set->latch->maybe_sleeping = false; } if (rc == -1) break; // 超时发生,跳出等待 else returned_events = rc; if (returned_events == 0 && timeout >= 0) // 更新剩余时间,超时用 { INSTR_TIME_SET_CURRENT(cur_time); INSTR_TIME_SUBTRACT(cur_time, start_time); cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time); if (cur_timeout <= 0) break; } } waiting = false; // 标记当前进程不再处于等待状态 pgstat_report_wait_end(); return returned_events; // 返回触发的事件数量 } // 使用epoll实现等待 static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, WaitEvent *occurred_events, int nevents) { int returned_events = 0; WaitEvent *cur_event; struct epoll_event *cur_epoll_event; // 等待事件发生,或者超时。 int rc = epoll_wait(set->epoll_fd, set->epoll_ret_events, Min(nevents, set->nevents_space), cur_timeout); if (rc < 0) // 如果返回值小于0,则检查错误码,如果是EINTR则继续等待,否则报错 { if (errno != EINTR) { waiting = false; ereport(ERROR, (errcode_for_socket_access(), errmsg("%s() failed: %m","epoll_wait"))); } return 0; } else if (rc == 0) return -1; // 超时发生 // 至少一个事件发生,遍历epoll返回的事件,处理事件 for (cur_epoll_event = set->epoll_ret_events; cur_epoll_event < (set->epoll_ret_events + rc) && returned_events < nevents; cur_epoll_event++) { /* epoll's data pointer is set to the associated WaitEvent */ cur_event = (WaitEvent *) cur_epoll_event->data.ptr; occurred_events->pos = cur_event->pos; occurred_events->user_data = cur_event->user_data; occurred_events->events = 0; if (cur_event->events == WL_LATCH_SET && cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)) { // 处理Latch事件 drain(); // 清空signalfd或者清空自管道中的数据,避免重复触发事件 if (set->latch && set->latch->is_set) // 如果latch已经设置,则返回WL_LATCH_SET事件 { occurred_events->fd = PGINVALID_SOCKET; occurred_events->events = WL_LATCH_SET; occurred_events++; returned_events++; } } else if (cur_event->events == WL_POSTMASTER_DEATH && cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)) { // 处理postmaster进程终止事件,二次确认postmaster进程是否存活,如果存活则继续等待,否则返回WL_POSTMASTER_DEATH事件 if (!PostmasterIsAliveInternal()) { if (set->exit_on_postmaster_death) // 如果需要退出(WL_EXIT_ON_PM_DEATH),则退出进程,否则返回WL_POSTMASTER_DEATH事件 proc_exit(1); occurred_events->fd = PGINVALID_SOCKET; occurred_events->events = WL_POSTMASTER_DEATH; occurred_events++; returned_events++; } } else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) { // 处理socket事件 if ((cur_event->events & WL_SOCKET_READABLE) && (cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))) { /* data available in socket, or EOF */ occurred_events->events |= WL_SOCKET_READABLE; } if ((cur_event->events & WL_SOCKET_WRITEABLE) && (cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP))) { /* writable, or EOF */ occurred_events->events |= WL_SOCKET_WRITEABLE; } if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; occurred_events++; returned_events++; } } } return returned_events; }

其中需要等待事件的类型掩码如下,需要注意WL_POSTMASTER_DEATH以及WL_EXIT_ON_PM_DEATH的区别

/* Bitmasks for events that may wake-up WaitLatch(), WaitLatchOrSocket(), or WaitEventSetWait(). */ #define WL_LATCH_SET (1 << 0) // latch设置 #define WL_SOCKET_READABLE (1 << 1) // socket读 #define WL_SOCKET_WRITEABLE (1 << 2) // socket写 #define WL_TIMEOUT (1 << 3) // 超时 #define WL_POSTMASTER_DEATH (1 << 4) // postmaster进程终止时唤醒子进程,子进程需要进行退出前操作 #define WL_EXIT_ON_PM_DEATH (1 << 5) // postmaster终止时子进程直接退出 #define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE // socket连接成功 #define WL_SOCKET_MASK (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE | WL_SOCKET_CONNECTED)

ResetLatch

Latch中的标识位重置为false

void ResetLatch(Latch *latch) { latch->is_set = false; // 重置latch为false pg_memory_barrier(); // 内存屏障,确保latch->is_set的写操作在后续的读操作之前完成 }

WaitEventSet

WaitEventSet是一个事件集合,用于等待多个事件,其定义如下:

struct WaitEventSet { int nevents; /* 注册的事件数量 */ int nevents_space; /* 最多支持的事件数量 */ WaitEvent *events; // 存储等待事件的数组 Latch *latch; // 如果wait event中指定了WL_LATCH_SET,则latch指向该latch,latch_pos是events数组中的偏移量 int latch_pos; bool exit_on_postmaster_death; // 如果postmaster终止则立即退出,WL_EXIT_ON_PM_DEATH会转为为WL_POSTMASTER_DEATH事件,如果该标志位为true,则检测到postmaster终止时立即退出,否则返回 #if defined(WAIT_USE_EPOLL) int epoll_fd; // epoll文件描述符 /* epoll_wait returns events in a user provided arrays, allocate once */ struct epoll_event *epoll_ret_events; };

CreateWaitEventSet

创建一个等待事件集合,其中最重要的是创建epoll,其实现如下

WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents) { Size sz = 0; /* * Use MAXALIGN size/alignment to guarantee that later uses of memory are * aligned correctly. E.g. epoll_event might need 8 byte alignment on some * platforms, but earlier allocations like WaitEventSet and WaitEvent * might not be sized to guarantee that when purely using sizeof(). */ sz += MAXALIGN(sizeof(WaitEventSet)); sz += MAXALIGN(sizeof(WaitEvent) * nevents); sz += MAXALIGN(sizeof(struct epoll_event) * nevents); char *data = (char *) MemoryContextAllocZero(context, sz); WaitEventSet *set = (WaitEventSet *) data; data += MAXALIGN(sizeof(WaitEventSet)); set->events = (WaitEvent *) data; data += MAXALIGN(sizeof(WaitEvent) * nevents); set->epoll_ret_events = (struct epoll_event *) data; data += MAXALIGN(sizeof(struct epoll_event) * nevents); set->latch = NULL; set->nevents_space = nevents; set->exit_on_postmaster_death = false; if (!AcquireExternalFD()) { /* treat this as though epoll_create1 itself returned EMFILE */ elog(ERROR, "epoll_create1 failed: %m"); } set->epoll_fd = epoll_create1(EPOLL_CLOEXEC); // 创建epoll if (set->epoll_fd < 0) { ReleaseExternalFD(); elog(ERROR, "epoll_create1 failed: %m"); } return set; }

AddWaitEventToSet

向等待事件集合中添加一个等待事件,具体实现就是注册epoll事件,其实现如下:

int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data) { WaitEvent *event; if (events == WL_EXIT_ON_PM_DEATH) // 如果是WL_EXIT_ON_PM_DEATH,则转为WL_POSTMASTER_DEATH事件,并设置exit_on_postmaster_death为true { events = WL_POSTMASTER_DEATH; set->exit_on_postmaster_death = true; } if (latch) { if (latch->owner_pid != MyProcPid) elog(ERROR, "cannot wait on a latch owned by another process"); if (set->latch) elog(ERROR, "cannot wait on more than one latch"); if ((events & WL_LATCH_SET) != WL_LATCH_SET) elog(ERROR, "latch events only support being set"); } else { if (events & WL_LATCH_SET) elog(ERROR, "cannot wait on latch without a specified latch"); } /* waiting for socket readiness without a socket indicates a bug */ if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK)) elog(ERROR, "cannot wait on socket event without a socket"); event = &set->events[set->nevents]; // 当前事件在事件数组中的地址 event->pos = set->nevents++; // 记录当前事件在事件集合中的位置 event->fd = fd; // 会被注册到epoll中 event->events = events; event->user_data = user_data; if (events == WL_LATCH_SET) // 如果添加的是latch事件 { set->latch = latch; set->latch_pos = event->pos; #if defined(WAIT_USE_SELF_PIPE) event->fd = selfpipe_readfd; // latch事件fd使用自管道的读端,这里会被注册到epoll事件中监听,当latch被设置时,会向管道写端写入一个字节,从而epoll可以监听到该事件,从而触发事件 #elif defined(WAIT_USE_SIGNALFD) event->fd = signal_fd; // latch事件fd使用信号文件描述符 #else event->fd = PGINVALID_SOCKET; #ifdef WAIT_USE_EPOLL return event->pos; #endif #endif } else if (events == WL_POSTMASTER_DEATH) // 如果添加的是postmaster终止事件 { event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]; } WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD); // 注册事件到epoll中 return event->pos; }

Latch实现

Latch的实现依赖操作系统底层机制,我们只分析Linux操作系统的清理,在Linux操作系统中,其实现方式之一是self-pipe,即通过一个自管道的方式,实现进程之间的通信,实现原理是:每个进程都创建一个管道pipe,这个管道写端用于触发事件,读端通过select/poll/epoll监听。如果需要触发事件,则向管道写端写入一个字节,通过select/poll/epoll监听到该事件,则触发事件。

static int selfpipe_readfd = -1; // 自管道读端 static int selfpipe_writefd = -1; // 自管道写端 /* Process owning the self-pipe --- needed for checking purposes */ static int selfpipe_owner_pid = 0; /* Private function prototypes */ static void latch_sigurg_handler(SIGNAL_ARGS); // SIGURG信号处理函数 static void sendSelfPipeByte(void); // 向自管道写端写入一个字节

初始化Latch支持,创建一个自管道,并设置读端为非阻塞,写端为非阻塞,并设置读端为FD_CLOEXEC,表示在fork时,不会继承该文件描述符,同时设置写端为FD_CLOEXEC,表示在fork时,不会继承该文件描述符。

void InitializeLatchSupport(void) { int pipefd[2]; // 创建一个自管道 if (IsUnderPostmaster) { // 继承自postmaster的自管道,关闭自管道 if (selfpipe_owner_pid != 0) { /* Release postmaster's pipe FDs; ignore any error */ (void) close(selfpipe_readfd); (void) close(selfpipe_writefd); /* Clean up, just for safety's sake; we'll set these below */ selfpipe_readfd = selfpipe_writefd = -1; selfpipe_owner_pid = 0; /* Keep fd.c's accounting straight */ ReleaseExternalFD(); ReleaseExternalFD(); } } if (pipe(pipefd) < 0) // 创建一个自管道 elog(FATAL, "pipe() failed: %m"); if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) == -1) // 设置读端为非阻塞 elog(FATAL, "fcntl(F_SETFL) failed on read-end of self-pipe: %m"); if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) == -1) // 设置写端为非阻塞 elog(FATAL, "fcntl(F_SETFL) failed on write-end of self-pipe: %m"); if (fcntl(pipefd[0], F_SETFD, FD_CLOEXEC) == -1) elog(FATAL, "fcntl(F_SETFD) failed on read-end of self-pipe: %m"); if (fcntl(pipefd[1], F_SETFD, FD_CLOEXEC) == -1) elog(FATAL, "fcntl(F_SETFD) failed on write-end of self-pipe: %m"); selfpipe_readfd = pipefd[0]; // 自管道读端 selfpipe_writefd = pipefd[1]; // 自管道写端 selfpipe_owner_pid = MyProcPid; // 自管道拥有者,当前进程 /* Tell fd.c about these two long-lived FDs */ ReserveExternalFD(); ReserveExternalFD(); pqsignal(SIGURG, latch_sigurg_handler); // 注册SIGURG信号,处理信号,如果收到SIGURG信号,并且当前进程正在等待,则调用sendSelfPipeByte()向当前进程发送一个字节,用于唤醒WaitLatch() } // SIGURG信号处理函数 static void latch_sigurg_handler(SIGNAL_ARGS) { int save_errno = errno; if (waiting) sendSelfPipeByte(); // 向自管道写端写入一个字节,用于唤醒WaitLatch() errno = save_errno; }

自管道的读端通过poll/epoll监听,下面的代码可以看到自管道的读端被注册到epoll中。

int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, void *user_data) { WaitEvent *event; // ... event = &set->events[set->nevents]; event->pos = set->nevents++; event->fd = fd; event->events = events; event->user_data = user_data; if (events == WL_LATCH_SET) { set->latch = latch; set->latch_pos = event->pos; #if defined(WAIT_USE_SELF_PIPE) event->fd = selfpipe_readfd; // 自管道读端 #elif defined(WAIT_USE_SIGNALFD) event->fd = signal_fd; #else event->fd = PGINVALID_SOCKET; #ifdef WAIT_USE_EPOLL return event->pos; #endif #endif } else if (events == WL_POSTMASTER_DEATH) { #ifndef WIN32 event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]; #endif } } static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) { // ... rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev); // ... }

PostgreSQL Latch初始化流程

我们看一下PostgreSQL Latch的初始化流程,主要分析本地Latch以及共享Latch的初始化流程。

main(int argc, char *argv[]) --> MyProcPid = getpid(); // 获取当前进程pid --> PostmasterMain(argc, argv); // postmaster主进程启动 --> reset_shared(); // 设置共享内存和信号量 --> CreateSharedMemoryAndSemaphores(); --> InitProcGlobal(); // Initialize the global process table during postmaster startup --> for (i < MaxBackends + NUM_AUXILIARY_PROCS) InitSharedLatch(&(procs[i].procLatch)); // 对每个后端进程初始化Latch --> ServerLoop(); --> BackendStartup(port); --> InitPostmasterChild(); --> InitializeLatchSupport(); /* Initialize process-local latch support */ --> pipe(pipefd) // 创建一个自管道 --> pqsignal(SIGURG, latch_sigurg_handler); // 注册信号处理函数,通过自管道触发事件,唤醒WaitLatch() --> MyLatch = &LocalLatchData; --> InitLatch(MyLatch); // 初始化当前postmaster子进程的本地Latch --> InitializeLatchWaitSet(); --> LatchWaitSet = CreateWaitEventSet(TopMemoryContext, 2); // 创建一个WaitEventSet,用于等待事件 --> set->epoll_fd = epoll_create1(EPOLL_CLOEXEC); // 创建一个epoll实例 --> AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); // 添加WL_LATCH_SET事件到WaitEventSet中 --> AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); // 如果是postmaster子进程则添加WL_EXIT_ON_PM_DEATH事件到WaitEventSet中 --> WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD); --> epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev); --> BackendInitialize(port); --> pq_init(); // 初始化libpq --> FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); // 创建一个WaitEventSet,用于等待事件 --> AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); // 添加WL_SOCKET_WRITEABLE事件到WaitEventSet中,套接字可写事件 --> AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); // 添加WL_LATCH_SET事件到WaitEventSet中,用于等待本地Latch --> AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); // 添加postmaster进程终止事件 --> BackendRun(port); --> PostgresMain(ac, av, port->database_name, port->user_name); --> InitProcess(); --> OwnLatch(&MyProc->procLatch); // Acquire ownership of the PGPROC's latch --> SwitchToSharedLatch(); // 从本地Latch切换到共享Latch --> MyLatch = &MyProc->procLatch; --> ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, MyLatch); --> SetLatch(MyLatch); --> InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL, false);

其中初始化共享Latch流程的关键代码如下:

/* Pointers to shared-memory structures */ PROC_HDR *ProcGlobal = NULL; void InitProcGlobal(void) { /* Create the ProcGlobal shared structure */ ProcGlobal = (PROC_HDR *) ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found); PGPROC *procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC)); MemSet(procs, 0, TotalProcs * sizeof(PGPROC)); ProcGlobal->allProcs = procs; for (i = 0; i < TotalProcs; i++) { /* Common initialization for all PGPROCs, regardless of type. */ /* * Set up per-PGPROC semaphore, latch, and fpInfoLock. Prepared xact * dummy PGPROCs don't need these though - they're never associated * with a real process */ if (i < MaxBackends + NUM_AUXILIARY_PROCS) { procs[i].sem = PGSemaphoreCreate(); InitSharedLatch(&(procs[i].procLatch)); LWLockInitialize(&(procs[i].fpInfoLock), LWTRANCHE_LOCK_FASTPATH); } // ... } // ... }

每个后端进程都有自己的PGPROC结构体,其中包含一个Latch结构。

// Each backend has a PGPROC struct in shared memory struct PGPROC { /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */ SHM_QUEUE links; /* list link if process is in a list */ PGPROC **procgloballist; /* procglobal list that owns this PGPROC */ PGSemaphore sem; /* ONE semaphore to sleep on */ ProcWaitStatus waitStatus; Latch procLatch; /* generic latch for process */ // ... }

参考文档:
Postgresql中latch实现中self-pipe trick解决什么问题
Postgresql源码(40)Latch的原理分析和应用场景

最后修改时间:2025-04-25 17:14:55
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论