spark ShuffleMapStage 源码
spark ShuffleMapStage 代码
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.spark.scheduler
import scala.collection.mutable.HashSet
import org.apache.spark.{MapOutputTrackerMaster, ShuffleDependency}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.CallSite
* ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
* They occur right before each shuffle operation, and might contain multiple pipelined operations
* before that (e.g. map and filter). When executed, they save map output files that can later be
* fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
* and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
* ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
* For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
* there can be multiple ActiveJobs trying to compute the same shuffle map stage.
private[spark] class ShuffleMapStage(
id: Int,
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _],
mapOutputTrackerMaster: MapOutputTrackerMaster,
resourceProfileId: Int)
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite, resourceProfileId) {
private[this] var _mapStageJobs: List[ActiveJob] = Nil
* Partitions that either haven't yet been computed, or that were computed on an executor
* that has since been lost, so should be re-computed. This variable is used by the
* DAGScheduler to determine when a stage has completed. Task successes in both the active
* attempt for the stage or in earlier attempts for this stage can cause partition ids to get
* removed from pendingPartitions. As a result, this variable may be inconsistent with the pending
* tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here
* will always be a subset of the partitions that the TaskSetManager thinks are pending).
val pendingPartitions = new HashSet[Int]
override def toString: String = "ShuffleMapStage " + id
* Returns the list of active jobs,
* i.e. map-stage jobs that were submitted to execute this stage independently (if any).
def mapStageJobs: Seq[ActiveJob] = _mapStageJobs
/** Adds the job to the active job list. */
def addActiveJob(job: ActiveJob): Unit = {
_mapStageJobs = job :: _mapStageJobs
/** Removes the job from the active job list. */
def removeActiveJob(job: ActiveJob): Unit = {
_mapStageJobs = _mapStageJobs.filter(_ != job)
* Number of partitions that have shuffle outputs.
* When this reaches [[numPartitions]], this map stage is ready.
def numAvailableOutputs: Int = mapOutputTrackerMaster.getNumAvailableOutputs(shuffleDep.shuffleId)
* Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
def isAvailable: Boolean = numAvailableOutputs == numPartitions
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
.getOrElse(0 until numPartitions)
spark BarrierJobAllocationFailed 源码
- 所属分类: 前端技术
- 本文标签:
2、 - 优质文章
8、 golang
9、 openharmony
10、 Vue中input框自动聚焦