暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

「Flink」工具人Flink之主从选举

将咖啡转化为程序的工具人 2021-09-08
968

每一个工具人都是一个齿轮,他们有大有小,缺一不可;

健康的环境下,这些齿轮彼此依靠、共同承压、互为备份,推动团队不断地运转、逐步前进。

失控的环境中,无人给齿轮加油;随着不断地压榨,小齿轮一个接着一个磨损,最后只剩大齿轮苦苦支撑,慢慢生锈。当这个大齿轮出现单点故障时,整个团队就会出现雪崩。

Flink也是一个由很多组件一起运作的集群,其中一些重要的组件,比如:jobMaster,ResourceManager,Dispatcher等都受到了高可用的保护。Flink中主要使用了CuratorFramework 作为基于Zookeeper的选举与监听。

一,我们以JobMaster为例,看下Flink是如何实现的:

JobMasterServiceLeadershipRunner.class

JobMasterServiceLeadershipRunner.class 实现了两个接口,

其中LeaderContender接口主要在leader选举中使用,代表了参与leader竞争的角色,其实现类有JobMasterServiceLeadershipRunner、ResourceManager、DefaultDispatcherRunner、WebMonitorEndpoint,该接口中包含了两个重要的方法:

      1.  grantLeadership,表示leader竞选成功的回调方法

      2.  revokeLeadership,表示由leader变为非leader的回调方法

