spark HadoopRDD 源码
spark HadoopRDD 代码
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.spark.rdd
import{FileNotFoundException, IOException}
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import scala.collection.immutable.Map
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils}
* A Spark split class that wraps around a Hadoop InputSplit.
private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = 31 * (31 + rddId) + index
override def equals(other: Any): Boolean = super.equals(other)
* Get any environment variables that should be added to the users environment when running pipes
* @return a Map with the environment variables and corresponding values, it could be empty
def getPipeEnvVars(): Map[String, String] = {
val envVars: Map[String, String] = inputSplit.value match {
case is: FileSplit =>
// map_input_file is deprecated in favor of mapreduce_map_input_file but set both
// since it's not removed yet
Map("map_input_file" -> is.getPath().toString(),
"mapreduce_map_input_file" -> is.getPath().toString())
case _ =>
* :: DeveloperApi ::
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
* sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variable references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each executor using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
* creates.
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate.
* @note Instantiating this class directly is not recommended, please use
* `org.apache.spark.SparkContext.hadoopRDD()`
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
if (initLocalJobConfFuncOpt.isDefined) {
def this(
sc: SparkContext,
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int) = {
sc.broadcast(new SerializableConfiguration(conf))
initLocalJobConfFuncOpt = None,
protected val jobConfCacheKey: String = "rdd_%d_job_conf".format(id)
protected val inputFormatCacheKey: String = "rdd_%d_input_format".format(id)
// used to build JobTracker ID
private val createTime = new Date()
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
// Returns a JobConf that will be used on executors to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
if (shouldCloneJobConf) {
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
// one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
// somewhat rarely because most jobs treat the configuration as though it's immutable. One
// solution, implemented here, is to clone the Configuration object. Unfortunately, this
// clone can be very expensive. To avoid unexpected performance regressions for workloads and
// Hadoop versions that do not suffer from these thread-safety issues, this cloning is
// disabled by default.
logDebug("Cloning Hadoop Configuration")
val newJobConf = new JobConf(conf)
if (!conf.isInstanceOf[JobConf]) {
initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
} else {
conf match {
case jobConf: JobConf =>
logDebug("Re-using user-broadcasted JobConf")
case _ =>
.map { conf =>
logDebug("Re-using cached JobConf")
.getOrElse {
// Create a JobConf that will be cached and used across this RDD's getJobConf()
// calls in the local process. The local cache is accessed through
// HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary
// objects. Synchronize to prevent ConcurrentModificationException (SPARK-1097,
// HADOOP-10456).
logDebug("Creating new JobConf and caching it for later re-use")
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
.asInstanceOf[InputFormat[K, V]]
newInputFormat match {
case c: Configurable => c.setConf(conf)
case _ =>
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
try {
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) {
val fileSplit = inputSplits(0).asInstanceOf[FileSplit]
val path = fileSplit.getPath
if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
val codecFactory = new CompressionCodecFactory(jobConf)
if (Utils.isFileSplittable(path, codecFactory)) {
logWarning(s"Loading one large file ${path.toString} with only one partition, " +
s"we can increase partition numbers for improving performance.")
} else {
logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
s"partition, because the file is compressed by unsplittable compression codec.")
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
} catch {
case e: InvalidInputException if ignoreMissingFiles =>
logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
s" partitions returned from this path.", e)
case e: IOException if e.getMessage.startsWith("Not a file:") =>
val path = e.getMessage.split(":").map(_.trim).apply(2)
throw SparkCoreErrors.pathNotSupportedError(path)
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
private val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
private val jobConf = getJobConf()
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead
// Sets InputFileBlockHolder for the file block's information
split.inputSplit.value match {
case fs: FileSplit =>
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
case _ =>
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
case _ => None
// We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader =
try {
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener[Unit] { context =>
// Update the bytes read before closing is to make sure lingering bytesRead statistics in
// this thread get correctly added.
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
override def getNext(): (K, V) = {
try {
finished = !, value)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${split.inputSplit}", e)
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
finished = true
if (!finished) {
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
(key, value)
override def close(): Unit = {
if (reader != null) {
try {
} catch {
case e: Exception =>
if (!ShutdownHookManager.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
} finally {
reader = null
if (getBytesReadCallback.isDefined) {
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
} catch {
case e: =>
logWarning("Unable to get input size to set InputMetrics for task", e)
new InterruptibleIterator[(K, V)](context, iter)
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
def mapPartitionsWithInputSplit[U: ClassTag](
f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs = hsplit match {
case lsplit: InputSplitWithLocationInfo =>
case _ => None
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
override def checkpoint(): Unit = {
// Do nothing. Hadoop RDD should not be checkpointed.
override def persist(storageLevel: StorageLevel): this.type = {
if (storageLevel.deserialized) {
logWarning("Caching HadoopRDDs as deserialized objects usually leads to undesired" +
" behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
" Use a map transformation to make copies of the records.")
def getConf: Configuration = getJobConf()
private[spark] object HadoopRDD extends Logging {
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
/** Update the input bytes read metric each time this number of records has been read */
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
def getCachedMetadata(key: String): AnyRef = SparkEnv.get.hadoopJobMetadata.get(key)
private def putCachedMetadata(key: String, value: AnyRef): Unit =
SparkEnv.get.hadoopJobMetadata.put(key, value)
/** Add Hadoop configuration specific to a single partition and attempt. */
def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int,
conf: JobConf): Unit = {
val jobID = new JobID(jobTrackerId, jobId)
val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId)
conf.set("", taId.getTaskID.toString)
conf.set("", taId.toString)
conf.setBoolean("mapreduce.task.ismap", true)
conf.setInt("mapreduce.task.partition", splitId)
conf.set("", jobID.toString)
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
* the given function rather than the index of the partition.
private[spark] class HadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (InputSplit, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] = {
val partition = split.asInstanceOf[HadoopPartition]
val inputSplit = partition.inputSplit.value
f(inputSplit, firstParent[T].iterator(split, context))
private[spark] def convertSplitLocationInfo(
infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
Option(infos).map(_.flatMap { loc =>
val locationStr = loc.getLocation
if (locationStr != null && locationStr != "localhost") {
if (loc.isInMemory) {
logDebug(s"Partition $locationStr is cached by Hadoop.")
} else {
} else {
