hadoop AliyunOSSFileSystem 源码

  • 2022-10-20
haddop AliyunOSSFileSystem 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.hadoop.fs.aliyun.oss;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;

import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata;

import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.intOption;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.longOption;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;

 * Implementation of {@link FileSystem} for <a href="https://oss.aliyun.com">
 * Aliyun OSS</a>, used to access OSS blob system in a filesystem style.
public class AliyunOSSFileSystem extends FileSystem {
  private static final Logger LOG =
  private URI uri;
  private String bucket;
  private String username;
  private Path workingDir;
  private int blockOutputActiveBlocks;
  private AliyunOSSFileSystemStore store;
  private int maxKeys;
  private int maxReadAheadPartNumber;
  private int maxConcurrentCopyTasksPerDir;
  private ExecutorService boundedThreadPool;
  private ExecutorService boundedCopyThreadPool;

  private static final PathFilter DEFAULT_FILTER = new PathFilter() {
    public boolean accept(Path file) {
      return true;

  public FSDataOutputStream append(Path path, int bufferSize,
      Progressable progress) throws IOException {
    throw new IOException("Append is not supported!");

  public void close() throws IOException {
    try {
    } finally {

  public FSDataOutputStream create(Path path, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    String key = pathToKey(path);
    FileStatus status = null;

    try {
      // get the status or throw a FNFE
      status = getFileStatus(path);

      // if the thread reaches here, there is something at the path
      if (status.isDirectory()) {
        // path references a directory
        throw new FileAlreadyExistsException(path + " is a directory");
      if (!overwrite) {
        // path references a file and overwrite is disabled
        throw new FileAlreadyExistsException(path + " already exists");
      LOG.debug("Overwriting file {}", path);
    } catch (FileNotFoundException e) {
      // this means the file is not found

    long uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(getConf(),
    return new FSDataOutputStream(
        new AliyunOSSBlockOutputStream(getConf(),
            new SemaphoredDelegatingExecutor(boundedThreadPool,
                blockOutputActiveBlocks, true)), statistics);

   * {@inheritDoc}
   * @throws FileNotFoundException if the parent directory is not present -or
   * is not a directory.
  public FSDataOutputStream createNonRecursive(Path path,
      FsPermission permission,
      EnumSet<CreateFlag> flags,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException {
    Path parent = path.getParent();
    if (parent != null) {
      // expect this to raise an exception if there is no parent
      if (!getFileStatus(parent).isDirectory()) {
        throw new FileAlreadyExistsException("Not a directory: " + parent);
    return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
        bufferSize, replication, blockSize, progress);

  public boolean delete(Path path, boolean recursive) throws IOException {
    try {
      return innerDelete(getFileStatus(path), recursive);
    } catch (FileNotFoundException e) {
      LOG.debug("Couldn't delete {} - does not exist", path);
      return false;

   * Delete an object. See {@link #delete(Path, boolean)}.
   * @param status fileStatus object
   * @param recursive if path is a directory and set to
   * true, the directory is deleted else throws an exception. In
   * case of a file the recursive can be set to either true or false.
   * @return  true if delete is successful else false.
   * @throws IOException due to inability to delete a directory or file.
  private boolean innerDelete(FileStatus status, boolean recursive)
      throws IOException {
    Path f = status.getPath();
    String p = f.toUri().getPath();
    FileStatus[] statuses;
    // indicating root directory "/".
    if (p.equals("/")) {
      statuses = listStatus(status.getPath());
      boolean isEmptyDir = statuses.length <= 0;
      return rejectRootDirectoryDelete(isEmptyDir, recursive);

    String key = pathToKey(f);
    if (status.isDirectory()) {
      if (!recursive) {
        // Check whether it is an empty directory or not
        statuses = listStatus(status.getPath());
        if (statuses.length > 0) {
          throw new IOException("Cannot remove directory " + f +
              ": It is not empty!");
        } else {
          // Delete empty directory without '-r'
          key = AliyunOSSUtils.maybeAddTrailingSlash(key);
      } else {
    } else {

    return true;

   * Implements the specific logic to reject root directory deletion.
   * The caller must return the result of this call, rather than
   * attempt to continue with the delete operation: deleting root
   * directories is never allowed. This method simply implements
   * the policy of when to return an exit code versus raise an exception.
   * @param isEmptyDir empty directory or not
   * @param recursive recursive flag from command
   * @return a return code for the operation
   * @throws PathIOException if the operation was explicitly rejected.
  private boolean rejectRootDirectoryDelete(boolean isEmptyDir,
      boolean recursive) throws IOException {
    LOG.info("oss delete the {} root directory of {}", bucket, recursive);
    if (isEmptyDir) {
      return true;
    if (recursive) {
      return false;
    } else {
      // reject
      throw new PathIOException(bucket, "Cannot delete root path");

  private void createFakeDirectoryIfNecessary(Path f) throws IOException {
    String key = pathToKey(f);
    if (StringUtils.isNotEmpty(key) && !exists(f)) {
      LOG.debug("Creating new fake directory at {}", f);

  public FileStatus getFileStatus(Path path) throws IOException {
    Path qualifiedPath = path.makeQualified(uri, workingDir);
    String key = pathToKey(qualifiedPath);

    // Root always exists
    if (key.length() == 0) {
      return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);

    ObjectMetadata meta = store.getObjectMetadata(key);
    // If key not found and key does not end with "/"
    if (meta == null && !key.endsWith("/")) {
      // In case of 'dir + "/"'
      key += "/";
      meta = store.getObjectMetadata(key);
    if (meta == null) {
      OSSListRequest listRequest = store.createListObjectsRequest(key,
          maxKeys, null, null, false);
      OSSListResult listing = store.listObjects(listRequest);
      do {
        if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
            CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
          return new OSSFileStatus(0, true, 1, 0, 0, qualifiedPath, username);
        } else if (listing.isTruncated()) {
          listing = store.continueListObjects(listRequest, listing);
        } else {
          throw new FileNotFoundException(
              path + ": No such file or directory!");
      } while (true);
    } else if (objectRepresentsDirectory(key, meta.getContentLength())) {
      return new OSSFileStatus(0, true, 1, 0, meta.getLastModified().getTime(),
          qualifiedPath, username);
    } else {
      return new OSSFileStatus(meta.getContentLength(), false, 1,
          getDefaultBlockSize(path), meta.getLastModified().getTime(),
          qualifiedPath, username);

  public String getScheme() {
    return "oss";

  public URI getUri() {
    return uri;

  public int getDefaultPort() {
    return Constants.OSS_DEFAULT_PORT;

  public Path getWorkingDirectory() {
    return workingDir;

  public long getDefaultBlockSize() {

  public String getCanonicalServiceName() {
    // Does not support Token
    return null;

   * Initialize new FileSystem.
   * @param name the uri of the file system, including host, port, etc.
   * @param conf configuration of the file system
   * @throws IOException IO problems
  public void initialize(URI name, Configuration conf) throws IOException {
    super.initialize(name, conf);

    bucket = name.getHost();
    uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
    // Username is the current user at the time the FS was instantiated.
    username = UserGroupInformation.getCurrentUser().getShortUserName();
    workingDir = new Path("/user", username).makeQualified(uri, null);
    long keepAliveTime = longOption(conf,
    blockOutputActiveBlocks = intOption(conf,

    store = new AliyunOSSFileSystemStore();
    store.initialize(name, conf, username, statistics);

    int threadNum = AliyunOSSUtils.intPositiveOption(conf,

    int totalTasks = AliyunOSSUtils.intPositiveOption(conf,

    maxReadAheadPartNumber = AliyunOSSUtils.intPositiveOption(conf,

    this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
        threadNum, totalTasks, keepAliveTime, TimeUnit.SECONDS,

    maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf,

    int maxCopyThreads = AliyunOSSUtils.intPositiveOption(conf,

    int maxCopyTasks = AliyunOSSUtils.intPositiveOption(conf,

    this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
        maxCopyThreads, maxCopyTasks, 60L,
        TimeUnit.SECONDS, "oss-copy-unbounded");


   * Turn a path (relative or otherwise) into an OSS key.
   * @param path the path of the file.
   * @return the key of the object that represents the file.
  private String pathToKey(Path path) {
    if (!path.isAbsolute()) {
      path = new Path(workingDir, path);

    return path.toUri().getPath().substring(1);

  private Path keyToPath(String key) {
    return new Path("/" + key);

  public FileStatus[] listStatus(Path path) throws IOException {
    String key = pathToKey(path);
    if (LOG.isDebugEnabled()) {
      LOG.debug("List status for path: " + path);

    final List<FileStatus> result = new ArrayList<FileStatus>();
    final FileStatus fileStatus = getFileStatus(path);

    if (fileStatus.isDirectory()) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("listStatus: doing listObjects for directory " + key);

      OSSListRequest listRequest = store.createListObjectsRequest(key,
          maxKeys, null, null, false);
      OSSListResult objects = store.listObjects(listRequest);
      while (true) {
        for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
          String objKey = objectSummary.getKey();
          if (objKey.equals(key + "/")) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Ignoring: " + objKey);
          } else {
            Path keyPath = keyToPath(objectSummary.getKey())
                .makeQualified(uri, workingDir);
            if (LOG.isDebugEnabled()) {
              LOG.debug("Adding: fi: " + keyPath);
            result.add(new OSSFileStatus(objectSummary.getSize(), false, 1,
                objectSummary.getLastModified().getTime(), keyPath, username));

        for (String prefix : objects.getCommonPrefixes()) {
          if (prefix.equals(key + "/")) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Ignoring: " + prefix);
          } else {
            Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
            if (LOG.isDebugEnabled()) {
              LOG.debug("Adding: rd: " + keyPath);

        if (objects.isTruncated()) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("listStatus: list truncated - getting next batch");
          objects = store.continueListObjects(listRequest, objects);
        } else {
    } else {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Adding: rd (not a dir): " + path);

    return result.toArray(new FileStatus[result.size()]);

  public RemoteIterator<LocatedFileStatus> listFiles(
      final Path f, final boolean recursive) throws IOException {
    Path qualifiedPath = f.makeQualified(uri, workingDir);
    final FileStatus status = getFileStatus(qualifiedPath);
    PathFilter filter = new PathFilter() {
      public boolean accept(Path path) {
        return status.isFile() || !path.equals(f);
    FileStatusAcceptor acceptor =
        new FileStatusAcceptor.AcceptFilesOnly(qualifiedPath);
    return innerList(f, status, filter, acceptor, recursive);

  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
      throws IOException {
    return listLocatedStatus(f, DEFAULT_FILTER);

  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
      final PathFilter filter) throws IOException {
    Path qualifiedPath = f.makeQualified(uri, workingDir);
    final FileStatus status = getFileStatus(qualifiedPath);
    FileStatusAcceptor acceptor =
        new FileStatusAcceptor.AcceptAllButSelf(qualifiedPath);
    return innerList(f, status, filter, acceptor, false);

  private RemoteIterator<LocatedFileStatus> innerList(final Path f,
      final FileStatus status,
      final PathFilter filter,
      final FileStatusAcceptor acceptor,
      final boolean recursive) throws IOException {
    Path qualifiedPath = f.makeQualified(uri, workingDir);
    String key = pathToKey(qualifiedPath);

    if (status.isFile()) {
      LOG.debug("{} is a File", qualifiedPath);
      final BlockLocation[] locations = getFileBlockLocations(status,
          0, status.getLen());
      return store.singleStatusRemoteIterator(filter.accept(f) ? status : null,
    } else {
      return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
          acceptor, recursive);

   * Used to create an empty file that represents an empty directory.
   * @param key directory path
   * @return true if directory is successfully created
   * @throws IOException
  private boolean mkdir(final String key) throws IOException {
    String dirName = key;
    if (StringUtils.isNotEmpty(key)) {
      if (!key.endsWith("/")) {
        dirName += "/";
    return true;

  public boolean mkdirs(Path path, FsPermission permission)
      throws IOException {
    try {
      FileStatus fileStatus = getFileStatus(path);

      if (fileStatus.isDirectory()) {
        return true;
      } else {
        throw new FileAlreadyExistsException("Path is a file: " + path);
    } catch (FileNotFoundException e) {
      String key = pathToKey(path);
      return mkdir(key);

   * Check whether the path is a valid path.
   * @param path the path to be checked.
   * @throws IOException
  private void validatePath(Path path) throws IOException {
    Path fPart = path.getParent();
    do {
      try {
        FileStatus fileStatus = getFileStatus(fPart);
        if (fileStatus.isDirectory()) {
          // If path exists and a directory, exit
        } else {
          throw new FileAlreadyExistsException(String.format(
              "Can't make directory for path '%s', it is a file.", fPart));
      } catch (FileNotFoundException fnfe) {
      fPart = fPart.getParent();
    } while (fPart != null);

  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
    final FileStatus fileStatus = getFileStatus(path);
    if (fileStatus.isDirectory()) {
      throw new FileNotFoundException("Can't open " + path +
          " because it is a directory");

    return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
        new SemaphoredDelegatingExecutor(
            boundedThreadPool, maxReadAheadPartNumber, true),
        maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),

  public boolean rename(Path srcPath, Path dstPath) throws IOException {
    if (srcPath.isRoot()) {
      // Cannot rename root of file system
      if (LOG.isDebugEnabled()) {
        LOG.debug("Cannot rename the root of a filesystem");
      return false;
    Path parent = dstPath.getParent();
    while (parent != null && !srcPath.equals(parent)) {
      parent = parent.getParent();
    if (parent != null) {
      return false;
    FileStatus srcStatus = getFileStatus(srcPath);
    FileStatus dstStatus;
    try {
      dstStatus = getFileStatus(dstPath);
    } catch (FileNotFoundException fnde) {
      dstStatus = null;
    if (dstStatus == null) {
      // If dst doesn't exist, check whether dst dir exists or not
      dstStatus = getFileStatus(dstPath.getParent());
      if (!dstStatus.isDirectory()) {
        throw new IOException(String.format(
            "Failed to rename %s to %s, %s is a file", srcPath, dstPath,
    } else {
      if (srcStatus.getPath().equals(dstStatus.getPath())) {
        return !srcStatus.isDirectory();
      } else if (dstStatus.isDirectory()) {
        // If dst is a directory
        dstPath = new Path(dstPath, srcPath.getName());
        FileStatus[] statuses;
        try {
          statuses = listStatus(dstPath);
        } catch (FileNotFoundException fnde) {
          statuses = null;
        if (statuses != null && statuses.length > 0) {
          // If dst exists and not a directory / not empty
          throw new FileAlreadyExistsException(String.format(
              "Failed to rename %s to %s, file already exists or not empty!",
              srcPath, dstPath));
      } else {
        // If dst is not a directory
        throw new FileAlreadyExistsException(String.format(
            "Failed to rename %s to %s, file already exists!", srcPath,

    boolean succeed;
    if (srcStatus.isDirectory()) {
      succeed = copyDirectory(srcPath, dstPath);
    } else {
      succeed = copyFile(srcPath, srcStatus.getLen(), dstPath);

    return srcPath.equals(dstPath) || (succeed && delete(srcPath, true));

   * Copy file from source path to destination path.
   * (the caller should make sure srcPath is a file and dstPath is valid)
   * @param srcPath source path.
   * @param srcLen source path length if it is a file.
   * @param dstPath destination path.
   * @return true if file is successfully copied.
  private boolean copyFile(Path srcPath, long srcLen, Path dstPath) {
    String srcKey = pathToKey(srcPath);
    String dstKey = pathToKey(dstPath);
    return store.copyFile(srcKey, srcLen, dstKey);

   * Copy a directory from source path to destination path.
   * (the caller should make sure srcPath is a directory, and dstPath is valid)
   * @param srcPath source path.
   * @param dstPath destination path.
   * @return true if directory is successfully copied.
  private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
    String srcKey = AliyunOSSUtils
    String dstKey = AliyunOSSUtils

    if (dstKey.startsWith(srcKey)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Cannot rename a directory to a subdirectory of self");
      return false;

    AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
    ExecutorService executorService = MoreExecutors.listeningDecorator(
        new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
            maxConcurrentCopyTasksPerDir, true));
    OSSListRequest listRequest = store.createListObjectsRequest(srcKey,
        maxKeys, null, null, true);
    OSSListResult objects = store.listObjects(listRequest);
    // Copy files from src folder to dst
    int copiesToFinish = 0;
    while (true) {
      for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
        String newKey =

        //copy operation just copies metadata, oss will support shallow copy
        executorService.execute(new AliyunOSSCopyFileTask(
            store, objectSummary.getKey(),
            objectSummary.getSize(), newKey, copyFileContext));
        // No need to call lock() here.
        // It's ok to copy one more file if the rename operation failed
        // Reduce the call of lock() can also improve our performance
        if (copyFileContext.isCopyFailure()) {
          //some error occurs, break
      if (objects.isTruncated()) {
        objects = store.continueListObjects(listRequest, objects);
      } else {
    //wait operations in progress to finish
    try {
    } catch (InterruptedException e) {
      LOG.warn("interrupted when wait copies to finish");
    } finally {
    return !copyFileContext.isCopyFailure();

  public void setWorkingDirectory(Path dir) {
    this.workingDir = dir;

  public AliyunOSSFileSystemStore getStore() {
    return store;