而JobManagerRunner中的接口负责Job的生命周期相关的服务,如启动一个JobMaster,并在启动时,将自身作为竞争者,传递给了leaderElectionService

        @Override
    public void start() throws Exception {
    LOG.debug("Start leadership runner for job {}.", getJobID());
    leaderElectionService.start(this);
    }

    leaderElectionService的实现类是DefaultleaderElectionService,它是Leader选举服务的实现类,它也有两个接口服务。

    LeaderElectionService主要提供了参与选举竞争的接口调用以及竞争结果的查询接口:


    LeaderElectionEventHandler主要负责Leader变化后的响应行为。

    在开始参加Leader选举时(DefaultleaderElectionService::start ),会通过选举驱动器工厂创建一个leaderElectionDriver,通过这个Driver工厂类,Flink将基于zookeeper的CuratorFramework的细节,与Flink本身做了解耦(这应该最近的版本做的改动,工具人看的是1.13版本的)。

    并将自身作为一个LeaderElectionEventHandler传入leaderElectionDriver



      @Override
      public final void start(LeaderContender contender) throws Exception {
      checkNotNull(contender, "Contender must not be null.");
      Preconditions.checkState(leaderContender == null, "Contender was already set.");


      synchronized (lock) {
      leaderContender = contender;
      leaderElectionDriver =
      leaderElectionDriverFactory.createLeaderElectionDriver(
      this,
      new LeaderElectionFatalErrorHandler(),
      leaderContender.getDescription());
      LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);


      running = true;
      }
      }

      ZooKeeperLeaderElectionDriver是一个LeaderLatchListener,选举成功会调用isLeader方法,由leader变为非leader调用notLeader方法;并且还同时是NodeCacheListener,要通过NodeCache方式添加了监控当前节点变化,当监听的节点发生变化时,则调用nodeChanged方法。

      ZooKeeperLeaderElectionDriver在构造时,启动了curator框架的leaderLatch和NodeCache。

            public ZooKeeperLeaderElectionDriver(
        CuratorFramework client,
        String latchPath,
        String leaderPath,
        LeaderElectionEventHandler leaderElectionEventHandler,
        FatalErrorHandler fatalErrorHandler,
        String leaderContenderDescription)
        throws Exception {
        this.client = checkNotNull(client);
        this.leaderPath = checkNotNull(leaderPath);
        this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
        this.leaderContenderDescription = checkNotNull(leaderContenderDescription);


        leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
        cache = new NodeCache(client, leaderPath);


        client.getUnhandledErrorListenable().addListener(this);


        running = true;


        leaderLatch.addListener(this);
        leaderLatch.start();


        cache.getListenable().addListener(this);
        cache.start();


        client.getConnectionStateListenable().addListener(listener);
        }

        选举成功后,一层一层回调用DefaultLeaderElectionService

          @Override
          public void isLeader() {
          leaderElectionEventHandler.onGrantLeadership();
          }

          DefaultLeaderElectionService响应选举成功,回调leaderContender.grantLeadership

                @Override
            @GuardedBy("lock")
            public void onGrantLeadership() {
            synchronized (lock) {
            if (running) {
            issuedLeaderSessionID = UUID.randomUUID();
            clearConfirmedLeaderInformation();


            if (LOG.isDebugEnabled()) {
            LOG.debug(
            "Grant leadership to contender {} with session ID {}.",
            leaderContender.getDescription(),
            issuedLeaderSessionID);
            }


            leaderContender.grantLeadership(issuedLeaderSessionID);
            } else {
            if (LOG.isDebugEnabled()) {
            LOG.debug(
            "Ignoring the grant leadership notification since the {} has "
            + "already been closed.",
            leaderElectionDriver);
            }
            }
            }
            }

            JobMasterServiceLeadershipRunner响应选举成功后,启动JobMaster。

                 @Override
              public void grantLeadership(UUID leaderSessionID) {
              runIfStateRunning(
              () -> startJobMasterServiceProcessAsync(leaderSessionID),
              "starting a new JobMasterServiceProcess");
              }

              这里做下小小的科普:在curatorFramework中,有两种选举模式:一种是使用LeaderSelector接口,这种方式在每个实例在获取领导权后,如果 takeLeadership(CuratorFramework client) 方法执行结束,将会释放其领导权,第二种方式就是Flink使用的LeaderLatch,获取领导权的节点在调用close之前会一直拥有领导权,所以更适合于Flink的运作场景。


              二,对于TaskExecutor来说,同样需要监听zookeeper的变化,来感知到JobMaster是否主从切换。

              在TaskExecutor中包含了一个JobLeaderService的对象属性jobLeaderService,在jobLeaderService中维护了所有的jobmaster leader,并且监听它,监听正是通过ZooKeeperLeaderRetrievalService完成。TaskExecutor在申请slot时会调用requestSlot方法,在该方法里面调用jobLeaderService.add方法,启动了leader寻回服务。

              JobLeaderService.class

                public void addJob(final JobID jobId, final String defaultTargetAddress) throws Exception {
                    Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running.");
                    LOG.info("Add job {} for job leader monitoring.", jobId);
                final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
                jobId,
                defaultTargetAddress);


                JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);


                final Tuple2<LeaderRetrievalService, JobManagerLeaderListener> oldEntry = jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener));


                if (oldEntry != null) {
                oldEntry.f0.stop();
                oldEntry.f1.stop();
                }


                leaderRetrievalService.start(jobManagerLeaderListener);
                }

                ZooKeeperLeaderRetrievalService同样继承了curatorFramework的NodeCacheListener,并在启动时注册回调监听

                    @Override
                  public void start(LeaderRetrievalListener listener) throws Exception {
                  Preconditions.checkNotNull(listener, "Listener must not be null.");
                  Preconditions.checkState(leaderListener == null, "ZooKeeperLeaderRetrievalService can " +
                  "only be started once.");


                  LOG.info("Starting ZooKeeperLeaderRetrievalService {}.", retrievalPath);


                  synchronized (lock) {
                  leaderListener = listener;


                  client.getUnhandledErrorListenable().addListener(this);
                  cache.getListenable().addListener(this);
                  cache.start();


                  client.getConnectionStateListenable().addListener(connectionStateListener);


                  running = true;
                  }
                  }

                  当JobMaster节点发生变化时,nodeChanged函数就会被回调,



                    @Override
                    public void nodeChanged() throws Exception {
                    synchronized (lock) {
                    if (running) {
                    try {
                              LOG.debug("Leader node has changed.");
                               .............
                    if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
                                Objects.equals(leaderSessionID, lastLeaderSessionID))) {
                    lastLeaderAddress = leaderAddress;
                    lastLeaderSessionID = leaderSessionID;
                                //核心逻辑
                    leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
                    }
                    } catch (Exception e) {
                    leaderListener.handleError(new Exception("Could not handle node changed event.", e));
                    throw e;
                    }
                    } else {
                    LOG.debug("Ignoring node change notification since the service has already been stopped.");
                    }
                    }
                    }

                    其中的JobLeaderService.JobManagerLeaderListener.notifyLeaderAddress将会被执行,其主要职责是与新的JobMaster建立连接rpcConnection。


                    好了今天的学习就到这里,最后,希望所有的工具人,不要学三国杀中的黄盖,为了不给桃的主公,寻找虚无缥缈的诸葛连弩,鞭挞自己,苦肉到死,流血而亡。


                    文章转载自将咖啡转化为程序的工具人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论