命令同步
在前面介绍数据同步这一部分时可能会有一个疑问,如果在执行数据同步的时候Master又接收到了新的查询命令,导致数据发生了变化,而真正生成同步数据的又是在子进程之中进行的,这样一来要如何将新的数据的变化同步给Slave节点呢?这一部分将会就这个问题进行解释。
命令的转发
Master实例向Slave实例转发命令的过程,与向AOF文件之中追加命令的逻辑类似在执行客户端查询命令的call
接口之中通过调用propagate
函数来实现的:
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags){if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)feedAppendOnlyFile(cmd, dbid, argv, argc);if (flags & PROPAGATE_PREL)replicationFeedSlaves(server.slaves, dbid, argv, argc);}
通过上面的代码,可以看出Master实例向Slave实例转发命令的逻辑,与追加AOF文件是先后进行的。
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
这个函数会将命令数据转发给slaves
列表之中对应的所有的Slave实例,并将命令数据写入到服务器的积压缓冲区之中,这个函数会执行如下的逻辑:
校验服务器的状态,检查是否可以执行命令转发的逻辑。这里需要注意的是,虽然主从复制支持层次话的结构,但是中间层的节点无法通过
replicationFeedSlaves
函数将命令转发给自己的Sub-Slave子节点,而是通过另外一个函数replicationFeedSlavesFromMasterStream
函数来实现命令的转发的。如果执行命令针对的
dictid
和服务器上缓存的数据库IDredisServer.slaveseldb
不同,那么会追加一条SELECT命令。将命令数据通过
feedReplicationBacklog
以及feedReplicationBacklogWithObject
接口追加到积压缓冲区之中。最后遍历存储Slave实例的双端链表
slaves
,将命令转发给所有没处于SLAVE_STATE_WAIT_BGSAVE_STATR
状态的Slave。对于处于SLAVE_STATE_ONLINE
状态的Slave,命令数据直接会被发送到对端的Slave实例上;对于那些处于SLAVE_STATE_WAIT_BGSAVE_END
或者SLAVE_STATE_SEND_BULK
状态的实例,命令数据将会被写入客户端输出缓冲区里,等待全量数据同步结束后,通过前面所讲的重新注册可写事件处理函数sendReplyToClient
,将缓冲区内的命令数据输出给Slave实例。这也就解释本节开头的那个疑问,同步途中的数据变化,要如何通知给Slave实例。
对于Slave实例在收到Master实例发送来的命令数据后也是会运行正常的命令执行逻辑,但是对于多层结构的主从复制来说,Slave实例将数据转发给Sub-Slave实例的逻辑并不是在replicationFeedSlaves
接口之中实现的,而是在命令执行结束时,通过调用另外一个replicationFeedSlavesFromMasterStream
接口实现的:
void processInputBufferAndReplicate(client *c){if (!(c->flags & CLIENT_MASTER)){processInputBuffer(c);}else{size_t prev_offset == c->reploff;//在processInputBuffer之中会调用call函数接口执行查询命令processInputBuffer(c);size_t applied = c->reploff - prev_offset;if (applied){replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);sdsrange(c->pending_querybuf,applied,-1);}}}
replicationFeedSlavesFromMasterStream
接口的作用与replicationFeedSlaves
类似,也会将命令数据写入积压缓冲区并转发给下一级的Sub-Slave实例。之所以使用的这种方式,按照Redis给出的解释是,这么做是方便传递“相同”数据流,并允许Slave实例共享Master实例的复制历史记录,相同的积压缓冲区以及偏移量。
接收命令转发
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
对于Slave一端来说,Master实例实际上是一个具有特殊标记的客户端对象。因此,Master在数据同步之后转发来的命令,在Slave一次可以按照正常处理客户端命令的逻辑来处理,这里对于处理细节不在详细介绍,可以回看前面关于Redis客户端以及Redis命令的相关内容。不过这里还有一点特殊的情况,就是对于层次结构的主从复制结构,Slave对象在接收到Master转发来的命令之后,还需要将其发送给自己的Slave,也就是Sub-Slave,而这个工作则是通过上面replicationFeedSlavesFromMasterStream
这个接口完成的。
同步复制
确认机制
void replicationSendAck(void);
在Slave一端会以1秒1次的频率调用replicationSendAck
函数来向Master通报它当前已经处理的命令偏移量。这个函数会在内部构建一个如下格式的REPLCONF的命令:
REPLCONF ACK <offset>
而在Master端,则会触发REPLCONF命令的处理函数replconfCommand
,该函数解析出offset
数据,将其更新到该Slave实例对应的client.repl_ack_off
字段上;同时并更新时间戳数据client.repl_ack_time
信息,这个字段将用作衡量健康的Slave的标准。
而Master强制要求Slave通报确认信息则是通过向Slave发送下面格式的REPLCONF命令来实现的:
REPLCONF GETACK *
当Slave实例收到这个命令之后,会强制调用replicationSendAck
向Master发送确认信息。
最后Master一侧也会定期调用refreshGoodSlavesCount
来刷新健康的Slave的数量:
void refreshGoodSlavesCount(void);
在Redis服务器的全局变量之中redisServer.repl_min_slaves_max_lag
字段记录确认消息的间隔阈值,refreshGoodSlavesCount
会遍历所有处于SLAVE_STATE_ONLINE
状态的Slave,如果其上次确认消息的时间client.repl_ack_time
和当前时间的差值在redisServer.repl_min_slaves_max_lag
阈值之内,则刷新健康的Slave的数量,最终这个数量会被记录到redisServer.repl_good_slaves_count
字段之上。
WAIT命令
这里需要注意的是WAIT命令只能在Master上来执行。前面在我们介绍客户端对象的阻塞操作时,介绍了blockingState
结构体,用于记录当前client
对象的阻塞操作状态;而WAIT命令本质上也是客户端对象上的阻塞操作,blockingState
也同样包含了这部分的信息:
struct blockingState {mstime_t timeout;...int numreplicas;long long reploffset;...};
这里:
blockingState.numreplicas
,这字段表示执行WAIT命令的客户端等待的Slave的数量。blockingState.reploffset
,这个字段则是表示该客户端等待的Slave实例至少要确认reploffset
这么多偏移量的命令数据。
在前面介绍阻塞操作时,我们讲解过,redisDb.blocking_keys
用于记录阻塞在给定Key上的客户端对象client
的列表。再主从复制之中,Redis在服务器全局变量之中维护了一个双端链表来记录当前通过WAIT命令进入阻塞状态的客户端对象client
:
struct redisServer{...list *clients_waiting_acks;int get_ack_from_slaves;...};
当Master实例收到用户的WAIT命令时会调用waitCommand
函数来处理这个命令:
void waitCommand(client *c);
该函数会执行如下的处理逻辑:
解析命令参数,包括等待时间、等待Slave实例的个数,同时选择该用户对应客户端对象
client
的client.woff
作为其等待的复制偏移量的值。如果该客户端关注的复制偏移量已经被满足条件的Slave所接收,那么该函数将不会阻塞客户端,而是直接返回。
使用上面的数据初始化该客户端对象的
blockingState
信息,将该客户端加入服务器的等待链表redisServer.clients_waiting_acks
之中。调用
blockClient
接口将该客户端对象设置为阻塞状态。调用
replicationRequestAckFromSlaves
,将redisServer.get_ack_from_slaves
字段设置为1,这样在beforeSleep
调用之中,Redis会检查该字段,并向所有的Slave发送GETACK命令,要求它们强制通告自己已经处理的复制偏移量。
当有足够对的Slave实例向Master通报了自己的复制偏移量时,在beforeSleep
接口之中会调用processClientsWaitingReplicas
函数,将符合解锁条件的客户端对象从WAIT命令的阻塞状态之中解除。
复制心跳机制
最后我们在来看一下主从复制之中的心跳机制:
void replicationCron(void);
replicationCron
这个函数接口会在Redis的事件循环心跳函数serverCron
之中以1秒1次的频率被调用执行。
Slave实例通过这个函数接口可以完成:
在REPLICAOF命令执行结束之后,在下一次心跳之中通过
syncWithMaster
接口,发起与Master建立主从复制连接的过程。通过
replicationSendAck
定期通报自己已经处理的复制偏移量的值。
Master实例则可以通过这个函数接口完成:
在长时间没有Slave连接的情况下,通过调用
freeReplicationBacklog
释放积压缓冲区,清理内存。在心跳之中,为Slave数据同步开启BGSAVE过程。
调用
refreshGoodSlavesCount
来刷新健康的Slave实例的个数。
以上便是对于Redis主从复制机制的一个简要的介绍。




