spark TaskEndReason 源码

  • 2022-10-20
  • 浏览 (319)

spark TaskEndReason 代码

文件路径:/core/src/main/scala/org/apache/spark/TaskEndReason.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark

import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, Utils}

// ==============================================================================================
// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!
// ==============================================================================================

/**
 * :: DeveloperApi ::
 * Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
 * tasks several times for "ephemeral" failures, and only report back failures that require some
 * old stages to be resubmitted, such as shuffle map fetch failures.
 */
@DeveloperApi
sealed trait TaskEndReason

/**
 * :: DeveloperApi ::
 * Task succeeded.
 */
@DeveloperApi
case object Success extends TaskEndReason

/**
 * :: DeveloperApi ::
 * Various possible reasons why a task failed.
 */
@DeveloperApi
sealed trait TaskFailedReason extends TaskEndReason {
  /** Error message displayed in the web UI. */
  def toErrorString: String

  /**
   * Whether this task failure should be counted towards the maximum number of times the task is
   * allowed to fail before the stage is aborted.  Set to false in cases where the task's failure
   * was unrelated to the task; for example, if the task failed because the executor it was running
   * on was killed.
   */
  def countTowardsTaskFailures: Boolean = true
}

/**
 * :: DeveloperApi ::
 * A `org.apache.spark.scheduler.ShuffleMapTask` that completed successfully earlier, but we
 * lost the executor before the stage completed. This means Spark needs to reschedule the task
 * to be re-executed on a different executor.
 */
@DeveloperApi
case object Resubmitted extends TaskFailedReason {
  override def toErrorString: String = "Resubmitted (resubmitted due to lost executor)"
}

/**
 * :: DeveloperApi ::
 * Task failed to fetch shuffle data from a remote node. Probably means we have lost the remote
 * executors the task is trying to fetch from, and thus need to rerun the previous stage.
 */
@DeveloperApi
case class FetchFailed(
    bmAddress: BlockManagerId,  // Note that bmAddress can be null
    shuffleId: Int,
    mapId: Long,
    mapIndex: Int,
    reduceId: Int,
    message: String)
  extends TaskFailedReason {
  override def toErrorString: String = {
    val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
    val mapIndexString = if (mapIndex == Int.MinValue) "Unknown" else mapIndex.toString
    s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndexString, " +
      s"mapId=$mapId, reduceId=$reduceId, message=\n$message\n)"
  }

  /**
   * Fetch failures lead to a different failure handling path: (1) we don't abort the stage after
   * 4 task failures, instead we immediately go back to the stage which generated the map output,
   * and regenerate the missing data. (2) we don't count fetch failures from executors excluded
   * due to too many task failures, since presumably its not the fault of the executor where
   * the task ran, but the executor which stored the data. This is especially important because
   * we might rack up a bunch of fetch-failures in rapid succession, on all nodes of the cluster,
   * due to one bad node.
   */
  override def countTowardsTaskFailures: Boolean = false
}

/**
 * :: DeveloperApi ::
 * Task failed due to a runtime exception. This is the most common failure case and also captures
 * user program exceptions.
 *
 * `stackTrace` contains the stack trace of the exception itself. It still exists for backward
 * compatibility. It's better to use `this(e: Throwable, metrics: Option[TaskMetrics])` to
 * create `ExceptionFailure` as it will handle the backward compatibility properly.
 *
 * `fullStackTrace` is a better representation of the stack trace because it contains the whole
 * stack trace including the exception and its causes
 *
 * `exception` is the actual exception that caused the task to fail. It may be `None` in
 * the case that the exception is not in fact serializable. If a task fails more than
 * once (due to retries), `exception` is that one that caused the last failure.
 */
