hadoop CoreFileSystem 源码

  • 2022-10-20
  • 浏览 (149)

haddop CoreFileSystem 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.service.utils;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.exceptions.ErrorStrings;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CoreFileSystem {
  private static final Logger
    log = LoggerFactory.getLogger(CoreFileSystem.class);

  private static final String UTF_8 = "UTF-8";

  protected final FileSystem fileSystem;
  protected final Configuration configuration;

  public CoreFileSystem(FileSystem fileSystem, Configuration configuration) {
    Preconditions.checkNotNull(fileSystem,
                               "Cannot create a CoreFileSystem with a null FileSystem");
    Preconditions.checkNotNull(configuration,
                               "Cannot create a CoreFileSystem with a null Configuration");
    this.fileSystem = fileSystem;
    this.configuration = configuration;
  }

  public CoreFileSystem(Configuration configuration) throws IOException {
    Preconditions.checkNotNull(configuration,
                               "Cannot create a CoreFileSystem with a null Configuration");
    this.fileSystem = FileSystem.get(configuration);
    this.configuration = configuration;
  }
  
  /**
   * Get the temp path for this cluster
   * @param clustername name of the cluster
   * @return path for temp files (is not purged)
   */
  public Path getTempPathForCluster(String clustername) {
    Path clusterDir = buildClusterDirPath(clustername);
    return new Path(clusterDir, YarnServiceConstants.TMP_DIR_PREFIX);
  }

  /**
   * Returns the underlying FileSystem for this object.
   *
   * @return filesystem
   */
  public FileSystem getFileSystem() {
    return fileSystem;
  }

  @Override
  public String toString() {
    final StringBuilder sb =
      new StringBuilder("CoreFileSystem{");
    sb.append("fileSystem=").append(fileSystem.getUri())
        .append('}');
    return sb.toString();
  }

  /**
   * Build up the path string for a cluster instance -no attempt to
   * create the directory is made
   *
   * @param clustername name of the cluster
   * @return the path for persistent data
   */
  public Path buildClusterDirPath(String clustername) {
    Preconditions.checkNotNull(clustername);
    Path path = getBaseApplicationPath();
    return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/"
        + clustername);
  }

  /**
   * Build up the upgrade path string for a cluster. No attempt to
   * create the directory is made.
   *
   * @param clusterName name of the cluster
   * @param version version of the cluster
   * @return the upgrade path to the cluster
   */
  public Path buildClusterUpgradeDirPath(String clusterName, String version) {
    Preconditions.checkNotNull(clusterName);
    Preconditions.checkNotNull(version);
    return new Path(buildClusterDirPath(clusterName),
        YarnServiceConstants.UPGRADE_DIR + "/" + version);
  }

  /**
   * Delete the upgrade cluster directory.
   * @param clusterName name of the cluster
   * @param version     version of the cluster
   * @throws IOException
   */
  public void deleteClusterUpgradeDir(String clusterName, String version)
      throws IOException {
    Preconditions.checkNotNull(clusterName);
    Preconditions.checkNotNull(version);
    Path upgradeCluster = buildClusterUpgradeDirPath(clusterName, version);
    fileSystem.delete(upgradeCluster, true);
  }
  /**
   * Build up the path string for keytab install location -no attempt to
   * create the directory is made
   *
   * @return the path for keytab
   */
  public Path buildKeytabInstallationDirPath(String keytabFolder) {
    Preconditions.checkNotNull(keytabFolder);
    Path path = getBaseApplicationPath();
    return new Path(path, YarnServiceConstants.KEYTAB_DIR + "/" + keytabFolder);
  }

  /**
   * Build up the path string for keytab install location -no attempt to
   * create the directory is made
   *
   * @return the path for keytab installation location
   */
  public Path buildKeytabPath(String keytabDir, String keytabName, String clusterName) {
    Path homePath = getHomeDirectory();
    Path baseKeytabDir;
    if (keytabDir != null) {
      baseKeytabDir = new Path(homePath, keytabDir);
    } else {
      baseKeytabDir = new Path(buildClusterDirPath(clusterName),
                               YarnServiceConstants.KEYTAB_DIR);
    }
    return keytabName == null ? baseKeytabDir :
        new Path(baseKeytabDir, keytabName);
  }

  /**
   * Build up the path string for resource install location -no attempt to
   * create the directory is made
   *
   * @return the path for resource
   */
  public Path buildResourcePath(String resourceFolder) {
    Preconditions.checkNotNull(resourceFolder);
    Path path = getBaseApplicationPath();
    return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + resourceFolder);
  }

  /**
   * Build up the path string for resource install location -no attempt to
   * create the directory is made
   *
   * @return the path for resource
   */
  public Path buildResourcePath(String dirName, String fileName) {
    Preconditions.checkNotNull(dirName);
    Preconditions.checkNotNull(fileName);
    Path path = getBaseApplicationPath();
    return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + dirName + "/" + fileName);
  }

  /**
   * Create a directory with the given permissions.
   *
   * @param dir          directory
   * @param clusterPerms cluster permissions
   * @throws IOException  IO problem
   * @throws BadClusterStateException any cluster state problem
   */
  @SuppressWarnings("deprecation")
  public void createWithPermissions(Path dir, FsPermission clusterPerms) throws
          IOException,
          BadClusterStateException {
    if (fileSystem.isFile(dir)) {
      // HADOOP-9361 shows some filesystems don't correctly fail here
      throw new BadClusterStateException(
              "Cannot create a directory over a file %s", dir);
    }
    log.debug("mkdir {} with perms {}", dir, clusterPerms);
    //no mask whatoever
    fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
    fileSystem.mkdirs(dir, clusterPerms);
    //and force set it anyway just to make sure
    fileSystem.setPermission(dir, clusterPerms);
  }

  /**
   * Verify that the cluster directory is not present
   *
   * @param clustername      name of the cluster
   * @param clusterDirectory actual directory to look for
   * @throws IOException trouble with FS
   * @throws SliderException If the directory exists
   */
  public void verifyClusterDirectoryNonexistent(String clustername,
                                                Path clusterDirectory)
      throws IOException, SliderException {
    if (fileSystem.exists(clusterDirectory)) {
      throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
              ErrorStrings.PRINTF_E_INSTANCE_ALREADY_EXISTS, clustername,
              clusterDirectory);
    }
  }
  /**
   * Verify that the given directory is not present
   *
   * @param clusterDirectory actual directory to look for
   * @throws IOException    trouble with FS
   * @throws SliderException If the directory exists
   */
  public void verifyDirectoryNonexistent(Path clusterDirectory) throws
          IOException,
      SliderException {
    if (fileSystem.exists(clusterDirectory)) {
      
      log.error("Dir {} exists: {}",
                clusterDirectory,
                listFSDir(clusterDirectory));
      throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
              ErrorStrings.PRINTF_E_INSTANCE_DIR_ALREADY_EXISTS,
              clusterDirectory);
    }
  }

  /**
   * Verify that a user has write access to a directory.
   * It does this by creating then deleting a temp file
   *
   * @param dirPath actual directory to look for
   * @throws FileNotFoundException file not found
   * @throws IOException  trouble with FS
   * @throws BadClusterStateException if the directory is not writeable
   */
  public void verifyDirectoryWriteAccess(Path dirPath) throws IOException,
      SliderException {
    verifyPathExists(dirPath);
    Path tempFile = new Path(dirPath, "tmp-file-for-checks");
    try {
      FSDataOutputStream out ;
      out = fileSystem.create(tempFile, true);
      IOUtils.closeStream(out);
      fileSystem.delete(tempFile, false);
    } catch (IOException e) {
      log.warn("Failed to create file {}: {}", tempFile, e);
      throw new BadClusterStateException(e,
              "Unable to write to directory %s : %s", dirPath, e.toString());
    }
  }

  /**
   * Verify that a path exists
   * @param path path to check
   * @throws FileNotFoundException file not found
   * @throws IOException  trouble with FS
   */
  public void verifyPathExists(Path path) throws IOException {
    if (!fileSystem.exists(path)) {
      throw new FileNotFoundException(path.toString());
    }
  }

  /**
   * Verify that a path exists
   * @param path path to check
   * @throws FileNotFoundException file not found or is not a file
   * @throws IOException  trouble with FS
   */
  public void verifyFileExists(Path path) throws IOException {
    FileStatus status = fileSystem.getFileStatus(path);

    if (!status.isFile()) {
      throw new FileNotFoundException("Not a file: " + path.toString());
    }
  }

  /**
   * Given a path, check if it exists and is a file
   * 
   * @param path
   *          absolute path to the file to check
   * @return true if and only if path exists and is a file, false for all other
   *          reasons including if file check throws IOException
   */
  public boolean isFile(Path path) {
    if (path == null) {
      return false;
    }
    boolean isFile = false;
    try {
      FileStatus status = fileSystem.getFileStatus(path);
      if (status.isFile()) {
        isFile = true;
      }
    } catch (IOException e) {
      // ignore, isFile is already set to false
    }
    return isFile;
  }

  /**
   * Get the base path
   *
   * @return the base path optionally configured by 
   * {@link YarnServiceConf#YARN_SERVICE_BASE_PATH}
   */
  public Path getBaseApplicationPath() {
    String configuredBasePath = configuration
        .get(YarnServiceConf.YARN_SERVICE_BASE_PATH,
            getHomeDirectory() + "/" + YarnServiceConstants.SERVICE_BASE_DIRECTORY);
    return new Path(configuredBasePath);
  }

  /**
   * Get service dependency absolute filepath in HDFS used for application
   * submission.
   * 
   * @return the absolute path to service dependency tarball in HDFS
   */
  public Path getDependencyTarGzip() {
    Path dependencyLibTarGzip = null;
    String configuredDependencyTarballPath = configuration
        .get(YarnServiceConf.DEPENDENCY_TARBALL_PATH);
    if (configuredDependencyTarballPath != null) {
      dependencyLibTarGzip = new Path(configuredDependencyTarballPath);
    }
    if (dependencyLibTarGzip == null) {
      dependencyLibTarGzip = new Path(String.format(YarnServiceConstants
          .DEPENDENCY_DIR, VersionInfo.getVersion()),
          YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME
              + YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT);
    }
    return dependencyLibTarGzip;
  }

  public Path getHomeDirectory() {
    return fileSystem.getHomeDirectory();
  }

  /**
   * Create an AM resource from the
   *
   * @param destPath     dest path in filesystem
   * @param resourceType resource type
   * @return the local resource for AM
   */
  public LocalResource createAmResource(Path destPath,
      LocalResourceType resourceType,
      LocalResourceVisibility visibility) throws IOException {

    FileStatus destStatus = fileSystem.getFileStatus(destPath);
    LocalResource amResource = Records.newRecord(LocalResource.class);
    amResource.setType(resourceType);
    // Set visibility of the resource
    // Setting to most private option
    if (visibility == null) {
      visibility = LocalResourceVisibility.APPLICATION;
    }
    amResource.setVisibility(visibility);
    // Set the resource to be copied over
    amResource.setResource(
        URL.fromPath(fileSystem.resolvePath(destStatus.getPath())));
    // Set timestamp and length of file so that the framework
    // can do basic sanity checks for the local resource
    // after it has been copied over to ensure it is the same
    // resource the client intended to use with the service
    amResource.setTimestamp(destStatus.getModificationTime());
    amResource.setSize(destStatus.getLen());
    return amResource;
  }

  /**
   * Register all files under a fs path as a directory to push out
   *
   * @param srcDir          src dir
   * @param destRelativeDir dest dir (no trailing /)
   * @return the map of entries
   */
  public Map<String, LocalResource> submitDirectory(Path srcDir, String destRelativeDir) throws IOException {
    //now register each of the files in the directory to be
    //copied to the destination
    FileStatus[] fileset = fileSystem.listStatus(srcDir);
    Map<String, LocalResource> localResources =
            new HashMap<String, LocalResource>(fileset.length);
    for (FileStatus entry : fileset) {

      LocalResource resource = createAmResource(entry.getPath(),
              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION);
      String relativePath = destRelativeDir + "/" + entry.getPath().getName();
      localResources.put(relativePath, resource);
    }
    return localResources;
  }

  /**
   * Submit a JAR containing a specific class, returning
   * the resource to be mapped in
   *
   * @param clazz   class to look for
   * @param subdir  subdirectory (expected to end in a "/")
   * @param jarName <i>At the destination</i>
   * @return the local resource ref
   * @throws IOException trouble copying to HDFS
   */
  public LocalResource submitJarWithClass(Class clazz, Path tempPath, String subdir, String jarName)
          throws IOException, SliderException {
    File localFile = ServiceUtils.findContainingJarOrFail(clazz);
    return submitFile(localFile, tempPath, subdir, jarName);
  }

  /**
   * Submit a local file to the filesystem references by the instance's cluster
   * filesystem
   *
   * @param localFile    filename
   * @param subdir       subdirectory (expected to end in a "/")
   * @param destFileName destination filename
   * @return the local resource ref
   * @throws IOException trouble copying to HDFS
   */
  public LocalResource submitFile(File localFile, Path tempPath, String subdir, String destFileName)
      throws IOException {
    Path src = new Path(localFile.toString());
    Path subdirPath = new Path(tempPath, subdir);
    fileSystem.mkdirs(subdirPath);
    Path destPath = new Path(subdirPath, destFileName);
    log.debug("Copying {} (size={} bytes) to {}", localFile, localFile.length(), destPath);

    fileSystem.copyFromLocalFile(false, true, src, destPath);

    // Set the type of resource - file or archive
    // archives are untarred at destination
    // we don't need the jar file to be untarred for now
    return createAmResource(destPath, LocalResourceType.FILE,
        LocalResourceVisibility.APPLICATION);
  }

  /**
   * Submit the AM tar.gz resource referenced by the instance's cluster
   * filesystem. Also, update the providerResources object with the new
   * resource.
   * 
   * @param providerResources
   *          the provider resource map to be updated
   * @throws IOException
   *           trouble copying to HDFS
   */
  public void submitTarGzipAndUpdate(
      Map<String, LocalResource> providerResources) throws IOException,
      BadClusterStateException {
    Path dependencyLibTarGzip = getDependencyTarGzip();
    LocalResource lc = createAmResource(dependencyLibTarGzip,
        LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION);
    providerResources.put(YarnServiceConstants.DEPENDENCY_LOCALIZED_DIR_LINK, lc);
  }

  public void copyLocalFileToHdfs(File localPath,
      Path destPath, FsPermission fp)
      throws IOException {
    if (localPath == null || destPath == null) {
      throw new IOException("Either localPath or destPath is null");
    }
    fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
        "000");
    fileSystem.mkdirs(destPath.getParent(), fp);
    log.info("Copying file {} to {}", localPath.toURI(), destPath);
    
    fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()),
        destPath);
    // set file permissions of the destPath
    fileSystem.setPermission(destPath, fp);
  }

  public void copyHdfsFileToLocal(Path hdfsPath, File destFile)
      throws IOException {
    if (hdfsPath == null || destFile == null) {
      throw new IOException("Either hdfsPath or destPath is null");
    }
    log.info("Copying file {} to {}", hdfsPath.toUri(), destFile.toURI());

    Path destPath = new Path(destFile.getPath());
    fileSystem.copyToLocalFile(hdfsPath, destPath);
  }

  /**
   * list entries in a filesystem directory
   *
   * @param path directory
   * @return a listing, one to a line
   * @throws IOException
   */
  public String listFSDir(Path path) throws IOException {
    FileStatus[] stats = fileSystem.listStatus(path);
    StringBuilder builder = new StringBuilder();
    for (FileStatus stat : stats) {
      builder.append(stat.getPath().toString())
              .append("\t")
              .append(stat.getLen())
              .append("\n");
    }
    return builder.toString();
  }

  public String cat(Path path) throws IOException {
    FileStatus status = fileSystem.getFileStatus(path);
    byte[] b = new byte[(int) status.getLen()];
    FSDataInputStream in = null;
    try {
      in = fileSystem.open(path);
      int count = in.read(b);
      return new String(b, 0, count, UTF_8);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ApplicationReportSerDeser 源码

hadoop ClientRegistryBinder 源码

hadoop Comparators 源码

hadoop ConfigHelper 源码

hadoop ConfigUtils 源码

hadoop Duration 源码

hadoop FilterUtils 源码

hadoop HttpUtil 源码

hadoop JsonSerDeser 源码

hadoop PatternValidator 源码

0  赞