spark RUtils 源码

  • 2022-10-20
  • 浏览 (446)

spark RUtils 代码

文件路径:/core/src/main/scala/org/apache/spark/api/r/RUtils.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.api.r

import java.io.File
import java.util.Arrays

import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.config._

private[spark] object RUtils {
  // Local path where R binary packages built from R source code contained in the spark
  // packages specified with "--packages" or "--jars" command line option reside.
  var rPackages: Option[String] = None

  /**
   * Get the SparkR package path in the local spark distribution.
   */
  def localSparkRPackagePath: Option[String] = {
    val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.test.home"))
    sparkHome.map(
      Seq(_, "R", "lib").mkString(File.separator)
    )
  }

  /**
   * Check if SparkR is installed before running tests that use SparkR.
   */
  def isSparkRInstalled: Boolean = {
    localSparkRPackagePath.exists { pkgDir =>
      new File(Seq(pkgDir, "SparkR").mkString(File.separator)).exists
    }
  }

  /**
   * Get the list of paths for R packages in various deployment modes, of which the first
   * path is for the SparkR package itself. The second path is for R packages built as
   * part of Spark Packages, if any exist. Spark Packages can be provided through the
   *  "--packages" or "--jars" command line options.
   *
   * This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
   * and environment variable `SPARK_HOME` are set.
   */
  def sparkRPackagePath(isDriver: Boolean): Seq[String] = {
    val (master, deployMode) =
      if (isDriver) {
        (sys.props("spark.master"), sys.props(SUBMIT_DEPLOY_MODE.key))
      } else {
        val sparkConf = SparkEnv.get.conf
        (sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE))
      }

    val isYarnCluster = master != null && master.contains("yarn") && deployMode == "cluster"
    val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"

    // In YARN mode, the SparkR package is distributed as an archive symbolically
    // linked to the "sparkr" file in the current directory and additional R packages
    // are distributed as an archive symbolically linked to the "rpkg" file in the
    // current directory.
    //
    // Note that this does not apply to the driver in client mode because it is run
    // outside of the cluster.
    if (isYarnCluster || (isYarnClient && !isDriver)) {
      val sparkRPkgPath = new File("sparkr").getAbsolutePath
      val rPkgPath = new File("rpkg")
      if (rPkgPath.exists()) {
        Seq(sparkRPkgPath, rPkgPath.getAbsolutePath)
      } else {
        Seq(sparkRPkgPath)
      }
    } else {
      // Otherwise, assume the package is local
      val sparkRPkgPath = localSparkRPackagePath.getOrElse {
          throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
      }
      if (!rPackages.isEmpty) {
        Seq(sparkRPkgPath, rPackages.get)
      } else {
        Seq(sparkRPkgPath)
      }
    }
  }

  /** Check if R is installed before running tests that use R commands. */
  def isRInstalled: Boolean = {
    try {
      val builder = new ProcessBuilder(Arrays.asList("R", "--version"))
      builder.start().waitFor() == 0
    } catch {
      case e: Exception => false
    }
  }

  def isEncryptionEnabled(sc: JavaSparkContext): Boolean = {
    sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED)
  }
}

相关信息

spark 源码目录

相关文章

spark BaseRRunner 源码

spark JVMObjectTracker 源码

spark RAuthHelper 源码

spark RBackend 源码

spark RBackendAuthHandler 源码

spark RBackendHandler 源码

spark RRDD 源码

spark RRunner 源码

spark SerDe 源码

0  赞