@DeveloperApi
case class ExceptionFailure(
    className: String,
    description: String,
    stackTrace: Array[StackTraceElement],
    fullStackTrace: String,
    private val exceptionWrapper: Option[ThrowableSerializationWrapper],
    accumUpdates: Seq[AccumulableInfo] = Seq.empty,
    private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil,
    private[spark] var metricPeaks: Seq[Long] = Seq.empty)
  extends TaskFailedReason {

  /**
   * `preserveCause` is used to keep the exception itself so it is available to the
   * driver. This may be set to `false` in the event that the exception is not in fact
   * serializable.
   */
  private[spark] def this(
      e: Throwable,
      accumUpdates: Seq[AccumulableInfo],
      preserveCause: Boolean) = {
    this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e),
      if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None, accumUpdates)
  }

  private[spark] def this(e: Throwable, accumUpdates: Seq[AccumulableInfo]) = {
    this(e, accumUpdates, preserveCause = true)
  }

  private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): ExceptionFailure = {
    this.accums = accums
    this
  }

  private[spark] def withMetricPeaks(metricPeaks: Seq[Long]): ExceptionFailure = {
    this.metricPeaks = metricPeaks
    this
  }

  def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception))

  override def toErrorString: String =
    if (fullStackTrace == null) {
      // fullStackTrace is added in 1.2.0
      // If fullStackTrace is null, use the old error string for backward compatibility
      exceptionString(className, description, stackTrace)
    } else {
      fullStackTrace
    }

  /**
   * Return a nice string representation of the exception, including the stack trace.
   * Note: It does not include the exception's causes, and is only used for backward compatibility.
   */
  private def exceptionString(
      className: String,
      description: String,
      stackTrace: Array[StackTraceElement]): String = {
    val desc = if (description == null) "" else description
    val st = if (stackTrace == null) "" else stackTrace.map("        " + _).mkString("\n")
    s"$className: $desc\n$st"
  }
}

/**
 * A class for recovering from exceptions when deserializing a Throwable that was
 * thrown in user task code. If the Throwable cannot be deserialized it will be null,
 * but the stacktrace and message will be preserved correctly in SparkException.
 */
private[spark] class ThrowableSerializationWrapper(var exception: Throwable) extends
    Serializable with Logging {
  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(exception)
  }
  private def readObject(in: ObjectInputStream): Unit = {
    try {
      exception = in.readObject().asInstanceOf[Throwable]
    } catch {
      case e : Exception => log.warn("Task exception could not be deserialized", e)
    }
  }
}

/**
 * :: DeveloperApi ::
 * The task finished successfully, but the result was lost from the executor's block manager before
 * it was fetched.
 */
@DeveloperApi
case object TaskResultLost extends TaskFailedReason {
  override def toErrorString: String = "TaskResultLost (result lost from block manager)"
}

/**
 * :: DeveloperApi ::
 * Task was killed intentionally and needs to be rescheduled.
 */
@DeveloperApi
case class TaskKilled(
    reason: String,
    accumUpdates: Seq[AccumulableInfo] = Seq.empty,
    private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil,
    metricPeaks: Seq[Long] = Seq.empty)
  extends TaskFailedReason {

  override def toErrorString: String = s"TaskKilled ($reason)"
  override def countTowardsTaskFailures: Boolean = false

}

/**
 * :: DeveloperApi ::
 * Task requested the driver to commit, but was denied.
 */
@DeveloperApi
case class TaskCommitDenied(
    jobID: Int,
    partitionID: Int,
    attemptNumber: Int) extends TaskFailedReason {
  override def toErrorString: String = "TaskCommitDenied (Driver denied task commit)" +
    s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
  /**
   * If a task failed because its attempt to commit was denied, do not count this failure
   * towards failing the stage. This is intended to prevent spurious stage failures in cases
   * where many speculative tasks are launched and denied to commit.
   */
  override def countTowardsTaskFailures: Boolean = false
}

/**
 * :: DeveloperApi ::
 * The task failed because the executor that it was running on was lost. This may happen because
 * the task crashed the JVM.
 */
@DeveloperApi
case class ExecutorLostFailure(
    execId: String,
    exitCausedByApp: Boolean = true,
    reason: Option[String]) extends TaskFailedReason {
  override def toErrorString: String = {
    val exitBehavior = if (exitCausedByApp) {
      "caused by one of the running tasks"
    } else {
      "unrelated to the running tasks"
    }
    s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})" +
      reason.map { r => s" Reason: $r" }.getOrElse("")
  }

  override def countTowardsTaskFailures: Boolean = exitCausedByApp
}

/**
 * :: DeveloperApi ::
 * We don't know why the task ended -- for example, because of a ClassNotFound exception when
 * deserializing the task result.
 */
@DeveloperApi
case object UnknownReason extends TaskFailedReason {
  override def toErrorString: String = "UnknownReason"
}

相关信息

spark 源码目录

相关文章

spark Aggregator 源码

spark BarrierCoordinator 源码

spark BarrierTaskContext 源码

spark BarrierTaskInfo 源码

spark ContextAwareIterator 源码

spark ContextCleaner 源码

spark Dependency 源码

spark ErrorClassesJSONReader 源码

spark ExecutorAllocationClient 源码

spark ExecutorAllocationManager 源码

0  赞