spark PushBasedFetchHelper 源码
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success}
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.MapOutputTracker
import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID
import org.apache.spark.internal.Logging
import{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener}
* Helper class for [[ShuffleBlockFetcherIterator]] that encapsulates all the push-based
* functionality to fetch push-merged block meta and shuffle chunks.
* A push-merged block contains multiple shuffle chunks where each shuffle chunk contains multiple
* shuffle blocks that belong to the common reduce partition and were merged by the
* external shuffle service to that chunk.
private class PushBasedFetchHelper(
private val iterator: ShuffleBlockFetcherIterator,
private val shuffleClient: BlockStoreClient,
private val blockManager: BlockManager,
private val mapOutputTracker: MapOutputTracker) extends Logging {
private[this] val startTimeNs = System.nanoTime()
private[storage] val localShuffleMergerBlockMgrId = BlockManagerId(
blockManager.blockManagerId.port, blockManager.blockManagerId.topologyInfo)
* A map for storing shuffle chunk bitmap.
private[this] val chunksMetaMap = new mutable.HashMap[ShuffleBlockChunkId, RoaringBitmap]()
* Returns true if the address is for a push-merged block.
def isPushMergedShuffleBlockAddress(address: BlockManagerId): Boolean = {
* Returns true if the address is of a remote push-merged block. false otherwise.
def isRemotePushMergedBlockAddress(address: BlockManagerId): Boolean = {
isPushMergedShuffleBlockAddress(address) && !=
* Returns true if the address is of a push-merged-local block. false otherwise.
def isLocalPushMergedBlockAddress(address: BlockManagerId): Boolean = {
isPushMergedShuffleBlockAddress(address) && ==
* This is executed by the task thread when the `` is invoked and the iterator
* processes a response of type [[ShuffleBlockFetcherIterator.SuccessFetchResult]].
* @param blockId shuffle chunk id.
def removeChunk(blockId: ShuffleBlockChunkId): Unit = {
* This is executed by the task thread when the `` is invoked and the iterator
* processes a response of type [[ShuffleBlockFetcherIterator.PushMergedLocalMetaFetchResult]].
* @param blockId shuffle chunk id.
def addChunk(blockId: ShuffleBlockChunkId, chunkMeta: RoaringBitmap): Unit = {
chunksMetaMap(blockId) = chunkMeta
* This is executed by the task thread when the `` is invoked and the iterator
* processes a response of type [[ShuffleBlockFetcherIterator.PushMergedRemoteMetaFetchResult]].
* @param shuffleId shuffle id.
* @param reduceId reduce id.
* @param blockSize size of the push-merged block.
* @param bitmaps chunk bitmaps, where each bitmap contains all the mapIds that were merged
* to that chunk.
* @return shuffle chunks to fetch.
def createChunkBlockInfosFromMetaResponse(
shuffleId: Int,
shuffleMergeId: Int,
reduceId: Int,
blockSize: Long,
bitmaps: Array[RoaringBitmap]): ArrayBuffer[(BlockId, Long, Int)] = {
val approxChunkSize = blockSize / bitmaps.length
val blocksToFetch = new ArrayBuffer[(BlockId, Long, Int)]()
for (i <- bitmaps.indices) {
val blockChunkId = ShuffleBlockChunkId(shuffleId, shuffleMergeId, reduceId, i)
chunksMetaMap.put(blockChunkId, bitmaps(i))
logDebug(s"adding block chunk $blockChunkId of size $approxChunkSize")
blocksToFetch += ((blockChunkId, approxChunkSize, SHUFFLE_PUSH_MAP_ID))
* This is executed by the task thread when the iterator is initialized and only if it has
* push-merged blocks for which it needs to fetch the metadata.
* @param req [[ShuffleBlockFetcherIterator.FetchRequest]] that only contains requests to fetch
* metadata of push-merged blocks.
def sendFetchMergedStatusRequest(req: FetchRequest): Unit = {
val sizeMap = {
case FetchBlockInfo(blockId, size, _) =>
val shuffleBlockId = blockId.asInstanceOf[ShuffleMergedBlockId]
((shuffleBlockId.shuffleId, shuffleBlockId.reduceId), size)
val address = req.address
val mergedBlocksMetaListener = new MergedBlocksMetaListener {
override def onSuccess(shuffleId: Int, shuffleMergeId: Int, reduceId: Int,
meta: MergedBlockMeta): Unit = {
logDebug(s"Received the meta of push-merged block for ($shuffleId, $shuffleMergeId," +
s" $reduceId) from ${}:${req.address.port}")
try {
iterator.addToResultsQueue(PushMergedRemoteMetaFetchResult(shuffleId, shuffleMergeId,
reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), address))
} catch {
case exception: Exception =>
logError(s"Failed to parse the meta of push-merged block for ($shuffleId, " +
s"$shuffleMergeId, $reduceId) from" +
s" ${}:${req.address.port}", exception)
PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, reduceId,
override def onFailure(shuffleId: Int, shuffleMergeId: Int, reduceId: Int,
exception: Throwable): Unit = {
logError(s"Failed to get the meta of push-merged block for ($shuffleId, $reduceId) " +
s"from ${}:${req.address.port}", exception)
PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, reduceId, address))
req.blocks.foreach { block =>
val shuffleBlockId = block.blockId.asInstanceOf[ShuffleMergedBlockId]
shuffleClient.getMergedBlockMeta(, address.port, shuffleBlockId.shuffleId,
shuffleBlockId.shuffleMergeId, shuffleBlockId.reduceId, mergedBlocksMetaListener)
* This is executed by the task thread when the iterator is initialized. It fetches all the
* outstanding push-merged local blocks.
* @param pushMergedLocalBlocks set of identified merged local blocks and their sizes.
def fetchAllPushMergedLocalBlocks(
pushMergedLocalBlocks: mutable.LinkedHashSet[BlockId]): Unit = {
if (pushMergedLocalBlocks.nonEmpty) {
blockManager.hostLocalDirManager.foreach(fetchPushMergedLocalBlocks(_, pushMergedLocalBlocks))
* Fetch the push-merged blocks dirs if they are not in the cache and eventually fetch push-merged
* local blocks.
private def fetchPushMergedLocalBlocks(
hostLocalDirManager: HostLocalDirManager,
pushMergedLocalBlocks: mutable.LinkedHashSet[BlockId]): Unit = {
val cachedPushedMergedDirs = hostLocalDirManager.getCachedHostLocalDirsFor(
if (cachedPushedMergedDirs.isDefined) {
logDebug(s"Fetch the push-merged-local blocks with cached merged dirs: " +
s"${cachedPushedMergedDirs.get.mkString(", ")}")
pushMergedLocalBlocks.foreach { blockId =>
fetchPushMergedLocalBlock(blockId, cachedPushedMergedDirs.get,
} else {
// Push-based shuffle is only enabled when the external shuffle service is enabled. If the
// external shuffle service is not enabled, then there will not be any push-merged blocks
// for the iterator to fetch.
logDebug(s"Asynchronous fetch the push-merged-local blocks without cached merged " +
s"dirs from the external shuffle service")
blockManager.externalShuffleServicePort, Array(SHUFFLE_MERGER_IDENTIFIER)) {
case Success(dirs) =>
logDebug(s"Fetched merged dirs in " +
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
pushMergedLocalBlocks.foreach {
blockId =>
logDebug(s"Successfully fetched local dirs: " +
s"${dirs.get(SHUFFLE_MERGER_IDENTIFIER).mkString(", ")}")
fetchPushMergedLocalBlock(blockId, dirs(SHUFFLE_MERGER_IDENTIFIER),
case Failure(throwable) =>
// If we see an exception with getting the local dirs for push-merged-local blocks,
// we fallback to fetch the original blocks. We do not report block fetch failure.
logWarning(s"Error while fetching the merged dirs for push-merged-local " +
s"blocks: ${pushMergedLocalBlocks.mkString(", ")}. Fetch the original blocks instead",
pushMergedLocalBlocks.foreach {
blockId =>
blockId, localShuffleMergerBlockMgrId, 0, isNetworkReqDone = false))
* Fetch a single push-merged-local block generated. This can also be executed by the task thread
* as well as the netty thread.
* @param blockId ShuffleBlockId to be fetched
* @param localDirs Local directories where the push-merged shuffle files are stored
* @param blockManagerId BlockManagerId
private[this] def fetchPushMergedLocalBlock(
blockId: BlockId,
localDirs: Array[String],
blockManagerId: BlockManagerId): Unit = {
try {
val shuffleBlockId = blockId.asInstanceOf[ShuffleMergedBlockId]
val chunksMeta = blockManager.getLocalMergedBlockMeta(shuffleBlockId, localDirs)
shuffleBlockId.shuffleId, shuffleBlockId.shuffleMergeId,
shuffleBlockId.reduceId, chunksMeta.readChunkBitmaps(), localDirs))
} catch {
case e: Exception =>
// If we see an exception with reading a push-merged-local meta, we fallback to
// fetch the original blocks. We do not report block fetch failure
// and will continue with the remaining local block read.
logWarning(s"Error occurred while fetching push-merged-local meta, " +
s"prepare to fetch the original blocks", e)
FallbackOnPushMergedFailureResult(blockId, blockManagerId, 0, isNetworkReqDone = false))
* This is executed by the task thread when the `` is invoked and the iterator
* processes a response of type:
* 1) [[ShuffleBlockFetcherIterator.SuccessFetchResult]]
* 2) [[ShuffleBlockFetcherIterator.FallbackOnPushMergedFailureResult]]
* 3) [[ShuffleBlockFetcherIterator.PushMergedRemoteMetaFailedFetchResult]]
* This initiates fetching fallback blocks for a push-merged block or a shuffle chunk that
* failed to fetch.
* It makes a call to the map output tracker to get the list of original blocks for the
* given push-merged block/shuffle chunk, split them into remote and local blocks, and process
* them accordingly.
* It also updates the numberOfBlocksToFetch in the iterator as it processes failed response and
* finds more push-merged requests to remote and again updates it with additional requests for
* original blocks.
* The fallback happens when:
* 1. There is an exception while creating shuffle chunks from push-merged-local shuffle block.
* See fetchLocalBlock.
* 2. There is a failure when fetching remote shuffle chunks.
* 3. There is a failure when processing SuccessFetchResult which is for a shuffle chunk
* (local or remote).
def initiateFallbackFetchForPushMergedBlock(
blockId: BlockId,
address: BlockManagerId): Unit = {
assert(blockId.isInstanceOf[ShuffleMergedBlockId] || blockId.isInstanceOf[ShuffleBlockChunkId])
logWarning(s"Falling back to fetch the original blocks for push-merged block $blockId")
// Increase the blocks processed since we will process another block in the next iteration of
// the while loop in
val fallbackBlocksByAddr: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] =
blockId match {
case shuffleBlockId: ShuffleMergedBlockId =>
shuffleBlockId.shuffleId, shuffleBlockId.reduceId)
case _ =>
val shuffleChunkId = blockId.asInstanceOf[ShuffleBlockChunkId]
val chunkBitmap: RoaringBitmap = chunksMetaMap.remove(shuffleChunkId).get
var blocksProcessed = 1
// When there is a failure to fetch a remote shuffle chunk, then we try to
// fallback not only for that particular remote shuffle chunk but also for all the
// pending chunks that belong to the same host. The reason for doing so is that it
// is very likely that the subsequent requests for shuffle chunks from this host will
// fail as well. Since, push-based shuffle is best effort and we try not to increase the
// delay of the fetches, we immediately fallback for all the pending shuffle chunks in the
// fetchRequests queue.
if (isRemotePushMergedBlockAddress(address)) {
// Fallback for all the pending fetch requests
val pendingShuffleChunks = iterator.removePendingChunks(shuffleChunkId, address)
pendingShuffleChunks.foreach { pendingBlockId =>
logInfo(s"Falling back immediately for shuffle chunk $pendingBlockId")
val bitmapOfPendingChunk: RoaringBitmap = chunksMetaMap.remove(pendingBlockId).get
// These blocks were added to numBlocksToFetch so we increment numBlocksProcessed
blocksProcessed += pendingShuffleChunks.size
shuffleChunkId.shuffleId, shuffleChunkId.reduceId, chunkBitmap)
