暂无图片
暂无图片
暂无图片
暂无图片
1
暂无图片

MapReduce|MR程序如何提交到Yarn集群

大数据记事本 2021-07-23
1203
一、概述
    当我们编写了一个 MapReduce 应用程序,通过如下命令提交执行时,该任务是如何提交到 Yarn 集群的呢?
    hadoop jar example.jar WordCount /file/input  /file/output
    复制
        这里 example.jar 是提交的 jar 包名称,WordCount 为 MainClass 名称,后面两个参数分别为输入路径和输出路径。
    二、流程分析
        当 hadoop 脚本后面输入 "jar" 参数时,对应执行的类为 
    org.apache.hadoop.util.RunJar:
      elif [ "$COMMAND" = "jar" ] ; then
      CLASS=org.apache.hadoop.util.RunJar
      复制
      在 RunJar 的 main 方法中,构建了一个 RunJar 实例,并调用其 run() 方法
        public static void main(String[] args) throws Throwable {
        //构建一个RunJar对象并执行run方法
        new RunJar().run(args);
        }
        复制
        run() 方法逻辑如下:
        • 获取 jar 文件,即 example.jar
        • 拿到 jar 文件中的 MainClass 名称,即 WordCount.class
        • 解压 jar 文件
        • 通过反射拿到 MainClass 实例对象
        • 拿到 main 方法的实例
        • 通过反射调用 main 方法,也就是我们编写的应用程序的 mian 方法
        • 在应用程序的 main 方法中,通过 Job.waitForCompletion() 等待应用程序执行完毕
          public void run(String[] args) throws Throwable {
          String usage = "RunJar jarFile [mainClass] args...";
          ...
          //获取到Jar文件,即提交的example.jar文件
          JarFile jarFile;
          try {
          jarFile = new JarFile(fileName);
          } catch(IOException io) {
          throw new IOException("Error opening job jar: " + fileName)
          .initCause(io);
          }


          //拿到Jar文件的Main Class,如WordCount.class
          Manifest manifest = jarFile.getManifest();
          if (manifest != null) {
          mainClassName = manifest.getMainAttributes().getValue("Main-Class");
          }
          jarFile.close();
          ...
          mainClassName = mainClassName.replaceAll("/", ".");


          File tmpDir = new File(System.getProperty("java.io.tmpdir"));
          ensureDirectory(tmpDir);
          //声明一个临时文件
          final File workDir;
          try {
          workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
          } catch (IOException ioe) {
          ...
          }
          ...
          //解压Jar文件
          unJar(file, workDir);


          ClassLoader loader = createClassLoader(file, workDir);
          Thread.currentThread().setContextClassLoader(loader);


          //通过反射拿到MainClass实例
          Class<?> mainClass = Class.forName(mainClassName, true, loader);


          //拿到main方法的实例
          Method main = mainClass.getMethod("main", new Class[] {
          Array.newInstance(String.class, 0).getClass()
          });
          String[] newArgs = Arrays.asList(args)
          .subList(firstArg, args.length).toArray(new String[0]);
          try {
          //通过反射调用main方法
          main.invoke(null, new Object[] { newArgs });
          } catch (InvocationTargetException e) {
          throw e.getTargetException();
          }
          /**
          * TODO 到此为止,跳转到Driver的main方法,也就是自己编写的应用程序的main方法
          * 最后调用Job.waitForCompletion等待应用程序执行完毕
          */
          }
          复制
          Job.waitForCompletion() 方法:
              该方法最主要的作用就是通过 submit() 将任务提交到集群并等待完成
            public boolean waitForCompletion(boolean verbose
            ) throws IOException, InterruptedException,
            ClassNotFoundException {
            if (state == JobState.DEFINE) {
            //TODO 提交任务
            submit();
            }
            //打印执行的进度信息
            ...
            return isSuccessful();
            }
            复制
            对于 Job.submit() 方法,主要做了 5 件事:
            • 设置当前Job的状态为DEFINE
            • 设置启用NewAPI,2.x之后属于NewAPI
            • 连接Yarn集群,获取到ResourceManager的代理对象
            • 获取一个作业提交器,将作业提交到 Yarn 集群
            • 提交之后,设置MR的状态为RUNNING
              public void submit() 
              throws IOException, InterruptedException, ClassNotFoundException {
              //1.设置当前Job的状态为DEFINE
              ensureState(JobState.DEFINE);
              //2.设置启用NewAPI,2.x之后属于NewAPI
              setUseNewAPI();
              //3.获取提交客户端,连接Yarn集群,获取到ResourceManager的代理对象
              connect();
              //4.获取一个提交器
              //这里的cluster就是上面初始化的Cluster对象
              //cluster.getFileSystem() ==> HDFS
              //cluster.getClient() ==> YARNRunner
              final JobSubmitter submitter =
              getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
              status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
              public JobStatus run() throws IOException, InterruptedException,
              ClassNotFoundException {
              //调用提交器的submitJobInternal方法执行Job的提交
              return submitter.submitJobInternal(Job.this, cluster);
              }
              });
              //5.提交之后,设置MR的状态为RUNNING
              state = JobState.RUNNING;
              LOG.info("The url to track the job: " + getTrackingURL());
              }
              复制
                  其中,最重要的是第3和第4步,下面进行详细分析:

              第3步:连接Yarn集群,获取到ResourceManager的代理对象

                  ResourceManager 的代理对象为 ApplicationClientProtocol 实例,其和 Job 类的关系如下:
              1. Job 的内部有一个 Cluster 类型的 cluster 成员变量 

              2. Cluster 的内部有一个 YARNRunner 类型的(ClientProtocol实现类) client 的成员变量 

              3. YARNRunner 的内部有一个 ResourceMgrDelegate 类型的 resMgrDelegate 成员变量 

              4. ResourceMgrDelegate 的内部有一个 YarnClientImpl 类型的(YarnClient子类) client 成员变量

              5. YarnClientImpl 的内部有一个 ApplicationClientProtocol 类型的 rmClient 成员变量

                  a .在 connect() 方法中初始化了 Cluster 对象

                private synchronized void connect()
                throws IOException, InterruptedException, ClassNotFoundException {
                if (cluster == null) {
                cluster =
                ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                public Cluster run()
                throws IOException, InterruptedException,
                ClassNotFoundException {
                //初始化了Cluster对象
                return new Cluster(getConfiguration());
                }
                });
                }
                }
                复制
                    b.在 Cluster 初始化的过程中,调用了其 initialize() 方法,内部初始化了 ClientProtocol 实例,ClientProtocol 本身是一个接口,具体的实现类有:
                • 本地模式:LocalJobRunner

                • Yarn模式:YARNRunner

                由于我们这里提交到 Yarn,所以这里的实现类是 YARNRunner
                  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
                  throws IOException {


                  synchronized (frameworkLoader) {
                  for (ClientProtocolProvider provider : frameworkLoader) {
                  LOG.debug("Trying ClientProtocolProvider : "
                  + provider.getClass().getName());
                  ClientProtocol clientProtocol = null;
                  try {
                  if (jobTrackAddr == null) {
                  //local模式
                  //provider=LocalClientProtocolProvider
                  //clientProtocol=LocalJobRunner
                  clientProtocol = provider.create(conf);
                  } else {
                  //Yarn模式
                  //provider=YarnClientProtocolProvider
                  //clientProtocal=YARNRunner
                  clientProtocol = provider.create(jobTrackAddr, conf);
                  }
                  ...
                  }
                  复制
                      c.在 YARNRunner 的构造方法中,又创建了 ResourceMgrDelegate 实例对象
                    public YARNRunner(Configuration conf) {


                    this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
                    }
                    复制
                        d.同样,在 ResourceMgrDelegate 的构造方法中,创建了 YarnClient 实例,具体的实现类是 YarnClientImpl
                      public ResourceMgrDelegate(YarnConfiguration conf) {
                      super(ResourceMgrDelegate.class.getName());
                      this.conf = conf;
                      //创建一个YarnClient,实现类是YarnClientImpl
                      this.client = YarnClient.createYarnClient();
                      init(conf);
                      start();
                      }
                      复制
                          e.在 YarnClientImpl 的属性中,有一个 ApplicationClientProtocol 类型的变量 rmClient,该对象就是 ResourceManager 的代理对象,通过该代理对象实现对 ResourceManager 的 RPC 调用
                        //ResourceManager的代理对象
                        protected ApplicationClientProtocol rmClient;
                        复制
                            综上,步骤3的作用主要是初始化了和 ResourceManager 相关的一系列实例对象

                        第4步:获取一个作业提交器,将作业提交到 Yarn 集群

                            这里的作业提交器为 JobSubmitter 实例对象,通过调用其 submitJobInternal() 方法进行作业的提交,该方法代码较多,分三部分进行分析:
                        • 生成或验证各种路径,将作业所需的资源进行上传
                        • 对输入数据源进行逻辑切片,并设置Map Task的数量
                        • 进行作业的提交
                        第一部分:生成或验证各种路径,将作业所需的资源进行上传
                          //验证作业的输出路径是否存在,如果存在会报错,
                          //正常情况是我们进行了配置,但是该路径在作业提交时是不存在的
                          checkSpecs(job);


                          //添加应用框架的路径到分布式缓存中
                          //DistributedCache:把应用的一些资源添加到分布式缓存中,那么程序在执行的时候,
                          //无论有多少个节点启动任务,这些节点都会自动把分布式缓存中的各种信息
                          //(小数据文件,配置信息...)同步到节点本地
                          addMRFrameworkToDistributedCache(conf);


                          //通过静态方法getStagingDir()获取作业执行时相关资源的存放路径
                          //默认是/tmp/hadoop-yarn/staging/提交作业用户名{$user}/.staging
                          Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);


                          //TODO 记录作业提交的主机IP,主机名,并设置配置信息
                          if (ip != null) {
                          submitHostAddress = ip.getHostAddress();
                          submitHostName = ip.getHostName();
                          conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
                          conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
                          }


                          //生成JobID
                          JobID jobId = submitClient.getNewJobID();
                          job.setJobID(jobId);


                          //构造提交作业的路径,jobStagingArea 后面拼接 jobID
                          //当提交一个 Job时,YARN的客户端会把该Job的一切要用的资源初始化并且存储在HDFS
                          //的工作目录中,以后哪个节点要执行 Task,就从HDFS 目录中拉取资源文件
                          //主要包括以下三类资源:
                          //1.xxx.jar
                          //2.job.xml
                          //3.shell启动命令
                          Path submitJobDir = new Path(jobStagingArea, jobId.toString());


                          try {
                          //将jar文件和配置文件上传到上面获取的资源的提交目录 submitJobDir
                          copyAndConfigureFiles(job, submitJobDir);


                          //上传完成后获取完整的配置文件所在路径,也就是job.xml的路径
                          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);


                          ...
                          复制

                          第二部分:对输入数据源进行逻辑切片,并设置Map Task的数量

                                ...
                            //获取逻辑切片的数量
                            int maps = writeSplits(job, submitJobDir);
                            //配置需要启动的 Map Task 的个数
                            conf.setInt(MRJobConfig.NUM_MAPS, maps);
                            ...
                            复制
                            其中,获取切片数量通过 writeSplits() 方法,其内部调用了 writeNewSplits() 方法
                              private <T extends InputSplit>
                              int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
                              InterruptedException, ClassNotFoundException {
                              Configuration conf = job.getConfiguration();
                              //TODO 根据配置的InputFormat类,通过反射创建对应的实例对象
                              // 默认为TextInputFormat,是FileInputFormat的子类
                              InputFormat<?, ?> input =
                              ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
                              //TODO 核心方法,InputSplit就是切片对象,getSplits() 方法返回切片对象的集合
                              // 这里的getSplits()在FileInputFormat中实现,TextInputFormat并未进行重写
                              List<InputSplit> splits = input.getSplits(job);
                              //将集合转为数组
                              T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);


                              //对数组中的元素根据切片大小进行排序
                              Arrays.sort(array, new SplitComparator());
                              //TODO 将逻辑切片信息写入Job提交的HDFS目录中
                              JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
                              jobSubmitDir.getFileSystem(conf), array);
                              //返回切片数量
                              return array.length;
                              }
                              复制
                                  这里切片的逻辑调用的是 InputFormat.getSplits() 方法,InputFormat 是一个抽象类,getSplits() 方法的具体逻辑在抽象子类 FileInputFormat 中实现,虽然这里的 input 变量对应的实例对象是 TextInputFormat 类型,但该类并未重写 getSplits() 方法。当获取到切片集合后,转为数组并按切片大小进行排序,并将切片信息写入 Job 提交的 HDFS 目录中。getSplits() 方法逻辑如下:
                                public List<InputSplit> getSplits(JobContext job) throws IOException {
                                ...
                                //初始化切片对象的集合容器
                                List<InputSplit> splits = new ArrayList<InputSplit>();
                                List<FileStatus> files = listStatus(job);
                                //遍历输入文件
                                for (FileStatus file: files) {
                                Path path = file.getPath();
                                //拿到文件的大小
                                long length = file.getLen();
                                if (length != 0) {
                                BlockLocation[] blkLocations;
                                if (file instanceof LocatedFileStatus) {
                                blkLocations = ((LocatedFileStatus) file).getBlockLocations();
                                } else {
                                FileSystem fs = path.getFileSystem(job.getConfiguration());
                                blkLocations = fs.getFileBlockLocations(file, 0, length);
                                }
                                //TODO 判断文件是否可以进行切分
                                // 如果可以进行切分
                                if (isSplitable(job, path)) {
                                //获取配置的块大小
                                long blockSize = file.getBlockSize();
                                //计算逻辑切片的大小
                                long splitSize = computeSplitSize(blockSize, minSize, maxSize);


                                //bytesRemaining => 文件剩余大小
                                long bytesRemaining = length;
                                //TODO 判断文件的剩余大小是否大于切片大小的 1.1 倍
                                while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                                //进行切片
                                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                blkLocations[blkIndex].getHosts(),
                                blkLocations[blkIndex].getCachedHosts()));
                                //更新剩余文件大小
                                bytesRemaining -= splitSize;
                                }
                                //TODO 如果剩余大小小于切片大小的 1.1 倍,且剩余大小不为0,那么将剩余的文件作为一个逻辑切片
                                if (bytesRemaining != 0) {
                                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                                blkLocations[blkIndex].getHosts(),
                                blkLocations[blkIndex].getCachedHosts()));
                                }
                                //TODO 如果无法切分,将文件整体作为一个逻辑切片
                                } else { // not splitable
                                splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                                blkLocations[0].getCachedHosts()));
                                }
                                //如果文件大小为0,不需要进行切片,添加一个空的逻辑切片
                                } else {
                                //Create empty hosts array for zero length files
                                splits.add(makeSplit(path, 0, length, new String[0]));
                                }
                                }
                                ...
                                }
                                return splits;
                                }
                                复制
                                这里进行逻辑切片的逻辑如下:
                                • 遍历输入的所有文件,获取文件大小
                                • 如果文件大小为 0 ,则不需要进行切片,直接添加一个空的逻辑切片
                                • 如果文件大小不为 0,判断该文件是否可以进行切分,如果不可以切分则将整体文件作为一个逻辑切片
                                • 如果文件可以进行切分,首先计算逻辑切片的大小,然后对文件进行切片,并更新剩余文件大小
                                • 如果剩余文件大小大于逻辑切片大小的 1.1 倍,则一直进行切分,直到该条件不满足位置
                                • 如果剩余文件不满足 1.1 倍逻辑切片大小,且剩余文件大小不为0,则将剩余文件作为一个逻辑切片
                                • 返回逻辑切片的集合,这里的逻辑切片用 InputSplit 对象表示
                                    当逻辑切片完成后,会将解析该 Job 生成的配置文件,写入前面获取的 job.xml 文件中
                                  //将解析该job生成的配置文件,写入到HDFS上
                                  writeConf(conf, submitJobFile);
                                  复制

                                  第三部分:进行作业的提交

                                    //这里的 submitClient 为 YARNRunner 实例
                                    status = submitClient.submitJob(
                                    jobId, submitJobDir.toString(), job.getCredentials());
                                    复制
                                        这里作业提交调用了 YARNRunner.submitJob() 方法,内部依次调用了 :
                                        ResourceMgrDelegate.submitApplication() -> 
                                        YarnClientImpl.submitApplication() -> 
                                        ApplicationClientProtocol.submitApplication() 。
                                      public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {
                                      ...
                                      try {
                                      //TODO 提交应用并获取 ApplicationId
                                      ApplicationId applicationId =
                                      resMgrDelegate.submitApplication(appContext);
                                      ...
                                      }
                                      ...
                                      }
                                      复制
                                          上面提到,ApplicationClientProtocol 就是 ResourceManager 的代理对象,调用其 submitApplication() 方法,实际方法的执行是通过 ResourceManager 内部的成员变量 clientRM 执行的,即执行 ClientRMService.submitApplication() 方法。
                                          至此,Job 已经提交到 Yarn 集群,后面任务的调度和执行则由 Yarn 负责,具体流程后续进行分析。
                                      文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                      评论

                                      西
                                      西北喜羊羊
                                      暂无图片
                                      2年前
                                      评论
                                      暂无图片 0
                                      大佬你只在这里发布吗?
                                      2年前
                                      暂无图片 点赞
                                      评论