spark BlockManagerMaster 源码
spark BlockManagerMaster 代码
文件路径:/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.storage
import scala.collection.generic.CanBuildFrom
import scala.collection.immutable.Iterable
import scala.concurrent.Future
import org.apache.spark.SparkConf
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{RpcUtils, ThreadUtils}
private[spark]
class BlockManagerMaster(
var driverEndpoint: RpcEndpointRef,
var driverHeartbeatEndPoint: RpcEndpointRef,
conf: SparkConf,
isDriver: Boolean)
extends Logging {
val timeout = RpcUtils.askRpcTimeout(conf)
/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String): Unit = {
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}
/** Decommission block managers corresponding to given set of executors
* Non-blocking.
*/
def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds))
}
/** Get Replication Info for all the RDD blocks stored in given blockManagerId */
def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId))
}
/** Request removal of a dead executor from the driver endpoint.
* This is only called on the driver side. Non-blocking
*/
def removeExecutorAsync(execId: String): Unit = {
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
logInfo("Removal of executor " + execId + " requested")
}
/**
* Register the BlockManager's id with the driver. The input BlockManagerId does not contain
* topology information. This information is obtained from the master and we respond with an
* updated BlockManagerId fleshed out with this information.
*/
def registerBlockManager(
id: BlockManagerId,
localDirs: Array[String],
maxOnHeapMemSize: Long,
maxOffHeapMemSize: Long,
storageEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $id")
val updatedId = driverEndpoint.askSync[BlockManagerId](
RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, storageEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Boolean = {
val res = driverEndpoint.askSync[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logDebug(s"Updated info of block $blockId")
res
}
/** Get locations of the blockId from the driver */
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
}
/** Get locations as well as status of the blockId from the driver */
def getLocationsAndStatus(
blockId: BlockId,
requesterHost: String): Option[BlockLocationsAndStatus] = {
driverEndpoint.askSync[Option[BlockLocationsAndStatus]](
GetLocationsAndStatus(blockId, requesterHost))
}
/** Get locations of multiple blockIds from the driver */
def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
GetLocationsMultipleBlockIds(blockIds))
}
/**
* Check if block manager master has a block. Note that this can be used to check for only
* those blocks that are reported to block manager master.
*/
def contains(blockId: BlockId): Boolean = {
!getLocations(blockId).isEmpty
}
/** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
}
/**
* Get a list of unique shuffle service locations where an executor is successfully
* registered in the past for block push/merge with push based shuffle.
*/
def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
}
/**
* Remove the host from the candidate list of shuffle push mergers. This can be
* triggered if there is a FetchFailedException on the host
* @param host
*/
def removeShufflePushMergerLocation(host: String): Unit = {
driverEndpoint.askSync[Unit](RemoveShufflePushMergerLocation(host))
}
def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
}
/**
* Remove a block from the storage endpoints that have it. This can only be used to remove
* blocks that the driver knows about.
*/
def removeBlock(blockId: BlockId): Unit = {
driverEndpoint.askSync[Boolean](RemoveBlock(blockId))
}
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean): Unit = {
val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
future.failed.foreach(e =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}
/** Remove all blocks belonging to the given shuffle. */
def removeShuffle(shuffleId: Int, blocking: Boolean): Unit = {
val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.failed.foreach(e =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}
/** Remove all blocks belonging to the given broadcast. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit = {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.failed.foreach(e =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}
/**
* Return the memory status for each block manager, in the form of a map from
* the block manager's id to two long values. The first value is the maximum
* amount of memory allocated for the block manager, while the second is the
* amount of remaining memory.
*/
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
if (driverEndpoint == null) return Map.empty
driverEndpoint.askSync[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
def getStorageStatus: Array[StorageStatus] = {
if (driverEndpoint == null) return Array.empty
driverEndpoint.askSync[Array[StorageStatus]](GetStorageStatus)
}
/**
* Return the block's status on all block managers, if any. NOTE: This is a
* potentially expensive operation and should only be used for testing.
*
* If askStorageEndpoints is true, this invokes the master to query each block manager for the
* most updated block statuses. This is useful when the master is not informed of the given block
* by all block managers.
*/
def getBlockStatus(
blockId: BlockId,
askStorageEndpoints: Boolean = true): Map[BlockManagerId, BlockStatus] = {
val msg = GetBlockStatus(blockId, askStorageEndpoints)
/*
* To avoid potential deadlocks, the use of Futures is necessary, because the master endpoint
* should not block on waiting for a block manager, which can in turn be waiting for the
* master endpoint for a response to a prior message.
*/
val response = driverEndpoint.
askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
implicit val sameThread = ThreadUtils.sameThread
val cbf =
implicitly[
CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
Option[BlockStatus],
Iterable[Option[BlockStatus]]]]
val blockStatus = timeout.awaitResult(
Future.sequence(futures)(cbf, ThreadUtils.sameThread))
if (blockStatus == null) {
throw SparkCoreErrors.blockStatusQueryReturnedNullError(blockId)
}
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) }
}.toMap
}
/**
* Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This
* is a potentially expensive operation and should only be used for testing.
*
* If askStorageEndpoints is true, this invokes the master to query each block manager for the
* most updated block statuses. This is useful when the master is not informed of the given block
* by all block managers.
*/
def getMatchingBlockIds(
filter: BlockId => Boolean,
askStorageEndpoints: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askStorageEndpoints)
val future = driverEndpoint.askSync[Future[Seq[BlockId]]](msg)
timeout.awaitResult(future)
}
/** Stop the driver endpoint, called only on the Spark driver node */
def stop(): Unit = {
if (driverEndpoint != null && isDriver) {
tell(StopBlockManagerMaster)
driverEndpoint = null
if (driverHeartbeatEndPoint.askSync[Boolean](StopBlockManagerMaster)) {
driverHeartbeatEndPoint = null
} else {
logWarning("Failed to stop BlockManagerMasterHeartbeatEndpoint")
}
logInfo("BlockManagerMaster stopped")
}
}
/** Send a one-way message to the master endpoint, to which we expect it to reply with true. */
private def tell(message: Any): Unit = {
if (!driverEndpoint.askSync[Boolean](message)) {
throw SparkCoreErrors.unexpectedBlockManagerMasterEndpointResultError()
}
}
}
private[spark] object BlockManagerMaster {
val DRIVER_ENDPOINT_NAME = "BlockManagerMaster"
val DRIVER_HEARTBEAT_ENDPOINT_NAME = "BlockManagerMasterHeartbeat"
}
相关信息
相关文章
spark BlockManagerDecommissioner 源码
spark BlockManagerManagedBuffer 源码
spark BlockManagerMasterEndpoint 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