前言
1、节点角色
ZK本身的节点主要分为三类:
Leader:主要是负责数据的写入,如果超过半数同意,那么就广播进行写入;
Follower:主要负责查询请求并将写入请求发送给leader,参与选举和写入投票;
Observe:也是负责查询请求并将写入请求发送给leader,不参加投票,只被动接收结果
2、选举过程
2.1 胜出的条件
获取半数投票以上的节点成为leader节点。
2.2 比较的规则
万事万物都有一个准则,好的比较坏的,坏的比较更坏的,世上本没有痛苦,痛苦都是自己寻找的结果,海燕你可长点心吧,哎呀跑偏了。
(1)任期
(2)事务ID(ZK中的事务ID)
(3)节点编号(集群中每个节点的编号)
就这么简单?是的。规则就是这么简单,但是源码还是有那么一丢丢的绕。
3、代码逻辑综述
源码看着相对比较枯燥,但是作为一个手艺人,怎么能不去了解怎么做的呢,我们先来梳理一下代码的流程,方便更好的看第四部分内容。
节点先投自己一票,并进行广播
收到消息后
如果消息为空,就进行重新发送消息或者建立连接
如果消息不为空,且消息接收者和投票的leader都是合法节点就进行下 边步骤。
如果节点为looking节点
4、源码分析
选主的逻辑是在lookForLeader开始的,像金字塔的第一块砖一样,我们先看ZK选主的第一块砖lookForLeader,第一次看源码的时候一定要把握主线,忽略从线,等主线完全理清楚了之后才去处理从线,要不会陷入迷宫之中。
下边就是主要的投票代码,看里边的注释:
public Vote lookForLeader() throws InterruptedException {
//省略代码
synchronized (this) {
//1、投票轮数加一
logicalclock.incrementAndGet();
//2、更新提议或者叫投票,三个参数:当前节点的id、ZK的最大事务id、任期
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = {}, proposed zxid=0x{}",self.getId(),Long.toHexString(proposedZxid));
//发送通知
sendNotifications();
//省略后边代码
}
复制
更新投票或者投票的方法为:
synchronized void updateProposal(long leader, long zxid, long epoch) {
proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}
复制
发送通知的方法为:
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
//初始化待发送通知或者投票的报文、对象
ToSend notmsg = new ToSend(
ToSend.mType.notification,
proposedLeader, //投的leader
proposedZxid, //事务ID
logicalclock.get(), //投票或者选举的轮数
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, //任期
qv.toString().getBytes(UTF_8));
sendqueue.offer(notmsg);
}
}
复制
待到山花烂漫时,她在丛中笑,消息都已经发完了,肯定就到了接收到选票的时候应该怎么操作了,接收选票的代码也是在lookForLeader中:
//在服务没有停止并且当前节点的状态是LOOKING时进行while循环
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
//200毫秒内有数据就立马取出
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
//当取出的数据为空的时候
if (n == null) {
//判断是否所有的投票都已经发送,如果发送完毕就重新发送
if (manager.haveDelivered()) {
sendNotifications();
//如果没有发送完毕就建立连接
} else {
manager.connectAll();
}
notTimeout = Math.min(notTimeout << 1, maxNotificationInterval);
//self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)
//上面一行代码默认返回false,所以下边的if判断永远不可能进入,当没有收到选票时就等待200ms后继续循环,即空转一轮。
if (self.getQuorumVerifier() instanceof QuorumOracleMaj
&& self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval)) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
LOG.info("Notification time out: {} ms", notTimeout);
//当接收到的消息不为空,并且是接收者和待选leader都是有效节点时进入当前逻辑
} else if (validVoter(n.sid) && validVoter(n.leader)) {
//先查看投票节点的状态
switch (n.state) {
case LOOKING:
case OBSERVING:
case FOLLOWING:
case LEADING:
default:
LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
复制
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
复制
case FOLLOWING:
Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
if (resultFN == null) {
break;
} else {
return resultFN;
}
case LEADING:
Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
if (resultLN == null) {
break;
} else {
return resultLN;
}
复制
我们先把totalOrderPredicate方法放前边,这个其实就是选举leader的规则的实现。
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}
复制
case LOOKING:
//当前节点事务id为-1或者发送节点的事务id为-1时,忽略通知
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
//如果发送节点的轮数大于当前节点的轮数 ,证明当前投票落后了
if (n.electionEpoch > logicalclock.get()) {
//就把当前节点的投票轮数设置成和发送节点轮数一样
logicalclock.set(n.electionEpoch);
//并且清空收到的选票,落后之后选票就无效了
recvset.clear();
/**
*这里是关键:
*如果发送节点的选举的leader,按照规则比较后胜出
*就把当前节点的选票修改成和发送节点一样的,
*反之则把当前节点的选票保留,然后发送通知
*/
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
//如果当前节点的选举轮数比发送的大,证明接收到的选票无效
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
//如果选举轮数一样,就比较发送节点的选票和当前节点的选票
//按照规则发送节点的选择的leader胜出,就修改当前节点的选票为
//发送节点一样,然后发送通知
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
//该发的通知发送了,该比较的都比较清楚了,
//但是注意上边的代码,只有在选票不一致的时候才再次
//进行投票,如果一样的话就不再进行投票了。
//下边是把投票信息记录下来,为什么?因为要比较HALF,过半就选出
//leader了,别一直忙活呀
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//该方法的作用是选的leader的一样的节点加在一样,为了比较是否过半
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
//该方法就是比较选择同样leader的节点是否过半,如果过半就进入
//进行状态设置
if (voteSet.hasAllQuorums()) {
//这块并没有立马设置,而是又等了一轮(200ms),然后在这个200ms
//内又接收到新的投票,并且新投票的选择的leader节点胜出,那么就
//把这张投票放进队列中,进行下次循环,并跳出当前的计票或者选主流程
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
//如果200ms后没有收到新的投票,那么就进行节点状态的设置,
//proposedLeader与当前节点id一样,当前节点就是leader,否则依据
//是否为PARTICIPANT来判读为fllow或者observe节点
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
复制
结语
原创不易,肝了三夜才写完,如果文章对你的有帮助,欢迎点赞评论转发,最后如果你对技术有热爱,你对编程有热情,欢迎加入程序猿大家庭,咱们一起成长,一起进步。
如果你喜欢本文
请长按二维码,关注程序猿每日分享

转发至朋友圈,是对我最大的支持
点个在看
小时候有种感觉叫喜欢
长大后有种支持叫再看
💕💕💕💕💕💕