spark ShuffleChecksumHelper 源码

  • 2022-10-20
  • 浏览 (400)

spark ShuffleChecksumHelper 代码

文件路径:/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.network.shuffle.checksum;

import java.io.*;
import java.util.concurrent.TimeUnit;
import java.util.zip.Adler32;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;

import com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.annotation.Private;
import org.apache.spark.network.buffer.ManagedBuffer;

/**
 * A set of utility functions for the shuffle checksum.
 */
@Private
public class ShuffleChecksumHelper {
  private static final Logger logger =
    LoggerFactory.getLogger(ShuffleChecksumHelper.class);

  public static final int CHECKSUM_CALCULATION_BUFFER = 8192;
  public static final Checksum[] EMPTY_CHECKSUM = new Checksum[0];
  public static final long[] EMPTY_CHECKSUM_VALUE = new long[0];

  public static Checksum[] createPartitionChecksums(int numPartitions, String algorithm) {
    return getChecksumsByAlgorithm(numPartitions, algorithm);
  }

  private static Checksum[] getChecksumsByAlgorithm(int num, String algorithm) {
    Checksum[] checksums;
    switch (algorithm) {
      case "ADLER32":
        checksums = new Adler32[num];
        for (int i = 0; i < num; i++) {
          checksums[i] = new Adler32();
        }
        return checksums;

      case "CRC32":
        checksums = new CRC32[num];
        for (int i = 0; i < num; i++) {
          checksums[i] = new CRC32();
        }
        return checksums;

      default:
        throw new UnsupportedOperationException(
          "Unsupported shuffle checksum algorithm: " + algorithm);
    }
  }

  public static Checksum getChecksumByAlgorithm(String algorithm) {
    return getChecksumsByAlgorithm(1, algorithm)[0];
  }

  public static String getChecksumFileName(String blockName, String algorithm) {
    // append the shuffle checksum algorithm as the file extension
    return String.format("%s.%s", blockName, algorithm);
  }

  private static long readChecksumByReduceId(File checksumFile, int reduceId) throws IOException {
    try (DataInputStream in = new DataInputStream(new FileInputStream(checksumFile))) {
      ByteStreams.skipFully(in, reduceId * 8L);
      return in.readLong();
    }
  }

  private static long calculateChecksumForPartition(
      ManagedBuffer partitionData,
      Checksum checksumAlgo) throws IOException {
    InputStream in = partitionData.createInputStream();
    byte[] buffer = new byte[CHECKSUM_CALCULATION_BUFFER];
    try(CheckedInputStream checksumIn = new CheckedInputStream(in, checksumAlgo)) {
      while (checksumIn.read(buffer, 0, CHECKSUM_CALCULATION_BUFFER) != -1) {}
      return checksumAlgo.getValue();
    }
  }

  /**
   * Diagnose the possible cause of the shuffle data corruption by verifying the shuffle checksums.
   *
   * There're 3 different kinds of checksums for the same shuffle partition:
   *   - checksum (c1) that is calculated by the shuffle data reader
   *   - checksum (c2) that is calculated by the shuffle data writer and stored in the checksum file
   *   - checksum (c3) that is recalculated during diagnosis
   *
   * And the diagnosis mechanism works like this:
   * If c2 != c3, we suspect the corruption is caused by the DISK_ISSUE. Otherwise, if c1 != c3,
   * we suspect the corruption is caused by the NETWORK_ISSUE. Otherwise, the cause remains
   * CHECKSUM_VERIFY_PASS. In case of the any other failures, the cause remains UNKNOWN_ISSUE.
   *
   * @param algorithm The checksum algorithm that is used for calculating checksum value
   *                  of partitionData
   * @param checksumFile The checksum file that written by the shuffle writer
   * @param reduceId The reduceId of the shuffle block
   * @param partitionData The partition data of the shuffle block
   * @param checksumByReader The checksum value that calculated by the shuffle data reader
   * @return The cause of data corruption
   */
  public static Cause diagnoseCorruption(
      String algorithm,
      File checksumFile,
      int reduceId,
      ManagedBuffer partitionData,
      long checksumByReader) {
    Cause cause;
    try {
      long diagnoseStartNs = System.nanoTime();
      // Try to get the checksum instance before reading the checksum file so that
      // `UnsupportedOperationException` can be thrown first before `FileNotFoundException`
      // when the checksum algorithm isn't supported.
      Checksum checksumAlgo = getChecksumByAlgorithm(algorithm);
      long checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
      long checksumByReCalculation = calculateChecksumForPartition(partitionData, checksumAlgo);
      long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - diagnoseStartNs);
      logger.info("Shuffle corruption diagnosis took {} ms, checksum file {}",
        duration, checksumFile.getAbsolutePath());
      if (checksumByWriter != checksumByReCalculation) {
        cause = Cause.DISK_ISSUE;
      } else if (checksumByWriter != checksumByReader) {
        cause = Cause.NETWORK_ISSUE;
      } else {
        cause = Cause.CHECKSUM_VERIFY_PASS;
      }
    } catch (UnsupportedOperationException e) {
      cause = Cause.UNSUPPORTED_CHECKSUM_ALGORITHM;
    } catch (FileNotFoundException e) {
      // Even if checksum is enabled, a checksum file may not exist if error throws during writing.
      logger.warn("Checksum file " + checksumFile.getName() + " doesn't exit");
      cause = Cause.UNKNOWN_ISSUE;
    } catch (Exception e) {
      logger.warn("Unable to diagnose shuffle block corruption", e);
      cause = Cause.UNKNOWN_ISSUE;
    }
    return cause;
  }
}

相关信息

spark 源码目录

相关文章

spark Cause 源码

0  赞