Spark DAG之处理SubmittedJob并划分Stage


文章目录

  • 概要
  • 1. Stage介绍
      • 1.1 父类Stage定义
      • 1.2 子类ShuffleMapStage、ResultStage
  • 2. 处理SubmittedJob事件
  • 3. 划分Stage
  • 致谢
  • 附录

概要 介绍Stage的定义,DAGScheduler划分Stage流程。
1. Stage介绍 原理相关的介绍,我们在RDD运行原理一文中有介绍过,这里再啰嗦一下:
一个job通常包含一个或多个stage。各个stage之间按照顺序执行。上面已经说过,一个job会有多个算子操作。这些算子都是将一个父RDD转换成子RDD。这个过程中,会有两种情况:父RDD中的数据是否进入不同的子RDD。如果一个父RDD的数据只进入到一个子RDD,比如map、union等操作,称之为narrow dependency(窄依赖)。否则,就会形成wide dependency( 宽依赖),一般也成为shuffle依赖,比如groupByKey等操作。
job中stage的划分就是根据shuffle依赖进行的。shuffle依赖是两个stage的分界点。shuffle操作一般都是任务中最耗时耗资源的部分。因为数据可能存放在HDFS不同的节点上,下一个stage的执行首先要去拉取上一个stage的数据(shuffle read操作),保存在自己的节点上,就会增加网络通信和IO。Shuffle操作其实是一个比较复杂的过程,这里暂且不表。
1.1 父类Stage定义
查看父类Stage的说明:
Spark DAG之处理SubmittedJob并划分Stage
文章图片

查看类定义:
Spark DAG之处理SubmittedJob并划分Stage
文章图片

Stage中有两个重要属性,rddparents,分别记录的是切分处的RDD父Stage信息,这一点结合我后面的例子更好理解。
Spark DAG之处理SubmittedJob并划分Stage
文章图片

1.2 子类ShuffleMapStage、ResultStage
Stage有两个子类,ShuffleMapStage、ResultStage,两者分别增加了一个重要属性信息,如下:
Stage类别 差异属性 作用
ShuffleMapStage shuffleDep: ShuffleDependency 保存Dependency信息
ResultStage func: (TaskContext, Iterator[_]) => _ 保存action对应的处理函数
2. 处理SubmittedJob事件 上一篇博客Spark DAG之SubmitJob最后讲到调用DAGSchedulerhandleJobSubmitted方法处理JobSubmitted事件,查看该方法 :
Spark DAG之处理SubmittedJob并划分Stage
文章图片

如上图注释处,handleJobSubmitted方法主要职责如下
  1. 调用createResultStage()方法,划分DAG生成Stage。
  2. 创建ActiveJob,并添加到对应集合管理。
  3. 调用submitStage()提交Stage。
