hadoop Paths 源码

  • 2022-10-20
  • 浏览 (204)

haddop Paths 代码

文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.commit.staging;

import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.cache.Cache;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncheckedExecutionException;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.UserGroupInformation;

import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.JAVA_IO_TMPDIR;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;

/**
 * Path operations for the staging committers.
 */
public final class Paths {

  private Paths() {
  }

  /**
   * Insert the UUID to a path if it is not there already.
   * If there is a trailing "." in the prefix after the last slash, the
   * UUID is inserted before it with a "-" prefix; otherwise appended.
   *
   * Examples:
   * <pre>
   *   /example/part-0000  ==&gt; /example/part-0000-0ab34
   *   /example/part-0001.gz.csv  ==&gt; /example/part-0001-0ab34.gz.csv
   *   /example/part-0002-0abc3.gz.csv  ==&gt; /example/part-0002-0abc3.gz.csv
   *   /example0abc3/part-0002.gz.csv  ==&gt; /example0abc3/part-0002.gz.csv
   * </pre>
   *
   *
   * @param pathStr path as a string; must not have a trailing "/".
   * @param uuid UUID to append; must not be empty
   * @return new path.
   */
  public static String addUUID(String pathStr, String uuid) {
    Preconditions.checkArgument(StringUtils.isNotEmpty(pathStr), "empty path");
    Preconditions.checkArgument(StringUtils.isNotEmpty(uuid), "empty uuid");
    // In some cases, Spark will add the UUID to the filename itself.
    if (pathStr.contains(uuid)) {
      return pathStr;
    }

    int dot; // location of the first '.' in the file name
    int lastSlash = pathStr.lastIndexOf('/');
    if (lastSlash >= 0) {
      Preconditions.checkState(lastSlash + 1 < pathStr.length(),
          "Bad path: " + pathStr);
      dot = pathStr.indexOf('.', lastSlash);
    } else {
      dot = pathStr.indexOf('.');
    }

    if (dot >= 0) {
      return pathStr.substring(0, dot) + "-" + uuid + pathStr.substring(dot);
    } else {
      return pathStr + "-" + uuid;
    }
  }

  /**
   * Get the parent path of a string path: everything up to but excluding
   * the last "/" in the path.
   * @param pathStr path as a string
   * @return the parent or null if there is no parent.
   */
  public static String getParent(String pathStr) {
    int lastSlash = pathStr.lastIndexOf('/');
    if (lastSlash >= 0) {
      return pathStr.substring(0, lastSlash);
    }
    return null;
  }

  /**
   * Using {@code URI#relativize()}, build the relative path from the
   * base path to the full path.
   * If {@code childPath} is not a child of {@code basePath} the outcome
   * os undefined.
   * @param basePath base path
   * @param fullPath full path under the base path.
   * @return the relative path
   */
  public static String getRelativePath(Path basePath,
      Path fullPath) {
    return basePath.toUri().relativize(fullPath.toUri()).getPath();
  }

  /**
   * Varags constructor of paths. Not very efficient.
   * @param parent parent path
   * @param child child entries. "" elements are skipped.
   * @return the full child path.
   */
  public static Path path(Path parent, String... child) {
    Path p = parent;
    for (String c : child) {
      if (!c.isEmpty()) {
        p = new Path(p, c);
      }
    }
    return p;
  }

  /**
   * A cache of temporary folders, using a generated ID which must be unique for
   * each active task attempt.
   */
  private static Cache<String, Path> tempFolders = CacheBuilder
      .newBuilder().build();

  /**
   * Get the task attempt temporary directory in the local filesystem.
   * This must be unique to all tasks on all jobs running on all processes
   * on this host.
   * It's constructed as uuid+task-attempt-ID, relying on UUID to be unique
   * for each job.
   * @param conf configuration
   * @param uuid some UUID, such as a job UUID
   * @param attemptID attempt ID
   * @return a local task attempt directory.
   * @throws IOException IO problem.
   */
  public static Path getLocalTaskAttemptTempDir(final Configuration conf,
      final String uuid,
      final TaskAttemptID attemptID)
      throws IOException {
    try {
      final LocalDirAllocator allocator =
          new LocalDirAllocator(Constants.BUFFER_DIR);
      String name = uuid + "-" + attemptID;
      return tempFolders.get(name,
          () -> {
            return FileSystem.getLocal(conf).makeQualified(
                allocator.getLocalPathForWrite(name, conf));
          });
    } catch (ExecutionException | UncheckedExecutionException e) {
      Throwable cause = e.getCause();
      if (cause instanceof RuntimeException) {
        throw (RuntimeException) cause;
      }
      if (cause instanceof IOException) {
        throw (IOException) cause;
      }
      throw new IOException(e);
    }
  }

