Spark作业通过资源调度系统获取了计算资源,然后即开始调度计算任务来执行实际的数据处理(比如ETL、机器学习、图计算),本文继续来解析Spark任务调度的相关处理过程和原理。
Action触发任务调度
Spark是惰性计算模式,所有的transformation算子的实际执行都是通过action算子来触发的;action算子是划分job的分界,因此本文的任务调度以job为单位来解析Spark作业任务调度的实现。
RDD的action算子(例如:collect、count、take、foreachPartition等)都会调用SparkContext的runJob方法来提交一个job,以collect算子为例,该算子的实现如下:
def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*)}
在runJob方法中实际是调用了DAGScheduler的submitJob方法,然后通过eventProcessLoop对象将作业提交的事件放到了事件队列eventQueue中:
eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, Utils.cloneProperties(properties)))
eventProcessLoop是在DAGScheduler初始化时创建的事件调度对象,该对象启动了一个线程不断从eventQueue中获取事件来处理:
override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } ...
拿到JobSubmitted的事件后由EventLoop的子类DAGSchedulerEventProcessLoop调用handleJobSubmitted方法来进行处理,该方法的处理逻辑主要分为两步,第一步是划分stage构建DAG有向无环图,第二步则是发起任务调度,本文的主要内容也将围绕这两点来展开:
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties): Unit = { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } ... submitStage(finalStage) }
划分Stage
DAG是一个有向无环图,Spark根据各RDD的数据来源、算子以及分区器等要素构建出不同类型的RDD对象(例如HadoopRDD、ShuffledRDD)、子RDD与父RDD之间的依赖关系集(OneToOneDependency、ShuffleDependency、RangeDependency)以及RDD分片的数据本地性属性等,进而构建出整个作业的DAG。
由于Spark的stage是以shuffle也即宽依赖为边界进行划分的,有了DAG接下来就可以根据RDD之间的依赖类型来划分stage了。
首先从result rdd开始向前回溯不断获取当前rdd的所有宽依赖列表,如果遍历到的依赖是宽依赖则放入parents HashSet中,如果遍历到的是窄依赖则继续遍历该窄依赖的rdd的所有dependency,直到找到下一个宽依赖并放到parents 列表中或者遍历了其所有父rdd无法找父rdd为止 ,得到rdd所有的shuffle依赖:
private[scheduler] def getShuffleDependenciesAndResourceProfiles( rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = { val parents = new HashSet[ShuffleDependency[_, _, _]] val resourceProfiles = new HashSet[ResourceProfile] val visited = new HashSet[RDD[_]] val waitingForVisit = new ListBuffer[RDD[_]] waitingForVisit += rdd while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { visited += toVisit Option(toVisit.getResourceProfile).foreach(resourceProfiles += _) toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.prepend(dependency.rdd) } } } (parents, resourceProfiles) }
然后根据该rdd的shuffleDeps列表中的每个shuffle依赖创建出相应的父stage:
private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]], firstJobId: Int): List[Stage] = { shuffleDeps.map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList}
其中getOrCreateShuffleMapStage方法采用递归的逻辑,如果一个shuffle依赖所属的rdd有未遍历到的shuffle依赖,则创建该依赖所属的stage;否则,继续遍历该rdd的所有shuffle依赖,并创建各个shuffle依赖所对应的父stage:
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) }}createShuffleMapStage(shuffleDep, firstJobId)
在createShuffleMapStage方法中可以看到stage的id是一个顺序递增的值id = nextStageId.getAndIncrement(),由于是递归创建stage因此越往前遍历stage的id越小,新创建的stage对象中的属性主要包括该stage的id、最后边的那个rdd及其分区数(决定了map任务数)、该stage所有父stage的集合、shuffle depenency等:
val parents = getOrCreateParentStages(shuffleDeps, jobId)val id = nextStageId.getAndIncrement()val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker, resourceProfile.id)stageIdToStage(id) = stageshuffleIdToMapStage(shuffleDep.shuffleId) = stage
当创建出了所有的ShuffleMapStage之后则会创建ResultStage:
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite, resourceProfile.id)stageIdToStage(id) = stage
到目前为止完成了handleJobSubmitted方法中的finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 的逻辑,该job的DAG也完成了构建,接下来通过调用submitStage(finalStage)方法开始作业所有stage及其任务的调度过程。
启动stage调度
submitStage方法也是一个递归,由于job中的stage已经划分完成并且从前到后进行了编码,在本方法的逻辑中如果当前stage的父stage列表不为空,则调用其所有父stage的submitStage方法;如果当前的stage的父stage为空则调用submitMissingTasks方法启动该stage中任务调度逻辑:
private def submitStage(stage: Stage): Unit = { ... if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } ...
启动了stage调度并没有真正地将任务分发到executor上去运行,接下来还需要创建任务集并根据特定的调度策略制定调度计划之后再执行任务的调度以及分发执行。
Stage之间的调度策略根据用户的配置来确定,包括FIFO和FAIR两种(默认是FIFO),并且在TaskScheduler初始化时就根据作业的配置创建了不同stage调度器和调度队列:
def initialize(backend: SchedulerBackend): Unit = { this.backend = backend schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") } } schedulableBuilder.buildPools()}
创建TaskSets
介绍了Stage的调度策略,接下来我们来看一下任务集创建的流程:
1.获取该stage中各个数据分片的数据本地性,从下面的代码中可见生成的结果集taskIdToLocations是一个key为分片id,value为本地性列表的HashMap:
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap }}
其中分片的本地性的计算方法主要通过getPreferredLocsInternal方法实现,如果RDD已被缓存则返回所有分片被缓存的TaskLocation信息并返回,如果RDD的分片本身具有本地性属性则返回其本地性的TaskLocation信息并返回,如果不满足以上两种情况则不断获取该RDD窄依赖对应的父RDD调用getPreferredLocsInternal方法继续计算,如果最后没有获得任何本地性信息则该RDD不具有本定性:
private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { ... val cached = getCacheLocs(rdd)(partition) if (cached.nonEmpty) { return cached } val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { return rddPrefs.map(TaskLocation(_)) } rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } } case _ => } Nil }
2.将待处理的task封装为一个TaskSet对象,该对象中携带了stageId、任务的代码闭包(包括rdd、stage的shuffle依赖关系或者action算子信息)、数据分片、分片的数据本地性信息等:
val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => ... new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => ... new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } }
任务的调度和执行
创建了TaskSet之后,TaskScheduler调用submitTasks方法来提交任务集,并在submitTasks方法中创建了TaskSetManager对象,在TaskSetManager对象的实例化过程中通过调用addPendingTasks方法将任务放到待调度的任务列表中,包括forExecutor、forHost、noPrefs、forRack和all多种列表,并且每个task根据本地性需求可以被放入到多个列表中,分别对应于PROCESS_LOCAL(executor级别), NODE_LOCAL(节点级别), NO_PREF(无本地化需求), RACK_LOCAL(机架级别), ANY(随机)这几个本地化级别。
接下将已构建好的TaskSetManager对象作为一个待调度的stage放到实现了特定调度策略的调度池中等待调度,这其实是任务调度的需求:
override def submitTasks(taskSet: TaskSet): Unit = { val tasks = taskSet.tasks this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) ... backend.reviveOffers()
我们来看看调度系统是如何来落地任务的调度需求的。在submitTasks方法的最后向SchedulerBackEnd发送ReviveOffers消息,SchedulerBackEnd在收到消息之后即调用CoarseGrainedSchedulerBackend的makeOffers方法来获取当前所有executor的资源可用情况,得到该作业包括executorId、host以及可用的core数为主要信息的WorkOffers对象,体现了该作业所有的executor可用于任务调度的资源情况:
private def makeOffers(): Unit = { // Make sure no executor is killed while some task is launching on it val taskDescs = withLock { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(isExecutorActive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores, Some(executorData.executorAddress.hostPort), executorData.resourcesInfo.map { case (rName, rInfo) => (rName, rInfo.availableAddrs.toBuffer) }, executorData.resourceProfileId) }.toIndexedSeq scheduler.resourceOffers(workOffers, true) } if (taskDescs.nonEmpty) { launchTasks(taskDescs) }}
值得一提的是,executorDataMap是一个key为executorId,value为ExecutorData的HashMap结构,用于在SchedulerBackEnd中维护当前spark作业的所有executor的通信地址信息以及executor的可用资源等信息。
接下来TaskScheduler调用resourceOffers方法根据一定的任务调度策略创建任务描述对象并触发任务的执行,resourceOffers方法的实现比较复杂,主要逻辑是逐个从待调度的任务集队列中取出一个taskset,然后根据该任务集的本地性级别从高到底调用resourceOfferSingleTaskSet方法:
val sortedTaskSets = rootPool.getSortedTaskSetQueue...for (taskSet <- sortedTaskSets) { ... for (currentMaxLocality <- taskSet.myLocalityLevels) { var launchedTaskAtCurrentMaxLocality = false do { val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, availableResources, tasks) launchedTaskAtCurrentMaxLocality = minLocality.isDefined launchedAnyTask |= launchedTaskAtCurrentMaxLocality noDelaySchedulingRejects &= noDelayScheduleReject globalMinLocality = minTaskLocality(globalMinLocality, minLocality) } while (launchedTaskAtCurrentMaxLocality) ...
在resourceOfferSingleTaskSet方法中遍历当前的workoffers列表,对于每一个workoffer对象调用TaskSetManager的resourceOffer方法创建出相应的TaskDescription对象,并调整其可用的资源情况:
for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host ... val (taskDescOption, didReject, index) = taskSet.resourceOffer(execId, host, maxLocality, taskResAssignments) for (task <- taskDescOption) { val (locality, resources) = if (task != null) { tasks(i) += task addRunningTask(task.taskId, execId, taskSet) (taskSet.taskInfos(task.taskId).taskLocality, task.resources) } ... availableCpus(i) -= taskCpus assert(availableCpus(i) >= 0) resources.foreach { case (rName, rInfo) => availableResources(i)(rName).remove(0, rInfo.addresses.size) } ...}
TaskDescription是任务调度的基本单元,具有executorId、host、任务id、本地化级别等信息;spark根据指定的executorId、host和可用的本地化级别从待调度的任务列表中选择一个本地化级别较高的、并且本地化需求和该executor一致的任务用于任务调度,并将该任务封装成TaskDescription对象,从而使得各个任务能够尽量调度到最近的executor上去执行,避免数据的磁盘IO以及网络分发导致的性能问题:
val taskDescription = dequeueTask(execId, host, allowedLocality) .map { case (index, taskLocality, speculative) => dequeuedTaskIndex = Some(index) ... prepareLaunchingTask( execId, host, index, taskLocality, speculative, taskResourceAssignments, curTime) } }
可见,真正的调度策略是TaskScheduler联合TaskSetManager对象实现的,将任务调度到哪个节点上受限于三个方面,第一是stage调度的优先级算法,第二是各个executor上的可用资源情况,第三是task自身的缓存或者本次性需求的情况;TaskScheduler根据这三点将待调度的任务集中的任务调度到最优的executor上去执行。
我们知道在spark on k8s存算分离的场景中,由于executor pod不运行在Hadoop集群的节点中,如果数据未经过broadcast或者cache到executor内存的情况下,任务将被随机分发到任意资源可用的pod上去运行。
创建了所有task的TaskDescription之后,在makeOffers方法的最后调用了launchTasks方法来启动task线程的执行,实际上是将task序列化之后发送LaunchTask消息给executor的RPC对象CoarseGrainedExecutorBackend:
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
CoarseGrainedExecutorBackend收到launchTask消息之后反序列化TaskDescription,并真正将任务调度到executor的线程池中去运行:
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { val tr = new TaskRunner(context, taskDescription, plugins) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) if (decommissioned) { log.error(s"Launching a task while in decommissioned state.") }}
在任务执行结束之后executor会调用statusUpdate方法,例如:在任务成功的情况下执行execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult),即向SchedulerBackEnd发送StatusUpdate指令,SchedulerBackEnd收到指令则更新任务所在executor上的可用资源(即增加其可用的core数),从而释放出计算资源用于其他任务的调度:
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)executorInfo.freeCores += taskCpusresources.foreach { case (k, v) => executorInfo.resourcesInfo.get(k).foreach { r => r.release(v.addresses) }}
总结
综上,Spark作业在DAGScheduler、SchedulerBackend、TaskScheduler和ExecutorBackend等对象的通力合作之下完成了作业的任务调度,首先构建作业DAG并划分Stage,接下来创建各stage的任务集并且结合stage的调度策略、executor的资源可用情况以及stage内部数据分片的本地性制定出具体的任务调度计划,然后再根据调度计划将任务调度到对应的executor线程池上去执行,从而完成整个作业的处理逻辑。
作者简介
焦媛,主要负责民生银行Hadoop大数据平台的生产运维工作,并负责HDFS和Spark相关开源产品的技术支持,以及Spark云原生技术的支持和推广工作。




