hadoop AbstractS3ACommitter 源码

  • 2022-10-20
  • 浏览 (219)

haddop AbstractS3ACommitter 代码

文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.commit;

import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;

import com.amazonaws.services.s3.model.MultipartUpload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.audit.AuditConstants;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.PersistentCommitData;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.s3a.commit.impl.AuditContextUpdater;
import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
import org.apache.hadoop.util.functional.TaskPool;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAX_TOTAL_TASKS;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_TOTAL_TASKS;
import static org.apache.hadoop.fs.s3a.Invoker.ignoreIOExceptions;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_COMMIT_JOB;
import static org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;

/**
 * Abstract base class for S3A committers; allows for any commonality
 * between different architectures.
 *
 * Although the committer APIs allow for a committer to be created without
 * an output path, this is not supported in this class or its subclasses:
 * a destination must be supplied. It is left to the committer factory
 * to handle the creation of a committer when the destination is unknown.
 *
 * Requiring an output directory simplifies coding and testing.
 *
 * The original implementation loaded all .pendingset files
 * before attempting any commit/abort operations.
 * While straightforward and guaranteeing that no changes were made to the
 * destination until all files had successfully been loaded -it didn't scale;
 * the list grew until it exceeded heap size.
 *
 * The second iteration builds up an {@link ActiveCommit} class with the
 * list of .pendingset files to load and then commit; that can be done
 * incrementally and in parallel.
 * As a side effect of this change, unless/until changed,
 * the commit/abort/revert of all files uploaded by a single task will be
 * serialized. This may slow down these operations if there are many files
 * created by a few tasks, <i>and</i> the HTTP connection pool in the S3A
 * committer was large enough for more all the parallel POST requests.
 */
