StoneDB开源地址
https://github.com/stoneatom/stonedb
作者:罗中天
浙江大学-软件工程-在读硕士、StoneDB 内核研发实习生
2023 年 StoneDB 开源之夏项目中选学生
redo log 类型
innodb 的 redo log 是带有逻辑意义的物理日志:物理指的是 redo log 是针对某一个页来说的,每条 redo log 都会有 Type、Space ID、Page Number 等信息,如下图所示;逻辑指的是一条 redo log 中可能描述的不是在页面上的某个偏移量的位置上写入若干个字节的数据,而是描述在页面上插入或者删除一条什么样的记录。
redo log 的通用结构为
Type (1) + Space ID (4) + Page Number (4) + Body
复制
Type 的最高位是一个 Single Record Flag 标志位,如果为 1,表示该 redo log 单独构成一个 mtr。
redo log 根据作用的对象,又可以分为作用于 Page 的 redo log,作用于 space 的 redo log 和提供额外信息的 redo log。
作用于 page 的 redo log
大多数的 redo log 属于这一类别,常见的有 MLOG_1BYTE、MLOG_2BYTES、MLOG_4BYTES、MLOG_8BYTES、MLOG_REC_INSERT、MLOG_REC_CLUST_DELETE_MARK、MLOG_REC_UPDATE_IN_PLACE 等。其中 MLOG_1BYTE、MLOG_2BYTES、MLOG_4BYTES、MLOG_8BYTES 描述了在页面的某个偏移量处写入若干个字节的数据;MLOG_REC_INSERT 描述了在页面上插入一条记录;MLOG_REC_CLUST_DELETE_MARK 描述了在聚簇索引的页面上删除一条记录(用户线程删除的操作只会打 delete 标记,物理删除的操作由 purge 线程来做);MLOG_REC_UPDATE_IN_PLACE 描述了在聚簇索引的页面上原地更新一条记录(即修改的是非索引列的字段,二级索引上的更新不会产生该条日志,因为二级索引上的记录没有版本链,所以更新操作产生的 redo log 为 MLOG_REC_CLUST_DELETE_MARK + MLOG_REC_INSERT)。
MLOG_REC_INSERT
MLOG_REC_INSERT 类型的 redo log body 部分的格式为
version (1): 版本信息
flag (1)
n (2): 字段的数量
inst_cols (2)
n_uniq (2): 主键的数量
n个字段的长度 (n * 2)
offset (2): 前一条记录在页面中的偏移量
end_seg_len (compressed): 从mismatch_index开始的记录长度,最低位是标志位
info_and_status_bits (1)
origin_offset (compressed): record header的长度
mismatch_index (compressed): 和前一个记录相比第一个不一样的位置
data (end_seg_len >> 1): 该redo log对应的记录从mismatch_index开始的数据复制
可见,MLOG_REC_INSERT 类型的 redo log 进行了前缀压缩
MLOG_REC_CLUST_DELETE_MARK
version (1): 版本信息
flag (1)
n (2): 字段的数量
inst_cols (2)
n_uniq (2): 主键的数量
n个字段的长度 (n * 2)
flags (1)
val (1): 设置还是取消delete flag
pos (compressed): trx_id在记录中的偏移量
roll_ptr (7)
trx_id (compressed)
offset (2): 记录origin offset的位置在页面中的偏移量复制
MLOG_REC_UPDATE_IN_PLACE
version (1): 版本信息
flag (1)
n (2): 字段的数量
inst_cols (2)
n_uniq (2): 主键的数量
n个字段的长度 (n * 2)
flags (1)
pos (compressed): trx_id在记录中的偏移量
roll_ptr (7)
trx_id (compressed)
rec_offset (2): 记录在页面中的偏移量
info_bits (1)
n_fields (compressed): 修改的字段的数量
对n_fields个修改字段的描述
field_no (compressed): 字段的编号
len (compressed): 字段的长度
data (len): 数据复制
作用于 space 的 redo log
这类 redo log 描述的是针对一个 space 文件的修改,由于这类文件不是 write ahead 的,而是在文件操作后才记录的,所以在恢复的过程中只会对于文件的状态做一些检查。这类 rede log 不是本文的重点,在后续不再赘述。
提供额外信息的 redo log
这一类的 redo log 主要指的是 MLOG_MULTI_REC_END,只由一个字节的 Type 构成,用于标识一个 mini transaction(简称 mtr)的结尾。
recovery 原理
innodb 的 recovery 从 innodb 启动的时候开始执行,大概流程如下:
1、从 ib_logfile 文件的 header 中找到 checkpoint lsn,作为 recovery 的起点
2、每次从 ib_logfile 文件中读取 64KB 的 redo log 到内存中
3、将每个 log block 的 header 和 trail 去掉后,拼出一份连续的日志
4、以 mtr 为单位进行解析
4.1、判断 MLOG_SINGLE_REC_FLAG 标志位,如果一个 mtr 只由单条日志构成,直接解析后放入哈希表;
4.2、如果一个 mtr 由多条日志构成,需要先找到 MLOG_MULTI_REC_END 类型的日志,确定 mtr 的终点,并加入缓存中,然后将缓存中所有的日志都放入哈希表中
5、将哈希表中的 redo log 进行重放
note:这里不直接在解析的时候回放,而是插入哈希表中回放的好处是:可能会有很多 redo log 作用在同一个 page 上,将这些 redo log 使用一次 IO 进行重放,可以加快重放的速度。该哈希表包括两层,第一层以 space_id 为 key,第二层以 page_no 为 key。
调用栈如下所示(下面的源码基于 MySQL8.0.30 版本)
// storage/innobase/srv/srv0start.cc
srv_start
// 从系统表的第一个页中获取flushed_lsn
// 如果是正常shutdown的话,会做一次同步的全量checkpoint,会在系统表的第一个页中写入checkpoint的lsn
srv_sys_space.open_or_create(false, create_new_db, &sum_of_new_sizes, &flushed_lsn);
read_lsn_and_check_flags(flush_lsn);
it->validate_first_page(it->m_space_id, flushed_lsn, false);
*flush_lsn = mach_read_from_8(m_first_page + FIL_PAGE_FILE_FLUSH_LSN);
recv_recovery_from_checkpoint_start(*log_sys, flushed_lsn);
// 每个ib_logfile文件有2KB的header,在header的第2个log block和第4个log block中的8字节偏移量处分别存有checkpoint1和checkpoint2
// 当checkpoint_no为偶数时,写入checkpoint1,为奇数时,写入checkpoint2
// 遍历所有的ib_logfile文件,分别从其header中取出两个checkpoint lsn,取最大值返回
// note: 其实在第一个ib_logfile中寻找checkpoint lsn即可,因为做checkpoint的时候只会往第一个ib_logfile中写入
Log_checkpoint_location checkpoint;
recv_find_max_checkpoint(log, checkpoint)
// 从checkpoint lsn开始解析redo log并且apply
recv_recovery_begin
recv_read_log_seg
recv_scan_log_recs
recv_parse_log_recs
recv_single_rec
recv_parse_log_rec
mlog_parse_initial_log_record
recv_parse_or_apply_log_rec_body
recv_multi_rec
recv_parse_log_rec
mlog_parse_initial_log_record
recv_parse_or_apply_log_rec_body
// 将哈希表中的redo log进行重放
recv_apply_hashed_log_recs复制
下面对从 recv_recovery_begin 开始的流程进行详细阐述,在解析 redo log 的时候以解析 MLOG_REC_INSERT 类型的 redo log 为例进行分析。为了突出主干,对代码做了简化。
innodb 将解析和重放的逻辑是写在一起的,当传入的 block 为空时,只解析不重放,当传入的 block 非空时,解析并且重放。
recv_recovery_begin
该函数负责循环从 ib_logfile 文件中读取 64KB 的 redo log 到内存中进行解析,并放入哈希表中
// storage/innobase/log/log0recv.cc
static dberr_t recv_recovery_begin(log_t &log, const lsn_t checkpoint_lsn) {
// 初始化recv_sys
recv_sys->len = 0;
...
// checkpoint_lsn向下向512KB对齐
lsn_t start_lsn = ut_uint64_align_down(checkpoint_lsn, OS_FILE_LOG_BLOCK_SIZE);
bool finished = false;
// 循环读取ib_logfile中的内容到
while (!finished) {
// 读取从start_lsn开始的64KB的数据到log.buf中
const lsn_t end_lsn =
recv_read_log_seg(log, log.buf, start_lsn, start_lsn + RECV_SCAN_SIZE);
if (end_lsn == start_lsn) {
/* This could happen if we crashed just after completing file,
and before next file has been successfully created. */
break;
}
dberr_t err;
finished = recv_scan_log_recs(log, max_mem, log.buf, end_lsn - start_lsn,
start_lsn, &log.m_scanned_lsn, err);
if (err != DB_SUCCESS) {
return err;
}
start_lsn = end_lsn;
}
return DB_SUCCESS;
}复制
recv_read_log_seg
该函数负责从 ib_logfile 文件中读取 64KB 的 redo log 到内存中。
// storage/innobase/log/log0recv.cc
static lsn_t recv_read_log_seg(log_t &log, byte *buf, lsn_t start_lsn,
const lsn_t end_lsn) {
// 找到start_lsn所在的ib_logfile文件
auto file = log.m_files.find(start_lsn);
if (file == log.m_files.end()) {
/* Missing valid file ! */
return start_lsn;
}
do {
os_offset_t source_offset;
// 计算start_lsn在ib_logfile文件中的偏移量
// LOG_FILE_HDR_SIZE + (lsn - file_start_lsn);
source_offset = file->offset(start_lsn);
os_offset_t len = end_lsn - start_lsn;
bool switch_to_next_file = false;
if (source_offset + len > file->m_size_in_bytes) {
len = file->m_size_in_bytes - source_offset;
switch_to_next_file = true;
}
// 读取文件
const dberr_t err =
log_data_blocks_read(file_handle, source_offset, len, buf);
start_lsn += len;
buf += len;
if (switch_to_next_file) {
// 切换到下一个文件
...
}
} while (start_lsn != end_lsn);
return end_lsn;
}
// 每个ib_logfile文件的header中记录有该文件起始的file_start_lsn
os_offset_t offset(lsn_t lsn)
os_offset_t offset(lsn_t lsn, lsn_t file_start_lsn)
return LOG_FILE_HDR_SIZE + (lsn - file_start_lsn);复制
recv_scan_log_recs
该函数先将每个 log block 的 header 和 trail 去掉后,拼出一份连续的日志,然后以 mtr 为单位进行解析
struct Log_data_block_header {
...
/** Offset up to which this block has data inside, computed from the
beginning of the block. */
// 该log block中前m_data_len个字节是有内容的
uint16_t m_data_len;
/** Offset to the first mtr starting in this block, or 0 if there is no
mtr starting in this block. */
// 该log block中第一个从该block中开始的mtr的起始位置
uint16_t m_first_rec_group;
};
static bool recv_scan_log_recs(log_t &log,
size_t max_memory, const byte *buf, size_t len,
lsn_t start_lsn, lsn_t *read_upto_lsn,
dberr_t &err) {
const byte *log_block = buf;
lsn_t scanned_lsn = start_lsn;
bool finished = false;
bool more_data = false;
// 每个log block有header和trail,导致跨block的日志是不连续的,不能直接解析
// 所以需要先将每个block的header和trail去掉,将所有block的主体内容拼起来
do {
// 解析log block header
Log_data_block_header block_header;
log_data_block_header_deserialize(log_block, block_header);
...
const auto data_len = block_header.m_data_len;
...
// 如果解析redo log的起点位置还没确定并且存在mtr从该block中开始,就确定解析的起点
if (!recv_sys->parse_start_lsn && block_header.m_first_rec_group > 0) {
recv_sys->parse_start_lsn = scanned_lsn + block_header.m_first_rec_group;
if (recv_sys->parse_start_lsn < recv_sys->checkpoint_lsn) {
recv_sys->bytes_to_ignore_before_checkpoint =
recv_sys->checkpoint_lsn - recv_sys->parse_start_lsn;
}
recv_sys->scanned_lsn = recv_sys->parse_start_lsn;
recv_sys->recovered_lsn = recv_sys->parse_start_lsn;
}
scanned_lsn += data_len;
if (scanned_lsn > recv_sys->scanned_lsn) {
// buf空间不够用,扩容
if (recv_sys->len + 4 * OS_FILE_LOG_BLOCK_SIZE >= recv_sys->buf_len) {
recv_sys_resize_buf();
}
if (!recv_sys->found_corrupt_log) {
// 将该log block去掉header和trail后接到recv_sys->buf的尾部
more_data = recv_sys_add_to_parsing_buf(log_block, scanned_lsn);
}
recv_sys->scanned_lsn = scanned_lsn;
}
// 该log block没有满,那么解析redo log的终点就是这个block
if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
/* Log data for this group ends here */
finished = true;
break;
} else {
log_block += OS_FILE_LOG_BLOCK_SIZE;
}
} while (log_block < buf + len);
if (more_data && !recv_sys->found_corrupt_log) {
// 解析redo log
recv_parse_log_recs();
if (recv_sys->recovered_offset > recv_sys->buf_len / 4) {
/* Move parsing buffer data to the buffer start */
recv_reset_buffer();
}
}
return finished;
}复制
recv_parse_log_recs
该函数判断 MLOG_SINGLE_REC_FLAG 标志位,根据一个 mtr 是由一条日志组成还是多条日志组成,分开处理。
static void recv_parse_log_recs() {
ut_ad(recv_sys->parse_start_lsn != 0);
// 解析redo log以mtr为基本单位
for (;;) {
byte *ptr = recv_sys->buf + recv_sys->recovered_offset;
byte *end_ptr = recv_sys->buf + recv_sys->len;
if (ptr == end_ptr) {
return;
}
bool single_rec;
switch (*ptr) {
case MLOG_DUMMY_RECORD:
single_rec = true;
break;
default:
// 解析Type最高位的标志位,看该mtr是由单条redo log构成还是多条redo log构成
single_rec = !!(*ptr & MLOG_SINGLE_REC_FLAG);
}
if (single_rec) {
if (recv_single_rec(ptr, end_ptr)) { // 单条redo log构成的mtr的解析入口
return;
}
} else if (recv_multi_rec(ptr, end_ptr)) { // 多条redo log构成的mtr的解析入口
return;
}
}
}复制
recv_single_rec
单条 redo log 构成的 mtr 的解析,将单条 redo log 解析后插入到哈希表中。
static bool recv_single_rec(byte *ptr, byte *end_ptr) {
lsn_t old_lsn = recv_sys->recovered_lsn;
byte *body;
mlog_id_t type;
page_no_t page_no;
space_id_t space_id;
// 解析单条redo log
ulint len =
recv_parse_log_rec(&type, ptr, end_ptr, &space_id, &page_no, &body);
lsn_t new_recovered_lsn;
new_recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
if (new_recovered_lsn > recv_sys->scanned_lsn) {
/* The log record filled a log block, and we
require that also the next log block should
have been scanned in */
return true;
}
...
recv_sys->recovered_offset += len;
recv_sys->recovered_lsn = new_recovered_lsn;
if (recv_recovery_on) {
// 将redo log加入到哈希表中
// 不直接重放的原因是可能会有很多redo log作用在同一个page上,将这些redo log使用一次IO进行重放,可以加快重放的速度
// 哈希表包括两层,第一层以space_id为key,第二层以page_no为key
recv_add_to_hash_table(type, space_id, page_no, body, ptr + len,
old_lsn, recv_sys->recovered_lsn);
}
return false;
}复制
recv_multi_rec
多条 redo log 构成的 mtr 的解析。
先确定 mtr 的重点,并将解析好的 redo log 加入缓存中,遍历该 mtr 中所有的 redo log,从缓存中取出后插入到哈希表中。
static bool recv_multi_rec(byte *ptr, byte *end_ptr) {
ulint n_recs = 0;
ulint total_len = 0;
// 先找到mtr的终点,即MLOG_MULTI_REC_END类型的记录
for (;;) {
mlog_id_t type = MLOG_BIGGEST_TYPE;
byte *body;
page_no_t page_no = 0;
space_id_t space_id = 0;
ulint len =
recv_parse_log_rec(&type, ptr, end_ptr, &space_id, &page_no, &body);
// 将部分解析的redo log缓存起来
recv_sys->save_rec(n_recs, space_id, page_no, type, body, len);
total_len += len;
++n_recs;
ptr += len;
if (type == MLOG_MULTI_REC_END) {
break;
}
}
lsn_t new_recovered_lsn =
recv_calc_lsn_on_data_add(recv_sys->recovered_lsn, total_len);
// 重置ptr的位置,开始扫第二遍
ptr = recv_sys->buf + recv_sys->recovered_offset;
for (ulint i = 0; i < n_recs; i++) {
lsn_t old_lsn = recv_sys->recovered_lsn;
space_id_t space_id = 0;
page_no_t page_no = 0;
mlog_id_t type = MLOG_BIGGEST_TYPE;
byte *body = nullptr;
size_t len = 0;
// 从第一遍扫的缓存中取出一条redo log
recv_sys->get_saved_rec(i, space_id, page_no, type, body, len);
recv_sys->recovered_offset += len;
recv_sys->recovered_lsn = recv_calc_lsn_on_data_add(old_lsn, len);
if (recv_recovery_on) {
// 将redo log加入到哈希表中
recv_add_to_hash_table(type, space_id, page_no, body, ptr + len,
old_lsn, new_recovered_lsn);
}
ptr += len;
}
return false;
}复制
recv_parse_log_rec
该函数负责对单条 redo log 日志进行解析,先解析 Type、Space ID、Page Number,再解析 body
static ulint recv_parse_log_rec(mlog_id_t *type, byte *ptr, byte *end_ptr,
space_id_t *space_id, page_no_t *page_no,
byte **body) {
byte *new_ptr;
*body = nullptr;
switch (*ptr) {
case MLOG_MULTI_REC_END:
case MLOG_DUMMY_RECORD:
*page_no = FIL_NULL;
*space_id = SPACE_UNKNOWN;
*type = static_cast<mlog_id_t>(*ptr);
return 1;
...
}
// 解析Type、Space ID、Page Number
new_ptr =
mlog_parse_initial_log_record(ptr, end_ptr, type, space_id, page_no);
*body = new_ptr;
if (new_ptr == nullptr) {
return 0;
}
// 解析body部分
new_ptr = recv_parse_or_apply_log_rec_body(
*type, new_ptr, end_ptr, *space_id, *page_no, nullptr, nullptr,
new_ptr - ptr, recv_sys->recovered_lsn);
if (new_ptr == nullptr) {
return 0;
}
return new_ptr - ptr;
}复制
mlog_parse_initial_log_record
该函数负责解析 Type、Space ID、Page Number
// storage/innobase/mtr/mtr0log.cc
byte *mlog_parse_initial_log_record(
const byte *ptr, /*!< in: buffer */
const byte *end_ptr, /*!< in: buffer end */
mlog_id_t *type, /*!< out: log record type: MLOG_1BYTE, ... */
space_id_t *space, /*!< out: space id */
page_no_t *page_no) /*!< out: page number */
{
if (end_ptr < ptr + 1) {
return (nullptr);
}
// 解析Type
*type = (mlog_id_t)((ulint)*ptr & ~MLOG_SINGLE_REC_FLAG);
ut_ad(*type <= MLOG_BIGGEST_TYPE);
ptr++;
if (end_ptr < ptr + 2) {
return (nullptr);
}
// 解析Space ID
*space = mach_parse_compressed(&ptr, end_ptr);
if (ptr != nullptr) {
// 解析Page Number
*page_no = mach_parse_compressed(&ptr, end_ptr);
}
return (const_cast<byte *>(ptr));
}复制
recv_parse_or_apply_log_rec_body
该函数负责解析 body,枚举所有的 type 类型,分别进行处理。
这里以 MLOG_REC_INSERT 的日志为例,会先解析字段数量、主键数量、字段长度等信息,构建出索引字典,然后解析剩余的部分,构建出完整的记录,最后插入对应的页中。
// storage/innobase/log/log0recv.cc
static byte *recv_parse_or_apply_log_rec_body(
mlog_id_t type, byte *ptr, byte *end_ptr, space_id_t space_id,
page_no_t page_no, buf_block_t *block, mtr_t *mtr, ulint parsed_bytes,
lsn_t start_lsn) {
...
dict_index_t *index = nullptr;
...
// 这里枚举了所有的redo log类型
switch (type) {
...
case MLOG_REC_INSERT:
// 解析字段数量、主键数量、字段长度等信息,构建出索引字典
if (nullptr != (ptr = mlog_parse_index(ptr, end_ptr, &index))) {
// 解析剩余的部分,构建出完整的记录,插入到对应的页中
ptr = page_cur_parse_insert_rec(false, ptr, end_ptr, block, index, mtr);
}
break;
...
}
if (index != nullptr) {
dict_table_t *table = index->table;
dict_mem_index_free(index);
dict_mem_table_free(table);
}
return ptr;
}复制
mlog_parse_index
该函数负责解析字段的数量,主键的数量和每个字段的长度,构建索引字典
byte *mlog_parse_index(byte *ptr, const byte *end_ptr, dict_index_t **index) {
/* Read the 1 byte for index log version */
uint8_t index_log_version = 0;
ptr = parse_index_log_version(ptr, end_ptr, index_log_version);
/* Read the 1 byte flag */
uint8_t flag = 0;
ptr = parse_index_flag(ptr, end_ptr, flag);
/* Read n and n_uniq */
// 解析字段的数量n和主键的数量n_uniq
uint16_t n = 0;
uint16_t n_uniq = 0;
uint16_t inst_cols = 0;
ptr = parse_index_column_counts(ptr, end_ptr, is_comp, is_versioned,
is_instant, n, n_uniq, inst_cols);
/* Create a dummy dict_table_t */
dict_table_t *table =
dict_mem_table_create(RECOVERY_INDEX_TABLE_NAME, DICT_HDR_SPACE, n, 0, 0,
is_comp ? DICT_TF_COMPACT : 0, 0);
/* Create a dummy dict_index_t */
dict_index_t *ind =
dict_mem_index_create(RECOVERY_INDEX_TABLE_NAME,
RECOVERY_INDEX_TABLE_NAME, DICT_HDR_SPACE, 0, n);
ind->table = table;
ind->n_uniq = (unsigned int)n_uniq;
if (n_uniq != n) {
ind->type = DICT_CLUSTERED;
}
/* Read each index field info */
// 解析每个字段的长度,填充index的feild信息
ptr = parse_index_fields(ptr, end_ptr, n, n_uniq, is_versioned, ind, table);
if (ptr == nullptr) {
*index = ind;
return ptr;
}
...
*index = ind;
return (ptr);
}复制
parse_index_fields
该函数负责解析每个字段的长度,填充索引的 field 列表
static byte *parse_index_fields(byte *ptr, const byte *end_ptr, uint16_t n,
uint16_t n_uniq, bool is_versioned,
dict_index_t *&ind, dict_table_t *&table) {
for (size_t i = 0; i < n; i++) {
uint16_t len = 0;
// 读取字段的长度信息
ptr = read_2_bytes(ptr, end_ptr, len);
// 这里构建出来的field字段的类型并不是准确的,只能区分出是变长还是定长,因为redo log中只有字段长度相关的信息,并没有类型相关的信息
dict_mem_table_add_col(
table, nullptr, nullptr,
((len + 1) & 0x7fff) <= 1 ? DATA_BINARY : DATA_FIXBINARY,
len & 0x8000 ? DATA_NOT_NULL : 0, len & 0x7fff, true, phy_pos, v_added,
v_dropped);
dict_index_add_col(ind, table, table->get_col(i), 0, true);
}
// 加上trx_id和roll_ptr的列
dict_table_add_system_columns(table, table->heap);
/* Identify DB_TRX_ID and DB_ROLL_PTR in the index. */
// index中字段的顺序和物理记录保持一致
// 如果是聚簇索引,trx_id和roll_ptr放在主键的后面
if (is_versioned || (n_uniq != n)) {
size_t i = 0;
i = DATA_TRX_ID - 1 + n_uniq;
ind->fields[i].col = &table->cols[n + DATA_TRX_ID];
ind->fields[i].col->set_phy_pos(table->cols[i].get_phy_pos());
i = DATA_ROLL_PTR - 1 + n_uniq;
ind->fields[i].col = &table->cols[n + DATA_ROLL_PTR];
ind->fields[i].col->set_phy_pos(table->cols[i].get_phy_pos());
}
return ptr;
}复制
page_cur_parse_insert_rec
由于 MLOG_REC_INSERT 类型的 redo log 里做了压缩,只记录了和上一条记录不一样的部分,所以需要先解析出上一条记录在页面中的偏移量、待插入记录和上一条记录第一个不相同的字节 mismatch_index 和待插入记录从 mismatch_index 开始的记录长度 eng_seg_len,然后定位到上一条记录,取出前 mismatch_index 个字节,并从 redo log 中解析出待插入记录从 mismatch_index 开始的部分,那么待插入记录就是两部分拼起来,最后插入到 B+树中。
// storage/innobase/page/page0cur.cc
byte *page_cur_parse_insert_rec(
bool is_short, /*!< in: true if short inserts */
const byte *ptr, /*!< in: buffer */
const byte *end_ptr, /*!< in: buffer end */
buf_block_t *block, /*!< in: page or NULL */
dict_index_t *index, /*!< in: record descriptor */
mtr_t *mtr) /*!< in: mtr or NULL */
{
ulint origin_offset = 0; /* remove warning */
ulint end_seg_len;
ulint mismatch_index = 0; /* remove warning */
page_t *page;
rec_t *cursor_rec{nullptr};
byte buf1[1024];
// buf描述待插入记录
byte *buf;
const byte *ptr2 = ptr;
ulint info_and_status_bits = 0; /* remove warning */
page_cur_t cursor;
mem_heap_t *heap = nullptr;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
// offsets描述每个字段在物理记录中的偏移量
ulint *offsets = offsets_;
// offsets[0]存offsets数组占用的内存大小
rec_offs_init(offsets_);
page = block ? buf_block_get_frame(block) : nullptr;
ulint offset;
// 前一条记录在页面中的偏移量
offset = mach_read_from_2(ptr);
ptr += 2;
if (page != nullptr) cursor_rec = page + offset;
// 该redo log对应的记录和前一条记录不一样的部分的长度,最低位是一个标志位
end_seg_len = mach_parse_compressed(&ptr, end_ptr);
info_and_status_bits = mach_read_from_1(ptr);
ptr++;
// 该redo log对应的record header的长度
origin_offset = mach_parse_compressed(&ptr, end_ptr);
// 和前一个记录相比第一个不一样的位置
mismatch_index = mach_parse_compressed(&ptr, end_ptr);
if (!block) {
return (const_cast<byte *>(ptr + (end_seg_len >> 1)));
}
...
// end_seg_len的最低位是一个标志位,所以真实的大小还需要除以2
end_seg_len >>= 1;
// 如果buf在栈上分配的内存不够,就从堆上分配进行扩容
if (mismatch_index + end_seg_len < sizeof buf1) {
buf = buf1;
} else {
buf = static_cast<byte *>(ut::malloc_withkey(UT_NEW_THIS_FILE_PSI_KEY,
mismatch_index + end_seg_len));
}
// 待插入记录 = 前一条记录的前mismatch_index个字节 + 从ptr开始的eng_seg_len个字节
if (mismatch_index) {
ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
}
ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
...
// 将cursor定位到前一条记录的位置
page_cur_position(cursor_rec, block, &cursor);
// 构建offsets数组,用于描述每个字段在记录中的偏移量
offsets = rec_get_offsets(buf + origin_offset, index, offsets,
ULINT_UNDEFINED, UT_LOCATION_HERE, &heap);
// 插入到B+树中
page_cur_rec_insert(&cursor, buf + origin_offset, index,
offsets, mtr);
if (buf != buf1) {
ut::free(buf);
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return (const_cast<byte *>(ptr + end_seg_len));
}复制
总结
这篇文章我们介绍了 redo log 的分类,不同种类的 redo log 的结构,并且分析了 redo log 在恢复时的流程相关的源码,欢迎大家关注StoneDB的开源代码。
加入微信群:添加社区助理-小石侠;加入钉钉群:扫描下方钉钉群二维码。