spark SparkSubmit 源码
spark SparkSubmit 代码
文件路径:/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import java.io._
import java.lang.reflect.{InvocationTargetException, UndeclaredThrowableException}
import java.net.{URI, URL}
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.{ServiceLoader, UUID}
import java.util.jar.JarInputStream
import javax.ws.rs.core.UriBuilder
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Properties, Try}
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._
/**
* Whether to submit, kill, or request the status of an application.
* The latter two operations are currently supported only for standalone and Mesos cluster modes.
*/
private[deploy] object SparkSubmitAction extends Enumeration {
type SparkSubmitAction = Value
val SUBMIT, KILL, REQUEST_STATUS, PRINT_VERSION = Value
}
/**
* Main gateway of launching a Spark application.
*
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
private[spark] class SparkSubmit extends Logging {
import DependencyUtils._
import SparkSubmit._
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args)
}
/**
* Kill an existing submission.
*/
private def kill(args: SparkSubmitArguments): Unit = {
if (RestSubmissionClient.supportsRestClient(args.master)) {
new RestSubmissionClient(args.master)
.killSubmission(args.submissionToKill)
} else {
val sparkConf = args.toSparkConf()
sparkConf.set("spark.master", args.master)
SparkSubmitUtils
.getSubmitOperations(args.master)
.kill(args.submissionToKill, sparkConf)
}
}
/**
* Request the status of an existing submission.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
if (RestSubmissionClient.supportsRestClient(args.master)) {
new RestSubmissionClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
} else {
val sparkConf = args.toSparkConf()
sparkConf.set("spark.master", args.master)
SparkSubmitUtils
.getSubmitOperations(args.master)
.printSubmissionStatus(args.submissionToRequestStatusFor, sparkConf)
}
}
/** Print version information to the log. */
private def printVersion(): Unit = {
logInfo("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
logInfo("Using Scala %s, %s, %s".format(
Properties.versionString, Properties.javaVmName, Properties.javaVersion))
logInfo(s"Branch $SPARK_BRANCH")
logInfo(s"Compiled by user $SPARK_BUILD_USER on $SPARK_BUILD_DATE")
logInfo(s"Revision $SPARK_REVISION")
logInfo(s"Url $SPARK_REPO_URL")
logInfo("Type --help for more information.")
}
/**
* Submit the application using the provided parameters, ensuring to first wrap
* in a doAs when --proxy-user is specified.
*/
@tailrec
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(args, uninitLog)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
} else {
throw e
}
}
} else {
runMain(args, uninitLog)
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
logInfo("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
logWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
/**
* Prepare the environment for submitting an application.
*
* @param args the parsed SparkSubmitArguments used for environment preparation.
* @param conf the Hadoop Configuration, this argument will only be set in unit test.
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
*
* Exposed for testing.
*/
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = args.toSparkConf()
var childMainClass = ""
// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
// Set the deploy mode; default is client mode
val deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ =>
error("Deploy mode must be either client or cluster")
-1
}
if (clusterManager == YARN) {
// Make sure YARN is included in our build if we're trying to use it
if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
}
}
if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error(
"Could not load KUBERNETES classes. " +
"This copy of Spark may not have been compiled with KUBERNETES support.")
}
}
// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
error("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
error("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
error("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Thrift server.")
case _ =>
}
// Update args.deployMode if it is null. It will be passed down as a Spark property later.
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _ =>
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
packagesTransitive = true, args.packagesExclusions, args.packages,
args.repositories, args.ivyRepoPath, args.ivySettingsPath)
if (resolvedMavenCoordinates.nonEmpty) {
// In K8s client mode, when in the driver, add resolved jars early as we might need
// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
val loader = getSubmitClassLoader(sparkConf)
for (jar <- resolvedMavenCoordinates) {
addJarToClasspath(jar, loader)
}
} else if (isKubernetesCluster) {
// We need this in K8s cluster mode so that we can upload local deps
// via the k8s application, like in cluster mode driver
childClasspath ++= resolvedMavenCoordinates
} else {
args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*))
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles,
mergeFileLists(resolvedMavenCoordinates: _*))
}
}
}
// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}
}
// update spark config from args
args.toSparkConf(Option(sparkConf))
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()
// Kerberos is not supported in standalone mode, and keytab support is not yet available
// in Mesos cluster mode.
if (clusterManager != STANDALONE
&& !isMesosCluster
&& args.principal != null
&& args.keytab != null) {
// If client mode, make sure the keytab is just a local path.
if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
args.keytab = new URI(args.keytab).getPath()
}
if (!Utils.isLocalUri(args.keytab)) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
// Resolve glob path for different resources.
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
// In client mode, download remote files.
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
if (deployMode == CLIENT) {
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, sparkConf, hadoopConf)
}.orNull
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
if (isKubernetesClusterModeDriver) {
// Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
// Executors will get the jars from the Spark file server.
// Explicitly download the related files here
args.jars = localJars
val filesLocalFiles = Option(args.files).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
val archiveLocalFiles = Option(args.archives).map { uris =>
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val localArchives = downloadFileList(
resolvedUris.map(
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf)
// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
// in cluster mode, the archives should be available in the driver's current working
// directory too.
Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map {
case (localArchive, resolvedUri) =>
val source = new File(localArchive.getPath)
val dest = new File(
".",
if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
logInfo(
s"Unpacking an archive $resolvedUri " +
s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)
// Keep the URIs of local files with the given fragments.
UriBuilder.fromUri(
localArchive).fragment(resolvedUri.getFragment).build().toString
}.mkString(",")
}.orNull
args.files = filesLocalFiles
args.archives = archiveLocalFiles
args.pyFiles = localPyFiles
}
}
// When running in YARN, for some remote resources with scheme:
// 1. Hadoop FileSystem doesn't support them.
// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
// We will download them to local disk prior to add to YARN's distributed cache.
// For yarn client mode, since we already download them with above code, so we only need to
// figure out the local path and replace the remote one.
if (clusterManager == YARN) {
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
def shouldDownload(scheme: String): Boolean = {
forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
}
def downloadResource(resource: String): String = {
val uri = Utils.resolveURI(resource)
uri.getScheme match {
case "local" | "file" => resource
case e if shouldDownload(e) =>
val file = new File(targetDir, new Path(uri).getName)
if (file.exists()) {
file.toURI.toString
} else {
downloadFile(resource, targetDir, sparkConf, hadoopConf)
}
case _ => uri.toString
}
}
args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
args.files = Option(args.files).map { files =>
Utils.stringToSeq(files).map(downloadResource).mkString(",")
}.orNull
args.pyFiles = Option(args.pyFiles).map { pyFiles =>
Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
}.orNull
args.jars = Option(args.jars).map { jars =>
Utils.stringToSeq(jars).map(downloadResource).mkString(",")
}.orNull
args.archives = Option(args.archives).map { archives =>
Utils.stringToSeq(archives).map(downloadResource).mkString(",")
}.orNull
}
// At this point, we have attempted to download all remote resources.
// Now we try to resolve the main class if our primary resource is a JAR.
if (args.mainClass == null && !args.isPython && !args.isR) {
try {
val uri = new URI(
Option(localPrimaryResource).getOrElse(args.primaryResource)
)
val fs = FileSystem.get(uri, hadoopConf)
Utils.tryWithResource(new JarInputStream(fs.open(new Path(uri)))) { jar =>
args.mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
}
} catch {
case e: Throwable =>
error(
s"Failed to get main class in JAR with error '${e.getMessage}'. " +
" Please specify one with --class."
)
}
if (args.mainClass == null) {
// If we still can't figure out the main class at this point, blow up.
error("No main class set in JAR; please specify one with --class.")
}
}
// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
}
}
// Non-PySpark applications can need Python dependencies.
if (deployMode == CLIENT && clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
if (localPyFiles != null) {
sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toSeq)
}
// In YARN mode for an R app, add the SparkR package archive and the R package
// archive containing all of the built R libraries to archives so that they can
// be distributed with the job
if (args.isR && clusterManager == YARN) {
val sparkRPackagePath = RUtils.localSparkRPackagePath
if (sparkRPackagePath.isEmpty) {
error("SPARK_HOME does not exist for R application in YARN mode.")
}
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!sparkRPackageFile.exists()) {
error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
// Distribute the SparkR package.
// Assigns a symbol link name "sparkr" to the shipped package.
args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
// Distribute the R package archive containing all the built R packages.
if (!RUtils.rPackages.isEmpty) {
val rPackageFile =
RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
error("Failed to zip all the built R packages.")
}
val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
// Assigns a symbol link name "rpkg" to the shipped package.
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
}
}
// TODO: Support distributing R packages with standalone cluster
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
error("Distributing R packages with standalone cluster is not supported.")
}
// TODO: Support distributing R packages with mesos cluster
if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
error("Distributing R packages with mesos cluster is not supported.")
}
// If we're running an R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
args.mainClass = "org.apache.spark.api.r.RBackend"
} else {
// If an R file is provided, add it to the child arguments and list of files to deploy.
// Usage: RRunner <main R file> [app arguments]
args.mainClass = "org.apache.spark.deploy.RRunner"
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (isYarnCluster && args.isR) {
// In yarn-cluster mode for an R app, add primary resource to files
// that can be distributed with the job
args.files = mergeFileLists(args.files, args.primaryResource)
}
// Special flag to avoid deprecation warnings at the client
sys.props("SPARK_SUBMIT") = "true"
// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = SUBMIT_DEPLOY_MODE.key),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT,
confKey = JAR_IVY_REPO_PATH.key),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = DRIVER_CLASS_PATH.key),
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = DRIVER_JAVA_OPTIONS.key),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = DRIVER_LIBRARY_PATH.key),
OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = PRINCIPAL.key),
OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = KEYTAB.key),
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = JAR_PACKAGES.key),
OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = JAR_REPOSITORIES.key),
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = JAR_IVY_REPO_PATH.key),
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES,
CLUSTER, confKey = JAR_PACKAGES_EXCLUSIONS.key),
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives",
mergeFn = Some(mergeFileLists(_, _))),
// Other options
OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = EXECUTOR_INSTANCES.key),
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = EXECUTOR_CORES.key),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = EXECUTOR_MEMORY.key),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = CORES_MAX.key),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = FILES.key),
OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = ARCHIVES.key),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = JARS.key),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = DRIVER_CORES.key),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = DRIVER_SUPERVISE.key),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = JAR_IVY_REPO_PATH.key),
// An internal option used only for spark-shell to add user jars to repl's classloader,
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
)
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
// Add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
// This assumes both primaryResource and user jars are local jars, or already downloaded
// to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
// added to the classpath of YARN client.
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
}
if (deployMode == CLIENT) {
if (args.childArgs != null) { childArgs ++= args.childArgs }
}
// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.confKey != null) {
if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) {
sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value))
} else {
sparkConf.set(opt.confKey, opt.value)
}
}
}
}
// In case of shells, spark.ui.showConsoleProgress can be true by default or by user.
if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
}
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python and R files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !args.isPython && !args.isR) {
var jars = sparkConf.get(JARS)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sparkConf.set(JARS, jars)
}
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
// All Spark parameters are expected to be passed to the client through system properties.
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
// Let YARN know it's a pyspark app, so it distributes needed libraries.
if (clusterManager == YARN) {
if (args.isPython) {
sparkConf.set("spark.yarn.isPython", "true")
}
}
if ((clusterManager == MESOS || clusterManager == KUBERNETES)
&& UserGroupInformation.isSecurityEnabled) {
setRMPrincipal(sparkConf)
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = REST_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
// Second argument is main class
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
sparkConf.set(SUBMIT_PYTHON_FILES, args.pyFiles.split(",").toSeq)
}
} else if (args.isR) {
// Second argument is main class
childArgs += (args.primaryResource, "")
} else {
childArgs += (args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
childArgs ++= Array("--primary-r-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
}
else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
} else {
childArgs ++= Array("--main-class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
}
}
// Pass the proxyUser to the k8s app so it is possible to add it to the driver args
if (args.proxyUser != null) {
childArgs += ("--proxy-user", args.proxyUser)
}
}
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
}
// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sparkConf.remove(DRIVER_HOST_ADDRESS)
}
// Resolve paths in certain spark properties
val pathConfigs = Seq(
JARS.key,
FILES.key,
ARCHIVES.key,
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sparkConf.getOption(config).foreach { oldValue =>
sparkConf.set(config, Utils.resolveURIs(oldValue))
}
}
// Resolve and format python file paths properly before adding them to the PYTHONPATH.
// The resolving part is redundant in the case of --py-files, but necessary if the user
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
val formattedPyFiles = if (deployMode != CLUSTER) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
}
sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)
if (args.verbose && isSqlShell(childMainClass)) {
childArgs ++= Seq("--verbose")
}
sparkConf.set("spark.app.submitTime", System.currentTimeMillis().toString)
(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}
// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
// mode, we must trick it into thinking we're YARN.
private def setRMPrincipal(sparkConf: SparkConf): Unit = {
val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
logInfo(s"Setting ${key} to ${shortUserName}")
sparkConf.set(key, shortUserName)
}
private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
loader
}
/**
* Run the main method of the child class using the submit arguments.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Logging.uninitialize()
}
if (args.verbose) {
logInfo(s"Main class:\n$childMainClass")
logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).sorted.mkString("\n")}")
logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
logInfo("\n")
}
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
logError(s"Failed to load class $childMainClass.")
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
logError(s"Failed to load $childMainClass: ${e.getMessage()}")
if (e.getMessage.contains("org/apache/hadoop/hive")) {
logInfo(s"Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
} finally {
if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
!isSqlShell(args.mainClass) && !isThriftServer(args.mainClass)) {
try {
SparkContext.getActive.foreach(_.stop())
} catch {
case e: Throwable => logError(s"Failed to close SparkContext: $e")
}
}
}
}
/** Throw a SparkException with the given error message. */
private def error(msg: String): Unit = throw new SparkException(msg)
}
/**
* This entry point is used by the launcher library to start in-process Spark applications.
*/
private[spark] object InProcessSparkSubmit {
def main(args: Array[String]): Unit = {
val submit = new SparkSubmit()
submit.doSubmit(args)
}
}
object SparkSubmit extends CommandLineUtils with Logging {
// Cluster managers
private val YARN = 1
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
// Deploy modes
private val CLIENT = 1
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val R_PACKAGE_ARCHIVE = "rpkg.zip"
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
// Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
override def main(args: Array[String]): Unit = {
Option(System.getenv("SPARK_PREFER_IPV6"))
.foreach(System.setProperty("java.net.preferIPv6Addresses", _))
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
override protected def logError(msg: => String): Unit = self.logError(msg)
}
}
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
}
}
}
submit.doSubmit(args)
}
/**
* Return whether the given primary resource represents a user jar.
*/
private[deploy] def isUserJar(res: String): Boolean = {
!isShell(res) && !isPython(res) && !isInternal(res) && !isR(res)
}
/**
* Return whether the given primary resource represents a shell.
*/
private[deploy] def isShell(res: String): Boolean = {
(res == SPARK_SHELL || res == PYSPARK_SHELL || res == SPARKR_SHELL)
}
/**
* Return whether the given main class represents a sql shell.
*/
private[deploy] def isSqlShell(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
}
/**
* Return whether the given main class represents a thrift server.
*/
private def isThriftServer(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
}
/**
* Return whether the given primary resource requires running python.
*/
private[deploy] def isPython(res: String): Boolean = {
res != null && res.endsWith(".py") || res == PYSPARK_SHELL
}
/**
* Return whether the given primary resource requires running R.
*/
private[deploy] def isR(res: String): Boolean = {
res != null && (res.endsWith(".R") || res.endsWith(".r")) || res == SPARKR_SHELL
}
private[deploy] def isInternal(res: String): Boolean = {
res == SparkLauncher.NO_RESOURCE
}
}
/** Provides utility functions to be used inside SparkSubmit. */
private[spark] object SparkSubmitUtils extends Logging {
// Exposed for testing
var printStream = SparkSubmit.printStream
// Exposed for testing.
// These components are used to make the default exclusion rules for Spark dependencies.
// We need to specify each component explicitly, otherwise we miss
// spark-streaming utility components. Underscore is there to differentiate between
// spark-streaming_2.1x and spark-streaming-kafka-0-10-assembly_2.1x
val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "kvstore_", "launcher_", "mllib_",
"mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", "sql_", "streaming_",
"tags_", "unsafe_")
/**
* Represents a Maven Coordinate
* @param groupId the groupId of the coordinate
* @param artifactId the artifactId of the coordinate
* @param version the version of the coordinate
*/
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
override def toString: String = s"$groupId:$artifactId:$version"
}
/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
* in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
val splits = p.replace("/", ":").split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
s"be whitespace. The groupId provided is: ${splits(0)}")
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
s"be whitespace. The artifactId provided is: ${splits(1)}")
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
s"be whitespace. The version provided is: ${splits(2)}")
new MavenCoordinate(splits(0), splits(1), splits(2))
}
}
/** Path of the local Maven cache. */
private[spark] def m2Path: File = {
if (Utils.isTesting) {
// test builds delete the maven cache, and this can cause flakiness
new File("dummy", ".m2" + File.separator + "repository")
} else {
new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
}
}
/**
* Extracts maven coordinates from a comma-delimited string
* @param defaultIvyUserDir The default user path for Ivy
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
*/
def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
cr.setName("spark-list")
val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
localM2.setRoot(m2Path.toURI.toString)
localM2.setUsepoms(true)
localM2.setName("local-m2-cache")
cr.add(localM2)
val localIvy = new FileSystemResolver
val localIvyRoot = new File(defaultIvyUserDir, "local")
localIvy.setLocal(true)
localIvy.setRepository(new FileRepository(localIvyRoot))
val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]", "[revision]",
"ivys", "ivy.xml").mkString(File.separator)
localIvy.addIvyPattern(ivyPattern)
val artifactPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", "[module]",
"[revision]", "[type]s", "[artifact](-[classifier]).[ext]").mkString(File.separator)
localIvy.addArtifactPattern(artifactPattern)
localIvy.setName("local-ivy-cache")
cr.add(localIvy)
// the biblio resolver resolves POM declared dependencies
val br: IBiblioResolver = new IBiblioResolver
br.setM2compatible(true)
br.setUsepoms(true)
val defaultInternalRepo : Option[String] = sys.env.get("DEFAULT_ARTIFACT_REPOSITORY")
br.setRoot(defaultInternalRepo.getOrElse("https://repo1.maven.org/maven2/"))
br.setName("central")
cr.add(br)
val sp: IBiblioResolver = new IBiblioResolver
sp.setM2compatible(true)
sp.setUsepoms(true)
sp.setRoot(sys.env.getOrElse(
"DEFAULT_ARTIFACT_REPOSITORY", "https://repos.spark-packages.org/"))
sp.setName("spark-packages")
cr.add(sp)
cr
}
/**
* Output a list of paths for the downloaded jars to be added to the classpath
* (will append to jars in SparkSubmit).
* @param artifacts Sequence of dependencies that were resolved and retrieved
* @param cacheDirectory Directory where jars are cached
* @return List of paths for the dependencies
*/
def resolveDependencyPaths(
artifacts: Array[AnyRef],
cacheDirectory: File): Seq[String] = {
artifacts.map(_.asInstanceOf[Artifact]).filter { artifactInfo =>
if (artifactInfo.getExt == "jar") {
true
} else {
logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
false
}
}.map { artifactInfo =>
val artifact = artifactInfo.getModuleRevisionId
val extraAttrs = artifactInfo.getExtraAttributes
val classifier = if (extraAttrs.containsKey("classifier")) {
"-" + extraAttrs.get("classifier")
} else {
""
}
cacheDirectory.getAbsolutePath + File.separator +
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar"
}
}
/** Adds the given maven coordinates to Ivy's module descriptor. */
def addDependenciesToIvy(
md: DefaultModuleDescriptor,
artifacts: Seq[MavenCoordinate],
ivyConfName: String): Unit = {
artifacts.foreach { mvn =>
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
val dd = new DefaultDependencyDescriptor(ri, false, false)
dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)")
// scalastyle:off println
printStream.println(s"${dd.getDependencyId} added as a dependency")
// scalastyle:on println
md.addDependency(dd)
}
}
/** Add exclusion rules for dependencies already included in the spark-assembly */
def addExclusionRules(
ivySettings: IvySettings,
ivyConfName: String,
md: DefaultModuleDescriptor): Unit = {
// Add scala exclusion rule
md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
IVY_DEFAULT_EXCLUDES.foreach { comp =>
md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,
ivyConfName))
}
}
/**
* Build Ivy Settings using options with default resolvers
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return An IvySettings object
*/
def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): IvySettings = {
val ivySettings: IvySettings = new IvySettings
processIvyPathArg(ivySettings, ivyPath)
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)
processRemoteRepoArg(ivySettings, remoteRepos)
// (since 2.5) Setting the property ivy.maven.lookup.sources to false
// disables the lookup of the sources artifact.
// And setting the property ivy.maven.lookup.javadoc to false
// disables the lookup of the javadoc artifact.
ivySettings.setVariable("ivy.maven.lookup.sources", "false")
ivySettings.setVariable("ivy.maven.lookup.javadoc", "false")
ivySettings
}
/**
* Load Ivy settings from a given filename, using supplied resolvers
* @param settingsFile Path to Ivy settings file
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return An IvySettings object
*/
def loadIvySettings(
settingsFile: String,
remoteRepos: Option[String],
ivyPath: Option[String]): IvySettings = {
val uri = new URI(settingsFile)
val file = Option(uri.getScheme).getOrElse("file") match {
case "file" => new File(uri.getPath)
case scheme => throw new IllegalArgumentException(s"Scheme $scheme not supported in " +
JAR_IVY_SETTING_PATH.key)
}
require(file.exists(), s"Ivy settings file $file does not exist")
require(file.isFile(), s"Ivy settings file $file is not a normal file")
val ivySettings: IvySettings = new IvySettings
try {
ivySettings.load(file)
} catch {
case e @ (_: IOException | _: ParseException) =>
throw new SparkException(s"Failed when loading Ivy settings from $settingsFile", e)
}
processIvyPathArg(ivySettings, ivyPath)
processRemoteRepoArg(ivySettings, remoteRepos)
ivySettings
}
/* Set ivy settings for location of cache, if option is supplied */
private def processIvyPathArg(ivySettings: IvySettings, ivyPath: Option[String]): Unit = {
ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))
}
}
/* Add any optional additional remote repositories */
private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: Option[String]): Unit = {
remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList =>
val cr = new ChainResolver
cr.setName("user-list")
// add current default resolver, if any
Option(ivySettings.getDefaultResolver).foreach(cr.add)
// add additional repositories, last resolution in chain takes precedence
repositoryList.zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
brr.setName(s"repo-${i + 1}")
cr.add(brr)
// scalastyle:off println
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
// scalastyle:on println
}
ivySettings.addResolver(cr)
ivySettings.setDefaultResolver(cr.getName)
}
}
/** A nice function to use in tests as well. Values are dummy strings. */
def getModuleDescriptor: DefaultModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
// Include UUID in module name, so multiple clients resolving maven coordinate at the same time
// do not modify the same resolution file concurrently.
ModuleRevisionId.newInstance("org.apache.spark",
s"spark-submit-parent-${UUID.randomUUID.toString}",
"1.0"))
/**
* Clear ivy resolution from current launch. The resolution file is usually at
* ~/.ivy2/org.apache.spark-spark-submit-parent-$UUID-default.xml,
* ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.xml, and
* ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.properties.
* Since each launch will have its own resolution files created, delete them after
* each resolution to prevent accumulation of these files in the ivy cache dir.
*/
private def clearIvyResolutionFiles(
mdId: ModuleRevisionId,
ivySettings: IvySettings,
ivyConfName: String): Unit = {
val currentResolutionFiles = Seq(
s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml",
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties"
)
currentResolutionFiles.foreach { filename =>
new File(ivySettings.getDefaultCache, filename).delete()
}
}
/**
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
* @param ivySettings An IvySettings containing resolvers to use
* @param transitive Whether resolving transitive dependencies, default is true
* @param exclusions Exclusions to apply when resolving transitive dependencies
* @return Seq of path to the jars of the given maven artifacts including their
* transitive dependencies
*/
def resolveMavenCoordinates(
coordinates: String,
ivySettings: IvySettings,
transitive: Boolean,
exclusions: Seq[String] = Nil,
isTest: Boolean = false): Seq[String] = {
if (coordinates == null || coordinates.trim.isEmpty) {
Nil
} else {
val sysOut = System.out
// Default configuration name for ivy
val ivyConfName = "default"
// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)
try {
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val packagesDirectory: File = new File(ivySettings.getDefaultIvyUserDir, "jars")
// scalastyle:off println
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// scalastyle:on println
val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(transitive)
val retrieveOptions = new RetrieveOptions
// Turn downloading and logging off for testing
if (isTest) {
resolveOptions.setDownload(false)
resolveOptions.setLog(LogOptions.LOG_QUIET)
retrieveOptions.setLog(LogOptions.LOG_QUIET)
} else {
resolveOptions.setDownload(true)
}
// Add exclusion rules for Spark and Scala Library
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)
exclusions.foreach { e =>
md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
}
// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
}
// retrieve all resolved dependencies
retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + File.separator +
"[organization]_[artifact]-[revision](-[classifier]).[ext]")
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
retrieveOptions.setConfs(Array(ivyConfName)))
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
} finally {
System.setOut(sysOut)
clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, ivyConfName)
}
}
}
private[deploy] def createExclusion(
coords: String,
ivySettings: IvySettings,
ivyConfName: String): ExcludeRule = {
val c = extractMavenCoordinates(coords)(0)
val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
rule.addConfiguration(ivyConfName)
rule
}
def parseSparkConfProperty(pair: String): (String, String) = {
pair.split("=", 2).toSeq match {
case Seq(k, v) => (k, v)
case _ => throw new SparkException(s"Spark config without '=': $pair")
}
}
private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[SparkSubmitOperation], loader)
.asScala
.filter(_.supports(master))
serviceLoaders.size match {
case x if x > 1 =>
throw new SparkException(s"Multiple($x) external SparkSubmitOperations " +
s"clients registered for master url ${master}.")
case 1 => serviceLoaders.headOption.get
case _ =>
throw new IllegalArgumentException(s"No external SparkSubmitOperations " +
s"clients found for master url: '$master'")
}
}
}
/**
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
*/
private case class OptionAssigner(
value: String,
clusterManager: Int,
deployMode: Int,
clOption: String = null,
confKey: String = null,
mergeFn: Option[(String, String) => String] = None)
private[spark] trait SparkSubmitOperation {
def kill(submissionId: String, conf: SparkConf): Unit
def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit
def supports(master: String): Boolean
}
相关信息
相关文章
spark ApplicationDescription 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