spark DependencyUtils 源码

  • 2022-10-20
spark DependencyUtils 代码


package org.apache.spark.util


import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

private[spark] case class IvyProperties(
    packagesExclusions: String,
    packages: String,
    repositories: String,
    ivyRepoPath: String,
    ivySettingsPath: String)

private[spark] object DependencyUtils extends Logging {

  def getIvyProperties(): IvyProperties = {
    val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) = Seq(
    IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath)

  private def isInvalidQueryString(tokens: Array[String]): Boolean = {
    tokens.length != 2 || StringUtils.isBlank(tokens(0)) || StringUtils.isBlank(tokens(1))

   * Parse URI query string's parameter value of `transitive` and `exclude`.
   * Other invalid parameters will be ignored.
   * @param uri Ivy URI need to be downloaded.
   * @return Tuple value of parameter `transitive` and `exclude` value.
   *         1. transitive: whether to download dependency jar of Ivy URI, default value is true
   *            and this parameter value is case-insensitive. This mimics Hive's behaviour for
   *            parsing the transitive parameter. Invalid value will be treat as false.
   *            Example: Input:  exclude=org.mortbay.jetty:jetty&transitive=true
   *            Output:  true
   *         2. exclude: comma separated exclusions to apply when resolving transitive dependencies,
   *            consists of `group:module` pairs separated by commas.
   *            Example: Input:  excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
   *            Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
  private def parseQueryParams(uri: URI): (Boolean, String) = {
    val uriQuery = uri.getQuery
    if (uriQuery == null) {
      (true, "")
    } else {
      val mapTokens = uriQuery.split("&").map(_.split("="))
      if (mapTokens.exists(isInvalidQueryString)) {
        throw new IllegalArgumentException(
          s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery")
      val groupedParams = => (kv(0), kv(1))).groupBy(_._1)

      // Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is true
      val transitiveParams = groupedParams.get("transitive")
      if ( > 1) {
        logWarning("It's best to specify `transitive` parameter in ivy URI query only once." +
          " If there are multiple `transitive` parameter, we will select the last one")
      val transitive =

      // Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
      // in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar
      // in a excluded list.
      val exclusionList = groupedParams.get("exclude").map { params => { excludeString =>
          val excludes = excludeString.split(",")
          if (":")).exists(isInvalidQueryString)) {
            throw new IllegalArgumentException(
              s"Invalid exclude string in Ivy URI ${uri.toString}:" +
                " expected 'org:module,org:module,..', found " + excludeString)

      val validParams = Set("transitive", "exclude")
      val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq
      if (invalidParams.nonEmpty) {
        logWarning(s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " +
          s"in Ivy URI query `$uriQuery`.")

      (transitive, exclusionList)

   * Download Ivy URI's dependency jars.
   * @param uri Ivy URI need to be downloaded. The URI format should be:
   *              `ivy://group:module:version[?query]`
   *            Ivy URI query part format should be:
   *              `parameter=value&parameter=value...`
   *            Note that currently Ivy URI query part support two parameters:
   *             1. transitive: whether to download dependent jars related to your Ivy URI.
   *                transitive=false or `transitive=true`, if not set, the default value is true.
   *             2. exclude: exclusion list when download Ivy URI jar and dependency jars.
   *                The `exclude` parameter content is a ',' separated `group:module` pair string :
   *                `exclude=group:module,group:module...`
   * @return List of jars downloaded.
  def resolveMavenDependencies(uri: URI): Seq[String] = {
    val ivyProperties = DependencyUtils.getIvyProperties()
    val authority = uri.getAuthority
    if (authority == null) {
      throw new IllegalArgumentException(
        s"Invalid Ivy URI authority in uri ${uri.toString}:" +
          " Expected 'org:module:version', found null.")
    if (authority.split(":").length != 3) {
      throw new IllegalArgumentException(
        s"Invalid Ivy URI authority in uri ${uri.toString}:" +
          s" Expected 'org:module:version', found $authority.")

    val (transitive, exclusionList) = parseQueryParams(uri)


  def resolveMavenDependencies(
      packagesTransitive: Boolean,
      packagesExclusions: String,
      packages: String,
      repositories: String,
      ivyRepoPath: String,
      ivySettingsPath: Option[String]): Seq[String] = {
    val exclusions: Seq[String] =
      if (!StringUtils.isBlank(packagesExclusions)) {
      } else {
    // Create the IvySettings, either load from file or build defaults
    val ivySettings = ivySettingsPath match {
      case Some(path) =>
        SparkSubmitUtils.loadIvySettings(path, Option(repositories), Option(ivyRepoPath))

      case None =>
        SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))

    SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings,
      transitive = packagesTransitive, exclusions = exclusions)

  def resolveAndDownloadJars(
      jars: String,
      userJar: String,
      sparkConf: SparkConf,
      hadoopConf: Configuration): String = {
    val targetDir = Utils.createTempDir()
    val userJarName = userJar.split(File.separatorChar).last
      .map {
        resolveGlobPaths(_, hadoopConf)
      .filterNot(_ == "")
      .map(downloadFileList(_, targetDir, sparkConf, hadoopConf))

  def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
    if (jars != null) {
      for (jar <- jars.split(",")) {
        addJarToClasspath(jar, loader)

   * Download a list of remote files to temp local files. If the file is local, the original file
   * will be returned.
   * @param fileList A comma separated file list.
   * @param targetDir A temporary directory for which downloaded files.
   * @param sparkConf Spark configuration.
   * @param hadoopConf Hadoop configuration.
   * @return A comma separated local files list.
  def downloadFileList(
      fileList: String,
      targetDir: File,
      sparkConf: SparkConf,
      hadoopConf: Configuration): String = {
    require(fileList != null, "fileList cannot be null.")
      .map(downloadFile(_, targetDir, sparkConf, hadoopConf))

   * Download a file from the remote to a local temporary directory. If the input path points to
   * a local path, returns it with no operation.
   * @param path A file path from where the files will be downloaded.
   * @param targetDir A temporary directory for which downloaded files.
   * @param sparkConf Spark configuration.
   * @param hadoopConf Hadoop configuration.
   * @return Path to the local file.
  def downloadFile(
      path: String,
      targetDir: File,
      sparkConf: SparkConf,
      hadoopConf: Configuration): String = {
    require(path != null, "path cannot be null.")
    val uri = Utils.resolveURI(path)

    uri.getScheme match {
      case "file" | "local" => path
      case "http" | "https" | "ftp" if Utils.isTesting =>
        // This is only used for SparkSubmitSuite unit test. Instead of downloading file remotely,
        // return a dummy local path instead.
        val file = new File(uri.getPath)
        new File(targetDir, file.getName).toURI.toString
      case _ =>
        val fname = new Path(uri).getName()
        val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, hadoopConf)

  def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = {
    require(paths != null, "paths cannot be null.")
    Utils.stringToSeq(paths).flatMap { path =>
      val (base, fragment) = splitOnFragment(path)
      (resolveGlobPath(base, hadoopConf), fragment) match {
        case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException(
            s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}")
        case (resolved, Some(namedAs)) => + "#" + namedAs)
        case (resolved, _) => resolved

  def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
    val uri = Utils.resolveURI(localJar)
    uri.getScheme match {
      case "file" | "local" =>
        val file = new File(uri.getPath)
        if (file.exists()) {
        } else {
          logWarning(s"Local jar $file does not exist, skipping.")
      case _ =>
        logWarning(s"Skip remote jar $uri.")

   * Merge a sequence of comma-separated file lists, some of which may be null to indicate
   * no files, into a single comma-separated string.
  def mergeFileLists(lists: String*): String = {
    val merged = lists.filterNot(StringUtils.isBlank)
    if (merged.nonEmpty) merged.mkString(",") else null

  private def splitOnFragment(path: String): (URI, Option[String]) = {
    val uri = Utils.resolveURI(path)
    val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)
    (withoutFragment, Option(uri.getFragment))

  private def resolveGlobPath(uri: URI, hadoopConf: Configuration): Array[String] = {
    uri.getScheme match {
      case "local" | "http" | "https" | "ftp" => Array(uri.toString)
      case _ =>
        val fs = FileSystem.get(uri, hadoopConf)
        Option(fs.globStatus(new Path(uri))).map { status =>



