spark IndexShuffleBlockResolver 源码
spark IndexShuffleBlockResolver 代码
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.spark.shuffle
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.{config, Logging}
import{FileSegmentManagedBuffer, ManagedBuffer}
import{ExecutorDiskUtils, MergedBlockMeta}
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.util.Utils
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
* The offsets of the data blocks in the data file are stored in a separate index file.
* We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
* as the filename postfix for data file, and ".index" as the filename postfix for index file.
// Note: Changes to the format in this file should be kept in sync with
private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
// var for testing
var _blockManager: BlockManager = null)
extends ShuffleBlockResolver
with Logging with MigratableResolver {
private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)
private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
private val remoteShuffleMaxDisk: Option[Long] =
def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None)
* Get the shuffle files that are stored locally. Used for block migrations.
override def getStoredShuffles(): Seq[ShuffleBlockInfo] = {
val allBlocks = blockManager.diskBlockManager.getAllBlocks()
allBlocks.flatMap {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
Some(ShuffleBlockInfo(shuffleId, mapId))
case _ =>
private def getShuffleBytesStored(): Long = {
val shuffleFiles: Seq[File] = getStoredShuffles().map {
si => getDataFile(si.shuffleId, si.mapId)
/** Create a temporary file that will be renamed to the final resulting file */
def createTempFile(file: File): File = {
* Get the shuffle data file.
* When the dirs parameter is None then use the disk manager's local directories. Otherwise,
* read from the specified directories.
def getDataFile(shuffleId: Int, mapId: Long, dirs: Option[Array[String]]): File = {
val blockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
.map(d =>
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir,
* Get the shuffle index file.
* When the dirs parameter is None then use the disk manager's local directories. Otherwise,
* read from the specified directories.
def getIndexFile(
shuffleId: Int,
mapId: Long,
dirs: Option[Array[String]] = None): File = {
val blockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
.map(d =>
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir,
private def getMergedBlockDataFile(
appId: String,
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int,
dirs: Option[Array[String]] = None): File = {
ShuffleMergedDataBlockId(appId, shuffleId, shuffleMergeId, reduceId), dirs)
private def getMergedBlockIndexFile(
appId: String,
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int,
dirs: Option[Array[String]] = None): File = {
ShuffleMergedIndexBlockId(appId, shuffleId, shuffleMergeId, reduceId), dirs)
private def getMergedBlockMetaFile(
appId: String,
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int,
dirs: Option[Array[String]] = None): File = {
ShuffleMergedMetaBlockId(appId, shuffleId, shuffleMergeId, reduceId), dirs)
* Remove data file and index file that contain the output data from one map.
def removeDataByMap(shuffleId: Int, mapId: Long): Unit = {
var file = getDataFile(shuffleId, mapId)
if (file.exists() && !file.delete()) {
logWarning(s"Error deleting data ${file.getPath()}")
file = getIndexFile(shuffleId, mapId)
if (file.exists() && !file.delete()) {
logWarning(s"Error deleting index ${file.getPath()}")
file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
if (file.exists() && !file.delete()) {
logWarning(s"Error deleting checksum ${file.getPath()}")
* Check whether the given index and data files match each other.
* If so, return the partition lengths in the data file. Otherwise return null.
private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
// the index file should have `block + 1` longs as offset.
if (index.length() != (blocks + 1) * 8L) {
return null
val lengths = new Array[Long](blocks)
// Read the lengths of blocks
val in = try {
new DataInputStream(new NioBufferedFileInputStream(index))
} catch {
case e: IOException =>
return null
try {
// Convert the offsets into lengths of each block
var offset = in.readLong()
if (offset != 0L) {
return null
var i = 0
while (i < blocks) {
val off = in.readLong()
lengths(i) = off - offset
offset = off
i += 1
} catch {
case e: IOException =>
return null
} finally {
// the size of data file should match with index file
if (data.length() == lengths.sum) {
} else {
* Write a provided shuffle block as a stream. Used for block migrations.
* ShuffleBlockBatchIds must contain the full range represented in the ShuffleIndexBlock.
* Requires the caller to delete any shuffle index blocks where the shuffle block fails to
* put.
override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager):
StreamCallbackWithID = {
// Throw an exception if we have exceeded maximum shuffle files stored
remoteShuffleMaxDisk.foreach { maxBytes =>
val bytesUsed = getShuffleBytesStored()
if (maxBytes < bytesUsed) {
throw new SparkException(s"Not storing remote shuffles $bytesUsed exceeds $maxBytes")
val file = blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
getIndexFile(shuffleId, mapId)
case ShuffleDataBlockId(shuffleId, mapId, _) =>
getDataFile(shuffleId, mapId)
case _ =>
throw new IllegalStateException(s"Unexpected shuffle block transfer ${blockId} as " +
val fileTmp = createTempFile(file)
val channel = Channels.newChannel(
new FileOutputStream(fileTmp)))
new StreamCallbackWithID {
override def getID: String =
override def onData(streamId: String, buf: ByteBuffer): Unit = {
while (buf.hasRemaining) {
override def onComplete(streamId: String): Unit = {
logTrace(s"Done receiving shuffle block $blockId, now storing on local disk.")
val diskSize = fileTmp.length()
this.synchronized {
if (file.exists()) {
if (!fileTmp.renameTo(file)) {
throw new IOException(s"fail to rename file ${fileTmp} to ${file}")
blockManager.reportBlockStatus(blockId, BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
override def onFailure(streamId: String, cause: Throwable): Unit = {
// the framework handles the connection itself, we just need to do local cleanup
logWarning(s"Error while uploading $blockId", cause)
* Get the index & data block for migration.
def getMigrationBlocks(shuffleBlockInfo: ShuffleBlockInfo): List[(BlockId, ManagedBuffer)] = {
try {
val shuffleId = shuffleBlockInfo.shuffleId
val mapId = shuffleBlockInfo.mapId
// Load the index block
val indexFile = getIndexFile(shuffleId, mapId)
val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
val indexFileSize = indexFile.length()
val indexBlockData = new FileSegmentManagedBuffer(
transportConf, indexFile, 0, indexFileSize)
// Load the data block
val dataFile = getDataFile(shuffleId, mapId)
val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
val dataBlockData = new FileSegmentManagedBuffer(
transportConf, dataFile, 0, dataFile.length())
// Make sure the index exist.
if (!indexFile.exists()) {
throw new FileNotFoundException("Index file is deleted already.")
if (dataFile.exists()) {
List((dataBlockId, dataBlockData), (indexBlockId, indexBlockData))
} else {
List((indexBlockId, indexBlockData))
} catch {
case _: Exception => // If we can't load the blocks ignore them.
logWarning(s"Failed to resolve shuffle block ${shuffleBlockInfo}. " +
"This is expected to occur if a block is removed after decommissioning has started.")
List.empty[(BlockId, ManagedBuffer)]
* Commit the data and metadata files as an atomic operation, use the existing ones, or
* replace them with new ones. Note that the metadata parameters (`lengths`, `checksums`)
* will be updated to match the existing ones if use the existing ones.
* There're two kinds of metadata files:
* - index file
* An index file contains the offsets of each block, plus a final offset at the end
* for the end of the output file. It will be used by [[getBlockData]] to figure out
* where each block begins and ends.
* - checksum file (optional)
* An checksum file contains the checksum of each block. It will be used to diagnose
* the cause when a block is corrupted. Note that empty `checksums` indicate that
* checksum is disabled.
def writeMetadataFileAndCommit(
shuffleId: Int,
mapId: Long,
lengths: Array[Long],
checksums: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = createTempFile(indexFile)
val checksumEnabled = checksums.nonEmpty
val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) {
assert(lengths.length == checksums.length,
"The size of partition lengths and checksums should be equal")
val checksumFile =
getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
(Some(checksumFile), Some(createTempFile(checksumFile)))
} else {
(None, None)
try {
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
this.synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (checksumEnabled) {
val existingChecksums = getChecksums(checksumFileOpt.get, checksums.length)
if (existingChecksums != null) {
System.arraycopy(existingChecksums, 0, checksums, 0, lengths.length)
} else {
// It's possible that the previous task attempt succeeded writing the
// index file and data file but failed to write the checksum file. In
// this case, the current task attempt could write the missing checksum
// file by itself.
writeMetadataFile(checksums, checksumTmpOpt.get, checksumFileOpt.get, false)
if (dataTmp != null && dataTmp.exists()) {
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
val offsets = lengths.scanLeft(0L)(_ + _)
writeMetadataFile(offsets, indexTmp, indexFile, true)
if (dataFile.exists()) {
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
// write the checksum file { case (checksumTmp, checksumFile) =>
try {
writeMetadataFile(checksums, checksumTmp, checksumFile, false)
} catch {
case e: Exception =>
// It's not worthwhile to fail here after index file and data file are
// already successfully stored since checksum is only a best-effort for
// the corner error case.
logError("Failed to write checksum file", e)
} finally {
logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}")
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
checksumTmpOpt.foreach { checksumTmp =>
if (checksumTmp.exists()) {
try {
if (!checksumTmp.delete()) {
logError(s"Failed to delete temporary checksum file " +
s"at ${checksumTmp.getAbsolutePath}")
} catch {
case e: Exception =>
// Unlike index deletion, we won't propagate the error for the checksum file since
// checksum is only a best-effort.
logError(s"Failed to delete temporary checksum file " +
s"at ${checksumTmp.getAbsolutePath}", e)
* Write the metadata file (index or checksum). Metadata values will be firstly write into
* the tmp file and the tmp file will be renamed to the target file at the end to avoid dirty
* writes.
* @param metaValues The metadata values
* @param tmpFile The temp file
* @param targetFile The target file
* @param propagateError Whether to propagate the error for file operation. Unlike index file,
* checksum is only a best-effort so we won't fail the whole task due to
* the error from checksum.
private def writeMetadataFile(
metaValues: Array[Long],
tmpFile: File,
targetFile: File,
propagateError: Boolean): Unit = {
val out = new DataOutputStream(
new BufferedOutputStream(
new FileOutputStream(tmpFile)
Utils.tryWithSafeFinally {
} {
if (targetFile.exists()) {
if (!tmpFile.renameTo(targetFile)) {
val errorMsg = s"fail to rename file $tmpFile to $targetFile"
if (propagateError) {
throw new IOException(errorMsg)
} else {
* This is only used for reading local merged block data. In such cases, all chunks in the
* merged shuffle file need to be identified at once, so the ShuffleBlockFetcherIterator
* knows how to consume local merged shuffle file as multiple chunks.
override def getMergedBlockData(
blockId: ShuffleMergedBlockId,
dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
val indexFile =
getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.shuffleMergeId,
blockId.reduceId, dirs)
val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId,
blockId.shuffleMergeId, blockId.reduceId, dirs)
// Load all the indexes in order to identify all chunks in the specified merged shuffle file.
val size = indexFile.length.toInt
val offsets = Utils.tryWithResource {
new DataInputStream(Files.newInputStream(indexFile.toPath))
} { dis =>
val buffer = ByteBuffer.allocate(size)
// Number of chunks is number of indexes - 1
val numChunks = size / 8 - 1
for (index <- 0 until numChunks) yield {
new FileSegmentManagedBuffer(transportConf, dataFile,
offsets.get(index + 1) - offsets.get(index))
* This is only used for reading local merged block meta data.
override def getMergedBlockMeta(
blockId: ShuffleMergedBlockId,
dirs: Option[Array[String]]): MergedBlockMeta = {
val indexFile =
getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId,
blockId.shuffleMergeId, blockId.reduceId, dirs)
val size = indexFile.length.toInt
val numChunks = (size / 8) - 1
val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId,
blockId.shuffleMergeId, blockId.reduceId, dirs)
val chunkBitMaps = new FileSegmentManagedBuffer(transportConf, metaFile, 0L, metaFile.length)
new MergedBlockMeta(numChunks, chunkBitMaps)
private[shuffle] def getChecksums(checksumFile: File, blockNum: Int): Array[Long] = {
if (!checksumFile.exists()) return null
val checksums = new ArrayBuffer[Long]
// Read the checksums of blocks
var in: DataInputStream = null
try {
in = new DataInputStream(new NioBufferedFileInputStream(checksumFile))
while (checksums.size < blockNum) {
checksums += in.readLong()
} catch {
case _: IOException | _: EOFException =>
return null
} finally {
* Get the shuffle checksum file.
* When the dirs parameter is None then use the disk manager's local directories. Otherwise,
* read from the specified directories.
def getChecksumFile(
shuffleId: Int,
mapId: Long,
algorithm: String,
dirs: Option[Array[String]] = None): File = {
val blockId = ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
val fileName = ShuffleChecksumHelper.getChecksumFileName(, algorithm)
.map(d =>
new File(ExecutorDiskUtils.getFilePath(d, blockManager.subDirsPerLocalDir, fileName)))
override def getBlockData(
blockId: BlockId,
dirs: Option[Array[String]]): ManagedBuffer = {
val (shuffleId, mapId, startReduceId, endReduceId) = blockId match {
case id: ShuffleBlockId =>
(id.shuffleId, id.mapId, id.reduceId, id.reduceId + 1)
case batchId: ShuffleBlockBatchId =>
(batchId.shuffleId, batchId.mapId, batchId.startReduceId, batchId.endReduceId)
case _ =>
throw new IllegalArgumentException("unexpected shuffle block id format: " + blockId)
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(shuffleId, mapId, dirs)
// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code
// which is incorrectly using our file descriptor then this code will fetch the wrong offsets
// (which may cause a reducer to be sent a different reducer's data). The explicit position
// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this
// class of issue from re-occurring in the future which is why they are left here even though
// SPARK-22982 is fixed.
val channel = Files.newByteChannel(indexFile.toPath)
channel.position(startReduceId * 8L)
val in = new DataInputStream(Channels.newInputStream(channel))
try {
val startOffset = in.readLong()
channel.position(endReduceId * 8L)
val endOffset = in.readLong()
val actualPosition = channel.position()
val expectedPosition = endReduceId * 8L + 8
if (actualPosition != expectedPosition) {
throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +
s"expected $expectedPosition but actual position was $actualPosition.")
new FileSegmentManagedBuffer(
getDataFile(shuffleId, mapId, dirs),
endOffset - startOffset)
} finally {
override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID),
ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
override def stop(): Unit = {}
private[spark] object IndexShuffleBlockResolver {
// No-op reduce ID used in interactions with disk store.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
- 所属分类: 前端技术
- 本文标签:
2、 - 优质文章
8、 golang
9、 openharmony
10、 Vue中input框自动聚焦