spark BlockStoreClient 源码
spark BlockStoreClient 代码
文件路径:/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.shuffle;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import com.codahale.metrics.MetricSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.shuffle.checksum.Cause;
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.util.TransportConf;
/**
* Provides an interface for reading both shuffle files and RDD blocks, either from an Executor
* or external service.
*/
public abstract class BlockStoreClient implements Closeable {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected volatile TransportClientFactory clientFactory;
protected String appId;
// Store the application attemptId
private String appAttemptId;
protected TransportConf transportConf;
/**
* Send the diagnosis request for the corrupted shuffle block to the server.
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param execId the executor id.
* @param shuffleId the shuffleId of the corrupted shuffle block
* @param mapId the mapId of the corrupted shuffle block
* @param reduceId the reduceId of the corrupted shuffle block
* @param checksum the shuffle checksum which calculated at client side for the corrupted
* shuffle block
* @return The cause of the shuffle block corruption
*/
public Cause diagnoseCorruption(
String host,
int port,
String execId,
int shuffleId,
long mapId,
int reduceId,
long checksum,
String algorithm) {
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer response = client.sendRpcSync(
new DiagnoseCorruption(appId, execId, shuffleId, mapId, reduceId, checksum, algorithm)
.toByteBuffer(),
transportConf.connectionTimeoutMs()
);
CorruptionCause cause =
(CorruptionCause) BlockTransferMessage.Decoder.fromByteBuffer(response);
return cause.cause;
} catch (Exception e) {
logger.warn("Failed to get the corruption cause.");
return Cause.UNKNOWN_ISSUE;
}
}
/**
* Fetch a sequence of blocks from a remote node asynchronously,
*
* Note that this API takes a sequence so the implementation can batch requests, and does not
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param execId the executor id.
* @param blockIds block ids to fetch.
* @param listener the listener to receive block fetching status.
* @param downloadFileManager DownloadFileManager to create and clean temp files.
* If it's not <code>null</code>, the remote blocks will be streamed
* into temp shuffle files to reduce the memory usage, otherwise,
* they will be kept in memory.
*/
public abstract void fetchBlocks(
String host,
int port,
String execId,
String[] blockIds,
BlockFetchingListener listener,
DownloadFileManager downloadFileManager);
/**
* Get the shuffle MetricsSet from BlockStoreClient, this will be used in MetricsSystem to
* get the Shuffle related metrics.
*/
public MetricSet shuffleMetrics() {
// Return an empty MetricSet by default.
return () -> Collections.emptyMap();
}
protected void checkInit() {
assert appId != null : "Called before init()";
}
// Set the application attemptId
public void setAppAttemptId(String appAttemptId) {
this.appAttemptId = appAttemptId;
}
// Get the application attemptId
public String getAppAttemptId() {
return this.appAttemptId;
}
/**
* Request the local disk directories for executors which are located at the same host with
* the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService).
*
* @param host the host of BlockManager or ExternalShuffleService. It should be the same host
* with current BlockStoreClient.
* @param port the port of BlockManager or ExternalShuffleService.
* @param execIds a collection of executor Ids, which specifies the target executors that we
* want to get their local directories. There could be multiple executor Ids if
* BlockStoreClient is implemented by ExternalBlockStoreClient since the request
* handler, ExternalShuffleService, can serve multiple executors on the same node.
* Or, only one executor Id if BlockStoreClient is implemented by
* NettyBlockTransferService.
* @param hostLocalDirsCompletable a CompletableFuture which contains a map from executor Id
* to its local directories if the request handler replies
* successfully. Otherwise, it contains a specific error.
*/
public void getHostLocalDirs(
String host,
int port,
String[] execIds,
CompletableFuture<Map<String, String[]>> hostLocalDirsCompletable) {
checkInit();
GetLocalDirsForExecutors getLocalDirsMessage = new GetLocalDirsForExecutors(appId, execIds);
try {
TransportClient client = clientFactory.createClient(host, port);
client.sendRpc(getLocalDirsMessage.toByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
try {
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
hostLocalDirsCompletable.complete(
((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
} catch (Throwable t) {
logger.warn("Error while trying to get the host local dirs for " +
Arrays.toString(getLocalDirsMessage.execIds), t.getCause());
hostLocalDirsCompletable.completeExceptionally(t);
}
}
@Override
public void onFailure(Throwable t) {
logger.warn("Error while trying to get the host local dirs for " +
Arrays.toString(getLocalDirsMessage.execIds), t.getCause());
hostLocalDirsCompletable.completeExceptionally(t);
}
});
} catch (IOException | InterruptedException e) {
hostLocalDirsCompletable.completeExceptionally(e);
}
}
/**
* Push a sequence of shuffle blocks in a best-effort manner to a remote node asynchronously.
* These shuffle blocks, along with blocks pushed by other clients, will be merged into
* per-shuffle partition merged shuffle files on the destination node.
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param blockIds block ids to be pushed
* @param buffers buffers to be pushed
* @param listener the listener to receive block push status.
*
* @since 3.1.0
*/
public void pushBlocks(
String host,
int port,
String[] blockIds,
ManagedBuffer[] buffers,
BlockPushingListener listener) {
throw new UnsupportedOperationException();
}
/**
* Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
* for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
* finishing the shuffle merge process associated with the shuffle mapper stage.
*
* @param host host of shuffle server
* @param port port of shuffle server.
* @param shuffleId shuffle ID of the shuffle to be finalized
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
* of shuffle by an indeterminate stage attempt.
* @param listener the listener to receive MergeStatuses
*
* @since 3.1.0
*/
public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
int shuffleMergeId,
MergeFinalizerListener listener) {
throw new UnsupportedOperationException();
}
/**
* Get the meta information of a merged block from the remote shuffle service.
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param shuffleId shuffle id.
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
* of shuffle by an indeterminate stage attempt.
* @param reduceId reduce id.
* @param listener the listener to receive chunk counts.
*
* @since 3.2.0
*/
public void getMergedBlockMeta(
String host,
int port,
int shuffleId,
int shuffleMergeId,
int reduceId,
MergedBlocksMetaListener listener) {
throw new UnsupportedOperationException();
}
}
相关信息
相关文章
spark BlockFetchingListener 源码
spark BlockTransferListener 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