hadoop OBSCommonUtils 源码

  • 2022-10-20
haddop OBSCommonUtils 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.hadoop.fs.obs;

import org.apache.hadoop.util.Preconditions;
import com.obs.services.ObsClient;
import com.obs.services.exception.ObsException;
import com.obs.services.model.AbortMultipartUploadRequest;
import com.obs.services.model.DeleteObjectsRequest;
import com.obs.services.model.DeleteObjectsResult;
import com.obs.services.model.KeyAndVersion;
import com.obs.services.model.ListMultipartUploadsRequest;
import com.obs.services.model.ListObjectsRequest;
import com.obs.services.model.MultipartUpload;
import com.obs.services.model.MultipartUploadListing;
import com.obs.services.model.ObjectListing;
import com.obs.services.model.ObjectMetadata;
import com.obs.services.model.ObsObject;
import com.obs.services.model.PutObjectRequest;
import com.obs.services.model.PutObjectResult;
import com.obs.services.model.UploadPartRequest;
import com.obs.services.model.UploadPartResult;
import com.obs.services.model.fs.FSStatusEnum;
import com.obs.services.model.fs.GetAttributeRequest;
import com.obs.services.model.fs.GetBucketFSStatusRequest;
import com.obs.services.model.fs.GetBucketFSStatusResult;
import com.obs.services.model.fs.ObsFSAttribute;
import com.obs.services.model.fs.WriteFileRequest;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

 * Common utils for {@link OBSFileSystem}.
