hadoop RawLocalFileSystem 源码

  • 2022-10-20
  • 浏览 (439)

haddop RawLocalFileSystem 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package org.apache.hadoop.fs;

import org.apache.hadoop.classification.VisibleForTesting;

import java.io.BufferedOutputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.FileDescriptor;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.FileTime;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;

/****************************************************************
 * Implement the FileSystem API for the raw local filesystem.
 *
 *****************************************************************/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class RawLocalFileSystem extends FileSystem {
  static final URI NAME = URI.create("file:///");
  private Path workingDir;
  private long defaultBlockSize;
  // Temporary workaround for HADOOP-9652.
  private static boolean useDeprecatedFileStatus = true;

  @VisibleForTesting
  public static void useStatIfAvailable() {
    useDeprecatedFileStatus = !Stat.isAvailable();
  }
  
  public RawLocalFileSystem() {
    workingDir = getInitialWorkingDirectory();
  }
  
  private Path makeAbsolute(Path f) {
    if (f.isAbsolute()) {
      return f;
    } else {
      return new Path(workingDir, f);
    }
  }
  
  /**
   * Convert a path to a File.
   *
   * @param path the path.
   * @return file.
   */
  public File pathToFile(Path path) {
    checkPath(path);
    if (!path.isAbsolute()) {
      path = new Path(getWorkingDirectory(), path);
    }
    return new File(path.toUri().getPath());
  }

  @Override
  public URI getUri() { return NAME; }
  
  @Override
  public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    setConf(conf);
    defaultBlockSize = getDefaultBlockSize(new Path(uri));
  }
  
  /*******************************************************
   * For open()'s FSInputStream.
   *******************************************************/
  class LocalFSFileInputStream extends FSInputStream implements
      HasFileDescriptor, IOStatisticsSource, StreamCapabilities {
    private FileInputStream fis;
    private final File name;
    private long position;
    private AsynchronousFileChannel asyncChannel = null;

    /**
     * Minimal set of counters.
     */
    private final IOStatisticsStore ioStatistics = iostatisticsStore()
        .withCounters(
            STREAM_READ_BYTES,
            STREAM_READ_EXCEPTIONS,
            STREAM_READ_SEEK_OPERATIONS,
            STREAM_READ_SKIP_OPERATIONS,
            STREAM_READ_SKIP_BYTES)
        .build();

    /** Reference to the bytes read counter for slightly faster counting. */
    private final AtomicLong bytesRead;

    /**
     * Thread level IOStatistics aggregator to update in close().
     */
    private final IOStatisticsAggregator
        ioStatisticsAggregator;

    public LocalFSFileInputStream(Path f) throws IOException {
      name = pathToFile(f);
      fis = new FileInputStream(name);
      bytesRead = ioStatistics.getCounterReference(
          STREAM_READ_BYTES);
      ioStatisticsAggregator =
          IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
    }
    
    @Override
    public void seek(long pos) throws IOException {
      if (pos < 0) {
        throw new EOFException(
          FSExceptionMessages.NEGATIVE_SEEK);
      }
      fis.getChannel().position(pos);
      this.position = pos;
    }
    
    @Override
    public long getPos() throws IOException {
      return this.position;
    }
    
    @Override
    public boolean seekToNewSource(long targetPos) throws IOException {
      return false;
    }
    
    /**
     * Just forward to the fis.
     */
    @Override
    public int available() throws IOException { return fis.available(); }
    @Override
    public boolean markSupported() { return false; }

    @Override
    public void close() throws IOException {
      try {
        fis.close();
        if (asyncChannel != null) {
          asyncChannel.close();
        }
      } finally {
        ioStatisticsAggregator.aggregate(ioStatistics);
      }
    }

    @Override
    public int read() throws IOException {
      try {
        int value = fis.read();
        if (value >= 0) {
          this.position++;
          statistics.incrementBytesRead(1);
          bytesRead.addAndGet(1);
        }
        return value;
      } catch (IOException e) {                 // unexpected exception
        ioStatistics.incrementCounter(STREAM_READ_EXCEPTIONS);
        throw new FSError(e);                   // assume native fs error
      }
    }
    
    @Override
    public int read(byte[] b, int off, int len) throws IOException {
      // parameter check
      validatePositionedReadArgs(position, b, off, len);
      try {
        int value = fis.read(b, off, len);
        if (value > 0) {
          this.position += value;
          statistics.incrementBytesRead(value);
          bytesRead.addAndGet(value);
        }
        return value;
      } catch (IOException e) {                 // unexpected exception
        ioStatistics.incrementCounter(STREAM_READ_EXCEPTIONS);
        throw new FSError(e);                   // assume native fs error
      }
    }
    
    @Override
    public int read(long position, byte[] b, int off, int len)
      throws IOException {
      // parameter check
      validatePositionedReadArgs(position, b, off, len);
      if (len == 0) {
        return 0;
      }

      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
      try {
        int value = fis.getChannel().read(bb, position);
        if (value > 0) {
          statistics.incrementBytesRead(value);
          ioStatistics.incrementCounter(STREAM_READ_BYTES, value);
        }
        return value;
      } catch (IOException e) {
        ioStatistics.incrementCounter(STREAM_READ_EXCEPTIONS);
        throw new FSError(e);
      }
    }
    
    @Override
    public long skip(long n) throws IOException {
      ioStatistics.incrementCounter(STREAM_READ_SKIP_OPERATIONS);
      long value = fis.skip(n);
      if (value > 0) {
        this.position += value;
        ioStatistics.incrementCounter(STREAM_READ_SKIP_BYTES, value);
      }
      return value;
    }

    @Override
    public FileDescriptor getFileDescriptor() throws IOException {
      return fis.getFD();
    }

    @Override
    public boolean hasCapability(String capability) {
      // a bit inefficient, but intended to make it easier to add
      // new capabilities.
      switch (capability.toLowerCase(Locale.ENGLISH)) {
      case StreamCapabilities.IOSTATISTICS:
      case StreamCapabilities.IOSTATISTICS_CONTEXT:
      case StreamCapabilities.VECTOREDIO:
        return true;
      default:
        return false;
      }
    }

    @Override
    public IOStatistics getIOStatistics() {
      return ioStatistics;
    }

    AsynchronousFileChannel getAsyncChannel() throws IOException {
      if (asyncChannel == null) {
        synchronized (this) {
          asyncChannel = AsynchronousFileChannel.open(name.toPath(),
                  StandardOpenOption.READ);
        }
      }
      return asyncChannel;
    }

    @Override
    public void readVectored(List<? extends FileRange> ranges,
                             IntFunction<ByteBuffer> allocate) throws IOException {

      List<? extends FileRange> sortedRanges = Arrays.asList(sortRanges(ranges));
      // Set up all of the futures, so that we can use them if things fail
      for(FileRange range: sortedRanges) {
        VectoredReadUtils.validateRangeRequest(range);
        range.setData(new CompletableFuture<>());
      }
      try {
        AsynchronousFileChannel channel = getAsyncChannel();
        ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
        AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
        for(int i = 0; i < sortedRanges.size(); ++i) {
          FileRange range = sortedRanges.get(i);
          buffers[i] = allocate.apply(range.getLength());
          channel.read(buffers[i], range.getOffset(), i, asyncHandler);
        }
      } catch (IOException ioe) {
        LOG.debug("Exception occurred during vectored read ", ioe);
        for(FileRange range: sortedRanges) {
          range.getData().completeExceptionally(ioe);
        }
      }
    }
  }

  /**
   * A CompletionHandler that implements readFully and translates back
   * into the form of CompletionHandler that our users expect.
   */
  static class AsyncHandler implements CompletionHandler<Integer, Integer> {
    private final AsynchronousFileChannel channel;
    private final List<? extends FileRange> ranges;
    private final ByteBuffer[] buffers;

    AsyncHandler(AsynchronousFileChannel channel,
                 List<? extends FileRange> ranges,
                 ByteBuffer[] buffers) {
      this.channel = channel;
      this.ranges = ranges;
      this.buffers = buffers;
    }

    @Override
    public void completed(Integer result, Integer r) {
      FileRange range = ranges.get(r);
      ByteBuffer buffer = buffers[r];
      if (result == -1) {
        failed(new EOFException("Read past End of File"), r);
      } else {
        if (buffer.remaining() > 0) {
          // issue a read for the rest of the buffer
          // QQ: What if this fails? It has the same handler.
          channel.read(buffer, range.getOffset() + buffer.position(), r, this);
        } else {
          // QQ: Why  is this required? I think because we don't want the
          // user to read data beyond limit.
          buffer.flip();
          range.getData().complete(buffer);
        }
      }
    }

    @Override
    public void failed(Throwable exc, Integer r) {
      LOG.debug("Failed while reading range {} ", r, exc);
      ranges.get(r).getData().completeExceptionally(exc);
    }
  }

  @Override
  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    getFileStatus(f);
    return new FSDataInputStream(new BufferedFSInputStream(
        new LocalFSFileInputStream(f), bufferSize));
  }

  @Override
  public FSDataInputStream open(PathHandle fd, int bufferSize)
      throws IOException {
    if (!(fd instanceof LocalFileSystemPathHandle)) {
      fd = new LocalFileSystemPathHandle(fd.bytes());
    }
    LocalFileSystemPathHandle id = (LocalFileSystemPathHandle) fd;
    id.verify(getFileStatus(new Path(id.getPath())));
    return new FSDataInputStream(new BufferedFSInputStream(
        new LocalFSFileInputStream(new Path(id.getPath())), bufferSize));
  }

  /*********************************************************
   * For create()'s FSOutputStream.
   *********************************************************/
  final class LocalFSFileOutputStream extends OutputStream implements
      IOStatisticsSource, StreamCapabilities, Syncable {
    private FileOutputStream fos;

    /**
     * Minimal set of counters.
     */
    private final IOStatisticsStore ioStatistics = iostatisticsStore()
        .withCounters(
            STREAM_WRITE_BYTES,
            STREAM_WRITE_EXCEPTIONS)
        .build();

    /**
     * Thread level IOStatistics aggregator to update in close().
     */
    private final IOStatisticsAggregator
        ioStatisticsAggregator;

    private LocalFSFileOutputStream(Path f, boolean append,
        FsPermission permission) throws IOException {
      File file = pathToFile(f);
      // store the aggregator before attempting any IO.
      ioStatisticsAggregator =
          IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();

      if (!append && permission == null) {
        permission = FsPermission.getFileDefault();
      }
      if (permission == null) {
        this.fos = new FileOutputStream(file, append);
      } else {
        permission = permission.applyUMask(FsPermission.getUMask(getConf()));
        if (Shell.WINDOWS && NativeIO.isAvailable()) {
          this.fos = NativeIO.Windows.createFileOutputStreamWithMode(file,
              append, permission.toShort());
        } else {
          this.fos = new FileOutputStream(file, append);
          boolean success = false;
          try {
            setPermission(f, permission);
            success = true;
          } finally {
            if (!success) {
              IOUtils.cleanupWithLogger(LOG, this.fos);
            }
          }
        }
      }
    }

    /*
     * Close the fos; update the IOStatisticsContext.
     */
    @Override
    public void close() throws IOException {
      try {
        fos.close();
      } finally {
        ioStatisticsAggregator.aggregate(ioStatistics);
      }
    }

    @Override
    public void flush() throws IOException { fos.flush(); }
    @Override
    public void write(byte[] b, int off, int len) throws IOException {
      try {
        fos.write(b, off, len);
        ioStatistics.incrementCounter(STREAM_WRITE_BYTES, len);
      } catch (IOException e) {                // unexpected exception
        ioStatistics.incrementCounter(STREAM_WRITE_EXCEPTIONS);
        throw new FSError(e);                  // assume native fs error
      }
    }
    
    @Override
    public void write(int b) throws IOException {
      try {
        fos.write(b);
        ioStatistics.incrementCounter(STREAM_WRITE_BYTES);
      } catch (IOException e) {              // unexpected exception
        ioStatistics.incrementCounter(STREAM_WRITE_EXCEPTIONS);
        throw new FSError(e);                // assume native fs error
      }
    }

    @Override
    public void hflush() throws IOException {
      flush();
    }

    /**
     * HSync calls sync on fhe file descriptor after a local flush() call.
     * @throws IOException failure
     */
    @Override
    public void hsync() throws IOException {
      flush();
      fos.getFD().sync();
    }

    @Override
    public boolean hasCapability(String capability) {
      // a bit inefficient, but intended to make it easier to add
      // new capabilities.
      switch (capability.toLowerCase(Locale.ENGLISH)) {
      case StreamCapabilities.IOSTATISTICS:
      case StreamCapabilities.IOSTATISTICS_CONTEXT:
        return true;
      default:
        return StoreImplementationUtils.isProbeForSyncable(capability);
      }
    }

    @Override
    public IOStatistics getIOStatistics() {
      return ioStatistics;
    }
  }

  @Override
  public FSDataOutputStream append(Path f, int bufferSize,
      Progressable progress) throws IOException {
    FileStatus status = getFileStatus(f);
    if (status.isDirectory()) {
      throw new IOException("Cannot append to a diretory (=" + f + " )");
    }
    return new FSDataOutputStream(new BufferedOutputStream(
        createOutputStreamWithMode(f, true, null), bufferSize), statistics,
        status.getLen());
  }

  @Override
  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
    short replication, long blockSize, Progressable progress)
    throws IOException {
    return create(f, overwrite, true, bufferSize, replication, blockSize,
        progress, null);
  }

  private FSDataOutputStream create(Path f, boolean overwrite,
      boolean createParent, int bufferSize, short replication, long blockSize,
      Progressable progress, FsPermission permission) throws IOException {
    if (exists(f) && !overwrite) {
      throw new FileAlreadyExistsException("File already exists: " + f);
    }
    Path parent = f.getParent();
    if (parent != null && !mkdirs(parent)) {
      throw new IOException("Mkdirs failed to create " + parent.toString());
    }
    return new FSDataOutputStream(new BufferedIOStatisticsOutputStream(
        createOutputStreamWithMode(f, false, permission), bufferSize, true),
        statistics);
  }
  
  protected OutputStream createOutputStream(Path f, boolean append) 
      throws IOException {
    return createOutputStreamWithMode(f, append, null);
  }

  protected OutputStream createOutputStreamWithMode(Path f, boolean append,
      FsPermission permission) throws IOException {
    return new LocalFSFileOutputStream(f, append, permission);
  }
  
  @Override
  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) {
      throw new FileAlreadyExistsException("File already exists: " + f);
    }
    return new FSDataOutputStream(new BufferedIOStatisticsOutputStream(
        createOutputStreamWithMode(f, false, permission), bufferSize, true),
            statistics);
  }

  @Override
  public FSDataOutputStream create(Path f, FsPermission permission,
    boolean overwrite, int bufferSize, short replication, long blockSize,
    Progressable progress) throws IOException {

    FSDataOutputStream out = create(f, overwrite, true, bufferSize, replication,
        blockSize, progress, permission);
    return out;
  }

  @Override
  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
      boolean overwrite,
      int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    FSDataOutputStream out = create(f, overwrite, false, bufferSize, replication,
        blockSize, progress, permission);
    return out;
  }

  @Override
  public void concat(final Path trg, final Path [] psrcs) throws IOException {
    final int bufferSize = 4096;
    try(FSDataOutputStream out = create(trg)) {
      for (Path src : psrcs) {
        try(FSDataInputStream in = open(src)) {
          IOUtils.copyBytes(in, out, bufferSize, false);
        }
      }
    }
  }

  @Override
  public boolean rename(Path src, Path dst) throws IOException {
    // Attempt rename using Java API.
    File srcFile = pathToFile(src);
    File dstFile = pathToFile(dst);
    if (srcFile.renameTo(dstFile)) {
      return true;
    }

    // Else try POSIX style rename on Windows only
    if (Shell.WINDOWS &&
        handleEmptyDstDirectoryOnWindows(src, srcFile, dst, dstFile)) {
      return true;
    }

    // The fallback behavior accomplishes the rename by a full copy.
    if (LOG.isDebugEnabled()) {
      LOG.debug("Falling through to a copy of " + src + " to " + dst);
    }
    return FileUtil.copy(this, src, this, dst, true, getConf());
  }

  @VisibleForTesting
  public final boolean handleEmptyDstDirectoryOnWindows(Path src, File srcFile,
      Path dst, File dstFile) throws IOException {

    // Enforce POSIX rename behavior that a source directory replaces an
    // existing destination if the destination is an empty directory. On most
    // platforms, this is already handled by the Java API call above. Some
    // platforms (notably Windows) do not provide this behavior, so the Java API
    // call renameTo(dstFile) fails. Delete destination and attempt rename
    // again.
    try {
      FileStatus sdst = this.getFileStatus(dst);
      String[] dstFileList = dstFile.list();
      if (dstFileList != null) {
        if (sdst.isDirectory() && dstFileList.length == 0) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Deleting empty destination and renaming " + src +
                " to " + dst);
          }
          if (this.delete(dst, false) && srcFile.renameTo(dstFile)) {
            return true;
          }
        }
      }
    } catch (FileNotFoundException ignored) {
    }
    return false;
  }

  @Override
  public boolean truncate(Path f, final long newLength) throws IOException {
    FileStatus status = getFileStatus(f);
    if(status == null) {
      throw new FileNotFoundException("File " + f + " not found");
    }
    if(status.isDirectory()) {
      throw new IOException("Cannot truncate a directory (=" + f + ")");
    }
    long oldLength = status.getLen();
    if(newLength > oldLength) {
      throw new IllegalArgumentException(
          "Cannot truncate to a larger file size. Current size: " + oldLength +
          ", truncate size: " + newLength + ".");
    }
    try (FileOutputStream out = new FileOutputStream(pathToFile(f), true)) {
      try {
        out.getChannel().truncate(newLength);
      } catch(IOException e) {
        throw new FSError(e);
      }
    }
    return true;
  }
  
  /**
   * Delete the given path to a file or directory.
   * @param p the path to delete
   * @param recursive to delete sub-directories
   * @return true if the file or directory and all its contents were deleted
   * @throws IOException if p is non-empty and recursive is false 
   */
  @Override
  public boolean delete(Path p, boolean recursive) throws IOException {
    File f = pathToFile(p);
    if (!f.exists()) {
      //no path, return false "nothing to delete"
      return false;
    }
    if (f.isFile()) {
      return f.delete();
    } else if (!recursive && f.isDirectory() && 
        (FileUtil.listFiles(f).length != 0)) {
      throw new IOException("Directory " + f.toString() + " is not empty");
    }
    return FileUtil.fullyDelete(f);
  }
 
  /**
   * {@inheritDoc}
   *
   * (<b>Note</b>: Returned list is not sorted in any given order,
   * due to reliance on Java's {@link File#list()} API.)
   */
  @Override
  public FileStatus[] listStatus(Path f) throws IOException {
    File localf = pathToFile(f);
    FileStatus[] results;

    if (!localf.exists()) {
      throw new FileNotFoundException("File " + f + " does not exist");
    }

    if (localf.isDirectory()) {
      String[] names = FileUtil.list(localf);
      results = new FileStatus[names.length];
      int j = 0;
      for (int i = 0; i < names.length; i++) {
        try {
          // Assemble the path using the Path 3 arg constructor to make sure
          // paths with colon are properly resolved on Linux
          results[j] = getFileStatus(new Path(f, new Path(null, null,
                                                          names[i])));
          j++;
        } catch (FileNotFoundException e) {
          // ignore the files not found since the dir list may have have
          // changed since the names[] list was generated.
        }
      }
      if (j == names.length) {
        return results;
      }
      return Arrays.copyOf(results, j);
    }

    if (!useDeprecatedFileStatus) {
      return new FileStatus[] { getFileStatus(f) };
    }
    return new FileStatus[] {
        new DeprecatedRawLocalFileStatus(localf,
        defaultBlockSize, this) };
  }

  @Override
  public boolean exists(Path f) throws IOException {
    return pathToFile(f).exists();
  }
  
  protected boolean mkOneDir(File p2f) throws IOException {
    return mkOneDirWithMode(new Path(p2f.getAbsolutePath()), p2f, null);
  }

  protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
      throws IOException {
    if (permission == null) {
      permission = FsPermission.getDirDefault();
    }
    permission = permission.applyUMask(FsPermission.getUMask(getConf()));
    if (Shell.WINDOWS && NativeIO.isAvailable()) {
      try {
        NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
        return true;
      } catch (IOException e) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(String.format(
              "NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
              p2f, permission.toShort()), e);
        }
        return false;
      }
    } else {
      boolean b = p2f.mkdir();
      if (b) {
        setPermission(p, permission);
      }
      return b;
    }
  }

  /**
   * Creates the specified directory hierarchy. Does not
   * treat existence as an error.
   */
  @Override
  public boolean mkdirs(Path f) throws IOException {
    return mkdirsWithOptionalPermission(f, null);
  }

  @Override
  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    return mkdirsWithOptionalPermission(f, permission);
  }

  private boolean mkdirsWithOptionalPermission(Path f, FsPermission permission)
      throws IOException {
    if(f == null) {
      throw new IllegalArgumentException("mkdirs path arg is null");
    }
    Path parent = f.getParent();
    File p2f = pathToFile(f);
    File parent2f = null;
    if(parent != null) {
      parent2f = pathToFile(parent);
      if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
        throw new ParentNotDirectoryException("Parent path is not a directory: "
            + parent);
      }
    }
    if (p2f.exists() && !p2f.isDirectory()) {
      throw new FileAlreadyExistsException("Destination exists" +
              " and is not a directory: " + p2f.getCanonicalPath());
    }
    return (parent == null || parent2f.exists() || mkdirs(parent)) &&
      (mkOneDirWithMode(f, p2f, permission) || p2f.isDirectory());
  }
  
  
  @Override
  public Path getHomeDirectory() {
    return this.makeQualified(new Path(System.getProperty("user.home")));
  }

  /**
   * Set the working directory to the given directory.
   */
  @Override
  public void setWorkingDirectory(Path newDir) {
    workingDir = makeAbsolute(newDir);
    checkPath(workingDir);
  }
  
  @Override
  public Path getWorkingDirectory() {
    return workingDir;
  }
  
  @Override
  protected Path getInitialWorkingDirectory() {
    return this.makeQualified(new Path(System.getProperty("user.dir")));
  }

  @Override
  public FsStatus getStatus(Path p) throws IOException {
    File partition = pathToFile(p == null ? new Path("/") : p);
    //File provides getUsableSpace() and getFreeSpace()
    //File provides no API to obtain used space, assume used = total - free
    return new FsStatus(partition.getTotalSpace(), 
      partition.getTotalSpace() - partition.getFreeSpace(),
      partition.getFreeSpace());
  }
  
  // In the case of the local filesystem, we can just rename the file.
  @Override
  public void moveFromLocalFile(Path src, Path dst) throws IOException {
    rename(src, dst);
  }
  
  // We can write output directly to the final location
  @Override
  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
    throws IOException {
    return fsOutputFile;
  }
  
  // It's in the right place - nothing to do.
  @Override
  public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
    throws IOException {
  }
  
  @Override
  public void close() throws IOException {
    super.close();
  }
  
  @Override
  public String toString() {
    return "LocalFS";
  }
  
  @Override
  public FileStatus getFileStatus(Path f) throws IOException {
    return getFileLinkStatusInternal(f, true);
  }

  @Deprecated
  private FileStatus deprecatedGetFileStatus(Path f) throws IOException {
    File path = pathToFile(f);
    if (path.exists()) {
      return new DeprecatedRawLocalFileStatus(pathToFile(f),
          defaultBlockSize, this);
    } else {
      throw new FileNotFoundException("File " + f + " does not exist");
    }
  }

  @Deprecated
  static class DeprecatedRawLocalFileStatus extends FileStatus {
    /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
     * We recognize if the information is already loaded by check if
     * onwer.equals("").
     */
    private boolean isPermissionLoaded() {
      return !super.getOwner().isEmpty(); 
    }

    private static long getLastAccessTime(File f) throws IOException {
      long accessTime;
      try {
        accessTime = Files.readAttributes(f.toPath(),
            BasicFileAttributes.class).lastAccessTime().toMillis();
      } catch (NoSuchFileException e) {
        throw new FileNotFoundException("File " + f + " does not exist");
      }
      return accessTime;
    }

    DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs)
      throws IOException {
      super(f.length(), f.isDirectory(), 1, defaultBlockSize,
          f.lastModified(), getLastAccessTime(f),
          null, null, null,
          new Path(f.getPath()).makeQualified(fs.getUri(),
            fs.getWorkingDirectory()));
    }
    
    @Override
    public FsPermission getPermission() {
      if (!isPermissionLoaded()) {
        loadPermissionInfo();
      }
      return super.getPermission();
    }

    @Override
    public String getOwner() {
      if (!isPermissionLoaded()) {
        loadPermissionInfo();
      }
      return super.getOwner();
    }

    @Override
    public String getGroup() {
      if (!isPermissionLoaded()) {
        loadPermissionInfo();
      }
      return super.getGroup();
    }

    /**
     * Load file permission information (UNIX symbol rwxrwxrwx, sticky bit info).
     *
     * To improve peformance, give priority to native stat() call. First try get
     * permission information by using native JNI call then fall back to use non
     * native (ProcessBuilder) call in case native lib is not loaded or native
     * call is not successful
     */
    private synchronized void loadPermissionInfo() {
      if (!isPermissionLoaded() && NativeIO.isAvailable()) {
        try {
          loadPermissionInfoByNativeIO();
        } catch (IOException ex) {
          LOG.debug("Native call failed", ex);
        }
      }

      if (!isPermissionLoaded()) {
        loadPermissionInfoByNonNativeIO();
      }
    }

    /// loads permissions, owner, and group from `ls -ld`
    @VisibleForTesting
    void loadPermissionInfoByNonNativeIO() {
      IOException e = null;
      try {
        String output = FileUtil.execCommand(new File(getPath().toUri()),
            Shell.getGetPermissionCommand());
        StringTokenizer t =
            new StringTokenizer(output, Shell.TOKEN_SEPARATOR_REGEX);
        //expected format
        //-rw-------    1 username groupname ...
        String permission = t.nextToken();
        if (permission.length() > FsPermission.MAX_PERMISSION_LENGTH) {
          //files with ACLs might have a '+'
          permission = permission.substring(0,
            FsPermission.MAX_PERMISSION_LENGTH);
        }
        setPermission(FsPermission.valueOf(permission));
        t.nextToken();

        String owner = t.nextToken();
        String group = t.nextToken();
        // If on windows domain, token format is DOMAIN\\user and we want to
        // extract only the user name
        // same as to the group name
        if (Shell.WINDOWS) {
          owner = removeDomain(owner);
          group = removeDomain(group);
        }
        setOwner(owner);
        setGroup(group);
      } catch (Shell.ExitCodeException ioe) {
        if (ioe.getExitCode() != 1) {
          e = ioe;
        } else {
          setPermission(null);
          setOwner(null);
          setGroup(null);
        }
      } catch (IOException ioe) {
        e = ioe;
      } finally {
        if (e != null) {
          throw new RuntimeException("Error while running command to get " +
                                     "file permissions : " + 
                                     StringUtils.stringifyException(e));
        }
      }
    }

    // In Windows, domain name is added.
    // For example, given machine name (domain name) dname, user name i, then
    // the result for user is dname\\i and for group is dname\\None. So we need
    // remove domain name as follows:
    // DOMAIN\\user => user, DOMAIN\\group => group
    private String removeDomain(String str) {
      int index = str.indexOf("\\");
      if (index != -1) {
        str = str.substring(index + 1);
      }
      return str;
    }

    // loads permissions, owner, and group from `ls -ld`
    // but use JNI to more efficiently get file mode (permission, owner, group)
    // by calling file stat() in *nix or some similar calls in Windows
    @VisibleForTesting
    void loadPermissionInfoByNativeIO() throws IOException {
      Path path = getPath();
      String pathName = path.toUri().getPath();
      // remove leading slash for Windows path
      if (Shell.WINDOWS && pathName.startsWith("/")) {
        pathName = pathName.substring(1);
      }
      try {
        NativeIO.POSIX.Stat stat = NativeIO.POSIX.getStat(pathName);
        String owner = stat.getOwner();
        String group = stat.getGroup();
        int mode = stat.getMode();
        setOwner(owner);
        setGroup(group);
        setPermission(new FsPermission(mode));
      } catch (IOException e) {
        setOwner(null);
        setGroup(null);
        setPermission(null);
        throw e;
      }
    }

    @Override
    public void write(DataOutput out) throws IOException {
      if (!isPermissionLoaded()) {
        loadPermissionInfo();
      }
      super.write(out);
    }
  }

  /**
   * Use the command chown to set owner.
   */
  @Override
  public void setOwner(Path p, String username, String groupname)
    throws IOException {
    FileUtil.setOwner(pathToFile(p), username, groupname);
  }

  /**
   * Use the command chmod to set permission.
   */
  @Override
  public void setPermission(Path p, FsPermission permission)
    throws IOException {
    if (NativeIO.isAvailable()) {
      NativeIO.POSIX.chmod(pathToFile(p).getCanonicalPath(),
                     permission.toShort());
    } else {
      String perm = String.format("%04o", permission.toShort());
      Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
        FileUtil.makeShellPath(pathToFile(p), true)));
    }
  }
 
  /**
   * Sets the {@link Path}'s last modified time and last access time to
   * the given valid times.
   *
   * @param mtime the modification time to set (only if no less than zero).
   * @param atime the access time to set (only if no less than zero).
   * @throws IOException if setting the times fails.
   */
  @Override
  public void setTimes(Path p, long mtime, long atime) throws IOException {
    try {
      BasicFileAttributeView view = Files.getFileAttributeView(
          pathToFile(p).toPath(), BasicFileAttributeView.class);
      FileTime fmtime = (mtime >= 0) ? FileTime.fromMillis(mtime) : null;
      FileTime fatime = (atime >= 0) ? FileTime.fromMillis(atime) : null;
      view.setTimes(fmtime, fatime, null);
    } catch (NoSuchFileException e) {
      throw new FileNotFoundException("File " + p + " does not exist");
    }
  }

  /**
   * Hook to implement support for {@link PathHandle} operations.
   * @param stat Referent in the target FileSystem
   * @param opts Constraints that determine the validity of the
   *            {@link PathHandle} reference.
   */
  protected PathHandle createPathHandle(FileStatus stat,
      Options.HandleOpt... opts) {
    if (stat.isDirectory() || stat.isSymlink()) {
      throw new IllegalArgumentException("PathHandle only available for files");
    }
    String authority = stat.getPath().toUri().getAuthority();
    if (authority != null && !authority.equals("file://")) {
      throw new IllegalArgumentException("Wrong FileSystem: " + stat.getPath());
    }
    Options.HandleOpt.Data data =
        Options.HandleOpt.getOpt(Options.HandleOpt.Data.class, opts)
            .orElse(Options.HandleOpt.changed(false));
    Options.HandleOpt.Location loc =
        Options.HandleOpt.getOpt(Options.HandleOpt.Location.class, opts)
            .orElse(Options.HandleOpt.moved(false));
    if (loc.allowChange()) {
      throw new UnsupportedOperationException("Tracking file movement in " +
          "basic FileSystem is not supported");
    }
    final Path p = stat.getPath();
    final Optional<Long> mtime = !data.allowChange()
        ? Optional.of(stat.getModificationTime())
        : Optional.empty();
    return new LocalFileSystemPathHandle(p.toString(), mtime);
  }

  @Override
  public boolean supportsSymlinks() {
    return true;
  }

  @SuppressWarnings("deprecation")
  @Override
  public void createSymlink(Path target, Path link, boolean createParent)
      throws IOException {
    if (!FileSystem.areSymlinksEnabled()) {
      throw new UnsupportedOperationException("Symlinks not supported");
    }
    final String targetScheme = target.toUri().getScheme();
    if (targetScheme != null && !"file".equals(targetScheme)) {
      throw new IOException("Unable to create symlink to non-local file "+
                            "system: "+target.toString());
    }
    if (createParent) {
      mkdirs(link.getParent());
    }

    // NB: Use createSymbolicLink in java.nio.file.Path once available
    int result = FileUtil.symLink(target.toString(),
        makeAbsolute(link).toString());
    if (result != 0) {
      throw new IOException("Error " + result + " creating symlink " +
          link + " to " + target);
    }
  }

  /**
   * Return a FileStatus representing the given path. If the path refers
   * to a symlink return a FileStatus representing the link rather than
   * the object the link refers to.
   */
  @Override
  public FileStatus getFileLinkStatus(final Path f) throws IOException {
    FileStatus fi = getFileLinkStatusInternal(f, false);
    // getFileLinkStatus is supposed to return a symlink with a
    // qualified path
    if (fi.isSymlink()) {
      Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
          fi.getPath(), fi.getSymlink());
      fi.setSymlink(targetQual);
    }
    return fi;
  }

  /**
   * Public {@link FileStatus} methods delegate to this function, which in turn
   * either call the new {@link Stat} based implementation or the deprecated
   * methods based on platform support.
   * 
   * @param f Path to stat
   * @param dereference whether to dereference the final path component if a
   *          symlink
   * @return FileStatus of f
   * @throws IOException
   */
  private FileStatus getFileLinkStatusInternal(final Path f,
      boolean dereference) throws IOException {
    if (!useDeprecatedFileStatus) {
      return getNativeFileLinkStatus(f, dereference);
    } else if (dereference) {
      return deprecatedGetFileStatus(f);
    } else {
      return deprecatedGetFileLinkStatusInternal(f);
    }
  }

  /**
   * Deprecated. Remains for legacy support. Should be removed when {@link Stat}
   * gains support for Windows and other operating systems.
   */
  @Deprecated
  private FileStatus deprecatedGetFileLinkStatusInternal(final Path f)
      throws IOException {
    String target = FileUtil.readLink(new File(f.toString()));

    try {
      FileStatus fs = getFileStatus(f);
      // If f refers to a regular file or directory
      if (target.isEmpty()) {
        return fs;
      }
      // Otherwise f refers to a symlink
      return new FileStatus(fs.getLen(),
          false,
          fs.getReplication(),
          fs.getBlockSize(),
          fs.getModificationTime(),
          fs.getAccessTime(),
          fs.getPermission(),
          fs.getOwner(),
          fs.getGroup(),
          new Path(target),
          f);
    } catch (FileNotFoundException e) {
      /* The exists method in the File class returns false for dangling
       * links so we can get a FileNotFoundException for links that exist.
       * It's also possible that we raced with a delete of the link. Use
       * the readBasicFileAttributes method in java.nio.file.attributes
       * when available.
       */
      if (!target.isEmpty()) {
        return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(),
            "", "", new Path(target), f);
      }
      // f refers to a file or directory that does not exist
      throw e;
    }
  }
  /**
   * Calls out to platform's native stat(1) implementation to get file metadata
   * (permissions, user, group, atime, mtime, etc). This works around the lack
   * of lstat(2) in Java 6.
   * 
   *  Currently, the {@link Stat} class used to do this only supports Linux
   *  and FreeBSD, so the old {@link #deprecatedGetFileLinkStatusInternal(Path)}
   *  implementation (deprecated) remains further OS support is added.
   *
   * @param f File to stat
   * @param dereference whether to dereference symlinks
   * @return FileStatus of f
   * @throws IOException
   */
  private FileStatus getNativeFileLinkStatus(final Path f,
      boolean dereference) throws IOException {
    checkPath(f);
    Stat stat = new Stat(f, defaultBlockSize, dereference, this);
    FileStatus status = stat.getFileStatus();
    return status;
  }

  @Override
  public Path getLinkTarget(Path f) throws IOException {
    FileStatus fi = getFileLinkStatusInternal(f, false);
    // return an unqualified symlink target
    return fi.getSymlink();
  }

  @Override
  public boolean hasPathCapability(final Path path, final String capability)
      throws IOException {
    switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
    case CommonPathCapabilities.FS_APPEND:
    case CommonPathCapabilities.FS_CONCAT:
    case CommonPathCapabilities.FS_PATHHANDLES:
    case CommonPathCapabilities.FS_PERMISSIONS:
    case CommonPathCapabilities.FS_TRUNCATE:
      return true;
    case CommonPathCapabilities.FS_SYMLINKS:
      return FileSystem.areSymlinksEnabled();
    default:
      return super.hasPathCapability(path, capability);
    }
  }

  @VisibleForTesting
  static void setUseDeprecatedFileStatus(boolean useDeprecatedFileStatus) {
    RawLocalFileSystem.useDeprecatedFileStatus = useDeprecatedFileStatus;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Abortable 源码

hadoop AbstractFileSystem 源码

hadoop AvroFSInput 源码

hadoop BBPartHandle 源码

hadoop BBUploadHandle 源码

hadoop BatchListingOperations 源码

hadoop BatchedRemoteIterator 源码

hadoop BlockLocation 源码

hadoop BlockStoragePolicySpi 源码

hadoop BufferedFSInputStream 源码

0  赞