  /**
   * Remove all information held about task attempts.
   * @param attemptID attempt ID.
   */
  public static void clearTempFolderInfo(final TaskAttemptID attemptID) {
    tempFolders.invalidate(attemptID);
  }

  /**
   * Reset the temp folder cache; useful in tests.
   */
  @VisibleForTesting
  public static void resetTempFolderCache() {
    tempFolders.invalidateAll();
  }

  /**
   * Try to come up with a good temp directory for different filesystems.
   * @param fs filesystem
   * @param conf configuration
   * @return a qualified path under which temporary work can go.
   */
  public static Path tempDirForStaging(FileSystem fs,
      Configuration conf) {
    String fallbackPath = fs.getScheme().equals("file")
        ? System.getProperty(JAVA_IO_TMPDIR)
        : FILESYSTEM_TEMP_PATH;

    return fs.makeQualified(new Path(conf.getTrimmed(
        FS_S3A_COMMITTER_STAGING_TMP_PATH, fallbackPath)));
  }

  /**
   * Get the Application Attempt ID for this job.
   * @param conf the config to look in
   * @return the Application Attempt ID for a given job.
   */
  private static int getAppAttemptId(Configuration conf) {
    return conf.getInt(
        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
  }

  /**
   * Build a qualified temporary path for the multipart upload commit
   * information in the cluster filesystem.
   * Path is built by
   * {@link #getMultipartUploadCommitsDirectory(FileSystem, Configuration, String)}
   * @param conf configuration defining default FS.
   * @param uuid uuid of job
   * @return a path which can be used for temporary work
   * @throws IOException on an IO failure.
   */
  public static Path getMultipartUploadCommitsDirectory(Configuration conf,
      String uuid) throws IOException {
    return getMultipartUploadCommitsDirectory(FileSystem.get(conf), conf, uuid);
  }

  /**
   * Build a qualified temporary path for the multipart upload commit
   * information in the supplied filesystem
   * (which is expected to be the cluster FS).
   * Currently {code $tempDir/$user/$uuid/staging-uploads} where
   * {@code tempDir} is from
   * {@link #tempDirForStaging(FileSystem, Configuration)}.
   * @param fs target FS
   * @param conf configuration
   * @param uuid uuid of job
   * @return a path which can be used for temporary work
   * @throws IOException on an IO failure.
   */
  @VisibleForTesting
  static Path getMultipartUploadCommitsDirectory(FileSystem fs,
      Configuration conf, String uuid) throws IOException {
    return path(
        tempDirForStaging(fs, conf),
        UserGroupInformation.getCurrentUser().getShortUserName(),
        uuid,
        STAGING_UPLOADS);
  }

  /**
   * Returns the partition of a relative file path, or null if the path is a
   * file name with no relative directory.
   *
   * @param relative a relative file path
   * @return the partition of the relative file path
   */
  protected static String getPartition(String relative) {
    return getParent(relative);
  }

  /**
   * Get the set of partitions from the list of files being staged.
   * This is all immediate parents of those files. If a file is in the root
   * dir, the partition is declared to be
   * {@link StagingCommitterConstants#TABLE_ROOT}.
   * @param attemptPath path for the attempt
   * @param taskOutput list of output files.
   * @return list of partitions.
   * @throws IOException IO failure
   */
  public static Set<String> getPartitions(Path attemptPath,
      List<? extends FileStatus> taskOutput)
      throws IOException {
    // get a list of partition directories
    Set<String> partitions = new LinkedHashSet<>();
    for (FileStatus fileStatus : taskOutput) {
      // sanity check the output paths
      Path outputFile = fileStatus.getPath();
      if (!fileStatus.isFile()) {
        throw new PathIsDirectoryException(outputFile.toString());
      }
      String partition = getPartition(
          getRelativePath(attemptPath, outputFile));
      partitions.add(partition != null ? partition : TABLE_ROOT);
    }

    return partitions;
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop ConflictResolution 源码

hadoop DirectoryStagingCommitter 源码

hadoop DirectoryStagingCommitterFactory 源码

hadoop PartitionedStagingCommitter 源码

hadoop PartitionedStagingCommitterFactory 源码

hadoop StagingCommitter 源码

hadoop StagingCommitterConstants 源码

hadoop StagingCommitterFactory 源码

hadoop package-info 源码

0  赞