3. 划分Stage
首先说明下,在一个Job的DAG中(,例如
Spark DAG之处理SubmittedJob并划分Stage
文章图片

中,存在了多个Stage : 四个ShuffleMapStage、一个ResultStage
在这里:
  • SMS1、SMS3是RS的parent
  • SMS0、SMS2是RS的ancestor
【Spark DAG之处理SubmittedJob并划分Stage】细说下ResultStage的创建流程:
  1. 在createResultStage中要创建并返回ResultStage,因此我们需要先获取ShuffleDependencies,根据这些依赖来获取或者创建ShuffleMapStages-----这些是它的parents。(也即是我们就需要去获取上图中的SMS1、SMS3。)
  2. 而在创建ShuffleMapStages(SMS1、SMS3)之前,会检查,发现它会去使用到ancestor(SMS0、SMS2)的数据,因此首先会通过依赖关系创建SMS0、SMS2(如果不存在),然后才来创建SMS1、SMS3。
    Spark DAG之处理SubmittedJob并划分Stage
    文章图片

    Spark DAG之处理SubmittedJob并划分Stage
    文章图片

    Spark DAG之处理SubmittedJob并划分Stage
    文章图片
到这里,就是创建createShuffleMapStage来创建ShuffleMapStage了
那么再说下ShuffleMapStage创建流程:与ResultStage类似,除了变成了
  1. 在createShuffleMapStage
  2. 在实际创建ShuffleMapStages时,其实与创建ResultStage是很类似的,只不过同样调用的是getOrCreateParentStages()方法,然后后面的流程完全一样。
    < 在任何ShuffleMapStages创建时,它都会被添加到shuffleIdToMapStage这个Map中进行维护,方便后续使用。>
    Spark DAG之处理SubmittedJob并划分Stage
    文章图片

    具体的例子这里不赘述,可以参考文章Spark DAG之划分Stage
致谢 -Spark DAG之划分Stage
  • 理解spark中的job、stage、task
附录
-----------------DAGScheduler.scala handleJobSubmitted()、--------------------------------- private[scheduler] def handleJobSubmitted( jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try {// 例如,如果在已删除其基础HDFS文件的HadoopRDD上运行作业,则新stage创建可能会引发异常。 // 1. 划分Stage,返回ResultStage,和RDD保存父RDD类似,Stage使用parents属性保存父Stage finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: BarrierJobSlotsNumberCheckFailed => logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + "than the total number of slots in the cluster currently.") // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, new BiFunction[Int, Int, Int] { override def apply(key: Int, value: Int): Int = value + 1 }) if (numCheckFailures <= maxFailureNumTasksCheck) { messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties)) }, timeIntervalNumTasksCheck, TimeUnit.SECONDS ) return } else { // Job failed, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) listener.jobFailed(e) return }case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } // Job submitted, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId)val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage))val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //提交Stage submitStage(finalStage) }--------DAGScheduler.scala ResultStage生成相关----------------------------------- /** * 创建与提供的jobId关联的ResultStage。 */ private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) // 1. 获取parents,它是ShuffleMapStages集合 val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() // 2. 根据parents来创建ResultStage val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 3. 将Stage添加到HashMap中:stageIdToStage stageIdToStage(id) = stage // 4. 更新JobId到StageId的映射 updateJobIdStageIdMaps(jobId, stage) // 5. 返回Stage stage } ------------------------ //*获取或创建给定RDD的父级列表。 将使用提供的firstJobId创建新的阶段。 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { // 通过ShuffleDependencies来创建ShuffleMapStage shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList } ----------------------------------- //*如果shuffleIdToMapStage中存在一个shuffle map stage,则获取它。 否则,如果 //shuffle map stage尚不存在,此方法还将创建shuffle map阶段,除非缺少了祖先shuffle map stage。 private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { // !!!这里,就会shuffleIdToMapStage这个Map中会去到shuffleDep.shuffleId对应的ShuffleMapStage, // 1. 如果已经存在了,那么返回已存在的那个stage // 2. 如果不存在,那么就为此shuffle来创建一个ShuffleMapStage, //创建过程中就会添加到shuffleIdToMapStage这个Map中,以便后面使用 shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // A.为所有缺少的祖先shuffleDep项创建Stage。这是容错机制。 getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => // 尽管getMissingAncestorShuffleDependencies仅返回shuffleIdToMapStage中尚未存在的shuffle依赖项, // 但是当我们到达foreach循环中的特定依赖项时,它可能会通过stage创建过程添加到shuffleIdToMapStage中以用于早期依赖项。 // 有关更多信息,请参见SPARK-13902。 if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // B.为祖先创建Stage之后,为当前给定的shuffleDep创建一个阶段。 createShuffleMapStage(shuffleDep, firstJobId) } }----------------------------------DAGScheduler.scala 创建ShuffleMapStage----------------------------//*创建一个ShuffleMapStage,生成给定的shuffle依赖项的分区。 //如果先前运行的阶段生成相同的shuffle数据,则此函数将复制仍可从先前的shuffle获得的输出位置,以避免不必要地重新生成数据。 def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length // 1. 获取parents,它是ShuffleMapStages集合 val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() // 2. 根据parents来创建ShuffleMapStage val stage = new ShuffleMapStage( id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker) // 3. 将Stage添加到两个HashMap中:stageIdToStage、shuffleIdToMapStage stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage // 4. 更新JobId到StageId的映射 updateJobIdStageIdMaps(jobId, stage) if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { //有点难看:需要在这里注册RDD与缓存和映射输出跟踪器,因为我们无法在RDD构造函数中执行它,因为分区的#是未知的 logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } // 5. 返回Stage stage } ----------------Stage.scala-------------------- /** * 一个stage是一组并行任务,它们都计算需要作为Spark作业的一部分运行的相同功能,其中所有任务都具有相同的shuffle依赖性。 * 由调度程序运行的每个DAG任务在发生shuffle的边界处被分成多个stage,然后DAGScheduler以拓扑顺序运行这些stage。 * * 每个Stage都可以是一个shuffle map stage,在这种情况下,其任务的结果是为其他stage或结果stage输入的, * 在这种情况下,它的任务直接计算Spark action(例如count(),save( )等,通过在RDD上运行一个函数。 * 对于随机映射stage,我们还跟踪每个输出分区所在的节点。 * * 每个stage还有一个firstJobId,用于识别首次提交stage的作业。使用FIFO调度时,这允许首先计算先前作业的stage,或者在失败时更快地恢复。 * * 最后,由于故障恢复,可以多次尝试重新执行单个stage。在这种情况下,Stage对象将跟踪多个StageInfo对象以传递给侦听器或Web UI。 * 最新的一个将通过latestInfo访问。 * * @param id: 唯一的stageID * @param rdd: 这个stage运行的RDD:对于一个shuffle map stage,它是我们运行map任务的RDD,而对于result stage,它是我们运行一个action的目标RDD * @param numTasks: stage的任务总数; 特别是result stage可能不需要计算所有分区,例如对于first(),lookup()和take()。 * @param parents: 这个stage依赖的stages列表(通过shuffle依赖)。 * @param firstJobId: 这个stage的第一个工作的ID,用于FIFO调度。 * @param callSite: 与此stage关联的用户程序中的位置:创建目标RDD的位置,shuffle map stage或调用result stage的操作的位置。 */ private[scheduler] abstract class Stage( val id: Int, val rdd: RDD[_], val numTasks: Int, val parents: List[Stage], val firstJobId: Int, val callSite: CallSite) extends Logging {val numPartitions = rdd.partitions.length/** Set of jobs that this stage belongs to. */ val jobIds = new HashSet[Int]/** The ID to use for the next new attempt for this stage. */ private var nextAttemptId: Int = 0val name: String = callSite.shortForm val details: String = callSite.longForm/** * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts * have been created). */ private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)/** * Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid * endless retries if a stage keeps failing. * We keep track of each attempt ID that has failed to avoid recording duplicate failures if * multiple tasks from the same stage attempt fail (SPARK-5945). */ val failedAttemptIds = new HashSet[Int]private[scheduler] def clearFailures() : Unit = { failedAttemptIds.clear() }/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ def makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { val metrics = new TaskMetrics metrics.register(rdd.sparkContext) _latestInfo = StageInfo.fromStage( this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId += 1 }/** Returns the StageInfo for the most recent attempt for this stage. */ def latestInfo: StageInfo = _latestInfooverride final def hashCode(): Int = idoverride final def equals(other: Any): Boolean = other match { case stage: Stage => stage != null && stage.id == id case _ => false }/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ def findMissingPartitions(): Seq[Int] }

    推荐阅读