1. 概述
XLOG
模块的内容,主要分享PostgreSQL数据库在
StartupXLOG()
函数中如何去分配并初始化一个新的
XLogReader
。
XLogReader
的数据类型是
struct XLogReaderState
,该结构体内部的成员主要包含了读取和关闭XLOG文件的回调函数,以及读取到XLOG段文件内容的内存缓冲区。
void StartupXLOG()
{
XLogReaderState *xlogreader;
...... //省略
/* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
xlogreader =
XLogReaderAllocate(wal_segment_size, NULL,
XL_ROUTINE(.page_read = &XLogPageRead,
.segment_open = NULL,
.segment_close = wal_segment_close),
&private);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"),
errdetail("Failed while allocating a WAL reading processor.")));
/// 来自于pg_control文件:Database system identifier: 7124521561191465673
xlogreader->system_identifier = ControlFile->system_identifier;
}
//省略复制
2. XLogReader数据类型
XLogReader
数据类型是
struct XLogReaderState
,声明于
xlogreader.h
头文件中,如下:
struct XLogReaderState
{
XLogReaderRoutine routine;
/* ----------------------------------------
* [公共参数]
* ----------------------------------------
*/
uint64 system_identifier;
void *private_data;
XLogRecPtr ReadRecPtr; // 最后一个记录读取的开始
XLogRecPtr EndRecPtr; // 最后一个记录读取的end+1
/* ----------------------------------------
* [当前记录的解码表示]
* [使用XLogRecGet*函数来调查记录; 这些字段不应该被直接访问。]
* ----------------------------------------
*/
XLogRecord *decoded_record; /* [当前解码的记录]*/
char *main_data; /* [记录的主要数据部分]*/
uint32 main_data_len; /* [主数据部分的长度]*/
uint32 main_data_bufsz; /* [缓冲区的分配大小]*/
RepOriginId record_origin;
/*
* [有关记录引用的块的信息]
*/
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
int max_block_id; /* [使用的最大block_id(如果没有则为-1)]*/
/* ----------------------------------------
* [私有/内部状态]
* ----------------------------------------
*/
char *readBuf;
uint32 readLen;
// [当前readBuf中数据的上次读取XLOG位置]
WALSegmentContext segcxt;
WALOpenSegment seg;
uint32 segoff;
// [读取上一页的开头及其TLI。不一定对应readBuf中的内容;用于时间线完整性检查。]
XLogRecPtr latestPagePtr;
TimeLineID latestPageTLI;
// [正在读取的WAL记录的开始。]
XLogRecPtr currRecPtr;
// [从中读取的时间线,如果需要查找,则为0]
TimeLineID currTLI;
/*
* [如果当前TLI是历史的(tliSwitchPoint),则在currTLI中读取安全点;如果在当前时间线上,则在InvalidXLogRecPtr中读取。]
* [实际上设置为包含结束currTLI有效性的时间性转换的段的开始,而不是转换本身的LSN,因为我们不能假设旧的段会存在。]
*/
XLogRecPtr currTLIValidUntil;
// [如果currTLI不是最新的已知时间线,则为到达currTLIValidUntil时读取的下一个时间线。]
TimeLineID nextTLI;
// [当前ReadRecord结果的缓冲区(可扩展),当记录跨越页面边界时使用。]
char *readRecordBuf;
uint32 readRecordBufSize; //allocate_recordbuf-->5MB
// [保存错误消息的缓冲区]
char *errormsg_buf;
};复制
routine
routine
是一个操作回调函数,其数据类型是
XLogReaderRoutine
,它共有三个数据成员,分别是:
page_read
、
segment_open
和
segment_close
,其声明于
xlogreader.h
头文件中,如下所示:
typedef struct XLogReaderRoutine
{
XLogPageReadCB page_read;
WALSegmentOpenCB segment_open;
WALSegmentCloseCB segment_close;
} XLogReaderRoutine;复制
page_read 它是一个数据输入回调。这个回调应该从
targetPagePtr
开始至少读取xlog
页面的reqLen
有效字节,并将它们存储在readBuf
中。回调函数将返回读取到的字节数(不超过XLOG_BLCKSZ
),失败时返回-1
。如果有必要,回调函数将休眠,targetRecPtr
是我们正在读取的WAL
记录的位置。通常它等于targetPagePtr + reqLen
,但有时xlogreader
需要在读取它感兴趣的实际WAL
记录之前读取和验证页面或段头。在这种情况下,targetRecPtr
可用于确定从哪个时间轴(线)读取页面。
回调应设置
->seg.ws_tli
转换为从其中读取页面的文件的TLI
。
page_read
属于一个函数指针数据类型(
XLogPageReadCB
),其原型如下:
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf);复制
segment_open 回调以打开指定的
WAL
段进行读取。->seg.ws_file
应设置为打开的段的文件描述符。如果失败,回调将引发一个错误,并且不会返回。“nextSegNo
”是要打开的段的编号。"tli_p
"是一个输入/输出参数。WALRead()
使用它来传递应该在其中找到新段的时间线,但是回调可以使用它来返回它实际打开的TLI
。
segment_open
也是一个函数指针类型(
WALSegmentOpenCB
),其声明如下:
typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
XLogSegNo nextSegNo,
TimeLineID *tli_p);复制
segment_close
segment_close
的所用是用于
WAL
段关闭回调。
->seg.ws_file
应设为负数。它同样是一个函数指针类型(
WALSegmentCloseCB
),其声明如下:
typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
复制
system_identifier 我们将要读取的 xlog
文件的系统标识符。如果未知或不重要,则设置为零(默认值)。private_data 该成员用于回调的不透明数据。 xloreader
不使用。ReadRecPtr 和 EndRecPtr 上次读取记录的开始点和结束点。 EndRecPtr
也用作下一个读取的位置。调用XLogBeginRead()
会将EndRecPtr
设置为起始位置,而ReadRecPtr
设置为无效。ReadRecPtr
表示最后一个记录读取的开始,EndRecPtr
表示最后读取的记录的结束位置+1
。decoded_record 当前解码的记录。 main_data 记录的主要数据部分。 main_data_len 主数据部分的长度。
main_data_bufsz 缓冲区的分配大小。 blocks 有关记录引用的块信息。 max_block_id 使用的最大 block_id
(如果没有则为-1)。readBuf 当前读页面的缓冲区( XLOG_BLCKSZ
字节,有效长度至少为readLen
字节)。readLen 当前读页面的缓冲区的最少有效长度。
3. 初始化XLogReader过程
XLogReader
的初始化由函数
XLogReaderAllocate()
负责完成,该函数主要用于分配并初始化一个新的
XLogReader
,如果无法分配
xlogreader
,则返回
NULL
。
XLogReaderState *
XLogReaderAllocate(int wal_segment_size, //16MB
const char *waldir, //NULL
XLogReaderRoutine *routine, //{XLogPageRead, NULL, wal_segment_close}
void *private_data) //[IN]
{
XLogReaderState *state;
// 1. 为state指针变量申请内存空间
state = (XLogReaderState *)
palloc_extended(sizeof(XLogReaderState),
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
// 2. 如果内存申请失败, 则返回
if (!state)
return NULL;
/* 初始化调用方提供的支持函数 */
state->routine = *routine;
state->max_block_id = -1;
/*
3. 永久分配readBuf。我们这样做,而不仅仅是创建一个静态数组,有两个原因:
(1)不需要在后端的大多数实例化中浪费存储空间;
(2)静态char数组不保证具有任何特定的对齐方式,而palloc_extended()将提供最大对齐(MAXALIGN)的存储。
*/
state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ, //XLOG_BLCKSZ大小为8192字节
MCXT_ALLOC_NO_OOM);
if (!state->readBuf)
{
pfree(state);
return NULL;
}
/* 4. 初始化段信息。*/
WALOpenSegmentInit(&state->seg,
&state->segcxt,
wal_segment_size,
waldir);
/* [system_identifier 上面初始化为0] */
state->private_data = private_data;
/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
MCXT_ALLOC_NO_OOM);
if (!state->errormsg_buf)
{
pfree(state->readBuf);
pfree(state);
return NULL;
}
state->errormsg_buf[0] = '\0';
// 5. 分配一个最小大小的初始readRecordBuf,如果需要,可以在以后扩大。
if (!allocate_recordbuf(state, 0))
{
pfree(state->errormsg_buf);
pfree(state->readBuf);
pfree(state);
return NULL;
}
return state;
}复制
XLogReaderAllocate()
内部间接调用的
WALOpenSegmentInit()
,其作用是初始化传入的段结构。其函数实现如下:
static void
WALOpenSegmentInit(WALOpenSegment *seg,
WALSegmentContext *segcxt,
int segsize, //16MB
const char *waldir) //NULL
{
seg->ws_file = -1;
seg->ws_segno = 0;
seg->ws_tli = 0;
segcxt->ws_segsize = segsize;
if (waldir)
snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
}复制
5
点的函数
allocate_recordbuf()
,其作用是分配
readRecordBuf
以适合至少给定长度的记录。该函数如果成功则返回
true
,如果内存不足则返回
false
。
readRecordBufSize
设置为新的缓冲区大小。
XLOG_BLCKSZ(8KB)
的倍数,并确保它在开始时至少是
5 * Max(BLCKSZ, XLOG_BLCKSZ)
。(这对于所有“正常”记录来说已经足够了,但是非常大的提交或中止记录可能需要更多的空间。)
static bool
allocate_recordbuf(XLogReaderState *state, uint32 reclength)
{
uint32 newSize = reclength;
newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
// 通过Max宏计算后得出:newSize: 5MB
#ifndef FRONTEND
/* 请注意,在许多不不幸的情况下,从回收段读取的随机数据可能会导致调用此例程,
* 其大小会导致分配时出现硬故障。对于备用,这将导致实例突然停止,并出现硬故障,
* 从而阻止它从其来源之一重试获取WAL,从而允许它在不手动重新启动的情况下继续重播。
* 如果数据来自过去回收的段并且仍然有效,则分配可能成功,但记录检查将失败,
* 因此这将是短暂的。如果由于内存不足而导致分配失败,那么根据MCXT_ALLOC_NO_OOM提供的保证,
* 这也不是硬失败。
*/
if (!AllocSizeIsValid(newSize))
return false;
#endif
//如果readRecordBuf指针不为空,则表明是一个野指针,先释指向的内存空间
if (state->readRecordBuf)
pfree(state->readRecordBuf);
// 申请newSize(此处为5MB)大小的内存空间
state->readRecordBuf =
(char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
// 如果内存空间申请失败,则返回false
if (state->readRecordBuf == NULL)
{
state->readRecordBufSize = 0;
return false;
}
// 内存空间申请成功, 置readRecordBufSize大小为5MB
state->readRecordBufSize = newSize;
return true;
}复制
XLogReaderState
类型的指针变量,并且该指针变量指向某块内存空间,其中的部分成员已被初始化。接下来我们就可以将XLOG段文件中读取到的数据存储到这个
XLogReaderState
类型的指针变量
XLogReader
中去。

PostgreSQL中文社区欢迎广大技术人员投稿
投稿邮箱:press@postgres.cn
文章转载自PostgreSQL中文社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。