hadoop CommitOperations 源码

  • 2022-10-20
  • 浏览 (354)

haddop CommitOperations 代码

文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.commit.impl;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.annotation.Nullable;

import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.WriteOperations;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.functional.TaskPool;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_COMMIT_JOB;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MATERIALIZE_FILE;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_LOAD_SINGLE_PENDING_FILE;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_STAGE_FILE_UPLOAD;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.util.functional.RemoteIterators.cleanupRemoteIterator;

/**
 * The implementation of the various actions a committer needs.
 * This doesn't implement the protocol/binding to a specific execution engine,
 * just the operations needed to to build one.
 *
 * When invoking FS operations, it assumes that the underlying FS is
 * handling retries and exception translation: it does not attempt to
 * duplicate that work.
 *
 */
public class CommitOperations extends AbstractStoreOperation
    implements IOStatisticsSource {
  private static final Logger LOG = LoggerFactory.getLogger(
      CommitOperations.class);

  /**
   * Destination filesystem.
   */
  private final S3AFileSystem fs;

  /** Statistics. */
  private final CommitterStatistics statistics;

  /**
   * Write operations for the destination fs.
   */
  private final WriteOperations writeOperations;

  /**
   * Filter to find all {code .pendingset} files.
   */
  public static final PathFilter PENDINGSET_FILTER =
      path -> path.toString().endsWith(CommitConstants.PENDINGSET_SUFFIX);

  /**
   * Filter to find all {code .pending} files.
   */
  public static final PathFilter PENDING_FILTER =
      path -> path.toString().endsWith(CommitConstants.PENDING_SUFFIX);

  /**
   * Instantiate.
   * @param fs FS to bind to
   * @throws IOException failure to bind.
   */
  public CommitOperations(S3AFileSystem fs) throws IOException {
    this(requireNonNull(fs), fs.newCommitterStatistics(), "/");
  }

  /**
   * Instantiate. This creates a new audit span for
   * the commit operations.
   * @param fs FS to bind to
   * @param committerStatistics committer statistics
   * @param outputPath destination of work.
   * @throws IOException failure to bind.
   */
  public CommitOperations(S3AFileSystem fs,
      CommitterStatistics committerStatistics,
      String outputPath) throws IOException {
    super(requireNonNull(fs).createStoreContext());
    this.fs = fs;
    statistics = requireNonNull(committerStatistics);
    // create a span
    writeOperations = fs.createWriteOperationHelper(
        fs.getAuditSpanSource().createSpan(
            COMMITTER_COMMIT_JOB.getSymbol(),
            outputPath, null));
  }

  /**
   * Convert an ordered list of strings to a list of index etag parts.
   * @param tagIds list of tags
   * @return same list, now in numbered tuples
   */
  public static List<PartETag> toPartEtags(List<String> tagIds) {
    return IntStream.range(0, tagIds.size())
        .mapToObj(i -> new PartETag(i + 1, tagIds.get(i)))
        .collect(Collectors.toList());
  }

  @Override
  public String toString() {
    return "CommitOperations{" + fs.getUri() + '}';
  }

  /** @return statistics. */
  protected CommitterStatistics getStatistics() {
    return statistics;
  }

  @Override
  public IOStatistics getIOStatistics() {
    return statistics.getIOStatistics();
  }

  /**
   * Commit the operation, throwing an exception on any failure.
   * @param commit commit to execute
   * @throws IOException on a failure
   */
  public void commitOrFail(
      final SinglePendingCommit commit) throws IOException {
    commit(commit, commit.getFilename()).maybeRethrow();
  }

  /**
   * Commit a single pending commit; exceptions are caught
   * and converted to an outcome.
   * @param commit entry to commit
   * @param origin origin path/string for outcome text
   * @return the outcome
   */
  public MaybeIOE commit(
      final SinglePendingCommit commit,
      final String origin) {
    LOG.debug("Committing single commit {}", commit);
    MaybeIOE outcome;
    String destKey = "unknown destination";
    try (DurationInfo d = new DurationInfo(LOG,
        "Committing file %s size %s",
        commit.getDestinationKey(),
        commit.getLength())) {

      commit.validate();
      destKey = commit.getDestinationKey();
      long l = trackDuration(statistics, COMMITTER_MATERIALIZE_FILE.getSymbol(),
          () -> innerCommit(commit));
      LOG.debug("Successful commit of file length {}", l);
      outcome = MaybeIOE.NONE;
      statistics.commitCompleted(commit.getLength());
    } catch (IOException e) {
      String msg = String.format("Failed to commit upload against %s: %s",
          destKey, e);
      LOG.warn(msg, e);
      outcome = new MaybeIOE(e);
      statistics.commitFailed();
    } catch (Exception e) {
      String msg = String.format("Failed to commit upload against %s," +
          " described in %s: %s", destKey, origin, e);
      LOG.warn(msg, e);
      outcome = new MaybeIOE(new PathCommitException(origin, msg, e));
      statistics.commitFailed();
    }
    return outcome;
  }

  /**
   * Inner commit operation.
   * @param commit entry to commit
   * @return bytes committed.
   * @throws IOException failure
   */
  private long innerCommit(
      final SinglePendingCommit commit) throws IOException {
    // finalize the commit
    writeOperations.commitUpload(
        commit.getDestinationKey(),
        commit.getUploadId(),
        toPartEtags(commit.getEtags()),
        commit.getLength());
    return commit.getLength();
  }

  /**
   * Locate all files with the pending suffix under a directory.
   * @param pendingDir directory
   * @param recursive recursive listing?
   * @return iterator of all located entries
   * @throws IOException if there is a problem listing the path.
   */
  public RemoteIterator<LocatedFileStatus> locateAllSinglePendingCommits(
      Path pendingDir,
      boolean recursive) throws IOException {
    return listAndFilter(fs, pendingDir, recursive, PENDING_FILTER);
  }

  /**
   * Load all single pending commits in the directory, using the
   * outer submitter.
   * All load failures are logged and then added to list of files which would
   * not load.
   *
   * @param pendingDir directory containing commits
   * @param recursive do a recursive scan?
   * @param commitContext commit context
   *
   * @return tuple of loaded entries and those pending files which would
   * not load/validate.
   *
   * @throws IOException on a failure to list the files.
   */
  public Pair<PendingSet,
      List<Pair<LocatedFileStatus, IOException>>>
      loadSinglePendingCommits(Path pendingDir,
      boolean recursive,
      CommitContext commitContext)
      throws IOException {

    PendingSet commits = new PendingSet();
    List<SinglePendingCommit> pendingFiles = Collections.synchronizedList(
        new ArrayList<>(1));
    List<Pair<LocatedFileStatus, IOException>> failures = Collections.synchronizedList(
        new ArrayList<>(1));

    TaskPool.foreach(locateAllSinglePendingCommits(pendingDir, recursive))
        //. stopOnFailure()
        .suppressExceptions(false)
        .executeWith(commitContext.getOuterSubmitter())
        .run(status -> {
          Path path = status.getPath();
          try {
            // load the file
            SinglePendingCommit singleCommit = trackDuration(statistics,
                COMMITTER_LOAD_SINGLE_PENDING_FILE.getSymbol(), () ->
                    SinglePendingCommit.load(fs,
                        path,
                        status,
                        commitContext.getSinglePendingFileSerializer()));
            // aggregate stats
            commits.getIOStatistics()
                .aggregate(singleCommit.getIOStatistics());
            // then clear so they aren't marshalled again.
            singleCommit.getIOStatistics().clear();
            pendingFiles.add(singleCommit);
          } catch (IOException e) {
            LOG.warn("Failed to load commit file {}", path, e);
            failures.add(Pair.of(status, e));
          }
        });
    commits.setCommits(pendingFiles);
    return Pair.of(commits, failures);
  }

  /**
   * Convert any exception to an IOE, if needed.
   * @param key key to use in a path exception
   * @param ex exception
   * @return an IOE, either the passed in value or a new one wrapping the other
   * exception.
   */
  public IOException makeIOE(String key, Exception ex) {
    return ex instanceof IOException
           ? (IOException) ex
           : new PathCommitException(key, ex.toString(), ex);
  }

  /**
   * Abort the multipart commit supplied. This is the lower level operation
   * which doesn't generate an outcome, instead raising an exception.
   * @param commit pending commit to abort
   * @throws FileNotFoundException if the abort ID is unknown
   * @throws IOException on any failure
   */
  public void abortSingleCommit(SinglePendingCommit commit)
      throws IOException {
    String destKey = commit.getDestinationKey();
    String origin = commit.getFilename() != null
                    ? (" defined in " + commit.getFilename())
                    : "";
    String uploadId = commit.getUploadId();
    LOG.info("Aborting commit ID {} to object {}{}", uploadId, destKey, origin);
    abortMultipartCommit(destKey, uploadId);
  }

  /**
   * Create an {@code AbortMultipartUpload} request and POST it to S3,
   * incrementing statistics afterwards.
   * @param destKey destination key
   * @param uploadId upload to cancel
   * @throws FileNotFoundException if the abort ID is unknown
   * @throws IOException on any failure
   */
  public void abortMultipartCommit(String destKey, String uploadId)
      throws IOException {
    try (DurationInfo d = new DurationInfo(LOG,
        "Aborting commit ID %s to path %s", uploadId, destKey)) {
      writeOperations.abortMultipartCommit(destKey, uploadId);
    } finally {
      statistics.commitAborted();
    }
  }

  /**
   * Enumerate all pending files in a dir/tree, abort.
   * @param pendingDir directory of pending operations
   * @param commitContext commit context
   * @param recursive recurse?
   * @return the outcome of all the abort operations
   * @throws IOException if there is a problem listing the path.
   */
  public MaybeIOE abortAllSinglePendingCommits(Path pendingDir,
      CommitContext commitContext,
      boolean recursive)
      throws IOException {
    Preconditions.checkArgument(pendingDir != null, "null pendingDir");
    LOG.debug("Aborting all pending commit filess under {}"
            + " (recursive={}", pendingDir, recursive);
    RemoteIterator<LocatedFileStatus> pendingFiles;
    try {
      pendingFiles = ls(pendingDir, recursive);
    } catch (FileNotFoundException fnfe) {
      LOG.info("No directory to abort {}", pendingDir);
      return MaybeIOE.NONE;
    }
    MaybeIOE outcome = MaybeIOE.NONE;
    if (!pendingFiles.hasNext()) {
      LOG.debug("No files to abort under {}", pendingDir);
    }
    while (pendingFiles.hasNext()) {
      final LocatedFileStatus status = pendingFiles.next();
      Path pendingFile = status.getPath();
      if (pendingFile.getName().endsWith(CommitConstants.PENDING_SUFFIX)) {
        try {
          abortSingleCommit(SinglePendingCommit.load(fs,
              pendingFile,
              status,
              commitContext.getSinglePendingFileSerializer()));
        } catch (FileNotFoundException e) {
          LOG.debug("listed file already deleted: {}", pendingFile);
        } catch (IOException | IllegalArgumentException e) {
          if (MaybeIOE.NONE.equals(outcome)) {
            outcome = new MaybeIOE(makeIOE(pendingFile.toString(), e));
          }
        } finally {
          // quietly try to delete the pending file
          S3AUtils.deleteQuietly(fs, pendingFile, false);
        }
      }
    }
    cleanupRemoteIterator(pendingFiles);
    return outcome;
  }

  /**
   * List files.
   * @param path path
   * @param recursive recursive listing?
   * @return iterator
   * @throws IOException failure
   */
  protected RemoteIterator<LocatedFileStatus> ls(Path path, boolean recursive)
      throws IOException {
    return fs.listFiles(path, recursive);
  }

  /**
   * List all pending uploads to the destination FS under a path.
   * @param dest destination path
   * @return A list of the pending uploads to any directory under that path.
   * @throws IOException IO failure
   */
  public List<MultipartUpload> listPendingUploadsUnderPath(Path dest)
      throws IOException {
    return writeOperations.listMultipartUploads(fs.pathToKey(dest));
  }

  /**
   * Abort all pending uploads to the destination FS under a path.
   * @param dest destination path
   * @return a count of the number of uploads aborted.
   * @throws IOException IO failure
   */
  public int abortPendingUploadsUnderPath(Path dest) throws IOException {
    return writeOperations.abortMultipartUploadsUnderPath(fs.pathToKey(dest));
  }

  /**
   * Delete any existing {@code _SUCCESS} file.
   * @param outputPath output directory
   * @throws IOException IO problem
   */
  public void deleteSuccessMarker(Path outputPath) throws IOException {
    fs.delete(new Path(outputPath, _SUCCESS), false);
  }

  /**
   * Save the success data to the {@code _SUCCESS} file.
   * @param outputPath output directory
   * @param successData success data to save.
   * @param addMetrics should the FS metrics be added?
   * @throws IOException IO problem
   */
  public void createSuccessMarker(Path outputPath,
      SuccessData successData,
      boolean addMetrics)
      throws IOException {
    Preconditions.checkArgument(outputPath != null, "null outputPath");

    if (addMetrics) {
      addFileSystemStatistics(successData.getMetrics());
    }

    // now write
    Path markerPath = new Path(outputPath, _SUCCESS);
    LOG.debug("Touching success marker for job {}: {}", markerPath,
        successData);
    try (DurationInfo ignored = new DurationInfo(LOG,
        "Writing success file %s", markerPath)) {
      successData.save(fs, markerPath, SuccessData.serializer());
    }
  }

  /**
   * Revert a pending commit by deleting the destination.
   * @param commit pending commit
   * @throws IOException failure
   */
  public void revertCommit(SinglePendingCommit commit) throws IOException {
    LOG.info("Revert {}", commit);
    try {
      writeOperations.revertCommit(commit.getDestinationKey());
    } finally {
      statistics.commitReverted();
    }
  }

  /**
   * Upload all the data in the local file, returning the information
   * needed to commit the work.
   * @param localFile local file (be  a file)
   * @param destPath destination path
   * @param partition partition/subdir. Not used
   * @param uploadPartSize size of upload
   * @param progress progress callback
   * @return a pending upload entry
   * @throws IOException failure
   */
  public SinglePendingCommit uploadFileToPendingCommit(File localFile,
      Path destPath,
      String partition,
      long uploadPartSize,
      Progressable progress)
      throws IOException {

    LOG.debug("Initiating multipart upload from {} to {}",
        localFile, destPath);
    Preconditions.checkArgument(destPath != null);
    if (!localFile.isFile()) {
      throw new FileNotFoundException("Not a file: " + localFile);
    }
    String destURI = destPath.toUri().toString();
    String destKey = fs.pathToKey(destPath);
    String uploadId = null;

    // flag to indicate to the finally clause that the operation
    // failed. it is cleared as the last action in the try block.
    boolean threw = true;
    final DurationTracker tracker = statistics.trackDuration(
        COMMITTER_STAGE_FILE_UPLOAD.getSymbol());
    try (DurationInfo d = new DurationInfo(LOG,
        "Upload staged file from %s to %s",
        localFile.getAbsolutePath(),
        destPath)) {

      statistics.commitCreated();
      uploadId = writeOperations.initiateMultiPartUpload(destKey,
          PutObjectOptions.keepingDirs());
      long length = localFile.length();

      SinglePendingCommit commitData = new SinglePendingCommit();
      commitData.setDestinationKey(destKey);
      commitData.setBucket(fs.getBucket());
      commitData.touch(System.currentTimeMillis());
      commitData.setUploadId(uploadId);
      commitData.setUri(destURI);
      commitData.setText(partition != null ? "partition: " + partition : "");
      commitData.setLength(length);

      long offset = 0;
      long numParts = (length / uploadPartSize +
          ((length % uploadPartSize) > 0 ? 1 : 0));
      // always write one part, even if it is just an empty one
      if (numParts == 0) {
        numParts = 1;
      }
      if (numParts > InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT) {
        // fail if the file is too big.
        // it would be possible to be clever here and recalculate the part size,
        // but this is not currently done.
        throw new PathIOException(destPath.toString(),
            String.format("File to upload (size %d)"
                + " is too big to be uploaded in parts of size %d",
                numParts, length));
      }

      List<PartETag> parts = new ArrayList<>((int) numParts);

      LOG.debug("File size is {}, number of parts to upload = {}",
          length, numParts);
      for (int partNumber = 1; partNumber <= numParts; partNumber += 1) {
        progress.progress();
        long size = Math.min(length - offset, uploadPartSize);
        UploadPartRequest part;
        part = writeOperations.newUploadPartRequest(
            destKey,
            uploadId,
            partNumber,
            (int) size,
            null,
            localFile,
            offset);
        part.setLastPart(partNumber == numParts);
        UploadPartResult partResult = writeOperations.uploadPart(part);
        offset += uploadPartSize;
        parts.add(partResult.getPartETag());
      }

      commitData.bindCommitData(parts);
      statistics.commitUploaded(length);
      // clear the threw flag.
      threw = false;
      return commitData;
    } finally {
      if (threw && uploadId != null) {
        try {
          abortMultipartCommit(destKey, uploadId);
        } catch (IOException e) {
          LOG.error("Failed to abort upload {} to {}", uploadId, destKey, e);
        }
      }
      if (threw) {
        tracker.failed();
      }
      // close tracker and so report statistics of success/failure
      tracker.close();
    }
  }

  /**
   * Add the filesystem statistics to the map; overwriting anything
   * with the same name.
   * @param dest destination map
   */
  public void addFileSystemStatistics(Map<String, Long> dest) {
    dest.putAll(fs.getInstrumentation().toMap());
  }

  /**
   * Note that a task has completed.
   * @param success success flag
   */
  public void taskCompleted(boolean success) {
    statistics.taskCompleted(success);
  }

  /**
   * Note that a job has completed.
   * @param success success flag
   */
  public void jobCompleted(boolean success) {
    statistics.jobCompleted(success);
  }

  /**
   * Create a commit context for a job or task.
   *
   * @param context job context
   * @param path path for all work.
   * @param committerThreads thread pool size
   * @param ioStatisticsContext IOStatistics context of current thread
   * @return the commit context to pass in.
   * @throws IOException failure.
   */
  public CommitContext createCommitContext(
      JobContext context,
      Path path,
      int committerThreads,
      IOStatisticsContext ioStatisticsContext) throws IOException {
    return new CommitContext(this, context,
        committerThreads,
        ioStatisticsContext);
  }

  /**
   * Create a stub commit context for tests.
   * There's no job context.
   * @param path path for all work.
   * @param jobId job ID; if null a random UUID is generated.
   * @param committerThreads number of committer threads.
   * @return the commit context to pass in.
   * @throws IOException failure.
   */
  public CommitContext createCommitContextForTesting(
      Path path, @Nullable String jobId, int committerThreads) throws IOException {
    final String id = jobId != null
        ? jobId
        : UUID.randomUUID().toString();

    return new CommitContext(this,
        getStoreContext().getConfiguration(),
        id,
        committerThreads,
        IOStatisticsContext.getCurrentIOStatisticsContext());
  }

  /**
   * Get the magic file length of a file.
   * If the FS doesn't support the API, the attribute is missing or
   * the parse to long fails, then Optional.empty() is returned.
   * Static for some easier testability.
   * @param fs filesystem
   * @param path path
   * @return either a length or None.
   * @throws IOException on error
   * */
  public static Optional<Long> extractMagicFileLength(FileSystem fs, Path path)
      throws IOException {
    byte[] bytes;
    try {
      bytes = fs.getXAttr(path, XA_MAGIC_MARKER);
    } catch (UnsupportedOperationException e) {
      // FS doesn't support xattr.
      LOG.debug("Filesystem {} doesn't support XAttr API", fs);
      return Optional.empty();
    }
    return HeaderProcessing.extractXAttrLongValue(bytes);
  }

  /**
   * A holder for a possible IOException; the call {@link #maybeRethrow()}
   * will throw any exception passed into the constructor, and be a no-op
   * if none was.
   *
   * Why isn't a Java 8 optional used here? The main benefit would be that
   * {@link #maybeRethrow()} could be done as a map(), but because Java doesn't
   * allow checked exceptions in a map, the following code is invalid
   * <pre>
   *   exception.map((e) -&gt; {throw e;}
   * </pre>
   * As a result, the code to work with exceptions would be almost as convoluted
   * as the original.
   */
  public static class MaybeIOE {
    private final IOException exception;

    public static final MaybeIOE NONE = new MaybeIOE(null);

    /**
     * Construct with an exception.
     * @param exception exception
     */
    public MaybeIOE(IOException exception) {
      this.exception = exception;
    }

    /**
     * Get any exception.
     * @return the exception.
     */
    public IOException getException() {
      return exception;
    }

    /**
     * Is there an exception in this class?
     * @return true if there is an exception
     */
    public boolean hasException() {
      return exception != null;
    }

    /**
     * Rethrow any exception.
     * @throws IOException the exception field, if non-null.
     */
    public void maybeRethrow() throws IOException {
      if (exception != null) {
        throw exception;
      }
    }

    @Override
    public String toString() {
      final StringBuilder sb = new StringBuilder("MaybeIOE{");
      sb.append(hasException() ? exception : "");
      sb.append('}');
      return sb.toString();
    }

    /**
     * Get an instance based on the exception: either a value
     * or a reference to {@link #NONE}.
     * @param ex exception
     * @return an instance.
     */
    public static MaybeIOE of(IOException ex) {
      return ex != null ? new MaybeIOE(ex) : NONE;
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AuditContextUpdater 源码

hadoop CommitContext 源码

hadoop CommitUtilsWithMR 源码

hadoop package-info 源码

0  赞