public abstract class AbstractS3ACommitter extends PathOutputCommitter
    implements IOStatisticsSource {

  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractS3ACommitter.class);

  public static final String THREAD_PREFIX = "s3a-committer-pool-";

  /**
   * Error string when task setup fails.
   */
  @VisibleForTesting
  public static final String E_SELF_GENERATED_JOB_UUID
      = "has a self-generated job UUID";

  /**
   * Unique ID for a Job.
   * In MapReduce Jobs the YARN JobID suffices.
   * On Spark this only be the YARN JobID
   * it is known to be creating strongly unique IDs
   * (i.e. SPARK-33402 is on the branch).
   */
  private final String uuid;

  /**
   * Source of the {@link #uuid} value.
   */
  private final JobUUIDSource uuidSource;

  /**
   * Has this instance been used for job setup?
   * If so then it is safe for a locally generated
   * UUID to be used for task setup.
   */
  private boolean jobSetup;

  /** Underlying commit operations. */
  private final CommitOperations commitOperations;

  /**
   * Final destination of work.
   */
  private Path outputPath;

  /**
   * Role: used in log/text messages.
   */
  private final String role;

  /**
   * This is the directory for all intermediate work: where the output format
   * will write data.
   * <i>This may not be on the final file system</i>
   */
  private Path workPath;

  /** Configuration of the job. */
  private Configuration conf;

  /** Filesystem of {@link #outputPath}. */
  private FileSystem destFS;

  /** The job context. For a task, this can be cast to a TaskContext. */
  private final JobContext jobContext;

  /** Should a job marker be created? */
  private final boolean createJobMarker;

  private final CommitterStatistics committerStatistics;

  /**
   * Source of Audit spans.
   */
  private final AuditSpanSource auditSpanSource;

  /**
   * Create a committer.
   * This constructor binds the destination directory and configuration, but
   * does not update the work path: That must be calculated by the
   * implementation;
   * It is omitted here to avoid subclass methods being called too early.
   * @param outputPath the job's output path: MUST NOT be null.
   * @param context the task's context
   * @throws IOException on a failure
   */
  protected AbstractS3ACommitter(
      Path outputPath,
      TaskAttemptContext context) throws IOException {
    super(outputPath, context);
    setOutputPath(outputPath);
    this.jobContext = requireNonNull(context, "null job context");
    this.role = "Task committer " + context.getTaskAttemptID();
    setConf(context.getConfiguration());
    Pair<String, JobUUIDSource> id = buildJobUUID(
        conf, context.getJobID());
    this.uuid = id.getLeft();
    this.uuidSource = id.getRight();
    LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
    initOutput(outputPath);
    LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
        role, jobName(context), jobIdString(context), outputPath);
    S3AFileSystem fs = getDestS3AFS();
    // set this thread's context with the job ID.
    // audit spans created in this thread will pick
    // up this value., including the commit operations instance
    // soon to be created.
    new AuditContextUpdater(jobContext)
        .updateCurrentAuditContext();

    // the filesystem is the span source, always.
    this.auditSpanSource = fs.getAuditSpanSource();
    this.createJobMarker = context.getConfiguration().getBoolean(
        CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
        DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
    // the statistics are shared between this committer and its operations.
    this.committerStatistics = fs.newCommitterStatistics();
    this.commitOperations = new CommitOperations(fs, committerStatistics,
        outputPath.toString());
  }

  /**
   * Init the output filesystem and path.
   * TESTING ONLY; allows mock FS to cheat.
   * @param out output path
   * @throws IOException failure to create the FS.
   */
  @VisibleForTesting
  protected void initOutput(Path out) throws IOException {
    FileSystem fs = getDestinationFS(out, getConf());
    setDestFS(fs);
    setOutputPath(fs.makeQualified(out));
  }

  /**
   * Get the job/task context this committer was instantiated with.
   * @return the context.
   */
  public final JobContext getJobContext() {
    return jobContext;
  }

  /**
   * Final path of output, in the destination FS.
   * @return the path
   */
  @Override
  public final Path getOutputPath() {
    return outputPath;
  }

  /**
   * Set the output path.
   * @param outputPath new value
   */
  protected final void setOutputPath(Path outputPath) {
    this.outputPath = requireNonNull(outputPath, "Null output path");
  }

  /**
   * This is the critical method for {@code FileOutputFormat}; it declares
   * the path for work.
   * @return the working path.
   */
  @Override
  public final Path getWorkPath() {
    return workPath;
  }

  /**
   * Set the work path for this committer.
   * @param workPath the work path to use.
   */
  protected final void setWorkPath(Path workPath) {
    LOG.debug("Setting work path to {}", workPath);
    this.workPath = workPath;
  }

  public final Configuration getConf() {
    return conf;
  }

  protected final void setConf(Configuration conf) {
    this.conf = conf;
  }

  /**
   * Get the destination FS, creating it on demand if needed.
   * @return the filesystem; requires the output path to be set up
   * @throws IOException if the FS cannot be instantiated.
   */
  public FileSystem getDestFS() throws IOException {
    if (destFS == null) {
      FileSystem fs = getDestinationFS(outputPath, getConf());
      setDestFS(fs);
    }
    return destFS;
  }

  /**
   * Get the destination as an S3A Filesystem; casting it.
   * @return the dest S3A FS.
   * @throws IOException if the FS cannot be instantiated.
   */
  public S3AFileSystem getDestS3AFS() throws IOException {
    return (S3AFileSystem) getDestFS();
  }

  /**
   * Set the destination FS: the FS of the final output.
   * @param destFS destination FS.
   */
  protected void setDestFS(FileSystem destFS) {
    this.destFS = destFS;
  }

  /**
   * Compute the path where the output of a given job attempt will be placed.
   * @param context the context of the job.  This is used to get the
   * application attempt ID.
   * @return the path to store job attempt data.
   */
  public Path getJobAttemptPath(JobContext context) {
    return getJobAttemptPath(getAppAttemptId(context));
  }

  /**
   * Compute the path under which all job attempts will be placed.
   * @return the path to store job attempt data.
   */
  protected abstract Path getJobPath();

  /**
   * Compute the path where the output of a given job attempt will be placed.
   * @param appAttemptId the ID of the application attempt for this job.
   * @return the path to store job attempt data.
   */
  protected abstract Path getJobAttemptPath(int appAttemptId);

  /**
   * Compute the path where the output of a task attempt is stored until
   * that task is committed. This may be the normal Task attempt path
   * or it may be a subdirectory.
   * The default implementation returns the value of
   * {@link #getBaseTaskAttemptPath(TaskAttemptContext)};
   * subclasses may return different values.
   * @param context the context of the task attempt.
   * @return the path where a task attempt should be stored.
   */
  public Path getTaskAttemptPath(TaskAttemptContext context) {
    return getBaseTaskAttemptPath(context);
  }

  /**
   * Compute the base path where the output of a task attempt is written.
   * This is the path which will be deleted when a task is cleaned up and
   * aborted.
   *
   * @param context the context of the task attempt.
   * @return the path where a task attempt should be stored.
   */
  protected abstract Path getBaseTaskAttemptPath(TaskAttemptContext context);

  /**
   * Get a temporary directory for data. When a task is aborted/cleaned
   * up, the contents of this directory are all deleted.
   * @param context task context
   * @return a path for temporary data.
   */
  public abstract Path getTempTaskAttemptPath(TaskAttemptContext context);

  /**
   * Get the name of this committer.
   * @return the committer name.
   */
  public abstract String getName();

  /**
   * The Job UUID, as passed in or generated.
   * @return the UUID for the job.
   */
  @VisibleForTesting
  public final String getUUID() {
    return uuid;
  }

  /**
   * Source of the UUID.
   * @return how the job UUID was retrieved/generated.
   */
  @VisibleForTesting
  public final JobUUIDSource getUUIDSource() {
    return uuidSource;
  }

  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder(
        "AbstractS3ACommitter{");
    sb.append("role=").append(role);
    sb.append(", name=").append(getName());
    sb.append(", outputPath=").append(getOutputPath());
    sb.append(", workPath=").append(workPath);
    sb.append(", uuid='").append(getUUID()).append('\'');
    sb.append(", uuid source=").append(getUUIDSource());
    sb.append('}');
    return sb.toString();
  }

  /**
   * Get the destination filesystem from the output path and the configuration.
   * @param out output path
   * @param config job/task config
   * @return the associated FS
   * @throws PathCommitException output path isn't to an S3A FS instance.
   * @throws IOException failure to instantiate the FS.
   */
  protected FileSystem getDestinationFS(Path out, Configuration config)
      throws IOException {
    return getS3AFileSystem(out, config,
        requiresDelayedCommitOutputInFileSystem());
  }

  /**
   * Flag to indicate whether or not the destination filesystem needs
   * to be configured to support magic paths where the output isn't immediately
   * visible. If the committer returns true, then committer setup will
   * fail if the FS doesn't have the capability.
   * Base implementation returns false.
   * @return what the requirements of the committer are of the filesystem.
   */
  protected boolean requiresDelayedCommitOutputInFileSystem() {
    return false;
  }

  /**
   * Task recovery considered Unsupported: Warn and fail.
   * @param taskContext Context of the task whose output is being recovered
   * @throws IOException always.
   */
  @Override
  public void recoverTask(TaskAttemptContext taskContext) throws IOException {
    LOG.warn("Cannot recover task {}", taskContext.getTaskAttemptID());
    throw new PathCommitException(outputPath,
        String.format("Unable to recover task %s",
            taskContext.getTaskAttemptID()));
  }

  /**
   * if the job requires a success marker on a successful job,
   * create the file {@link CommitConstants#_SUCCESS}.
   *
   * While the classic committers create a 0-byte file, the S3A committers
   * PUT up a the contents of a {@link SuccessData} file.
   * @param commitContext commit context
   * @param pending the pending commits
   *
   * @return the success data, even if the marker wasn't created
   *
   * @throws IOException IO failure
   */
  protected SuccessData maybeCreateSuccessMarkerFromCommits(
      final CommitContext commitContext,
      ActiveCommit pending) throws IOException {
    List<String> filenames = new ArrayList<>(pending.size());
    // The list of committed objects in pending is size limited in
    // ActiveCommit.uploadCommitted.
    filenames.addAll(pending.committedObjects);
    // load in all the pending statistics
    IOStatisticsSnapshot snapshot = new IOStatisticsSnapshot(
        pending.getIOStatistics());
    // and the current statistics
    snapshot.aggregate(getIOStatistics());

    // and include the context statistics if enabled
    if (commitContext.isCollectIOStatistics()) {
      snapshot.aggregate(commitContext.getIOStatisticsContext()
              .getIOStatistics());
    }

    return maybeCreateSuccessMarker(commitContext.getJobContext(), filenames, snapshot);
  }

  /**
   * if the job requires a success marker on a successful job,
   * create the {@code _SUCCESS} file.
   *
   * While the classic committers create a 0-byte file, the S3A committers
   * PUT up a the contents of a {@link SuccessData} file.
   * The file is returned, even if no marker is created.
   * This is so it can be saved to a report directory.
   * @param context job context
   * @param filenames list of filenames.
   * @param ioStatistics any IO Statistics to include
   * @throws IOException IO failure
   * @return the success data.
   */
  protected SuccessData maybeCreateSuccessMarker(
      final JobContext context,
      final List<String> filenames,
      final IOStatisticsSnapshot ioStatistics)
      throws IOException {

    SuccessData successData =
        createSuccessData(context, filenames, ioStatistics,
            getDestFS().getConf());
    if (createJobMarker) {
      // save it to the job dest dir
      commitOperations.createSuccessMarker(getOutputPath(), successData, true);
    }
    return successData;
  }

  /**
   * Create the success data structure from a job context.
   * @param context job context.
   * @param filenames short list of filenames; nullable
   * @param ioStatistics IOStatistics snapshot
   * @param destConf config of the dest fs, can be null
   * @return the structure
   *
   */
  private SuccessData createSuccessData(final JobContext context,
      final List<String> filenames,
      final IOStatisticsSnapshot ioStatistics,
      final Configuration destConf) {
    // create a success data structure
    SuccessData successData = new SuccessData();
    successData.setCommitter(getName());
    successData.setJobId(uuid);
    successData.setJobIdSource(uuidSource.getText());
    successData.setDescription(getRole());
    successData.setHostname(NetUtils.getLocalHostname());
    Date now = new Date();
    successData.setTimestamp(now.getTime());
    successData.setDate(now.toString());
    if (filenames != null) {
      successData.setFilenames(filenames);
    }
    successData.getIOStatistics().aggregate(ioStatistics);
    // attach some config options as diagnostics to assist
    // in debugging performance issues.

    // commit thread pool size
    successData.addDiagnostic(FS_S3A_COMMITTER_THREADS,
        Integer.toString(getJobCommitThreadCount(context)));

    // and filesystem http connection and thread pool sizes
    if (destConf != null) {
      successData.addDiagnostic(MAXIMUM_CONNECTIONS,
          destConf.get(MAXIMUM_CONNECTIONS,
              Integer.toString(DEFAULT_MAXIMUM_CONNECTIONS)));
      successData.addDiagnostic(MAX_TOTAL_TASKS,
          destConf.get(MAX_TOTAL_TASKS,
              Integer.toString(DEFAULT_MAX_TOTAL_TASKS)));
    }
    return successData;
  }

  /**
   * Base job setup (optionally) deletes the success marker and
   * always creates the destination directory.
   * When objects are committed that dest dir marker will inevitably
   * be deleted; creating it now ensures there is something at the end
   * while the job is in progress -and if nothing is created, that
   * it is still there.
   * <p>
   *   The option {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}
   *   is set to the job UUID; if generated locally
   *   {@link InternalCommitterConstants#SPARK_WRITE_UUID} is also patched.
   *   The field {@link #jobSetup} is set to true to note that
   *   this specific committer instance was used to set up a job.
   * </p>
   * @param context context
   * @throws IOException IO failure
   */

  @Override
  public void setupJob(JobContext context) throws IOException {
    try (DurationInfo d = new DurationInfo(LOG,
        "Job %s setting up", getUUID())) {
      // record that the job has been set up
      jobSetup = true;
      // patch job conf with the job UUID.
      Configuration c = context.getConfiguration();
      c.set(FS_S3A_COMMITTER_UUID, getUUID());
      c.set(FS_S3A_COMMITTER_UUID_SOURCE, getUUIDSource().getText());
      Path dest = getOutputPath();
      if (createJobMarker){
        commitOperations.deleteSuccessMarker(dest);
      }
      getDestFS().mkdirs(dest);
      // do a scan for surplus markers
      warnOnActiveUploads(dest);
    }
  }

  /**
   * Task setup. Fails if the the UUID was generated locally, and
   * the same committer wasn't used for job setup.
   * {@inheritDoc}
   * @throws PathCommitException if the task UUID options are unsatisfied.
   */
  @Override
  public void setupTask(TaskAttemptContext context) throws IOException {
    TaskAttemptID attemptID = context.getTaskAttemptID();

    // update the context so that task IO in the same thread has
    // the relevant values.
    new AuditContextUpdater(context)
        .updateCurrentAuditContext();

    try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s",
        attemptID)) {
      // reject attempts to set up the task where the output won't be
      // picked up
      if (!jobSetup
          && getUUIDSource() == JobUUIDSource.GeneratedLocally) {
        // on anything other than a test run, the context must not have been
        // generated locally.
        throw new PathCommitException(getOutputPath().toString(),
            "Task attempt " + attemptID
                + " " + E_SELF_GENERATED_JOB_UUID);
      }
      Path taskAttemptPath = getTaskAttemptPath(context);
      FileSystem fs = taskAttemptPath.getFileSystem(getConf());
      // delete that ta path if somehow it was there
      fs.delete(taskAttemptPath, true);
      // create an empty directory
      fs.mkdirs(taskAttemptPath);
    }
  }

  /**
   * Get the task attempt path filesystem. This may not be the same as the
   * final destination FS, and so may not be an S3A FS.
   * @param context task attempt
   * @return the filesystem
   * @throws IOException failure to instantiate
   */
  protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext context)
      throws IOException {
    return getTaskAttemptPath(context).getFileSystem(getConf());
  }

  /**
   * Commit all the pending uploads.
   * Each file listed in the ActiveCommit instance is queued for processing
   * in a separate thread; its contents are loaded and then (sequentially)
   * committed.
   * On a failure or abort of a single file's commit, all its uploads are
   * aborted.
   * The revert operation lists the files already committed and deletes them.
   * @param commitContext commit context
   * @param pending  pending uploads
   * @throws IOException on any failure
   */
  protected void commitPendingUploads(
      final CommitContext commitContext,
      final ActiveCommit pending) throws IOException {
    if (pending.isEmpty()) {
      LOG.warn("{}: No pending uploads to commit", getRole());
    }
    try (DurationInfo ignored = new DurationInfo(LOG,
        "committing the output of %s task(s)", pending.size())) {

      TaskPool.foreach(pending.getSourceFiles())
          .stopOnFailure()
          .suppressExceptions(false)
          .executeWith(commitContext.getOuterSubmitter())
          .abortWith(status ->
              loadAndAbort(commitContext, pending, status, true, false))
          .revertWith(status ->
              loadAndRevert(commitContext, pending, status))
          .run(status ->
              loadAndCommit(commitContext, pending, status));
    }
  }

  /**
   * Run a precommit check that all files are loadable.
   * This check avoids the situation where the inability to read
   * a file only surfaces partway through the job commit, so
   * results in the destination being tainted.
   * @param commitContext commit context
   * @param pending the pending operations
   * @throws IOException any failure
   */
  protected void precommitCheckPendingFiles(
      final CommitContext commitContext,
      final ActiveCommit pending) throws IOException {

    FileSystem sourceFS = pending.getSourceFS();
    try (DurationInfo ignored =
             new DurationInfo(LOG, "Preflight Load of pending files")) {

      TaskPool.foreach(pending.getSourceFiles())
          .stopOnFailure()
          .suppressExceptions(false)
          .executeWith(commitContext.getOuterSubmitter())
          .run(status -> PersistentCommitData.load(sourceFS, status,
              commitContext.getPendingSetSerializer()));
    }
  }

  /**
   * Load a pendingset file and commit all of its contents.
   * Invoked within a parallel run; the commitContext thread
   * pool is already busy/possibly full, so do not
   * execute work through the same submitter.
   * @param commitContext context to commit through
   * @param activeCommit commit state
   * @param status file to load
   * @throws IOException failure
   */
  private void loadAndCommit(
      final CommitContext commitContext,
      final ActiveCommit activeCommit,
      final FileStatus status) throws IOException {

    final Path path = status.getPath();
    commitContext.switchToIOStatisticsContext();
    try (DurationInfo ignored =
             new DurationInfo(LOG,
                 "Loading and committing files in pendingset %s", path)) {
      PendingSet pendingSet = PersistentCommitData.load(
          activeCommit.getSourceFS(),
          status,
          commitContext.getPendingSetSerializer());
      String jobId = pendingSet.getJobId();
      if (!StringUtils.isEmpty(jobId) && !getUUID().equals(jobId)) {
        throw new PathCommitException(path,
            String.format("Mismatch in Job ID (%s) and commit job ID (%s)",
                getUUID(), jobId));
      }
      TaskPool.foreach(pendingSet.getCommits())
          .stopOnFailure()
          .suppressExceptions(false)
          .executeWith(commitContext.getInnerSubmitter())
          .onFailure((commit, exception) ->
              commitContext.abortSingleCommit(commit))
          .abortWith(commitContext::abortSingleCommit)
          .revertWith(commitContext::revertCommit)
          .run(commit -> {
            commitContext.commitOrFail(commit);
            activeCommit.uploadCommitted(
                commit.getDestinationKey(), commit.getLength());
          });
      activeCommit.pendingsetCommitted(pendingSet.getIOStatistics());
    }
  }

  /**
   * Load a pendingset file and revert all of its contents.
   * Invoked within a parallel run; the commitContext thread
   * pool is already busy/possibly full, so do not
   * execute work through the same submitter.
   * @param commitContext context to commit through
   * @param activeCommit commit state
   * @param status status of file to load
   * @throws IOException failure
   */
  private void loadAndRevert(
      final CommitContext commitContext,
      final ActiveCommit activeCommit,
      final FileStatus status) throws IOException {

    final Path path = status.getPath();
    commitContext.switchToIOStatisticsContext();
    try (DurationInfo ignored =
             new DurationInfo(LOG, false, "Committing %s", path)) {
      PendingSet pendingSet = PersistentCommitData.load(
          activeCommit.getSourceFS(),
          status,
          commitContext.getPendingSetSerializer());
      TaskPool.foreach(pendingSet.getCommits())
          .suppressExceptions(true)
          .run(commitContext::revertCommit);
    }
  }

  /**
   * Load a pendingset file and abort all of its contents.
   * Invoked within a parallel run; the commitContext thread
   * pool is already busy/possibly full, so do not
   * execute work through the same submitter.
   * @param commitContext context to commit through
   * @param activeCommit commit state
   * @param status status of file to load
   * @param deleteRemoteFiles should remote files be deleted?
   * @throws IOException failure
   */
  private void loadAndAbort(
      final CommitContext commitContext,
      final ActiveCommit activeCommit,
      final FileStatus status,
      final boolean suppressExceptions,
      final boolean deleteRemoteFiles) throws IOException {

    final Path path = status.getPath();
    commitContext.switchToIOStatisticsContext();
    try (DurationInfo ignored =
             new DurationInfo(LOG, false, "Aborting %s", path)) {
      PendingSet pendingSet = PersistentCommitData.load(
          activeCommit.getSourceFS(),
          status,
          commitContext.getPendingSetSerializer());
      FileSystem fs = getDestFS();
      TaskPool.foreach(pendingSet.getCommits())
          .executeWith(commitContext.getInnerSubmitter())
          .suppressExceptions(suppressExceptions)
          .run(commit -> {
            try {
              commitContext.abortSingleCommit(commit);
            } catch (FileNotFoundException e) {
              // Commit ID was not known; file may exist.
              // delete it if instructed to do so.
              if (deleteRemoteFiles) {
                fs.delete(commit.destinationPath(), false);
              }
            }
          });
    }
  }

  /**
   * Start the final job commit/abort commit operations.
   * If configured to collect statistics,
   * The IO StatisticsContext is reset.
   * @param context job context
   * @return a commit context through which the operations can be invoked.
   * @throws IOException failure.
   */
  protected CommitContext initiateJobOperation(
      final JobContext context)
      throws IOException {

    IOStatisticsContext ioStatisticsContext =
        IOStatisticsContext.getCurrentIOStatisticsContext();
    CommitContext commitContext = getCommitOperations().createCommitContext(
        context,
        getOutputPath(),
        getJobCommitThreadCount(context),
        ioStatisticsContext);
    commitContext.maybeResetIOStatisticsContext();
    return commitContext;
  }

  /**
   * Start a ask commit/abort commit operations.
   * This may have a different thread count.
   * If configured to collect statistics,
   * The IO StatisticsContext is reset.
   * @param context job or task context
   * @return a commit context through which the operations can be invoked.
   * @throws IOException failure.
   */
  protected CommitContext initiateTaskOperation(
      final JobContext context)
      throws IOException {

    CommitContext commitContext = getCommitOperations().createCommitContext(
        context,
        getOutputPath(),
        getTaskCommitThreadCount(context),
        IOStatisticsContext.getCurrentIOStatisticsContext());
    commitContext.maybeResetIOStatisticsContext();
    return commitContext;
  }

  /**
   * Internal Job commit operation: where the S3 requests are made
   * (potentially in parallel).
   * @param commitContext commit context
   * @param pending pending commits
   * @throws IOException any failure
   */
  protected void commitJobInternal(
      final CommitContext commitContext,
      final ActiveCommit pending)
      throws IOException {
    trackDurationOfInvocation(committerStatistics,
        COMMITTER_COMMIT_JOB.getSymbol(),
        () -> commitPendingUploads(commitContext, pending));
  }

  @Override
  public void abortJob(JobContext context, JobStatus.State state)
      throws IOException {
    LOG.info("{}: aborting job {} in state {}",
        getRole(), jobIdString(context), state);
    // final cleanup operations
    try (CommitContext commitContext = initiateJobOperation(context)){
      abortJobInternal(commitContext, false);
    }
  }


  /**
   * The internal job abort operation; can be overridden in tests.
   * This must clean up operations; it is called when a commit fails, as
   * well as in an {@link #abortJob(JobContext, JobStatus.State)} call.
   * The base implementation calls {@link #cleanup(CommitContext, boolean)}
   * so cleans up the filesystems and destroys the thread pool.
   * Subclasses must always invoke this superclass method after their
   * own operations.
   * Creates and closes its own commit context.
   *
   * @param commitContext commit context
   * @param suppressExceptions should exceptions be suppressed?
   * @throws IOException any IO problem raised when suppressExceptions is false.
   */
  protected void abortJobInternal(CommitContext commitContext,
      boolean suppressExceptions)
      throws IOException {
    cleanup(commitContext, suppressExceptions);
  }

  /**
   * Abort all pending uploads to the destination directory during
   * job cleanup operations.
   * Note: this instantiates the thread pool if required -so
   * @param suppressExceptions should exceptions be suppressed
   * @param commitContext commit context
   * @throws IOException IO problem
   */
  protected void abortPendingUploadsInCleanup(
      boolean suppressExceptions,
      CommitContext commitContext) throws IOException {
    // return early if aborting is disabled.
    if (!shouldAbortUploadsInCleanup()) {
      LOG.debug("Not cleanup up pending uploads to {} as {} is false ",
          getOutputPath(),
          FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS);
      return;
    }
    Path dest = getOutputPath();
    try (DurationInfo ignored =
             new DurationInfo(LOG, "Aborting all pending commits under %s",
                 dest)) {
      CommitOperations ops = getCommitOperations();
      List<MultipartUpload> pending;
      try {
        pending = ops.listPendingUploadsUnderPath(dest);
      } catch (IOException e) {
        // Swallow any errors given this is best effort
        LOG.debug("Failed to list pending uploads under {}", dest, e);
        return;
      }
      if (!pending.isEmpty()) {
        LOG.warn("{} pending uploads were found -aborting", pending.size());
        LOG.warn("If other tasks/jobs are writing to {},"
            + "this action may cause them to fail", dest);
        TaskPool.foreach(pending)
            .executeWith(commitContext.getOuterSubmitter())
            .suppressExceptions(suppressExceptions)
            .run(u -> commitContext.abortMultipartCommit(
                u.getKey(), u.getUploadId()));
      } else {
        LOG.info("No pending uploads were found");
      }
    }
  }

  private boolean shouldAbortUploadsInCleanup() {
    return getConf()
        .getBoolean(FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS,
            DEFAULT_FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS);
  }

  /**
   * Subclass-specific pre-Job-commit actions.
   * The staging committers all load the pending files to verify that
   * they can be loaded.
   * The Magic committer does not, because of the overhead of reading files
   * from S3 makes it too expensive.
   * @param commitContext commit context
   * @param pending the pending operations
   * @throws IOException any failure
   */
  @VisibleForTesting
  public void preCommitJob(CommitContext commitContext,
      ActiveCommit pending) throws IOException {
  }

  /**
   * Commit work.
   * This consists of two stages: precommit and commit.
   * <p>
   * Precommit: identify pending uploads, then allow subclasses
   * to validate the state of the destination and the pending uploads.
   * Any failure here triggers an abort of all pending uploads.
   * <p>
   * Commit internal: do the final commit sequence.
   * <p>
   * The final commit action is to build the {@code _SUCCESS} file entry.
   * </p>
   * @param context job context
   * @throws IOException any failure
   */
  @Override
  public void commitJob(JobContext context) throws IOException {
    String id = jobIdString(context);
    // the commit context is created outside a try-with-resources block
    // so it can be used in exception handling.
    CommitContext commitContext = null;

    SuccessData successData = null;
    IOException failure = null;
    String stage = "preparing";
    try (DurationInfo d = new DurationInfo(LOG,
        "%s: commitJob(%s)", getRole(), id)) {
      commitContext = initiateJobOperation(context);
      ActiveCommit pending
          = listPendingUploadsToCommit(commitContext);
      stage = "precommit";
      preCommitJob(commitContext, pending);
      stage = "commit";
      commitJobInternal(commitContext, pending);
      stage = "completed";
      jobCompleted(true);
      stage = "marker";
      successData = maybeCreateSuccessMarkerFromCommits(commitContext, pending);
      stage = "cleanup";
      cleanup(commitContext, false);
    } catch (IOException e) {
      // failure. record it for the summary
      failure = e;
      LOG.warn("Commit failure for job {}", id, e);
      jobCompleted(false);
      abortJobInternal(commitContext, true);
      throw e;
    } finally {
      // save the report summary, even on failure
      if (commitContext != null) {
        if (successData == null) {
          // if the commit did not get as far as creating success data, create one.
          successData = createSuccessData(context, null, null,
              getDestFS().getConf());
        }
        // save quietly, so no exceptions are raised
        maybeSaveSummary(stage,
            commitContext,
            successData,
            failure,
            true,
            true);
        // and close that commit context
        commitContext.close();
      }
    }

  }

  /**
   * Job completion outcome; this may be subclassed in tests.
   * @param success did the job succeed.
   */
  protected void jobCompleted(boolean success) {
    getCommitOperations().jobCompleted(success);
  }

  /**
   * Clean up any staging directories.
   * IOEs must be caught and swallowed.
   */
  public abstract void cleanupStagingDirs();

  /**
   * Get the list of pending uploads for this job attempt.
   * @param commitContext commit context
   * @return a list of pending uploads.
   * @throws IOException Any IO failure
   */
  protected abstract ActiveCommit listPendingUploadsToCommit(
      CommitContext commitContext)
      throws IOException;

  /**
   * Cleanup the job context, including aborting anything pending
   * and destroying the thread pool.
   * @param commitContext commit context
   * @param suppressExceptions should exceptions be suppressed?
   * @throws IOException any failure if exceptions were not suppressed.
   */
  protected void cleanup(CommitContext commitContext,
      boolean suppressExceptions) throws IOException {
    try (DurationInfo d = new DurationInfo(LOG,
        "Cleanup job %s", jobIdString(commitContext.getJobContext()))) {
      abortPendingUploadsInCleanup(suppressExceptions, commitContext);
    } finally {
      cleanupStagingDirs();
    }
  }

  @Override
  @SuppressWarnings("deprecation")
  public void cleanupJob(JobContext context) throws IOException {
    String r = getRole();
    String id = jobIdString(context);
    LOG.warn("{}: using deprecated cleanupJob call for {}", r, id);
    try (DurationInfo d = new DurationInfo(LOG, "%s: cleanup Job %s", r, id);
         CommitContext commitContext = initiateJobOperation(context)) {
      cleanup(commitContext, true);
    }
  }

  /**
   * Execute an operation; maybe suppress any raised IOException.
   * @param suppress should raised IOEs be suppressed?
   * @param action action (for logging when the IOE is supressed.
   * @param operation operation
   * @throws IOException if operation raised an IOE and suppress == false
   */
  protected void maybeIgnore(
      boolean suppress,
      String action,
      InvocationRaisingIOE operation) throws IOException {
    if (suppress) {
      ignoreIOExceptions(LOG, action, "", operation);
    } else {
      operation.apply();
    }
  }

  /**
   * Log or rethrow a caught IOException.
   * @param suppress should raised IOEs be suppressed?
   * @param action action (for logging when the IOE is suppressed.
   * @param ex  exception
   * @throws IOException if suppress == false
   */
  protected void maybeIgnore(
      boolean suppress,
      String action,
      IOException ex) throws IOException {
    if (suppress) {
      LOG.debug(action, ex);
    } else {
      throw ex;
    }
  }

  /**
   * Get the commit actions instance.
   * Subclasses may provide a mock version of this.
   * @return the commit actions instance to use for operations.
   */
  protected CommitOperations getCommitOperations() {
    return commitOperations;
  }

  /**
   * Used in logging and reporting to help disentangle messages.
   * @return the committer's role.
   */
  protected String getRole() {
    return role;
  }

  /**
   * Get the thread count for this job's commit operations.
   * @param context the JobContext for this commit
   * @return a possibly zero thread count.
   */
  private int getJobCommitThreadCount(final JobContext context) {
    return context.getConfiguration().getInt(
        FS_S3A_COMMITTER_THREADS,
        DEFAULT_COMMITTER_THREADS);
  }

  /**
   * Get the thread count for this task's commit operations.
   * @param context the JobContext for this commit
   * @return a possibly zero thread count.
   */
  private int getTaskCommitThreadCount(final JobContext context) {
    return context.getConfiguration().getInt(
        FS_S3A_COMMITTER_THREADS,
        DEFAULT_COMMITTER_THREADS);
  }

  /**
   * Delete the task attempt path without raising any errors.
   * @param context task context
   */
  protected void deleteTaskAttemptPathQuietly(TaskAttemptContext context) {
    Path attemptPath = getBaseTaskAttemptPath(context);
    ignoreIOExceptions(LOG, "Delete task attempt path", attemptPath.toString(),
        () -> deleteQuietly(
            getTaskAttemptFilesystem(context), attemptPath, true));
  }

  /**
   * Abort all pending uploads in the list.
   * This operation is used by the magic committer as part of its
   * rollback after a failure during task commit.
   * @param commitContext commit context
   * @param pending pending uploads
   * @param suppressExceptions should exceptions be suppressed
   * @throws IOException any exception raised
   */
  protected void abortPendingUploads(
      final CommitContext commitContext,
      final List<SinglePendingCommit> pending,
      final boolean suppressExceptions)
      throws IOException {
    if (pending == null || pending.isEmpty()) {
      LOG.info("{}: no pending commits to abort", getRole());
    } else {
      try (DurationInfo d = new DurationInfo(LOG,
          "Aborting %s uploads", pending.size())) {
        TaskPool.foreach(pending)
            .executeWith(commitContext.getOuterSubmitter())
            .suppressExceptions(suppressExceptions)
            .run(commitContext::abortSingleCommit);
      }
    }
  }

  /**
   * Abort all pending uploads in the list.
   * @param commitContext commit context
   * @param pending pending uploads
   * @param suppressExceptions should exceptions be suppressed?
   * @param deleteRemoteFiles should remote files be deleted?
   * @throws IOException any exception raised
   */
  protected void abortPendingUploads(
      final CommitContext commitContext,
      final ActiveCommit pending,
      final boolean suppressExceptions,
      final boolean deleteRemoteFiles) throws IOException {
    if (pending.isEmpty()) {
      LOG.info("{}: no pending commits to abort", getRole());
    } else {
      try (DurationInfo d = new DurationInfo(LOG,
          "Aborting %s uploads", pending.size())) {
        TaskPool.foreach(pending.getSourceFiles())
            .executeWith(commitContext.getOuterSubmitter())
            .suppressExceptions(suppressExceptions)
            .run(path ->
                loadAndAbort(commitContext,
                    pending,
                    path,
                    suppressExceptions,
                    deleteRemoteFiles));
      }
    }
  }

  @Override
  public IOStatistics getIOStatistics() {
    return committerStatistics.getIOStatistics();
  }

  /**
   * Scan for active uploads and list them along with a warning message.
   * Errors are ignored.
   * @param path output path of job.
   */
  protected void warnOnActiveUploads(final Path path) {
    List<MultipartUpload> pending;
    try {
      pending = getCommitOperations()
          .listPendingUploadsUnderPath(path);
    } catch (IOException e) {
      LOG.debug("Failed to list uploads under {}",
          path, e);
      return;
    }
    if (!pending.isEmpty()) {
      // log a warning
      LOG.warn("{} active upload(s) in progress under {}",
          pending.size(),
          path);
      LOG.warn("Either jobs are running concurrently"
          + " or failed jobs are not being cleaned up");
      // and the paths + timestamps
      DateFormat df = DateFormat.getDateTimeInstance();
      pending.forEach(u ->
          LOG.info("[{}] {}",
              df.format(u.getInitiated()),
              u.getKey()));
      if (shouldAbortUploadsInCleanup()) {
        LOG.warn("This committer will abort these uploads in job cleanup");
      }
    }
  }

  /**
   * Build the job UUID.
   *
   * <p>
   *  In MapReduce jobs, the application ID is issued by YARN, and
   *  unique across all jobs.
   * </p>
   * <p>
   * Spark will use a fake app ID based on the current time.
   * This can lead to collisions on busy clusters unless
   * the specific spark release has SPARK-33402 applied.
   * This appends a random long value to the timestamp, so
   * is unique enough that the risk of collision is almost
   * nonexistent.
   * </p>
   * <p>
   *   The order of selection of a uuid is
   * </p>
   * <ol>
   *   <li>Value of
   *   {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
   *   <li>Value of
   *   {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
   *   <li>If enabled through
   *   {@link CommitConstants#FS_S3A_COMMITTER_GENERATE_UUID}:
   *   Self-generated uuid.</li>
   *   <li>If {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
   *   is not set: Application ID</li>
   * </ol>
   * The UUID bonding takes place during construction;
   * the staging committers use it to set up their wrapped
   * committer to a path in the cluster FS which is unique to the
   * job.
   * <p>
   *  In MapReduce jobs, the application ID is issued by YARN, and
   *  unique across all jobs.
   * </p>
   * In {@link #setupJob(JobContext)} the job context's configuration
   * will be patched
   * be valid in all sequences where the job has been set up for the
   * configuration passed in.
   * <p>
   *   If the option {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
   *   is set, then an external UUID MUST be passed in.
   *   This can be used to verify that the spark engine is reliably setting
   *   unique IDs for staging.
   * </p>
   * @param conf job/task configuration
   * @param jobId job ID from YARN or spark.
   * @return Job UUID and source of it.
   * @throws PathCommitException no UUID was found and it was required
   */
  public static Pair<String, JobUUIDSource>
      buildJobUUID(Configuration conf, JobID jobId)
      throws PathCommitException {

    String jobUUID = conf.getTrimmed(FS_S3A_COMMITTER_UUID, "");

    if (!jobUUID.isEmpty()) {
      return Pair.of(jobUUID, JobUUIDSource.CommitterUUIDProperty);
    }
    // there is no job UUID.
    // look for one from spark
    jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
    if (!jobUUID.isEmpty()) {
      return Pair.of(jobUUID, JobUUIDSource.SparkWriteUUID);
    }

    // there is no UUID configuration in the job/task config

    // Check the job hasn't declared a requirement for the UUID.
    // This allows or fail-fast validation of Spark behavior.
    if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID,
        DEFAULT_S3A_COMMITTER_REQUIRE_UUID)) {
      throw new PathCommitException("", E_NO_SPARK_UUID);
    }

    // see if the job can generate a random UUI`
    if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID,
        DEFAULT_S3A_COMMITTER_GENERATE_UUID)) {
      // generate a random UUID. This is OK for a job, for a task
      // it means that the data may not get picked up.
      String newId = UUID.randomUUID().toString();
      LOG.warn("No job ID in configuration; generating a random ID: {}",
          newId);
      return Pair.of(newId, JobUUIDSource.GeneratedLocally);
    }
    // if no other option was supplied, return the job ID.
    // This is exactly what MR jobs expect, but is not what
    // Spark jobs can do as there is a risk of jobID collision.
    return Pair.of(jobId.toString(), JobUUIDSource.JobID);
  }

  /**
   * Enumeration of Job UUID source.
   */
  public enum JobUUIDSource {
    SparkWriteUUID(SPARK_WRITE_UUID),
    CommitterUUIDProperty(FS_S3A_COMMITTER_UUID),
    JobID("JobID"),
    GeneratedLocally("Generated Locally");

    private final String text;

    JobUUIDSource(final String text) {
      this.text = text;
    }

    /**
     * Source for messages.
     * @return text
     */
    public String getText() {
      return text;
    }

    @Override
    public String toString() {
      final StringBuilder sb = new StringBuilder(
          "JobUUIDSource{");
      sb.append("text='").append(text).append('\'');
      sb.append('}');
      return sb.toString();
    }
  }

  /**
   * Add jobID to current context.
   */
  protected final void updateCommonContext() {
    currentAuditContext().put(AuditConstants.PARAM_JOB_ID, uuid);

  }

  protected AuditSpanSource getAuditSpanSource() {
    return auditSpanSource;
  }

  /**
   * Start an operation; retrieve an audit span.
   *
   * All operation names <i>SHOULD</i> come from
   * {@code StoreStatisticNames} or
   * {@code StreamStatisticNames}.
   * @param name operation name.
   * @param path1 first path of operation
   * @param path2 second path of operation
   * @return a span for the audit
   * @throws IOException failure
   */
  protected AuditSpan startOperation(String name,
      @Nullable String path1,
      @Nullable String path2)
      throws IOException {
    return getAuditSpanSource().createSpan(name, path1, path2);
  }


  /**
   * Save a summary to the report dir if the config option
   * is set.
   * The report will be updated with the current active stage,
   * and if {@code thrown} is non-null, it will be added to the
   * diagnostics (and the job tagged as a failure).
   * Static for testability.
   * @param activeStage active stage
   * @param context commit context.
   * @param report summary file.
   * @param thrown any exception indicting failure.
   * @param quiet should exceptions be swallowed.
   * @param overwrite should the existing file be overwritten
   * @return the path of a file, if successfully saved
   * @throws IOException if a failure occured and quiet==false
   */
  private static Path maybeSaveSummary(
      String activeStage,
      CommitContext context,
      SuccessData report,
      Throwable thrown,
      boolean quiet,
      boolean overwrite) throws IOException {
    Configuration conf = context.getConf();
    String reportDir = conf.getTrimmed(OPT_SUMMARY_REPORT_DIR, "");
    if (reportDir.isEmpty()) {
      LOG.debug("No summary directory set in " + OPT_SUMMARY_REPORT_DIR);
      return null;
    }
    LOG.debug("Summary directory set to {}", reportDir);

    Path reportDirPath = new Path(reportDir);
    Path path = new Path(reportDirPath,
        createJobSummaryFilename(context.getJobId()));

    if (thrown != null) {
      report.recordJobFailure(thrown);
    }
    report.putDiagnostic(STAGE, activeStage);
    // the store operations here is explicitly created for the FS where
    // the reports go, which may not be the target FS of the job.

    final FileSystem fs = path.getFileSystem(conf);
    try (ManifestStoreOperations operations = new ManifestStoreOperationsThroughFileSystem(
        fs)) {
      if (!overwrite) {
        // check for file existence so there is no need to worry about
        // precisely what exception is raised when overwrite=false and dest file
        // exists
        try {
          FileStatus st = operations.getFileStatus(path);
          // get here and the file exists
          LOG.debug("Report already exists: {}", st);
          return null;
        } catch (FileNotFoundException ignored) {
        }
      }
      report.save(fs, path, SuccessData.serializer());
      LOG.info("Job summary saved to {}", path);
      return path;
    } catch (IOException e) {
      LOG.debug("Failed to save summary to {}", path, e);
      if (quiet) {
        return null;
      } else {
        throw e;
      }
    }
  }

  /**
   * State of the active commit operation.
   *
   * It contains a list of all pendingset files to load as the source
   * of outstanding commits to complete/abort,
   * and tracks the files uploaded.
   *
   * To avoid running out of heap by loading all the source files
   * simultaneously:
   * <ol>
   *   <li>
   *     The list of files to load is passed round but
   *     the contents are only loaded on demand.
   *   </li>
   *   <li>
   *     The number of written files tracked for logging in
   *     the _SUCCESS file are limited to a small amount -enough
   *     for testing only.
   *   </li>
   * </ol>
   */
  public static final class ActiveCommit {

    private static final AbstractS3ACommitter.ActiveCommit EMPTY
        = new ActiveCommit(null, new ArrayList<>());

    /** All pendingset files to iterate through. */
    private final List<FileStatus> sourceFiles;

    /**
     * Filesystem for the source files.
     */
    private final FileSystem sourceFS;

    /**
     * List of committed objects; only built up until the commit limit is
     * reached.
     */
    private final List<String> committedObjects = new ArrayList<>();

    /**
     * The total number of committed objects.
     */
    private int committedObjectCount;

    /**
     * Total number of bytes committed.
     */
    private long committedBytes;

    /**
     * Aggregate statistics of all supplied by
     * committed uploads.
     */
    private final IOStatisticsSnapshot ioStatistics =
        new IOStatisticsSnapshot();

    /**
     * Construct from a source FS and list of files.
     * @param sourceFS filesystem containing the list of pending files
     * @param sourceFiles .pendingset files to load and commit.
     */
    @SuppressWarnings("unchecked")
    public ActiveCommit(
        final FileSystem sourceFS,
        final List<? extends FileStatus> sourceFiles) {
      this.sourceFiles = (List<FileStatus>) sourceFiles;
      this.sourceFS = sourceFS;
    }

    /**
     * Create an active commit of the given pending files.
     * @param pendingFS source filesystem.
     * @param statuses iterator of file status or subclass to use.
     * @return the commit
     * @throws IOException if the iterator raises one.
     */
    public static ActiveCommit fromStatusIterator(
        final FileSystem pendingFS,
        final RemoteIterator<? extends FileStatus> statuses) throws IOException {
      return new ActiveCommit(pendingFS, toList(statuses));
    }

    /**
     * Get the empty entry.
     * @return an active commit with no pending files.
     */
    public static ActiveCommit empty() {
      return EMPTY;
    }

    public List<FileStatus> getSourceFiles() {
      return sourceFiles;
    }

    public FileSystem getSourceFS() {
      return sourceFS;
    }

    /**
     * Note that a file was committed.
     * Increase the counter of files and total size.
     * If there is room in the committedFiles list, the file
     * will be added to the list and so end up in the _SUCCESS file.
     * @param key key of the committed object.
     * @param size size in bytes.
     */
    public synchronized void uploadCommitted(String key,
        long size) {
      if (committedObjects.size() < SUCCESS_MARKER_FILE_LIMIT) {
        committedObjects.add(
            key.startsWith("/") ? key : ("/" + key));
      }
      committedObjectCount++;
      committedBytes += size;
    }

    /**
     * Callback when a pendingset has been committed,
     * including any source statistics.
     * @param sourceStatistics any source statistics
     */
    public void pendingsetCommitted(final IOStatistics sourceStatistics) {
      ioStatistics.aggregate(sourceStatistics);
    }

    public IOStatisticsSnapshot getIOStatistics() {
      return ioStatistics;
    }

    public synchronized List<String> getCommittedObjects() {
      return committedObjects;
    }

    public synchronized int getCommittedFileCount() {
      return committedObjectCount;
    }

    public synchronized long getCommittedBytes() {
      return committedBytes;
    }

    public int size() {
      return sourceFiles.size();
    }

    public boolean isEmpty() {
      return sourceFiles.isEmpty();
    }

    public void add(FileStatus status) {
      sourceFiles.add(status);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractS3ACommitterFactory 源码

hadoop CommitConstants 源码

hadoop CommitUtils 源码

hadoop CommitterStatisticNames 源码

hadoop InternalCommitterConstants 源码

hadoop LocalTempDir 源码

hadoop MagicCommitIntegration 源码

hadoop MagicCommitPaths 源码

hadoop PathCommitException 源码

hadoop PutTracker 源码

0  赞