final class OBSCommonUtils {
   * Class logger.
  private static final Logger LOG = LoggerFactory.getLogger(

   * Moved permanently response code.
  static final int MOVED_PERMANENTLY_CODE = 301;

   * Unauthorized response code.
  static final int UNAUTHORIZED_CODE = 401;

   * Forbidden response code.
  static final int FORBIDDEN_CODE = 403;

   * Not found response code.
  static final int NOT_FOUND_CODE = 404;

   * File conflict.
  static final int CONFLICT_CODE = 409;

   * Gone response code.
  static final int GONE_CODE = 410;

   * EOF response code.
  static final int EOF_CODE = 416;

   * Core property for provider path. Duplicated here for consistent code across
   * Hadoop version: {@value}.
  static final String CREDENTIAL_PROVIDER_PATH
      = "hadoop.security.credential.provider.path";

   * Max number of retry times.
  static final int MAX_RETRY_TIME = 3;

   * Delay time between two retries.
  static final int DELAY_TIME = 10;

   * Max number of listing keys for checking folder empty.
  static final int MAX_KEYS_FOR_CHECK_FOLDER_EMPTY = 3;

   * Max number of listing keys for checking folder empty.
  static final int BYTE_TO_INT_MASK = 0xFF;

  private OBSCommonUtils() {

   * Get the fs status of the bucket.
   * @param obs        OBS client instance
   * @param bucketName bucket name
   * @return boolean value indicating if this bucket is a posix bucket
   * @throws FileNotFoundException the bucket is absent
   * @throws IOException           any other problem talking to OBS
  static boolean getBucketFsStatus(final ObsClient obs,
      final String bucketName)
      throws FileNotFoundException, IOException {
    try {
      GetBucketFSStatusRequest getBucketFsStatusRequest
          = new GetBucketFSStatusRequest();
      GetBucketFSStatusResult getBucketFsStatusResult =
      FSStatusEnum fsStatus = getBucketFsStatusResult.getStatus();
      return fsStatus == FSStatusEnum.ENABLED;
    } catch (ObsException e) {
      throw translateException("getBucketFsStatus", bucketName, e);

   * Turns a path (relative or otherwise) into an OBS key.
   * @param owner the owner OBSFileSystem instance
   * @param path  input path, may be relative to the working dir
   * @return a key excluding the leading "/", or, if it is the root path, ""
  static String pathToKey(final OBSFileSystem owner, final Path path) {
    Path absolutePath = path;
    if (!path.isAbsolute()) {
      absolutePath = new Path(owner.getWorkingDirectory(), path);

    if (absolutePath.toUri().getScheme() != null && absolutePath.toUri()
        .isEmpty()) {
      return "";

    return absolutePath.toUri().getPath().substring(1);

   * Turns a path (relative or otherwise) into an OBS key, adding a trailing "/"
   * if the path is not the root <i>and</i> does not already have a "/" at the
   * end.
   * @param key obs key or ""
   * @return the with a trailing "/", or, if it is the root key, "",
  static String maybeAddTrailingSlash(final String key) {
    if (!StringUtils.isEmpty(key) && !key.endsWith("/")) {
      return key + '/';
    } else {
      return key;

   * Convert a path back to a key.
   * @param key input key
   * @return the path from this key
  static Path keyToPath(final String key) {
    return new Path("/" + key);

   * Convert a key to a fully qualified path.
   * @param owner the owner OBSFileSystem instance
   * @param key   input key
   * @return the fully qualified path including URI scheme and bucket name.
  static Path keyToQualifiedPath(final OBSFileSystem owner,
      final String key) {
    return qualify(owner, keyToPath(key));

   * Qualify a path.
   * @param owner the owner OBSFileSystem instance
   * @param path  path to qualify
   * @return a qualified path.
  static Path qualify(final OBSFileSystem owner, final Path path) {
    return path.makeQualified(owner.getUri(), owner.getWorkingDirectory());

   * Delete obs key started '/'.
   * @param key object key
   * @return new key
  static String maybeDeleteBeginningSlash(final String key) {
    return !StringUtils.isEmpty(key) && key.startsWith("/") ? key.substring(
        1) : key;

   * Add obs key started '/'.
   * @param key object key
   * @return new key
  static String maybeAddBeginningSlash(final String key) {
    return !StringUtils.isEmpty(key) && !key.startsWith("/")
        ? "/" + key
        : key;

   * Translate an exception raised in an operation into an IOException. HTTP
   * error codes are examined and can be used to build a more specific
   * response.
   * @param operation operation
   * @param path      path operated on (may be null)
   * @param exception obs exception raised
   * @return an IOE which wraps the caught exception.
  static IOException translateException(
      final String operation, final String path,
      final ObsException exception) {
    String message = String.format("%s%s: status [%d] - request id [%s] "
            + "- error code [%s] - error message [%s] - trace :%s ",
        operation, path != null ? " on " + path : "",
        exception.getResponseCode(), exception.getErrorRequestId(),
        exception.getErrorMessage(), exception);

    IOException ioe;

    int status = exception.getResponseCode();
    switch (status) {
      message =
          String.format("Received permanent redirect response, "
                  + "status [%d] - request id [%s] - "
                  + "error code [%s] - message [%s]",
              exception.getErrorRequestId(), exception.getErrorCode(),
      ioe = new OBSIOException(message, exception);
    // permissions
      ioe = new AccessDeniedException(path, null, message);

    // the object isn't there
    case NOT_FOUND_CODE:
    case GONE_CODE:
      ioe = new FileNotFoundException(message);

    // out of range. This may happen if an object is overwritten with
    // a shorter one while it is being read.
    case EOF_CODE:
      ioe = new EOFException(message);

      // no specific exit code. Choose an IOE subclass based on the
      // class
      // of the caught exception
      ioe = new OBSIOException(message, exception);
    return ioe;

   * Reject any request to delete an object where the key is root.
   * @param bucket bucket name
   * @param key    key to validate
   * @throws InvalidRequestException if the request was rejected due to a
   *                                 mistaken attempt to delete the root
   *                                 directory.
  static void blockRootDelete(final String bucket, final String key)
      throws InvalidRequestException {
    if (key.isEmpty() || "/".equals(key)) {
      throw new InvalidRequestException(
          "Bucket " + bucket + " cannot be deleted");

   * Delete an object. Increments the {@code OBJECT_DELETE_REQUESTS} and write
   * operation statistics.
   * @param owner the owner OBSFileSystem instance
   * @param key   key to blob to delete.
   * @throws IOException on any failure to delete object
  static void deleteObject(final OBSFileSystem owner, final String key)
      throws IOException {
    blockRootDelete(owner.getBucket(), key);
    ObsException lastException = null;
    for (int retryTime = 1; retryTime <= MAX_RETRY_TIME; retryTime++) {
      try {
        owner.getObsClient().deleteObject(owner.getBucket(), key);
      } catch (ObsException e) {
        lastException = e;
        LOG.warn("Delete path failed with [{}], "
                + "retry time [{}] - request id [{}] - "
                + "error code [{}] - error message [{}]",
            e.getResponseCode(), retryTime, e.getErrorRequestId(),
            e.getErrorCode(), e.getErrorMessage());
        if (retryTime < MAX_RETRY_TIME) {
          try {
          } catch (InterruptedException ie) {
            throw translateException("delete", key, e);
    throw translateException(
        String.format("retry max times [%s] delete failed", MAX_RETRY_TIME),
        key, lastException);

   * Perform a bulk object delete operation. Increments the {@code
   * OBJECT_DELETE_REQUESTS} and write operation statistics.
   * @param owner         the owner OBSFileSystem instance
   * @param deleteRequest keys to delete on the obs-backend
   * @throws IOException on any failure to delete objects
  static void deleteObjects(final OBSFileSystem owner,
      final DeleteObjectsRequest deleteRequest) throws IOException {
    DeleteObjectsResult result;
    try {
      result = owner.getObsClient().deleteObjects(deleteRequest);
    } catch (ObsException e) {
      LOG.warn("delete objects failed, request [{}], request id [{}] - "
              + "error code [{}] - error message [{}]",
          deleteRequest, e.getErrorRequestId(), e.getErrorCode(),
      for (KeyAndVersion keyAndVersion
          : deleteRequest.getKeyAndVersionsList()) {
        deleteObject(owner, keyAndVersion.getKey());

    // delete one by one if there is errors
    if (result != null) {
      List<DeleteObjectsResult.ErrorResult> errorResults
          = result.getErrorResults();
      if (!errorResults.isEmpty()) {
        LOG.warn("bulk delete {} objects, {} failed, begin to delete "
                + "one by one.",
        for (DeleteObjectsResult.ErrorResult errorResult
            : errorResults) {
          deleteObject(owner, errorResult.getObjectKey());

   * Create a putObject request. Adds the ACL and metadata
   * @param owner    the owner OBSFileSystem instance
   * @param key      key of object
   * @param metadata metadata header
   * @param srcfile  source file
   * @return the request
  static PutObjectRequest newPutObjectRequest(final OBSFileSystem owner,
      final String key, final ObjectMetadata metadata, final File srcfile) {
    PutObjectRequest putObjectRequest = new PutObjectRequest(
        owner.getBucket(), key, srcfile);
    if (owner.getSse().isSseCEnable()) {
    } else if (owner.getSse().isSseKmsEnable()) {
    return putObjectRequest;

   * Create a {@link PutObjectRequest} request. The metadata is assumed to have
   * been configured with the size of the operation.
   * @param owner       the owner OBSFileSystem instance
   * @param key         key of object
   * @param metadata    metadata header
   * @param inputStream source data.
   * @return the request
  static PutObjectRequest newPutObjectRequest(final OBSFileSystem owner,
      final String key, final ObjectMetadata metadata,
      final InputStream inputStream) {
    PutObjectRequest putObjectRequest = new PutObjectRequest(
        owner.getBucket(), key, inputStream);
    if (owner.getSse().isSseCEnable()) {
    } else if (owner.getSse().isSseKmsEnable()) {
    return putObjectRequest;

   * PUT an object directly (i.e. not via the transfer manager). Byte length is
   * calculated from the file length, or, if there is no file, from the content
   * length of the header. <i>Important: this call will close any input stream
   * in the request.</i>
   * @param owner            the owner OBSFileSystem instance
   * @param putObjectRequest the request
   * @return the upload initiated
   * @throws ObsException on problems
  static PutObjectResult putObjectDirect(final OBSFileSystem owner,
      final PutObjectRequest putObjectRequest) throws ObsException {
    long len;
    if (putObjectRequest.getFile() != null) {
      len = putObjectRequest.getFile().length();
    } else {
      len = putObjectRequest.getMetadata().getContentLength();

    PutObjectResult result = owner.getObsClient()
    return result;

   * Upload part of a multi-partition file. Increments the write and put
   * counters. <i>Important: this call does not close any input stream in the
   * request.</i>
   * @param owner   the owner OBSFileSystem instance
   * @param request request
   * @return the result of the operation.
   * @throws ObsException on problems
  static UploadPartResult uploadPart(final OBSFileSystem owner,
      final UploadPartRequest request) throws ObsException {
    long len = request.getPartSize();
    UploadPartResult uploadPartResult = owner.getObsClient()
    return uploadPartResult;

  static void removeKeys(final OBSFileSystem owner,
      final List<KeyAndVersion> keysToDelete, final boolean clearKeys,
      final boolean checkRootDelete) throws IOException {
    if (keysToDelete.isEmpty()) {
      // exit fast if there are no keys to delete

    if (checkRootDelete) {
      for (KeyAndVersion keyVersion : keysToDelete) {
        blockRootDelete(owner.getBucket(), keyVersion.getKey());

    if (!owner.isEnableMultiObjectDelete()
        || keysToDelete.size() < owner.getMultiDeleteThreshold()) {
      // delete one by one.
      for (KeyAndVersion keyVersion : keysToDelete) {
        deleteObject(owner, keyVersion.getKey());
    } else if (keysToDelete.size() <= owner.getMaxEntriesToDelete()) {
      // Only one batch.
      DeleteObjectsRequest deleteObjectsRequest
          = new DeleteObjectsRequest(owner.getBucket());
          keysToDelete.toArray(new KeyAndVersion[0]));
      deleteObjects(owner, deleteObjectsRequest);
    } else {
      // Multi batches.
      List<KeyAndVersion> keys = new ArrayList<>(
      for (KeyAndVersion key : keysToDelete) {
        if (keys.size() == owner.getMaxEntriesToDelete()) {
          // Delete one batch.
          removeKeys(owner, keys, true, false);
      // Delete the last batch
      removeKeys(owner, keys, true, false);

    if (clearKeys) {

   * Translate an exception raised in an operation into an IOException. The
   * specific type of IOException depends on the class of {@link ObsException}
   * passed in, and any status codes included in the operation. That is: HTTP
   * error codes are examined and can be used to build a more specific
   * response.
   * @param operation operation
   * @param path      path operated on (must not be null)
   * @param exception obs exception raised
   * @return an IOE which wraps the caught exception.
  static IOException translateException(final String operation,
      final Path path, final ObsException exception) {
    return translateException(operation, path.toString(), exception);

   * List the statuses of the files/directories in the given path if the path is
   * a directory.
   * @param owner     the owner OBSFileSystem instance
   * @param f         given path
   * @param recursive flag indicating if list is recursive
   * @return the statuses of the files/directories in the given patch
   * @throws FileNotFoundException when the path does not exist;
   * @throws IOException           due to an IO problem.
   * @throws ObsException          on failures inside the OBS SDK
  static FileStatus[] innerListStatus(final OBSFileSystem owner, final Path f,
      final boolean recursive)
      throws FileNotFoundException, IOException, ObsException {
    Path path = qualify(owner, f);
    String key = pathToKey(owner, path);

    List<FileStatus> result;
    final FileStatus fileStatus = owner.getFileStatus(path);

    if (fileStatus.isDirectory()) {
      key = maybeAddTrailingSlash(key);
      String delimiter = recursive ? null : "/";
      ListObjectsRequest request = createListObjectsRequest(owner, key,
          "listStatus: doing listObjects for directory {} - recursive {}",
          f, recursive);

      OBSListing.FileStatusListingIterator files = owner.getObsListing()
              path, request, OBSListing.ACCEPT_ALL,
              new OBSListing.AcceptAllButSelfAndS3nDirs(path));
      result = new ArrayList<>(files.getBatchSize());
      while (files.hasNext()) {

      return result.toArray(new FileStatus[0]);
    } else {
      LOG.debug("Adding: rd (not a dir): {}", path);
      FileStatus[] stats = new FileStatus[1];
      stats[0] = fileStatus;
      return stats;

   * Create a {@code ListObjectsRequest} request against this bucket.
   * @param owner     the owner OBSFileSystem instance
   * @param key       key for request
   * @param delimiter any delimiter
   * @return the request
  static ListObjectsRequest createListObjectsRequest(
      final OBSFileSystem owner, final String key, final String delimiter) {
    return createListObjectsRequest(owner, key, delimiter, -1);

  static ListObjectsRequest createListObjectsRequest(
      final OBSFileSystem owner, final String key, final String delimiter,
      final int maxKeyNum) {
    ListObjectsRequest request = new ListObjectsRequest();
    if (maxKeyNum > 0 && maxKeyNum < owner.getMaxKeys()) {
    } else {
    if (delimiter != null) {
    return request;

   * Implements the specific logic to reject root directory deletion. The caller
   * must return the result of this call, rather than attempt to continue with
   * the delete operation: deleting root directories is never allowed. This
   * method simply implements the policy of when to return an exit code versus
   * raise an exception.
   * @param bucket     bucket name
   * @param isEmptyDir flag indicating if the directory is empty
   * @param recursive  recursive flag from command
   * @return a return code for the operation
   * @throws PathIOException if the operation was explicitly rejected.
  static boolean rejectRootDirectoryDelete(final String bucket,
      final boolean isEmptyDir,
      final boolean recursive)
      throws IOException {
    LOG.info("obs delete the {} root directory of {}", bucket, recursive);
    if (isEmptyDir) {
      return true;
    if (recursive) {
      return false;
    } else {
      // reject
      throw new PathIOException(bucket, "Cannot delete root path");

   * Make the given path and all non-existent parents into directories.
   * @param owner the owner OBSFileSystem instance
   * @param path  path to create
   * @return true if a directory was created
   * @throws FileAlreadyExistsException there is a file at the path specified
   * @throws IOException                other IO problems
   * @throws ObsException               on failures inside the OBS SDK
  static boolean innerMkdirs(final OBSFileSystem owner, final Path path)
      throws IOException, FileAlreadyExistsException, ObsException {
    LOG.debug("Making directory: {}", path);
    FileStatus fileStatus;
    try {
      fileStatus = owner.getFileStatus(path);

      if (fileStatus.isDirectory()) {
        return true;
      } else {
        throw new FileAlreadyExistsException("Path is a file: " + path);
    } catch (FileNotFoundException e) {
      Path fPart = path.getParent();
      do {
        try {
          fileStatus = owner.getFileStatus(fPart);
          if (fileStatus.isDirectory()) {
          if (fileStatus.isFile()) {
            throw new FileAlreadyExistsException(
                String.format("Can't make directory for path '%s'"
                    + " since it is a file.", fPart));
        } catch (FileNotFoundException fnfe) {
          LOG.debug("file {} not fount, but ignore.", path);
        fPart = fPart.getParent();
      } while (fPart != null);

      String key = pathToKey(owner, path);
      if (owner.isFsBucket()) {
        OBSPosixBucketUtils.fsCreateFolder(owner, key);
      } else {
        OBSObjectBucketUtils.createFakeDirectory(owner, key);
      return true;

   * Initiate a {@code listObjects} operation, incrementing metrics in the
   * process.
   * @param owner   the owner OBSFileSystem instance
   * @param request request to initiate
   * @return the results
   * @throws IOException on any failure to list objects
  static ObjectListing listObjects(final OBSFileSystem owner,
      final ListObjectsRequest request) throws IOException {
    if (request.getDelimiter() == null && request.getMarker() == null
        && owner.isFsBucket() && owner.isObsClientDFSListEnable()) {
      return OBSFsDFSListing.fsDFSListObjects(owner, request);

    return commonListObjects(owner, request);

  static ObjectListing commonListObjects(final OBSFileSystem owner,
      final ListObjectsRequest request) {
    for (int retryTime = 1; retryTime < MAX_RETRY_TIME; retryTime++) {
      try {
        return owner.getObsClient().listObjects(request);
      } catch (ObsException e) {
        LOG.warn("Failed to commonListObjects for request[{}], retry "
                + "time [{}], due to exception[{}]",
            request, retryTime, e);
        try {
        } catch (InterruptedException ie) {
          LOG.error("Failed to commonListObjects for request[{}], "
                  + "retry time [{}], due to exception[{}]",
              request, retryTime, e);
          throw e;

    return owner.getObsClient().listObjects(request);

   * List the next set of objects.
   * @param owner   the owner OBSFileSystem instance
   * @param objects paged result
   * @return the next result object
   * @throws IOException on any failure to list the next set of objects
  static ObjectListing continueListObjects(final OBSFileSystem owner,
      final ObjectListing objects) throws IOException {
    if (objects.getDelimiter() == null && owner.isFsBucket()
        && owner.isObsClientDFSListEnable()) {
      return OBSFsDFSListing.fsDFSContinueListObjects(owner,
          (OBSFsDFSListing) objects);

    return commonContinueListObjects(owner, objects);

  private static ObjectListing commonContinueListObjects(
      final OBSFileSystem owner, final ObjectListing objects) {
    String delimiter = objects.getDelimiter();
    int maxKeyNum = objects.getMaxKeys();
    // LOG.debug("delimiters: "+objects.getDelimiter());
    ListObjectsRequest request = new ListObjectsRequest();
    if (maxKeyNum > 0 && maxKeyNum < owner.getMaxKeys()) {
    } else {
    if (delimiter != null) {
    return commonContinueListObjects(owner, request);

  static ObjectListing commonContinueListObjects(final OBSFileSystem owner,
      final ListObjectsRequest request) {
    for (int retryTime = 1; retryTime < MAX_RETRY_TIME; retryTime++) {
      try {
        return owner.getObsClient().listObjects(request);
      } catch (ObsException e) {
        LOG.warn("Continue list objects failed for request[{}], retry"
                + " time[{}], due to exception[{}]",
            request, retryTime, e);
        try {
        } catch (InterruptedException ie) {
          LOG.error("Continue list objects failed for request[{}], "
                  + "retry time[{}], due to exception[{}]",
              request, retryTime, e);
          throw e;

    return owner.getObsClient().listObjects(request);

   * Predicate: does the object represent a directory?.
   * @param name object name
   * @param size object size
   * @return true if it meets the criteria for being an object
  public static boolean objectRepresentsDirectory(final String name,
      final long size) {
    return !name.isEmpty() && name.charAt(name.length() - 1) == '/'
        && size == 0L;

   * Date to long conversion. Handles null Dates that can be returned by OBS by
   * returning 0
   * @param date date from OBS query
   * @return timestamp of the object
  public static long dateToLong(final Date date) {
    if (date == null) {
      return 0L;

    return date.getTime() / OBSConstants.SEC2MILLISEC_FACTOR
        * OBSConstants.SEC2MILLISEC_FACTOR;

  // Used to check if a folder is empty or not.
  static boolean isFolderEmpty(final OBSFileSystem owner, final String key)
      throws FileNotFoundException, ObsException {
    for (int retryTime = 1; retryTime < MAX_RETRY_TIME; retryTime++) {
      try {
        return innerIsFolderEmpty(owner, key);
      } catch (ObsException e) {
            "Failed to check empty folder for [{}], retry time [{}], "
                + "exception [{}]", key, retryTime, e);

        try {
        } catch (InterruptedException ie) {
          throw e;

    return innerIsFolderEmpty(owner, key);

  // Used to check if a folder is empty or not by counting the number of
  // sub objects in list.
  private static boolean isFolderEmpty(final String key,
      final ObjectListing objects) {
    int count = objects.getObjects().size();
    if (count >= 2) {
      // There is a sub file at least.
      return false;
    } else if (count == 1 && !objects.getObjects()
        .equals(key)) {
      // There is a sub file at least.
      return false;

    count = objects.getCommonPrefixes().size();
    // There is a sub file at least.
    // There is no sub object.
    if (count >= 2) {
      // There is a sub file at least.
      return false;
    } else {
      return count != 1 || objects.getCommonPrefixes().get(0).equals(key);

  // Used to check if a folder is empty or not.
  static boolean innerIsFolderEmpty(final OBSFileSystem owner,
      final String key)
      throws FileNotFoundException, ObsException {
    String obsKey = maybeAddTrailingSlash(key);
    ListObjectsRequest request = new ListObjectsRequest();
    ObjectListing objects = owner.getObsClient().listObjects(request);

    if (!objects.getCommonPrefixes().isEmpty() || !objects.getObjects()
        .isEmpty()) {
      if (isFolderEmpty(obsKey, objects)) {
        LOG.debug("Found empty directory {}", obsKey);
        return true;
      if (LOG.isDebugEnabled()) {
        LOG.debug("Found path as directory (with /): {}/{}",

        for (ObsObject summary : objects.getObjects()) {
          LOG.debug("Summary: {} {}", summary.getObjectKey(),
        for (String prefix : objects.getCommonPrefixes()) {
          LOG.debug("Prefix: {}", prefix);
      LOG.debug("Found non-empty directory {}", obsKey);
      return false;
    } else if (obsKey.isEmpty()) {
      LOG.debug("Found root directory");
      return true;
    } else if (owner.isFsBucket()) {
      LOG.debug("Found empty directory {}", obsKey);
      return true;

    LOG.debug("Not Found: {}", obsKey);
    throw new FileNotFoundException("No such file or directory: " + obsKey);

   * Build a {@link LocatedFileStatus} from a {@link FileStatus} instance.
   * @param owner  the owner OBSFileSystem instance
   * @param status file status
   * @return a located status with block locations set up from this FS.
   * @throws IOException IO Problems.
  static LocatedFileStatus toLocatedFileStatus(final OBSFileSystem owner,
      final FileStatus status) throws IOException {
    return new LocatedFileStatus(
        status, status.isFile() ? owner.getFileBlockLocations(status, 0,
        status.getLen()) : null);

   * Create a appendFile request. Adds the ACL and metadata
   * @param owner          the owner OBSFileSystem instance
   * @param key            key of object
   * @param tmpFile        temp file or input stream
   * @param recordPosition client record next append position
   * @return the request
   * @throws IOException any problem
  static WriteFileRequest newAppendFileRequest(final OBSFileSystem owner,
      final String key, final long recordPosition, final File tmpFile)
      throws IOException {
    ObsFSAttribute obsFsAttribute;
    try {
      GetAttributeRequest getAttributeReq = new GetAttributeRequest(
          owner.getBucket(), key);
      obsFsAttribute = owner.getObsClient().getAttribute(getAttributeReq);
    } catch (ObsException e) {
      throw translateException("GetAttributeRequest", key, e);

    long appendPosition = Math.max(recordPosition,
    if (recordPosition != obsFsAttribute.getContentLength()) {
      LOG.warn("append url[{}] position[{}], file contentLength[{}] not"
              + " equal to recordPosition[{}].", key, appendPosition,
          obsFsAttribute.getContentLength(), recordPosition);
    WriteFileRequest writeFileReq = new WriteFileRequest(owner.getBucket(),
        key, tmpFile, appendPosition);
    return writeFileReq;

   * Create a appendFile request. Adds the ACL and metadata
   * @param owner          the owner OBSFileSystem instance
   * @param key            key of object
   * @param inputStream    temp file or input stream
   * @param recordPosition client record next append position
   * @return the request
   * @throws IOException any problem
  static WriteFileRequest newAppendFileRequest(final OBSFileSystem owner,
      final String key, final long recordPosition,
      final InputStream inputStream) throws IOException {
    ObsFSAttribute obsFsAttribute;
    try {
      GetAttributeRequest getAttributeReq = new GetAttributeRequest(
          owner.getBucket(), key);
      obsFsAttribute = owner.getObsClient().getAttribute(getAttributeReq);
    } catch (ObsException e) {
      throw translateException("GetAttributeRequest", key, e);

    long appendPosition = Math.max(recordPosition,
    if (recordPosition != obsFsAttribute.getContentLength()) {
      LOG.warn("append url[{}] position[{}], file contentLength[{}] not"
              + " equal to recordPosition[{}].", key, appendPosition,
          obsFsAttribute.getContentLength(), recordPosition);
    WriteFileRequest writeFileReq = new WriteFileRequest(owner.getBucket(),
        key, inputStream, appendPosition);
    return writeFileReq;

   * Append File.
   * @param owner             the owner OBSFileSystem instance
   * @param appendFileRequest append object request
   * @throws IOException on any failure to append file
  static void appendFile(final OBSFileSystem owner,
      final WriteFileRequest appendFileRequest) throws IOException {
    long len = 0;
    if (appendFileRequest.getFile() != null) {
      len = appendFileRequest.getFile().length();

    try {
      LOG.debug("Append file, key {} position {} size {}",
    } catch (ObsException e) {
      throw translateException("AppendFile",
          appendFileRequest.getObjectKey(), e);

   * Close the Closeable objects and <b>ignore</b> any Exception or null
   * pointers. (This is the SLF4J equivalent of that in {@code IOUtils}).
   * @param closeables the objects to close
  static void closeAll(final java.io.Closeable... closeables) {
    for (java.io.Closeable c : closeables) {
      if (c != null) {
        try {
          if (LOG != null) {
            LOG.debug("Closing {}", c);
        } catch (Exception e) {
          if (LOG != null && LOG.isDebugEnabled()) {
            LOG.debug("Exception in closing {}", c, e);

   * Extract an exception from a failed future, and convert to an IOE.
   * @param operation operation which failed
   * @param path      path operated on (may be null)
   * @param ee        execution exception
   * @return an IOE which can be thrown
  static IOException extractException(final String operation,
      final String path, final ExecutionException ee) {
    IOException ioe;
    Throwable cause = ee.getCause();
    if (cause instanceof ObsException) {
      ioe = translateException(operation, path, (ObsException) cause);
    } else if (cause instanceof IOException) {
      ioe = (IOException) cause;
    } else {
      ioe = new IOException(operation + " failed: " + cause, cause);
    return ioe;

   * Create a files status instance from a listing.
   * @param keyPath   path to entry
   * @param summary   summary from OBS
   * @param blockSize block size to declare.
   * @param owner     owner of the file
   * @return a status entry
  static OBSFileStatus createFileStatus(
      final Path keyPath, final ObsObject summary, final long blockSize,
      final String owner) {
    if (objectRepresentsDirectory(
        summary.getObjectKey(), summary.getMetadata().getContentLength())) {
      return new OBSFileStatus(keyPath, owner);
    } else {
      return new OBSFileStatus(

   * Return the access key and secret for OBS API use. Credentials may exist in
   * configuration, within credential providers or indicated in the UserInfo of
   * the name URI param.
   * @param name the URI for which we need the access keys.
   * @param conf the Configuration object to interrogate for keys.
   * @return OBSAccessKeys
   * @throws IOException problems retrieving passwords from KMS.
  static OBSLoginHelper.Login getOBSAccessKeys(final URI name,
      final Configuration conf)
      throws IOException {
    OBSLoginHelper.Login login
        = OBSLoginHelper.extractLoginDetailsWithWarnings(name);
    Configuration c =
    String accessKey = getPassword(c, OBSConstants.ACCESS_KEY,
    String secretKey = getPassword(c, OBSConstants.SECRET_KEY,
    String sessionToken = getPassword(c, OBSConstants.SESSION_TOKEN,
    return new OBSLoginHelper.Login(accessKey, secretKey, sessionToken);

   * Get a password from a configuration, or, if a value is passed in, pick that
   * up instead.
   * @param conf configuration
   * @param key  key to look up
   * @param val  current value: if non empty this is used instead of querying
   *             the configuration.
   * @return a password or "".
   * @throws IOException on any problem
  private static String getPassword(final Configuration conf,
      final String key, final String val) throws IOException {
    return StringUtils.isEmpty(val) ? lookupPassword(conf, key) : val;

   * Get a password from a configuration/configured credential providers.
   * @param conf configuration
   * @param key  key to look up
   * @return a password or the value in {@code defVal}
   * @throws IOException on any problem
  private static String lookupPassword(final Configuration conf,
      final String key) throws IOException {
    try {
      final char[] pass = conf.getPassword(key);
      return pass != null ? new String(pass).trim() : "";
    } catch (IOException ioe) {
      throw new IOException("Cannot find password option " + key, ioe);

   * String information about a summary entry for debug messages.
   * @param summary summary object
   * @return string value
  static String stringify(final ObsObject summary) {
    return summary.getObjectKey() + " size=" + summary.getMetadata()

   * Get a integer option not smaller than the minimum allowed value.
   * @param conf   configuration
   * @param key    key to look up
   * @param defVal default value
   * @param min    minimum value
   * @return the value
   * @throws IllegalArgumentException if the value is below the minimum
  static int intOption(final Configuration conf, final String key,
      final int defVal,
      final int min) {
    int v = conf.getInt(key, defVal);
        v >= min,
        String.format("Value of %s: %d is below the minimum value %d", key,
            v, min));
    LOG.debug("Value of {} is {}", key, v);
    return v;

   * Get a long option not smaller than the minimum allowed value.
   * @param conf   configuration
   * @param key    key to look up
   * @param defVal default value
   * @param min    minimum value
   * @return the value
   * @throws IllegalArgumentException if the value is below the minimum
  static long longOption(final Configuration conf, final String key,
      final long defVal,
      final long min) {
    long v = conf.getLong(key, defVal);
        v >= min,
        String.format("Value of %s: %d is below the minimum value %d", key,
            v, min));
    LOG.debug("Value of {} is {}", key, v);
    return v;

   * Get a long option not smaller than the minimum allowed value, supporting
   * memory prefixes K,M,G,T,P.
   * @param conf   configuration
   * @param key    key to look up
   * @param defVal default value
   * @param min    minimum value
   * @return the value
   * @throws IllegalArgumentException if the value is below the minimum
  static long longBytesOption(final Configuration conf, final String key,
      final long defVal,
      final long min) {
    long v = conf.getLongBytes(key, defVal);
        v >= min,
        String.format("Value of %s: %d is below the minimum value %d", key,
            v, min));
    LOG.debug("Value of {} is {}", key, v);
    return v;

   * Get a size property from the configuration: this property must be at least
   * equal to {@link OBSConstants#MULTIPART_MIN_SIZE}. If it is too small, it is
   * rounded up to that minimum, and a warning printed.
   * @param conf     configuration
   * @param property property name
   * @param defVal   default value
   * @return the value, guaranteed to be above the minimum size
  public static long getMultipartSizeProperty(final Configuration conf,
      final String property, final long defVal) {
    long partSize = conf.getLongBytes(property, defVal);
    if (partSize < OBSConstants.MULTIPART_MIN_SIZE) {
      LOG.warn("{} must be at least 5 MB; configured value is {}",
          property, partSize);
      partSize = OBSConstants.MULTIPART_MIN_SIZE;
    return partSize;

   * Ensure that the long value is in the range of an integer.
   * @param name property name for error messages
   * @param size original size
   * @return the size, guaranteed to be less than or equal to the max value of
   * an integer.
  static int ensureOutputParameterInRange(final String name,
      final long size) {
    if (size > Integer.MAX_VALUE) {
          "obs: {} capped to ~2.14GB"
              + " (maximum allowed size with current output mechanism)",
      return Integer.MAX_VALUE;
    } else {
      return (int) size;

   * Propagates bucket-specific settings into generic OBS configuration keys.
   * This is done by propagating the values of the form {@code
   * fs.obs.bucket.${bucket}.key} to {@code fs.obs.key}, for all values of "key"
   * other than a small set of unmodifiable values.
   * <p>The source of the updated property is set to the key name of the
   * bucket property, to aid in diagnostics of where things came from.
   * <p>Returns a new configuration. Why the clone? You can use the same conf
   * for different filesystems, and the original values are not updated.
   * <p>The {@code fs.obs.impl} property cannot be set, nor can any with the
   * prefix {@code fs.obs.bucket}.
   * <p>This method does not propagate security provider path information
   * from the OBS property into the Hadoop common provider: callers must call
   * {@link #patchSecurityCredentialProviders(Configuration)} explicitly.
   * @param source Source Configuration object.
   * @param bucket bucket name. Must not be empty.
   * @return a (potentially) patched clone of the original.
  static Configuration propagateBucketOptions(final Configuration source,
      final String bucket) {

    Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket");
    final String bucketPrefix = OBSConstants.FS_OBS_BUCKET_PREFIX + bucket
        + '.';
    LOG.debug("Propagating entries under {}", bucketPrefix);
    final Configuration dest = new Configuration(source);
    for (Map.Entry<String, String> entry : source) {
      final String key = entry.getKey();
      // get the (unexpanded) value.
      final String value = entry.getValue();
      if (!key.startsWith(bucketPrefix) || bucketPrefix.equals(key)) {
      // there's a bucket prefix, so strip it
      final String stripped = key.substring(bucketPrefix.length());
      if (stripped.startsWith("bucket.") || "impl".equals(stripped)) {
        // tell user off
        LOG.debug("Ignoring bucket option {}", key);
      } else {
        // propagate the value, building a new origin field.
        // to track overwrites, the generic key is overwritten even if
        // already matches the new one.
        final String generic = OBSConstants.FS_OBS_PREFIX + stripped;
        LOG.debug("Updating {}", generic);
        dest.set(generic, value, key);
    return dest;

   * Patch the security credential provider information in {@link
   * #CREDENTIAL_PROVIDER_PATH} with the providers listed in {@link
   * <p>This allows different buckets to use different credential files.
   * @param conf configuration to patch
  static void patchSecurityCredentialProviders(final Configuration conf) {
    Collection<String> customCredentials =
    Collection<String> hadoopCredentials = conf.getStringCollection(
    if (!customCredentials.isEmpty()) {
      List<String> all = Lists.newArrayList(customCredentials);
      String joined = StringUtils.join(all, ',');
      LOG.debug("Setting {} to {}", CREDENTIAL_PROVIDER_PATH, joined);
      conf.set(CREDENTIAL_PROVIDER_PATH, joined, "patch of "

   * Verify that the bucket exists. This does not check permissions, not even
   * read access.
   * @param owner the owner OBSFileSystem instance
   * @throws FileNotFoundException the bucket is absent
   * @throws IOException           any other problem talking to OBS
  static void verifyBucketExists(final OBSFileSystem owner)
      throws FileNotFoundException, IOException {
    int retryTime = 1;
    while (true) {
      try {
        if (!owner.getObsClient().headBucket(owner.getBucket())) {
          throw new FileNotFoundException(
              "Bucket " + owner.getBucket() + " does not exist");
      } catch (ObsException e) {
        LOG.warn("Failed to head bucket for [{}], retry time [{}], "
                + "exception [{}]", owner.getBucket(), retryTime,
            translateException("doesBucketExist", owner.getBucket(),

        if (MAX_RETRY_TIME == retryTime) {
          throw translateException("doesBucketExist",
              owner.getBucket(), e);

        try {
        } catch (InterruptedException ie) {
          throw e;

   * initialize multi-part upload, purge larger than the value of
   * @param owner the owner OBSFileSystem instance
   * @param conf  the configuration to use for the FS
   * @throws IOException on any failure to initialize multipart upload
  static void initMultipartUploads(final OBSFileSystem owner,
      final Configuration conf)
      throws IOException {
    boolean purgeExistingMultipart =
    long purgeExistingMultipartAge =
        longOption(conf, OBSConstants.PURGE_EXISTING_MULTIPART_AGE,

    if (!purgeExistingMultipart) {

    final Date purgeBefore = new Date(
        new Date().getTime() - purgeExistingMultipartAge * 1000);

    try {
      ListMultipartUploadsRequest request
          = new ListMultipartUploadsRequest(owner.getBucket());
      while (true) {
        // List + purge
        MultipartUploadListing uploadListing = owner.getObsClient()
        for (MultipartUpload upload
            : uploadListing.getMultipartTaskList()) {
          if (upload.getInitiatedDate().compareTo(purgeBefore) < 0) {
                new AbortMultipartUploadRequest(
                    owner.getBucket(), upload.getObjectKey(),
        if (!uploadListing.isTruncated()) {
    } catch (ObsException e) {
      if (e.getResponseCode() == FORBIDDEN_CODE) {
        LOG.debug("Failed to purging multipart uploads against {},"
                + " FS may be read only", owner.getBucket(),
      } else {
        throw translateException("purging multipart uploads",
            owner.getBucket(), e);

  static void shutdownAll(final ExecutorService... executors) {
    for (ExecutorService exe : executors) {
      if (exe != null) {
        try {
          if (LOG != null) {
            LOG.debug("Shutdown {}", exe);
        } catch (Exception e) {
          if (LOG != null && LOG.isDebugEnabled()) {
            LOG.debug("Exception in shutdown {}", exe, e);


