spark ShuffleManager 源码
spark ShuffleManager 代码
文件路径:/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle
import org.apache.spark.{ShuffleDependency, TaskContext}
/**
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver
* and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles
* with it, and executors (or tasks running locally in the driver) can ask to read and write data.
*
* NOTE:
* 1. This will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
* 2. This contains a method ShuffleBlockResolver which interacts with External Shuffle Service
* when it is enabled. Need to pay attention to that, if implementing a custom ShuffleManager, to
* make sure the custom ShuffleManager could co-exist with External Shuffle Service.
*/
private[spark] trait ShuffleManager {
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** Get a writer for a given partition. Called on executors by map tasks. */
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
* read from all map outputs of the shuffle.
*
* Called on executors by reduce tasks.
*/
final def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReader(handle, 0, Int.MaxValue, startPartition, endPartition, context, metrics)
}
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to
* read from a range of map outputs(startMapIndex to endMapIndex-1, inclusive).
* If endMapIndex=Int.MaxValue, the actual endMapIndex will be changed to the length of total map
* outputs of the shuffle in `getMapSizesByExecutorId`.
*
* Called on executors by reduce tasks.
*/
def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
/**
* Remove a shuffle's metadata from the ShuffleManager.
* @return true if the metadata removed successfully, otherwise false.
*/
def unregisterShuffle(shuffleId: Int): Boolean
/**
* Return a resolver capable of retrieving shuffle block data based on block coordinates.
*/
def shuffleBlockResolver: ShuffleBlockResolver
/** Shut down this ShuffleManager. */
def stop(): Unit
}
相关信息
相关文章
spark BlockStoreShuffleReader 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