spark ExternalBlockHandler 源码

  • 2022-10-20
  • 浏览 (311)

spark ExternalBlockHandler 代码

文件路径:/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.MergedBlockMetaResponseCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.MergedBlockMetaRequest;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.checksum.Cause;
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.util.TimerWithCustomTimeUnit;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportConf;

/**
 * RPC Handler for a server which can serve both RDD blocks and shuffle blocks from outside
 * of an Executor process.
 *
 * Handles registering executors and opening shuffle or disk persisted RDD blocks from them.
 * Blocks are registered with the "one-for-one" strategy, meaning each Transport-layer Chunk
 * is equivalent to one block.
 */
public class ExternalBlockHandler extends RpcHandler
    implements RpcHandler.MergedBlockMetaReqHandler {
  private static final Logger logger = LoggerFactory.getLogger(ExternalBlockHandler.class);
  private static final String SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger";
  private static final String SHUFFLE_BLOCK_ID = "shuffle";
  private static final String SHUFFLE_CHUNK_ID = "shuffleChunk";

  @VisibleForTesting
  final ExternalShuffleBlockResolver blockManager;
  private final OneForOneStreamManager streamManager;
  private final ShuffleMetrics metrics;
  private final MergedShuffleFileManager mergeManager;

  public ExternalBlockHandler(TransportConf conf, File registeredExecutorFile)
    throws IOException {
    this(new OneForOneStreamManager(),
      new ExternalShuffleBlockResolver(conf, registeredExecutorFile),
      new NoOpMergedShuffleFileManager(conf, null));
  }

  public ExternalBlockHandler(
      TransportConf conf,
      File registeredExecutorFile,
      MergedShuffleFileManager mergeManager) throws IOException {
    this(new OneForOneStreamManager(),
      new ExternalShuffleBlockResolver(conf, registeredExecutorFile), mergeManager);
  }

  @VisibleForTesting
  public ExternalShuffleBlockResolver getBlockResolver() {
    return blockManager;
  }

  /** Enables mocking out the StreamManager and BlockManager. */
  @VisibleForTesting
  public ExternalBlockHandler(
      OneForOneStreamManager streamManager,
      ExternalShuffleBlockResolver blockManager) {
    this(streamManager, blockManager, new NoOpMergedShuffleFileManager(null, null));
  }

  /** Enables mocking out the StreamManager, BlockManager, and MergeManager. */
  @VisibleForTesting
  public ExternalBlockHandler(
      OneForOneStreamManager streamManager,
      ExternalShuffleBlockResolver blockManager,
      MergedShuffleFileManager mergeManager) {
    this.metrics = new ShuffleMetrics();
    this.streamManager = streamManager;
    this.blockManager = blockManager;
    this.mergeManager = mergeManager;
  }

  @Override
  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message);
    handleMessage(msgObj, client, callback);
  }

  @Override
  public StreamCallbackWithID receiveStream(
      TransportClient client,
      ByteBuffer messageHeader,
      RpcResponseCallback callback) {
    BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(messageHeader);
    if (msgObj instanceof PushBlockStream) {
      PushBlockStream message = (PushBlockStream) msgObj;
      checkAuth(client, message.appId);
      return mergeManager.receiveBlockDataAsStream(message);
    } else {
      throw new UnsupportedOperationException("Unexpected message with #receiveStream: " + msgObj);
    }
  }

  protected void handleMessage(
      BlockTransferMessage msgObj,
      TransportClient client,
      RpcResponseCallback callback) {
    if (msgObj instanceof AbstractFetchShuffleBlocks || msgObj instanceof OpenBlocks) {
      final Timer.Context responseDelayContext = metrics.openBlockRequestLatencyMillis.time();
      try {
        int numBlockIds;
        long streamId;
        if (msgObj instanceof AbstractFetchShuffleBlocks) {
          AbstractFetchShuffleBlocks msg = (AbstractFetchShuffleBlocks) msgObj;
          checkAuth(client, msg.appId);
          numBlockIds = ((AbstractFetchShuffleBlocks) msgObj).getNumBlocks();
          Iterator<ManagedBuffer> iterator;
          if (msgObj instanceof  FetchShuffleBlocks) {
            iterator = new ShuffleManagedBufferIterator((FetchShuffleBlocks)msgObj);
          } else {
            iterator = new ShuffleChunkManagedBufferIterator((FetchShuffleBlockChunks) msgObj);
          }
          streamId = streamManager.registerStream(client.getClientId(), iterator,
            client.getChannel(), true);
        } else {
          // For the compatibility with the old version, still keep the support for OpenBlocks.
          OpenBlocks msg = (OpenBlocks) msgObj;
          numBlockIds = msg.blockIds.length;
          checkAuth(client, msg.appId);
          streamId = streamManager.registerStream(client.getClientId(),
            new ManagedBufferIterator(msg), client.getChannel(), true);
        }
        if (logger.isTraceEnabled()) {
          logger.trace(
            "Registered streamId {} with {} buffers for client {} from host {}",
            streamId,
            numBlockIds,
            client.getClientId(),
            getRemoteAddress(client.getChannel()));
        }
        callback.onSuccess(new StreamHandle(streamId, numBlockIds).toByteBuffer());
      } finally {
        responseDelayContext.stop();
      }

    } else if (msgObj instanceof RegisterExecutor) {
      final Timer.Context responseDelayContext =
        metrics.registerExecutorRequestLatencyMillis.time();
      try {
        RegisterExecutor msg = (RegisterExecutor) msgObj;
        checkAuth(client, msg.appId);
        blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
        mergeManager.registerExecutor(msg.appId, msg.executorInfo);
        callback.onSuccess(ByteBuffer.wrap(new byte[0]));
      } finally {
        responseDelayContext.stop();
      }

    } else if (msgObj instanceof RemoveBlocks) {
      RemoveBlocks msg = (RemoveBlocks) msgObj;
      checkAuth(client, msg.appId);
      int numRemovedBlocks = blockManager.removeBlocks(msg.appId, msg.execId, msg.blockIds);
      callback.onSuccess(new BlocksRemoved(numRemovedBlocks).toByteBuffer());

    } else if (msgObj instanceof GetLocalDirsForExecutors) {
      GetLocalDirsForExecutors msg = (GetLocalDirsForExecutors) msgObj;
      checkAuth(client, msg.appId);
      Set<String> execIdsForBlockResolver = Sets.newHashSet(msg.execIds);
      boolean fetchMergedBlockDirs = execIdsForBlockResolver.remove(SHUFFLE_MERGER_IDENTIFIER);
      Map<String, String[]> localDirs = blockManager.getLocalDirs(msg.appId,
        execIdsForBlockResolver);
      if (fetchMergedBlockDirs) {
        localDirs.put(SHUFFLE_MERGER_IDENTIFIER, mergeManager.getMergedBlockDirs(msg.appId));
      }
      callback.onSuccess(new LocalDirsForExecutors(localDirs).toByteBuffer());
    } else if (msgObj instanceof FinalizeShuffleMerge) {
      final Timer.Context responseDelayContext =
          metrics.finalizeShuffleMergeLatencyMillis.time();
      FinalizeShuffleMerge msg = (FinalizeShuffleMerge) msgObj;
      try {
        checkAuth(client, msg.appId);
        MergeStatuses statuses = mergeManager.finalizeShuffleMerge(msg);
        callback.onSuccess(statuses.toByteBuffer());
      } catch(IOException e) {
        throw new RuntimeException(String.format("Error while finalizing shuffle merge "
          + "for application %s shuffle %d with shuffleMergeId %d", msg.appId, msg.shuffleId,
            msg.shuffleMergeId), e);
      } finally {
        responseDelayContext.stop();
      }
    } else if (msgObj instanceof DiagnoseCorruption) {
      DiagnoseCorruption msg = (DiagnoseCorruption) msgObj;
      checkAuth(client, msg.appId);
      Cause cause = blockManager.diagnoseShuffleBlockCorruption(
        msg.appId, msg.execId, msg.shuffleId, msg.mapId, msg.reduceId, msg.checksum, msg.algorithm);
      // In any cases of the error, diagnoseShuffleBlockCorruption should return UNKNOWN_ISSUE,
      // so it should always reply as success.
      callback.onSuccess(new CorruptionCause(cause).toByteBuffer());
    } else {
      throw new UnsupportedOperationException("Unexpected message: " + msgObj);
    }
  }

  @Override
  public void receiveMergeBlockMetaReq(
      TransportClient client,
      MergedBlockMetaRequest metaRequest,
      MergedBlockMetaResponseCallback callback) {
    final Timer.Context responseDelayContext = metrics.fetchMergedBlocksMetaLatencyMillis.time();
    try {
      checkAuth(client, metaRequest.appId);
      MergedBlockMeta mergedMeta =
        mergeManager.getMergedBlockMeta(metaRequest.appId, metaRequest.shuffleId,
          metaRequest.shuffleMergeId, metaRequest.reduceId);
      logger.debug(
        "Merged block chunks appId {} shuffleId {} reduceId {} num-chunks : {} ",
          metaRequest.appId, metaRequest.shuffleId, metaRequest.reduceId,
          mergedMeta.getNumChunks());
      callback.onSuccess(mergedMeta.getNumChunks(), mergedMeta.getChunksBitmapBuffer());
    } finally {
      responseDelayContext.stop();
    }
  }

  @Override
  public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler() {
    return this;
  }

  @Override
  public void exceptionCaught(Throwable cause, TransportClient client) {
    metrics.caughtExceptions.inc();
  }

  public MetricSet getAllMetrics() {
    return metrics;
  }

  @Override
  public StreamManager getStreamManager() {
    return streamManager;
  }

  /**
   * Removes an application (once it has been terminated), and optionally will clean up any
   * local directories associated with the executors of that application in a separate thread.
   */
  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
    blockManager.applicationRemoved(appId, cleanupLocalDirs);
    mergeManager.applicationRemoved(appId, cleanupLocalDirs);
  }

  /**
   * Clean up any non-shuffle files in any local directories associated with an finished executor.
   */
  public void executorRemoved(String executorId, String appId) {
    blockManager.executorRemoved(executorId, appId);
  }

  public void close() {
    blockManager.close();
    mergeManager.close();
  }

  private void checkAuth(TransportClient client, String appId) {
    if (client.getClientId() != null && !client.getClientId().equals(appId)) {
      throw new SecurityException(String.format(
        "Client for %s not authorized for application %s.", client.getClientId(), appId));
    }
  }

  /**
   * A simple class to wrap all shuffle service wrapper metrics
   */
  @VisibleForTesting
  public class ShuffleMetrics implements MetricSet {
    private final Map<String, Metric> allMetrics;
    // Time latency for open block request in ms
    private final Timer openBlockRequestLatencyMillis =
        new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
    // Time latency for executor registration latency in ms
    private final Timer registerExecutorRequestLatencyMillis =
        new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
    // Time latency for processing fetch merged blocks meta request latency in ms
    private final Timer fetchMergedBlocksMetaLatencyMillis =
        new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
    // Time latency for processing finalize shuffle merge request latency in ms
    private final Timer finalizeShuffleMergeLatencyMillis =
        new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
    // Block transfer rate in blocks per second
    private final Meter blockTransferRate = new Meter();
    // Block fetch message rate per second. When using non-batch fetches
    // (`OpenBlocks` or `FetchShuffleBlocks` with `batchFetchEnabled` as false), this will be the
    // same as the `blockTransferRate`. When batch fetches are enabled, this will represent the
    // number of batch fetches, and `blockTransferRate` will represent the number of blocks
    // returned by the fetches.
    private final Meter blockTransferMessageRate = new Meter();
    // Block transfer rate in byte per second
    private final Meter blockTransferRateBytes = new Meter();
    // Number of active connections to the shuffle service
    private Counter activeConnections = new Counter();
    // Number of exceptions caught in connections to the shuffle service
    private Counter caughtExceptions = new Counter();

    public ShuffleMetrics() {
      allMetrics = new HashMap<>();
      allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
      allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
      allMetrics.put("fetchMergedBlocksMetaLatencyMillis", fetchMergedBlocksMetaLatencyMillis);
      allMetrics.put("finalizeShuffleMergeLatencyMillis", finalizeShuffleMergeLatencyMillis);
      allMetrics.put("blockTransferRate", blockTransferRate);
      allMetrics.put("blockTransferMessageRate", blockTransferMessageRate);
      allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
      allMetrics.put("blockTransferAvgSize_1min", new RatioGauge() {
        @Override
        protected Ratio getRatio() {
          return Ratio.of(
              blockTransferRateBytes.getOneMinuteRate(),
              // use blockTransferMessageRate here instead of blockTransferRate to represent the
              // average size of the disk read / network message which has more operational impact
              // than the actual size of the block
              blockTransferMessageRate.getOneMinuteRate());
        }
      });
      allMetrics.put("registeredExecutorsSize",
                     (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
      allMetrics.put("numActiveConnections", activeConnections);
      allMetrics.put("numCaughtExceptions", caughtExceptions);
    }

    @Override
    public Map<String, Metric> getMetrics() {
      return allMetrics;
    }
  }

  private class ManagedBufferIterator implements Iterator<ManagedBuffer> {

    private int index = 0;
    private final Function<Integer, ManagedBuffer> blockDataForIndexFn;
    private final int size;

    ManagedBufferIterator(OpenBlocks msg) {
      String appId = msg.appId;
      String execId = msg.execId;
      String[] blockIds = msg.blockIds;
      String[] blockId0Parts = blockIds[0].split("_");
      if (blockId0Parts.length == 4 && blockId0Parts[0].equals(SHUFFLE_BLOCK_ID)) {
        final int shuffleId = Integer.parseInt(blockId0Parts[1]);
        final int[] mapIdAndReduceIds = shuffleMapIdAndReduceIds(blockIds, shuffleId);
        size = mapIdAndReduceIds.length;
        blockDataForIndexFn = index -> blockManager.getBlockData(appId, execId, shuffleId,
          mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
      } else if (blockId0Parts.length == 5 && blockId0Parts[0].equals(SHUFFLE_CHUNK_ID)) {
        final int shuffleId = Integer.parseInt(blockId0Parts[1]);
        final int shuffleMergeId = Integer.parseInt(blockId0Parts[2]);
        final int[] reduceIdAndChunkIds = shuffleReduceIdAndChunkIds(blockIds, shuffleId,
          shuffleMergeId);
        size = reduceIdAndChunkIds.length;
        blockDataForIndexFn = index -> mergeManager.getMergedBlockData(msg.appId, shuffleId,
          shuffleMergeId, reduceIdAndChunkIds[index], reduceIdAndChunkIds[index + 1]);
      } else if (blockId0Parts.length == 3 && blockId0Parts[0].equals("rdd")) {
        final int[] rddAndSplitIds = rddAndSplitIds(blockIds);
        size = rddAndSplitIds.length;
        blockDataForIndexFn = index -> blockManager.getRddBlockData(appId, execId,
          rddAndSplitIds[index], rddAndSplitIds[index + 1]);
      } else {
        throw new IllegalArgumentException("Unexpected block id format: " + blockIds[0]);
      }
    }

    private int[] rddAndSplitIds(String[] blockIds) {
      final int[] rddAndSplitIds = new int[2 * blockIds.length];
      for (int i = 0; i < blockIds.length; i++) {
        String[] blockIdParts = blockIds[i].split("_");
        if (blockIdParts.length != 3 || !blockIdParts[0].equals("rdd")) {
          throw new IllegalArgumentException("Unexpected RDD block id format: " + blockIds[i]);
        }
        rddAndSplitIds[2 * i] = Integer.parseInt(blockIdParts[1]);
        rddAndSplitIds[2 * i + 1] = Integer.parseInt(blockIdParts[2]);
      }
      return rddAndSplitIds;
    }

    /**
     * @param blockIds Regular shuffle blockIds starts with SHUFFLE_BLOCK_ID to be parsed
     * @param shuffleId shuffle blocks shuffleId
     * @return mapId and reduceIds of the shuffle blocks in the same order as that of the blockIds
     *
     * Regular shuffle blocks format should be shuffle_$shuffleId_$mapId_$reduceId
     */
    private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
      final int[] mapIdAndReduceIds = new int[2 * blockIds.length];
      for (int i = 0; i < blockIds.length; i++) {
        String[] blockIdParts = blockIds[i].split("_");
        if (blockIdParts.length != 4 || !blockIdParts[0].equals(SHUFFLE_BLOCK_ID)) {
          throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
        }
        if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
          throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
            ", got:" + blockIds[i]);
        }
        // mapId
        mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
        // reduceId
        mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
      }
      return mapIdAndReduceIds;
    }

    /**
     * @param blockIds Shuffle merged chunks starts with SHUFFLE_CHUNK_ID to be parsed
     * @param shuffleId shuffle blocks shuffleId
     * @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
     *                       of shuffle by an indeterminate stage attempt.
     * @return reduceId and chunkIds of the shuffle chunks in the same order as that of the
     *         blockIds
     *
     * Shuffle merged chunks format should be
     * shuffleChunk_$shuffleId_$shuffleMergeId_$reduceId_$chunkId
     */
    private int[] shuffleReduceIdAndChunkIds(
        String[] blockIds,
        int shuffleId,
        int shuffleMergeId) {
      final int[] reduceIdAndChunkIds = new int[2 * blockIds.length];
      for(int i = 0; i < blockIds.length; i++) {
        String[] blockIdParts = blockIds[i].split("_");
        if (blockIdParts.length != 5 || !blockIdParts[0].equals(SHUFFLE_CHUNK_ID)) {
          throw new IllegalArgumentException("Unexpected shuffle chunk id format: " + blockIds[i]);
        }
        if (Integer.parseInt(blockIdParts[1]) != shuffleId ||
            Integer.parseInt(blockIdParts[2]) != shuffleMergeId) {
          throw new IllegalArgumentException(String.format("Expected shuffleId = %s"
            + " and shuffleMergeId = %s but got %s", shuffleId, shuffleMergeId, blockIds[i]));
        }
        // reduceId
        reduceIdAndChunkIds[2 * i] = Integer.parseInt(blockIdParts[3]);
        // chunkId
        reduceIdAndChunkIds[2 * i + 1] = Integer.parseInt(blockIdParts[4]);
      }
      return reduceIdAndChunkIds;
    }

    @Override
    public boolean hasNext() {
      return index < size;
    }

    @Override
    public ManagedBuffer next() {
      final ManagedBuffer block = blockDataForIndexFn.apply(index);
      index += 2;
      metrics.blockTransferRate.mark();
      metrics.blockTransferMessageRate.mark();
      metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
      return block;
    }
  }

  private class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {

    private int mapIdx = 0;
    private int reduceIdx = 0;

    private final String appId;
    private final String execId;
    private final int shuffleId;
    private final long[] mapIds;
    private final int[][] reduceIds;
    private final boolean batchFetchEnabled;

    ShuffleManagedBufferIterator(FetchShuffleBlocks msg) {
      appId = msg.appId;
      execId = msg.execId;
      shuffleId = msg.shuffleId;
      mapIds = msg.mapIds;
      reduceIds = msg.reduceIds;
      batchFetchEnabled = msg.batchFetchEnabled;
      // mapIds.length must equal to reduceIds.length, and the passed in FetchShuffleBlocks
      // must have non-empty mapIds and reduceIds, see the checking logic in
      // OneForOneBlockFetcher.
      assert(mapIds.length != 0 && mapIds.length == reduceIds.length);
    }

    @Override
    public boolean hasNext() {
      return mapIdx < mapIds.length && reduceIdx < reduceIds[mapIdx].length;
    }

    @Override
    public ManagedBuffer next() {
      ManagedBuffer block;
      if (!batchFetchEnabled) {
        block = blockManager.getBlockData(
          appId, execId, shuffleId, mapIds[mapIdx], reduceIds[mapIdx][reduceIdx]);
        if (reduceIdx < reduceIds[mapIdx].length - 1) {
          reduceIdx += 1;
        } else {
          reduceIdx = 0;
          mapIdx += 1;
        }
        metrics.blockTransferRate.mark();
      } else {
        assert(reduceIds[mapIdx].length == 2);
        int startReduceId = reduceIds[mapIdx][0];
        int endReduceId = reduceIds[mapIdx][1];
        block = blockManager.getContinuousBlocksData(appId, execId, shuffleId, mapIds[mapIdx],
          startReduceId, endReduceId);
        mapIdx += 1;
        metrics.blockTransferRate.mark(endReduceId - startReduceId);
      }
      metrics.blockTransferMessageRate.mark();
      metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
      return block;
    }
  }

  private class ShuffleChunkManagedBufferIterator implements Iterator<ManagedBuffer> {

    private int reduceIdx = 0;
    private int chunkIdx = 0;

    private final String appId;
    private final int shuffleId;
    private final int shuffleMergeId;
    private final int[] reduceIds;
    private final int[][] chunkIds;

    ShuffleChunkManagedBufferIterator(FetchShuffleBlockChunks msg) {
      appId = msg.appId;
      shuffleId = msg.shuffleId;
      shuffleMergeId = msg.shuffleMergeId;
      reduceIds = msg.reduceIds;
      chunkIds = msg.chunkIds;
      // reduceIds.length must equal to chunkIds.length, and the passed in FetchShuffleBlockChunks
      // must have non-empty reduceIds and chunkIds, see the checking logic in
      // OneForOneBlockFetcher.
      assert(reduceIds.length != 0 && reduceIds.length == chunkIds.length);
    }

    @Override
    public boolean hasNext() {
      return reduceIdx < reduceIds.length && chunkIdx < chunkIds[reduceIdx].length;
    }

    @Override
    public ManagedBuffer next() {
      ManagedBuffer block = Preconditions.checkNotNull(mergeManager.getMergedBlockData(
        appId, shuffleId, shuffleMergeId, reduceIds[reduceIdx], chunkIds[reduceIdx][chunkIdx]));
      if (chunkIdx < chunkIds[reduceIdx].length - 1) {
        chunkIdx += 1;
      } else {
        chunkIdx = 0;
        reduceIdx += 1;
      }
      metrics.blockTransferRateBytes.mark(block.size());
      return block;
    }
  }

  @Override
  public void channelActive(TransportClient client) {
    metrics.activeConnections.inc();
    super.channelActive(client);
  }

  @Override
  public void channelInactive(TransportClient client) {
    metrics.activeConnections.dec();
    super.channelInactive(client);
  }

}

相关信息

spark 源码目录

相关文章

spark BlockFetchingListener 源码

spark BlockPushingListener 源码

spark BlockStoreClient 源码

spark BlockTransferListener 源码

spark Constants 源码

spark DownloadFile 源码

spark DownloadFileManager 源码

spark DownloadFileWritableChannel 源码

spark ErrorHandler 源码

spark ExecutorDiskUtils 源码

0  赞