spark SparkSubmit 源码

  • 2022-10-20
spark SparkSubmit 代码


package org.apache.spark.deploy

import java.lang.reflect.{InvocationTargetException, UndeclaredThrowableException}
import{URI, URL}
import java.text.ParseException
import java.util.{ServiceLoader, UUID}
import java.util.jar.JarInputStream

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Properties, Try}

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
import{ArtifactId, ModuleId, ModuleRevisionId}
import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}

import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._

 * Whether to submit, kill, or request the status of an application.
 * The latter two operations are currently supported only for standalone and Mesos cluster modes.
private[deploy] object SparkSubmitAction extends Enumeration {
  type SparkSubmitAction = Value

 * Main gateway of launching a Spark application.
 * This program handles setting up the classpath with relevant Spark dependencies and provides
 * a layer over the different cluster managers and deploy modes that Spark supports.
private[spark] class SparkSubmit extends Logging {

  import DependencyUtils._
  import SparkSubmit._

  def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()

  protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
    new SparkSubmitArguments(args)

   * Kill an existing submission.
  private def kill(args: SparkSubmitArguments): Unit = {
    if (RestSubmissionClient.supportsRestClient(args.master)) {
      new RestSubmissionClient(args.master)
    } else {
      val sparkConf = args.toSparkConf()
      sparkConf.set("spark.master", args.master)
        .kill(args.submissionToKill, sparkConf)

   * Request the status of an existing submission.
  private def requestStatus(args: SparkSubmitArguments): Unit = {
    if (RestSubmissionClient.supportsRestClient(args.master)) {
      new RestSubmissionClient(args.master)
    } else {
      val sparkConf = args.toSparkConf()
      sparkConf.set("spark.master", args.master)
        .printSubmissionStatus(args.submissionToRequestStatusFor, sparkConf)

  /** Print version information to the log. */
  private def printVersion(): Unit = {
    logInfo("""Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version %s
    logInfo("Using Scala %s, %s, %s".format(
      Properties.versionString, Properties.javaVmName, Properties.javaVersion))
    logInfo(s"Branch $SPARK_BRANCH")
    logInfo(s"Compiled by user $SPARK_BUILD_USER on $SPARK_BUILD_DATE")
    logInfo(s"Revision $SPARK_REVISION")
    logInfo(s"Url $SPARK_REPO_URL")
    logInfo("Type --help for more information.")

   * Submit the application using the provided parameters, ensuring to first wrap
   * in a doAs when --proxy-user is specified.
  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(args, uninitLog)
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
            } else {
              throw e
      } else {
        runMain(args, uninitLog)

    // In standalone cluster mode, there are two submission gateways:
    //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
    //   (2) The new REST-based gateway introduced in Spark 1.3
    // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
    // to use the legacy gateway if the master endpoint turns out to be not a REST server.
    if (args.isStandaloneCluster && args.useRest) {
      try {
        logInfo("Running Spark using the REST application submission protocol.")
      } catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          logWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args, false)
    // In all other modes, just run the main class as prepared
    } else {

   * Prepare the environment for submitting an application.
   * @param args the parsed SparkSubmitArguments used for environment preparation.
   * @param conf the Hadoop Configuration, this argument will only be set in unit test.
   * @return a 4-tuple:
   *        (1) the arguments for the child process,
   *        (2) a list of classpath entries for the child,
   *        (3) a map of system properties, and
   *        (4) the main class for the child
   * Exposed for testing.
  private[deploy] def prepareSubmitEnvironment(
      args: SparkSubmitArguments,
      conf: Option[HadoopConfiguration] = None)
      : (Seq[String], Seq[String], SparkConf, String) = {
    // Return values
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sparkConf = args.toSparkConf()
    var childMainClass = ""

    // Set the cluster manager
    val clusterManager: Int = args.master match {
      case "yarn" => YARN
      case m if m.startsWith("spark") => STANDALONE
      case m if m.startsWith("mesos") => MESOS
      case m if m.startsWith("k8s") => KUBERNETES
      case m if m.startsWith("local") => LOCAL
      case _ =>
        error("Master must either be yarn or start with spark, mesos, k8s, or local")

    // Set the deploy mode; default is client mode
    val deployMode: Int = args.deployMode match {
      case "client" | null => CLIENT
      case "cluster" => CLUSTER
      case _ =>
        error("Deploy mode must be either client or cluster")

    if (clusterManager == YARN) {
      // Make sure YARN is included in our build if we're trying to use it
      if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
          "Could not load YARN classes. " +
          "This copy of Spark may not have been compiled with YARN support.")

    if (clusterManager == KUBERNETES) {
      args.master = Utils.checkAndGetK8sMasterUrl(args.master)
      // Make sure KUBERNETES is included in our build if we're trying to use it
      if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
          "Could not load KUBERNETES classes. " +
            "This copy of Spark may not have been compiled with KUBERNETES support.")

    // Fail fast, the following modes are not supported or applicable
    (clusterManager, deployMode) match {
      case (STANDALONE, CLUSTER) if args.isPython =>
        error("Cluster deploy mode is currently not supported for python " +
          "applications on standalone clusters.")
      case (STANDALONE, CLUSTER) if args.isR =>
        error("Cluster deploy mode is currently not supported for R " +
          "applications on standalone clusters.")
      case (LOCAL, CLUSTER) =>
        error("Cluster deploy mode is not compatible with master \"local\"")
      case (_, CLUSTER) if isShell(args.primaryResource) =>
        error("Cluster deploy mode is not applicable to Spark shells.")
      case (_, CLUSTER) if isSqlShell(args.mainClass) =>
        error("Cluster deploy mode is not applicable to Spark SQL shell.")
      case (_, CLUSTER) if isThriftServer(args.mainClass) =>
        error("Cluster deploy mode is not applicable to Spark Thrift server.")
      case _ =>

    // Update args.deployMode if it is null. It will be passed down as a Spark property later.
    (args.deployMode, deployMode) match {
      case (null, CLIENT) => args.deployMode = "client"
      case (null, CLUSTER) => args.deployMode = "cluster"
      case _ =>
    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
    val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
    val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
    val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
    val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
    val isKubernetesClusterModeDriver = isKubernetesClient &&
      sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)

    if (!isMesosCluster && !isStandAloneCluster) {
      // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
      // too for packages that include Python code
      val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
        packagesTransitive = true, args.packagesExclusions, args.packages,
        args.repositories, args.ivyRepoPath, args.ivySettingsPath)

      if (resolvedMavenCoordinates.nonEmpty) {
        // In K8s client mode, when in the driver, add resolved jars early as we might need
        // them at the submit time for artifact downloading.
        // For example we might use the dependencies for downloading
        // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
        // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
        if (isKubernetesClusterModeDriver) {
          val loader = getSubmitClassLoader(sparkConf)
          for (jar <- resolvedMavenCoordinates) {
            addJarToClasspath(jar, loader)
        } else if (isKubernetesCluster) {
          // We need this in K8s cluster mode so that we can upload local deps
          // via the k8s application, like in cluster mode driver
          childClasspath ++= resolvedMavenCoordinates
        } else {
          args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*))
          if (args.isPython || isInternal(args.primaryResource)) {
            args.pyFiles = mergeFileLists(args.pyFiles,
              mergeFileLists(resolvedMavenCoordinates: _*))

      // install any R packages that may have been passed through --jars or --packages.
      // Spark Packages may contain R source code inside the jar.
      if (args.isR && !StringUtils.isBlank(args.jars)) {
        RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)

    // update spark config from args
    val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
    val targetDir = Utils.createTempDir()

    // Kerberos is not supported in standalone mode, and keytab support is not yet available
    // in Mesos cluster mode.
    if (clusterManager != STANDALONE
        && !isMesosCluster
        && args.principal != null
        && args.keytab != null) {
      // If client mode, make sure the keytab is just a local path.
      if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
        args.keytab = new URI(args.keytab).getPath()

      if (!Utils.isLocalUri(args.keytab)) {
        require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
        UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)

    // Resolve glob path for different resources.
    args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull

    // In client mode, download remote files.
    var localPrimaryResource: String = null
    var localJars: String = null
    var localPyFiles: String = null
    if (deployMode == CLIENT) {
      localPrimaryResource = Option(args.primaryResource).map {
        downloadFile(_, targetDir, sparkConf, hadoopConf)
      localJars = Option(args.jars).map {
        downloadFileList(_, targetDir, sparkConf, hadoopConf)
      localPyFiles = Option(args.pyFiles).map {
        downloadFileList(_, targetDir, sparkConf, hadoopConf)

      if (isKubernetesClusterModeDriver) {
        // Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
        // Executors will get the jars from the Spark file server.
        // Explicitly download the related files here
        args.jars = localJars
        val filesLocalFiles = Option(args.files).map {
          downloadFileList(_, targetDir, sparkConf, hadoopConf)
        val archiveLocalFiles = Option(args.archives).map { uris =>
          val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
          val localArchives = downloadFileList(
            targetDir, sparkConf, hadoopConf)

          // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
          // in cluster mode, the archives should be available in the driver's current working
          // directory too.
          Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map {
            case (localArchive, resolvedUri) =>
              val source = new File(localArchive.getPath)
              val dest = new File(
                if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
                s"Unpacking an archive $resolvedUri " +
                  s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
              Utils.unpack(source, dest)

              // Keep the URIs of local files with the given fragments.
        args.files = filesLocalFiles
        args.archives = archiveLocalFiles
        args.pyFiles = localPyFiles

    // When running in YARN, for some remote resources with scheme:
    //   1. Hadoop FileSystem doesn't support them.
    //   2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
    // We will download them to local disk prior to add to YARN's distributed cache.
    // For yarn client mode, since we already download them with above code, so we only need to
    // figure out the local path and replace the remote one.
    if (clusterManager == YARN) {
      val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)

      def shouldDownload(scheme: String): Boolean = {
        forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
          Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure

      def downloadResource(resource: String): String = {
        val uri = Utils.resolveURI(resource)
        uri.getScheme match {
          case "local" | "file" => resource
          case e if shouldDownload(e) =>
            val file = new File(targetDir, new Path(uri).getName)
            if (file.exists()) {
            } else {
              downloadFile(resource, targetDir, sparkConf, hadoopConf)
          case _ => uri.toString

      args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
      args.files = Option(args.files).map { files =>
      args.pyFiles = Option(args.pyFiles).map { pyFiles =>
      args.jars = Option(args.jars).map { jars =>
      args.archives = Option(args.archives).map { archives =>

    // At this point, we have attempted to download all remote resources.
    // Now we try to resolve the main class if our primary resource is a JAR.
    if (args.mainClass == null && !args.isPython && !args.isR) {
      try {
        val uri = new URI(
        val fs = FileSystem.get(uri, hadoopConf)

        Utils.tryWithResource(new JarInputStream( Path(uri)))) { jar =>
          args.mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
      } catch {
        case e: Throwable =>
            s"Failed to get main class in JAR with error '${e.getMessage}'. " +
            " Please specify one with --class."

      if (args.mainClass == null) {
        // If we still can't figure out the main class at this point, blow up.
        error("No main class set in JAR; please specify one with --class.")

    // If we're running a python app, set the main class to our specific python runner
    if (args.isPython && deployMode == CLIENT) {
      if (args.primaryResource == PYSPARK_SHELL) {
        args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
      } else {
        // If a python file is provided, add it to the child arguments and list of files to deploy.
        // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
        args.mainClass = "org.apache.spark.deploy.PythonRunner"
        args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs

    // Non-PySpark applications can need Python dependencies.
    if (deployMode == CLIENT && clusterManager != YARN) {
      // The YARN backend handles python files differently, so don't merge the lists.
      args.files = mergeFileLists(args.files, args.pyFiles)

    if (localPyFiles != null) {
      sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toSeq)

    // In YARN mode for an R app, add the SparkR package archive and the R package
    // archive containing all of the built R libraries to archives so that they can
    // be distributed with the job
    if (args.isR && clusterManager == YARN) {
      val sparkRPackagePath = RUtils.localSparkRPackagePath
      if (sparkRPackagePath.isEmpty) {
        error("SPARK_HOME does not exist for R application in YARN mode.")
      val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
      if (!sparkRPackageFile.exists()) {
        error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
      val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

      // Distribute the SparkR package.
      // Assigns a symbol link name "sparkr" to the shipped package.
      args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")

      // Distribute the R package archive containing all the built R packages.
      if (!RUtils.rPackages.isEmpty) {
        val rPackageFile =
          RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
        if (!rPackageFile.exists()) {
          error("Failed to zip all the built R packages.")

        val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
        // Assigns a symbol link name "rpkg" to the shipped package.
        args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")

    // TODO: Support distributing R packages with standalone cluster
    if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
      error("Distributing R packages with standalone cluster is not supported.")

    // TODO: Support distributing R packages with mesos cluster
    if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
      error("Distributing R packages with mesos cluster is not supported.")

    // If we're running an R app, set the main class to our specific R runner
    if (args.isR && deployMode == CLIENT) {
      if (args.primaryResource == SPARKR_SHELL) {
        args.mainClass = "org.apache.spark.api.r.RBackend"
      } else {
        // If an R file is provided, add it to the child arguments and list of files to deploy.
        // Usage: RRunner <main R file> [app arguments]
        args.mainClass = "org.apache.spark.deploy.RRunner"
        args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
        args.files = mergeFileLists(args.files, args.primaryResource)

    if (isYarnCluster && args.isR) {
      // In yarn-cluster mode for an R app, add primary resource to files
      // that can be distributed with the job
      args.files = mergeFileLists(args.files, args.primaryResource)

    // Special flag to avoid deprecation warnings at the client
    sys.props("SPARK_SUBMIT") = "true"

    // A list of rules to map each argument to system properties or command-line options in
    // each deploy mode; we iterate through these below
    val options = List[OptionAssigner](

      // All cluster managers
      OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
      OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = SUBMIT_DEPLOY_MODE.key),
      OptionAssigner(, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = ""),
      OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT,
        confKey = JAR_IVY_REPO_PATH.key),
      OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
        confKey = DRIVER_MEMORY.key),
      OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = DRIVER_CLASS_PATH.key),
      OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = DRIVER_JAVA_OPTIONS.key),
      OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = DRIVER_LIBRARY_PATH.key),
      OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = PRINCIPAL.key),
      OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = KEYTAB.key),
      OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),

      // Propagate attributes for dependency resolution at the driver side
      OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES,
        CLUSTER, confKey = JAR_PACKAGES.key),
      OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES,
        CLUSTER, confKey = JAR_REPOSITORIES.key),
      OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES,
        CLUSTER, confKey = JAR_IVY_REPO_PATH.key),
      OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES,

      // Yarn only
      OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
      OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles",
        mergeFn = Some(mergeFileLists(_, _))),
      OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars",
        mergeFn = Some(mergeFileLists(_, _))),
      OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files",
        mergeFn = Some(mergeFileLists(_, _))),
      OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives",
        mergeFn = Some(mergeFileLists(_, _))),

      // Other options
      OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = EXECUTOR_INSTANCES.key),
      OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = EXECUTOR_CORES.key),
      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = EXECUTOR_MEMORY.key),
      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = CORES_MAX.key),
        confKey = FILES.key),
      OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = ARCHIVES.key),
      OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
      OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = JARS.key),
      OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
        confKey = DRIVER_MEMORY.key),
      OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
        confKey = DRIVER_CORES.key),
      OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
        confKey = DRIVER_SUPERVISE.key),
      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = JAR_IVY_REPO_PATH.key),

      // An internal option used only for spark-shell to add user jars to repl's classloader,
      // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
      // remote jars, so adding a new option to only specify local jars for spark-shell internally.
      OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")

    // In client mode, launch the application main class directly
    // In addition, add the main application jar and any added jars (if any) to the classpath
    if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
      if (localJars != null) { childClasspath ++= localJars.split(",") }
    // Add the main application jar and any added jars to classpath in case YARN client
    // requires these jars.
    // This assumes both primaryResource and user jars are local jars, or already downloaded
    // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
    // added to the classpath of YARN client.
    if (isYarnCluster) {
      if (isUserJar(args.primaryResource)) {
        childClasspath += args.primaryResource
      if (args.jars != null) { childClasspath ++= args.jars.split(",") }

    if (deployMode == CLIENT) {
      if (args.childArgs != null) { childArgs ++= args.childArgs }

    // Map all arguments to command-line options or system properties for our chosen mode
    for (opt <- options) {
      if (opt.value != null &&
          (deployMode & opt.deployMode) != 0 &&
          (clusterManager & opt.clusterManager) != 0) {
        if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
        if (opt.confKey != null) {
          if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) {
            sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value))
          } else {
            sparkConf.set(opt.confKey, opt.value)

    // In case of shells, spark.ui.showConsoleProgress can be true by default or by user.
    if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
      sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)

    // Add the application jar automatically so the user doesn't have to call sc.addJar
    // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
    // For python and R files, the primary resource is already distributed as a regular file
    if (!isYarnCluster && !args.isPython && !args.isR) {
      var jars = sparkConf.get(JARS)
      if (isUserJar(args.primaryResource)) {
        jars = jars ++ Seq(args.primaryResource)
      sparkConf.set(JARS, jars)

    // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
    // All Spark parameters are expected to be passed to the client through system properties.
    if (args.isStandaloneCluster) {
      if (args.useRest) {
        childMainClass = REST_CLUSTER_SUBMIT_CLASS
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      if (args.childArgs != null) {
        childArgs ++= args.childArgs

    // Let YARN know it's a pyspark app, so it distributes needed libraries.
    if (clusterManager == YARN) {
      if (args.isPython) {
        sparkConf.set("spark.yarn.isPython", "true")

    if ((clusterManager == MESOS || clusterManager == KUBERNETES)
       && UserGroupInformation.isSecurityEnabled) {

    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
      if (args.isPython) {
        childArgs += ("--primary-py-file", args.primaryResource)
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-r-file", mainFile)
        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
      } else {
        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
          childArgs += ("--jar", args.primaryResource)
        childArgs += ("--class", args.mainClass)
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }

    if (isMesosCluster) {
      assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
      childMainClass = REST_CLUSTER_SUBMIT_CLASS
      if (args.isPython) {
        // Second argument is main class
        childArgs += (args.primaryResource, "")
        if (args.pyFiles != null) {
          sparkConf.set(SUBMIT_PYTHON_FILES, args.pyFiles.split(",").toSeq)
      } else if (args.isR) {
        // Second argument is main class
        childArgs += (args.primaryResource, "")
      } else {
        childArgs += (args.primaryResource, args.mainClass)
      if (args.childArgs != null) {
        childArgs ++= args.childArgs

    if (isKubernetesCluster) {
      if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
        if (args.isPython) {
          childArgs ++= Array("--primary-py-file", args.primaryResource)
          childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
        } else if (args.isR) {
          childArgs ++= Array("--primary-r-file", args.primaryResource)
          childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
        else {
          childArgs ++= Array("--primary-java-resource", args.primaryResource)
          childArgs ++= Array("--main-class", args.mainClass)
      } else {
        childArgs ++= Array("--main-class", args.mainClass)
      if (args.childArgs != null) {
        args.childArgs.foreach { arg =>
          childArgs += ("--arg", arg)
      // Pass the proxyUser to the k8s app so it is possible to add it to the driver args
      if (args.proxyUser != null) {
        childArgs += ("--proxy-user", args.proxyUser)

    // Load any properties specified through --conf and the default properties file
    for ((k, v) <- args.sparkProperties) {
      sparkConf.setIfMissing(k, v)

    // Ignore invalid in cluster modes.
    if (deployMode == CLUSTER) {

    // Resolve paths in certain spark properties
    val pathConfigs = Seq(
    pathConfigs.foreach { config =>
      // Replace old URIs with resolved URIs, if they exist
      sparkConf.getOption(config).foreach { oldValue =>
        sparkConf.set(config, Utils.resolveURIs(oldValue))

    // Resolve and format python file paths properly before adding them to the PYTHONPATH.
    // The resolving part is redundant in the case of --py-files, but necessary if the user
    // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
    val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
    val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
    val formattedPyFiles = if (deployMode != CLUSTER) {
    } else {
      // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
      // support dealing with remote python files, they could distribute and add python files
      // locally.
    sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)

    if (args.verbose && isSqlShell(childMainClass)) {
      childArgs ++= Seq("--verbose")

    sparkConf.set("", System.currentTimeMillis().toString)

    (childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)

  // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
  // renewer set to the YARN ResourceManager.  Since YARN isn't configured in Mesos or Kubernetes
  // mode, we must trick it into thinking we're YARN.
  private def setRMPrincipal(sparkConf: SparkConf): Unit = {
    val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
    val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
    logInfo(s"Setting ${key} to ${shortUserName}")
    sparkConf.set(key, shortUserName)

  private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
    val loader =
      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
        new ChildFirstURLClassLoader(new Array[URL](0),
      } else {
        new MutableURLClassLoader(new Array[URL](0),

   * Run the main method of the child class using the submit arguments.
   * This runs in two steps. First, we prepare the launch environment by setting up
   * the appropriate classpath, system properties, and application arguments for
   * running the child main class based on the cluster manager and the deploy mode.
   * Second, we use this launch environment to invoke the main method of the child
   * main class.
   * Note that this main class will not be the one provided by the user if we're
   * running cluster deploy mode or python applications.
  private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    // Let the main class re-initialize the logging system once it starts.
    if (uninitLog) {

    if (args.verbose) {
      logInfo(s"Main class:\n$childMainClass")
      // sysProps may contain sensitive information, so redact before printing
      logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).sorted.mkString("\n")}")
      logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
    val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        logError(s"Failed to load class $childMainClass.")
        if (childMainClass.contains("thriftserver")) {
          logInfo(s"Failed to load main class $childMainClass.")
          logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
      case e: NoClassDefFoundError =>
        logError(s"Failed to load $childMainClass: ${e.getMessage()}")
        if (e.getMessage.contains("org/apache/hadoop/hive")) {
          logInfo(s"Failed to load hive class.")
          logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)

    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
    } else {
      new JavaMainApplication(mainClass)

    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: Throwable =>

    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    } finally {
      if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
          !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass)) {
        try {
        } catch {
          case e: Throwable => logError(s"Failed to close SparkContext: $e")

  /** Throw a SparkException with the given error message. */
  private def error(msg: String): Unit = throw new SparkException(msg)


 * This entry point is used by the launcher library to start in-process Spark applications.
private[spark] object InProcessSparkSubmit {

  def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit()


object SparkSubmit extends CommandLineUtils with Logging {

  // Cluster managers
  private val YARN = 1
  private val STANDALONE = 2
  private val MESOS = 4
  private val LOCAL = 8
  private val KUBERNETES = 16

  // Deploy modes
  private val CLIENT = 1
  private val CLUSTER = 2

  // Special primary resource names that represent shells rather than application jars.
  private val SPARK_SHELL = "spark-shell"
  private val PYSPARK_SHELL = "pyspark-shell"
  private val SPARKR_SHELL = "sparkr-shell"
  private val SPARKR_PACKAGE_ARCHIVE = ""
  private val R_PACKAGE_ARCHIVE = ""

  private val CLASS_NOT_FOUND_EXIT_STATUS = 101

  // Following constants are visible for testing.
  private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
  private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
  private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()

  override def main(args: Array[String]): Unit = {
      .foreach(System.setProperty("", _))
    val submit = new SparkSubmit() {
      self =>

      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        new SparkSubmitArguments(args) {
          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)

          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)

          override protected def logError(msg: => String): Unit = self.logError(msg)

      override protected def logInfo(msg: => String): Unit = printMessage(msg)

      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")

      override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")

      override def doSubmit(args: Array[String]): Unit = {
        try {
        } catch {
          case e: SparkUserAppException =>



   * Return whether the given primary resource represents a user jar.
  private[deploy] def isUserJar(res: String): Boolean = {
    !isShell(res) && !isPython(res) && !isInternal(res) && !isR(res)

   * Return whether the given primary resource represents a shell.
  private[deploy] def isShell(res: String): Boolean = {
    (res == SPARK_SHELL || res == PYSPARK_SHELL || res == SPARKR_SHELL)

   * Return whether the given main class represents a sql shell.
  private[deploy] def isSqlShell(mainClass: String): Boolean = {
    mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"

   * Return whether the given main class represents a thrift server.
  private def isThriftServer(mainClass: String): Boolean = {
    mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"

   * Return whether the given primary resource requires running python.
  private[deploy] def isPython(res: String): Boolean = {
    res != null && res.endsWith(".py") || res == PYSPARK_SHELL

   * Return whether the given primary resource requires running R.
  private[deploy] def isR(res: String): Boolean = {
    res != null && (res.endsWith(".R") || res.endsWith(".r")) || res == SPARKR_SHELL

  private[deploy] def isInternal(res: String): Boolean = {
    res == SparkLauncher.NO_RESOURCE


/** Provides utility functions to be used inside SparkSubmit. */
private[spark] object SparkSubmitUtils extends Logging {

  // Exposed for testing
  var printStream = SparkSubmit.printStream

  // Exposed for testing.
  // These components are used to make the default exclusion rules for Spark dependencies.
  // We need to specify each component explicitly, otherwise we miss
  // spark-streaming utility components. Underscore is there to differentiate between
  // spark-streaming_2.1x and spark-streaming-kafka-0-10-assembly_2.1x
  val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "kvstore_", "launcher_", "mllib_",
    "mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", "sql_", "streaming_",
    "tags_", "unsafe_")

   * Represents a Maven Coordinate
   * @param groupId the groupId of the coordinate
   * @param artifactId the artifactId of the coordinate
   * @param version the version of the coordinate
  private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
    override def toString: String = s"$groupId:$artifactId:$version"

   * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
   * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
   * @param coordinates Comma-delimited string of maven coordinates
   * @return Sequence of Maven coordinates
  def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
    coordinates.split(",").map { p =>
      val splits = p.replace("/", ":").split(":")
      require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
        s"'groupId:artifactId:version'. The coordinate provided is: $p")
      require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
        s"be whitespace. The groupId provided is: ${splits(0)}")
      require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
        s"be whitespace. The artifactId provided is: ${splits(1)}")
      require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
        s"be whitespace. The version provided is: ${splits(2)}")
      new MavenCoordinate(splits(0), splits(1), splits(2))

  /** Path of the local Maven cache. */
  private[spark] def m2Path: File = {
    if (Utils.isTesting) {
      // test builds delete the maven cache, and this can cause flakiness
      new File("dummy", ".m2" + File.separator + "repository")
    } else {
      new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")

   * Extracts maven coordinates from a comma-delimited string
   * @param defaultIvyUserDir The default user path for Ivy
   * @return A ChainResolver used by Ivy to search for and resolve dependencies.
  def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
    // We need a chain resolver if we want to check multiple repositories
    val cr = new ChainResolver

    val localM2 = new IBiblioResolver

    val localIvy = new FileSystemResolver
    val localIvyRoot = new File(defaultIvyUserDir, "local")
    localIvy.setRepository(new FileRepository(localIvyRoot))
    val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]",
      "ivys", "ivy.xml").mkString(File.separator)
    val artifactPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]",
      "[revision]", "[type]s", "[artifact](-[classifier]).[ext]").mkString(File.separator)

    // the biblio resolver resolves POM declared dependencies
    val br: IBiblioResolver = new IBiblioResolver
    val defaultInternalRepo : Option[String] = sys.env.get("DEFAULT_ARTIFACT_REPOSITORY")

    val sp: IBiblioResolver = new IBiblioResolver

   * Output a list of paths for the downloaded jars to be added to the classpath
   * (will append to jars in SparkSubmit).
   * @param artifacts Sequence of dependencies that were resolved and retrieved
   * @param cacheDirectory Directory where jars are cached
   * @return List of paths for the dependencies
  def resolveDependencyPaths(
      artifacts: Array[AnyRef],
      cacheDirectory: File): Seq[String] = {[Artifact]).filter { artifactInfo =>
      if (artifactInfo.getExt == "jar") {
      } else {
        logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
    }.map { artifactInfo =>
      val artifact = artifactInfo.getModuleRevisionId
      val extraAttrs = artifactInfo.getExtraAttributes
      val classifier = if (extraAttrs.containsKey("classifier")) {
        "-" + extraAttrs.get("classifier")
      } else {
      cacheDirectory.getAbsolutePath + File.separator +

  /** Adds the given maven coordinates to Ivy's module descriptor. */
  def addDependenciesToIvy(
      md: DefaultModuleDescriptor,
      artifacts: Seq[MavenCoordinate],
      ivyConfName: String): Unit = {
    artifacts.foreach { mvn =>
      val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
      val dd = new DefaultDependencyDescriptor(ri, false, false)
      dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)")
      // scalastyle:off println
      printStream.println(s"${dd.getDependencyId} added as a dependency")
      // scalastyle:on println

  /** Add exclusion rules for dependencies already included in the spark-assembly */
  def addExclusionRules(
      ivySettings: IvySettings,
      ivyConfName: String,
      md: DefaultModuleDescriptor): Unit = {
    // Add scala exclusion rule
    md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))

    IVY_DEFAULT_EXCLUDES.foreach { comp =>
      md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,

   * Build Ivy Settings using options with default resolvers
   * @param remoteRepos Comma-delimited string of remote repositories other than maven central
   * @param ivyPath The path to the local ivy repository
   * @return An IvySettings object
  def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
    val ivySettings: IvySettings = new IvySettings
    processIvyPathArg(ivySettings, ivyPath)

    // create a pattern matcher
    ivySettings.addMatcher(new GlobPatternMatcher)
    // create the dependency resolvers
    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
    processRemoteRepoArg(ivySettings, remoteRepos)
    // (since 2.5) Setting the property ivy.maven.lookup.sources to false
    // disables the lookup of the sources artifact.
    // And setting the property ivy.maven.lookup.javadoc to false
    // disables the lookup of the javadoc artifact.
    ivySettings.setVariable("ivy.maven.lookup.sources", "false")
    ivySettings.setVariable("ivy.maven.lookup.javadoc", "false")

   * Load Ivy settings from a given filename, using supplied resolvers
   * @param settingsFile Path to Ivy settings file
   * @param remoteRepos Comma-delimited string of remote repositories other than maven central
   * @param ivyPath The path to the local ivy repository
   * @return An IvySettings object
  def loadIvySettings(
      settingsFile: String,
      remoteRepos: Option[String],
      ivyPath: Option[String]): IvySettings = {
    val uri = new URI(settingsFile)
    val file = Option(uri.getScheme).getOrElse("file") match {
      case "file" => new File(uri.getPath)
      case scheme => throw new IllegalArgumentException(s"Scheme $scheme not supported in " +
    require(file.exists(), s"Ivy settings file $file does not exist")
    require(file.isFile(), s"Ivy settings file $file is not a normal file")
    val ivySettings: IvySettings = new IvySettings
    try {
    } catch {
      case e @ (_: IOException | _: ParseException) =>
        throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e)
    processIvyPathArg(ivySettings, ivyPath)
    processRemoteRepoArg(ivySettings, remoteRepos)

  /* Set ivy settings for location of cache, if option is supplied */
  private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = {
    ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
      ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
      ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))

  /* Add any optional additional remote repositories */
  private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String]): Unit = {
    remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList =>
      val cr = new ChainResolver

      // add current default resolver, if any

      // add additional repositories, last resolution in chain takes precedence
      repositoryList.zipWithIndex.foreach { case (repo, i) =>
        val brr: IBiblioResolver = new IBiblioResolver
        brr.setName(s"repo-${i + 1}")
        // scalastyle:off println
        printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
        // scalastyle:on println


  /** A nice function to use in tests as well. Values are dummy strings. */
  def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
    // Include UUID in module name, so multiple clients resolving maven coordinate at the same time
    // do not modify the same resolution file concurrently.

   * Clear ivy resolution from current launch. The resolution file is usually at
   * ~/.ivy2/org.apache.spark-spark-submit-parent-$UUID-default.xml,
   * ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.xml, and
   * ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$
   * Since each launch will have its own resolution files created, delete them after
   * each resolution to prevent accumulation of these files in the ivy cache dir.
  private def clearIvyResolutionFiles(
      mdId: ModuleRevisionId,
      ivySettings: IvySettings,
      ivyConfName: String): Unit = {
    val currentResolutionFiles = Seq(
    currentResolutionFiles.foreach { filename =>
      new File(ivySettings.getDefaultCache, filename).delete()

   * Resolves any dependencies that were supplied through maven coordinates
   * @param coordinates Comma-delimited string of maven coordinates
   * @param ivySettings An IvySettings containing resolvers to use
   * @param transitive Whether resolving transitive dependencies, default is true
   * @param exclusions Exclusions to apply when resolving transitive dependencies
   * @return Seq of path to the jars of the given maven artifacts including their
   *         transitive dependencies
  def resolveMavenCoordinates(
      coordinates: String,
      ivySettings: IvySettings,
      transitive: Boolean,
      exclusions: Seq[String] = Nil,
      isTest: Boolean = false): Seq[String] = {
    if (coordinates == null || coordinates.trim.isEmpty) {
    } else {
      val sysOut = System.out
      // Default configuration name for ivy
      val ivyConfName = "default"

      // A Module descriptor must be specified. Entries are dummy strings
      val md = getModuleDescriptor

      try {
        // To prevent ivy from logging to system out
        val artifacts = extractMavenCoordinates(coordinates)
        // Directories for caching downloads through ivy and storing the jars when maven coordinates
        // are supplied to spark-submit
        val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
        // scalastyle:off println
          s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
        printStream.println(s"The jars for the packages stored in: $packagesDirectory")
        // scalastyle:on println

        val ivy = Ivy.newInstance(ivySettings)
        // Set resolve options to download transitive dependencies as well
        val resolveOptions = new ResolveOptions
        val retrieveOptions = new RetrieveOptions
        // Turn downloading and logging off for testing
        if (isTest) {
        } else {

        // Add exclusion rules for Spark and Scala Library
        addExclusionRules(ivySettings, ivyConfName, md)
        // add all supplied maven artifacts as dependencies
        addDependenciesToIvy(md, artifacts, ivyConfName)
        exclusions.foreach { e =>
          md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
        // resolve dependencies
        val rr: ResolveReport = ivy.resolve(md, resolveOptions)
        if (rr.hasError) {
          throw new RuntimeException(rr.getAllProblemMessages.toString)
        // retrieve all resolved dependencies
        retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + File.separator +
        resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
      } finally {
        clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName)

  private[deploy] def createExclusion(
      coords: String,
      ivySettings: IvySettings,
      ivyConfName: String): ExcludeRule = {
    val c = extractMavenCoordinates(coords)(0)
    val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
    val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)

  def parseSparkConfProperty(pair: String): (String, String) = {
    pair.split("=", 2).toSeq match {
      case Seq(k, v) => (k, v)
      case _ => throw new SparkException(s"Spark config without '=': $pair")

  private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = {
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoaders =
      ServiceLoader.load(classOf[SparkSubmitOperation], loader)

    serviceLoaders.size match {
      case x if x > 1 =>
        throw new SparkException(s"Multiple($x) external SparkSubmitOperations " +
          s"clients registered for master url ${master}.")
      case 1 => serviceLoaders.headOption.get
      case _ =>
        throw new IllegalArgumentException(s"No external SparkSubmitOperations " +
          s"clients found for master url: '$master'")

 * Provides an indirection layer for passing arguments as system properties or flags to
 * the user's driver program or to downstream launcher tools.
private case class OptionAssigner(
    value: String,
    clusterManager: Int,
    deployMode: Int,
    clOption: String = null,
    confKey: String = null,
    mergeFn: Option[(String, String) => String] = None)

private[spark] trait SparkSubmitOperation {

  def kill(submissionId: String, conf: SparkConf): Unit

  def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit

  def supports(master: String): Boolean


