hadoop CosNFileSystem 源码

  • 2022-10-20
  • 浏览 (462)

haddop CosNFileSystem 代码

文件路径:/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.cosn;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;

/**
 * The core CosN Filesystem implementation.
 */
@InterfaceAudience.Private
@InterfaceStability.Stable
public class CosNFileSystem extends FileSystem {
  static final Logger LOG = LoggerFactory.getLogger(CosNFileSystem.class);

  public static final String SCHEME = "cosn";
  public static final String PATH_DELIMITER = Path.SEPARATOR;

  private URI uri;
  private String bucket;
  private NativeFileSystemStore store;
  private Path workingDir;
  private String owner = "Unknown";
  private String group = "Unknown";

  private ExecutorService boundedIOThreadPool;
  private ExecutorService boundedCopyThreadPool;

  public CosNFileSystem() {
  }

  public CosNFileSystem(NativeFileSystemStore store) {
    this.store = store;
  }

  /**
   * Return the protocol scheme for the FileSystem.
   *
   * @return <code>cosn</code>
   */
  @Override
  public String getScheme() {
    return CosNFileSystem.SCHEME;
  }

  @Override
  public void initialize(URI name, Configuration conf) throws IOException {
    super.initialize(name, conf);
    this.bucket = name.getHost();
    if (this.store == null) {
      this.store = createDefaultStore(conf);
    }
    this.store.initialize(name, conf);
    setConf(conf);
    this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
    this.workingDir = new Path("/user",
        System.getProperty("user.name")).makeQualified(
        this.uri,
        this.getWorkingDirectory());
    this.owner = getOwnerId();
    this.group = getGroupId();
    LOG.debug("owner:" + owner + ", group:" + group);

    BufferPool.getInstance().initialize(this.getConf());

    // initialize the thread pool
    int uploadThreadPoolSize = this.getConf().getInt(
        CosNConfigKeys.UPLOAD_THREAD_POOL_SIZE_KEY,
        CosNConfigKeys.DEFAULT_UPLOAD_THREAD_POOL_SIZE
    );
    int readAheadPoolSize = this.getConf().getInt(
        CosNConfigKeys.READ_AHEAD_QUEUE_SIZE,
        CosNConfigKeys.DEFAULT_READ_AHEAD_QUEUE_SIZE
    );
    int ioThreadPoolSize = uploadThreadPoolSize + readAheadPoolSize / 3;
    long threadKeepAlive = this.getConf().getLong(
        CosNConfigKeys.THREAD_KEEP_ALIVE_TIME_KEY,
        CosNConfigKeys.DEFAULT_THREAD_KEEP_ALIVE_TIME
    );
    this.boundedIOThreadPool = BlockingThreadPoolExecutorService.newInstance(
        ioThreadPoolSize / 2, ioThreadPoolSize,
        threadKeepAlive, TimeUnit.SECONDS,
        "cos-transfer-thread-pool");
    int copyThreadPoolSize = this.getConf().getInt(
        CosNConfigKeys.COPY_THREAD_POOL_SIZE_KEY,
        CosNConfigKeys.DEFAULT_COPY_THREAD_POOL_SIZE
    );
    this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
        CosNConfigKeys.DEFAULT_COPY_THREAD_POOL_SIZE, copyThreadPoolSize,
        60L, TimeUnit.SECONDS,
        "cos-copy-thread-pool");
  }

  private static NativeFileSystemStore createDefaultStore(Configuration conf) {
    NativeFileSystemStore store = new CosNativeFileSystemStore();
    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
        conf.getInt(CosNConfigKeys.COSN_MAX_RETRIES_KEY,
            CosNConfigKeys.DEFAULT_MAX_RETRIES),
        conf.getLong(CosNConfigKeys.COSN_RETRY_INTERVAL_KEY,
            CosNConfigKeys.DEFAULT_RETRY_INTERVAL),
        TimeUnit.SECONDS);
    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
        new HashMap<>();

    exceptionToPolicyMap.put(IOException.class, basePolicy);
    RetryPolicy methodPolicy = RetryPolicies.retryByException(
        RetryPolicies.TRY_ONCE_THEN_FAIL,
        exceptionToPolicyMap);
    Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
    methodNameToPolicyMap.put("storeFile", methodPolicy);
    methodNameToPolicyMap.put("rename", methodPolicy);

    return (NativeFileSystemStore) RetryProxy.create(
        NativeFileSystemStore.class, store, methodNameToPolicyMap);
  }

  private String getOwnerId() {
    return System.getProperty("user.name");
  }

  private String getGroupId() {
    return System.getProperty("user.name");
  }

  private String getOwnerInfo(boolean getOwnerId) {
    String ownerInfoId = "";
    try {
      String userName = System.getProperty("user.name");
      String command = "id -u " + userName;
      if (!getOwnerId) {
        command = "id -g " + userName;
      }
      Process child = Runtime.getRuntime().exec(command);
      child.waitFor();

      // Get the input stream and read from it
      InputStream in = child.getInputStream();
      StringBuilder strBuffer = new StringBuilder();
      int c;
      while ((c = in.read()) != -1) {
        strBuffer.append((char) c);
      }
      in.close();
      ownerInfoId = strBuffer.toString();
    } catch (IOException | InterruptedException e) {
      LOG.error("Getting owner info occurs a exception", e);
    }
    return ownerInfoId;
  }

  private static String pathToKey(Path path) {
    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
      // allow uris without trailing slash after bucket to refer to root,
      // like cosn://mybucket
      return "";
    }
    if (!path.isAbsolute()) {
      throw new IllegalArgumentException("Path must be absolute: " + path);
    }
    String ret = path.toUri().getPath();
    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
      ret = ret.substring(0, ret.length() - 1);
    }
    return ret;
  }

  private static Path keyToPath(String key) {
    if (!key.startsWith(PATH_DELIMITER)) {
      return new Path("/" + key);
    } else {
      return new Path(key);
    }
  }

  private Path makeAbsolute(Path path) {
    if (path.isAbsolute()) {
      return path;
    }
    return new Path(workingDir, path);
  }

  /**
   * This optional operation is not yet supported.
   */
  @Override
  public FSDataOutputStream append(Path f, int bufferSize,
      Progressable progress) throws IOException {
    throw new IOException("Not supported");
  }

  @Override
  public FSDataOutputStream create(Path f, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    FileStatus fileStatus;

    try {
      fileStatus = getFileStatus(f);
      if (fileStatus.isDirectory()) {
        throw new FileAlreadyExistsException(f + " is a directory");
      }
      if (!overwrite) {
        // path references a file and overwrite is disabled
        throw new FileAlreadyExistsException(f + " already exists");
      }

    } catch (FileNotFoundException e) {
      LOG.debug("Creating a new file: [{}] in COS.", f);
    }

    Path absolutePath = makeAbsolute(f);
    String key = pathToKey(absolutePath);
    return new FSDataOutputStream(
        new CosNOutputStream(getConf(), store, key, blockSize,
            this.boundedIOThreadPool), statistics);
  }

  private boolean rejectRootDirectoryDelete(boolean isEmptyDir,
      boolean recursive) throws PathIOException {
    if (isEmptyDir) {
      return true;
    }
    if (recursive) {
      return false;
    } else {
      throw new PathIOException(this.bucket, "Can not delete root path");
    }
  }

  @Override
  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
      EnumSet<CreateFlag> flags, int bufferSize, short replication,
      long blockSize, Progressable progress) throws IOException {
    Path parent = f.getParent();
    if (null != parent) {
      if (!getFileStatus(parent).isDirectory()) {
        throw new FileAlreadyExistsException("Not a directory: " + parent);
      }
    }

    return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
        bufferSize, replication, blockSize, progress);
  }

  @Override
  public boolean delete(Path f, boolean recursive) throws IOException {
    LOG.debug("Ready to delete path: [{}]. recursive: [{}].", f, recursive);
    FileStatus status;
    try {
      status = getFileStatus(f);
    } catch (FileNotFoundException e) {
      LOG.debug("Ready to delete the file: [{}], but it does not exist.", f);
      return false;
    }
    Path absolutePath = makeAbsolute(f);
    String key = pathToKey(absolutePath);
    if (key.compareToIgnoreCase("/") == 0) {
      FileStatus[] fileStatuses = listStatus(f);
      return this.rejectRootDirectoryDelete(
          fileStatuses.length == 0, recursive);
    }

    if (status.isDirectory()) {
      if (!key.endsWith(PATH_DELIMITER)) {
        key += PATH_DELIMITER;
      }
      if (!recursive && listStatus(f).length > 0) {
        String errMsg = String.format("Can not delete the directory: [%s], as"
            + " it is not empty and option recursive is false.", f);
        throw new IOException(errMsg);
      }

      createParent(f);

      String priorLastKey = null;
      do {
        PartialListing listing = store.list(
            key,
            Constants.COS_MAX_LISTING_LENGTH,
            priorLastKey,
            true);
        for (FileMetadata file : listing.getFiles()) {
          store.delete(file.getKey());
        }
        for (FileMetadata commonPrefix : listing.getCommonPrefixes()) {
          store.delete(commonPrefix.getKey());
        }
        priorLastKey = listing.getPriorLastKey();
      } while (priorLastKey != null);
      try {
        store.delete(key);
      } catch (Exception e) {
        LOG.error("Deleting the COS key: [{}] occurs an exception.", key, e);
      }

    } else {
      LOG.debug("Delete the file: {}", f);
      createParent(f);
      store.delete(key);
    }
    return true;
  }

  @Override
  public FileStatus getFileStatus(Path f) throws IOException {
    Path absolutePath = makeAbsolute(f);
    String key = pathToKey(absolutePath);

    if (key.length() == 0) {
      // root always exists
      return newDirectory(absolutePath);
    }

    LOG.debug("Call the getFileStatus to obtain the metadata for "
        + "the file: [{}].", f);

    FileMetadata meta = store.retrieveMetadata(key);
    if (meta != null) {
      if (meta.isFile()) {
        LOG.debug("Path: [{}] is a file. COS key: [{}]", f, key);
        return newFile(meta, absolutePath);
      } else {
        LOG.debug("Path: [{}] is a dir. COS key: [{}]", f, key);
        return newDirectory(meta, absolutePath);
      }
    }

    if (!key.endsWith(PATH_DELIMITER)) {
      key += PATH_DELIMITER;
    }

    // Considering that the object store's directory is a common prefix in
    // the object key, it needs to check the existence of the path by listing
    // the COS key.
    LOG.debug("List COS key: [{}] to check the existence of the path.", key);
    PartialListing listing = store.list(key, 1);
    if (listing.getFiles().length > 0
        || listing.getCommonPrefixes().length > 0) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Path: [{}] is a directory. COS key: [{}]", f, key);
      }
      return newDirectory(absolutePath);
    }

    throw new FileNotFoundException(
        "No such file or directory '" + absolutePath + "'");
  }

  @Override
  public URI getUri() {
    return uri;
  }

  /**
   * <p>
   * If <code>f</code> is a file, this method will make a single call to COS.
   * If <code>f</code> is a directory,
   * this method will make a maximum of ( <i>n</i> / 199) + 2 calls to cos,
   * where <i>n</i> is the total number of files
   * and directories contained directly in <code>f</code>.
   * </p>
   */
  @Override
  public FileStatus[] listStatus(Path f) throws IOException {
    Path absolutePath = makeAbsolute(f);
    String key = pathToKey(absolutePath);

    if (key.length() > 0) {
      FileStatus fileStatus = this.getFileStatus(f);
      if (fileStatus.isFile()) {
        return new FileStatus[]{fileStatus};
      }
    }

    if (!key.endsWith(PATH_DELIMITER)) {
      key += PATH_DELIMITER;
    }

    URI pathUri = absolutePath.toUri();
    Set<FileStatus> status = new TreeSet<>();
    String priorLastKey = null;
    do {
      PartialListing listing = store.list(
          key, Constants.COS_MAX_LISTING_LENGTH, priorLastKey, false);
      for (FileMetadata fileMetadata : listing.getFiles()) {
        Path subPath = keyToPath(fileMetadata.getKey());
        if (fileMetadata.getKey().equals(key)) {
          // this is just the directory we have been asked to list.
          LOG.debug("The file list contains the COS key [{}] to be listed.",
              key);
        } else {
          status.add(newFile(fileMetadata, subPath));
        }
      }

      for (FileMetadata commonPrefix : listing.getCommonPrefixes()) {
        Path subPath = keyToPath(commonPrefix.getKey());
        String relativePath = pathUri.relativize(subPath.toUri()).getPath();
        status.add(
            newDirectory(commonPrefix, new Path(absolutePath, relativePath)));
      }
      priorLastKey = listing.getPriorLastKey();
    } while (priorLastKey != null);

    return status.toArray(new FileStatus[status.size()]);
  }

  private FileStatus newFile(FileMetadata meta, Path path) {
    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
        meta.getLastModified(), 0, null, this.owner, this.group,
        path.makeQualified(this.getUri(), this.getWorkingDirectory()));
  }

  private FileStatus newDirectory(Path path) {
    return new FileStatus(0, true, 1, 0, 0, 0, null, this.owner, this.group,
        path.makeQualified(this.getUri(), this.getWorkingDirectory()));
  }

  private FileStatus newDirectory(FileMetadata meta, Path path) {
    if (meta == null) {
      return newDirectory(path);
    }
    return new FileStatus(0, true, 1, 0, meta.getLastModified(),
        0, null, this.owner, this.group,
        path.makeQualified(this.getUri(), this.getWorkingDirectory()));
  }

  /**
   * Validate the path from the bottom up.
   *
   * @param path The path to be validated
   * @throws FileAlreadyExistsException The specified path is an existing file
   * @throws IOException                Getting the file status of the
   *                                    specified path occurs
   *                                    an IOException.
   */
  private void validatePath(Path path) throws IOException {
    Path parent = path.getParent();
    do {
      try {
        FileStatus fileStatus = getFileStatus(parent);
        if (fileStatus.isDirectory()) {
          break;
        } else {
          throw new FileAlreadyExistsException(String.format(
              "Can't make directory for path '%s', it is a file.", parent));
        }
      } catch (FileNotFoundException e) {
        LOG.debug("The Path: [{}] does not exist.", path);
      }
      parent = parent.getParent();
    } while (parent != null);
  }

  @Override
  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    try {
      FileStatus fileStatus = getFileStatus(f);
      if (fileStatus.isDirectory()) {
        return true;
      } else {
        throw new FileAlreadyExistsException("Path is a file: " + f);
      }
    } catch (FileNotFoundException e) {
      validatePath(f);
    }

    return mkDirRecursively(f, permission);
  }

  /**
   * Recursively create a directory.
   *
   * @param f          Absolute path to the directory.
   * @param permission Directory permissions. Permission does not work for
   *                   the CosN filesystem currently.
   * @return Return true if the creation was successful,  throw a IOException.
   * @throws IOException The specified path already exists or an error
   *                     creating the path.
   */
  public boolean mkDirRecursively(Path f, FsPermission permission)
      throws IOException {
    Path absolutePath = makeAbsolute(f);
    List<Path> paths = new ArrayList<>();
    do {
      paths.add(absolutePath);
      absolutePath = absolutePath.getParent();
    } while (absolutePath != null);

    for (Path path : paths) {
      if (path.equals(new Path(CosNFileSystem.PATH_DELIMITER))) {
        break;
      }
      try {
        FileStatus fileStatus = getFileStatus(path);
        if (fileStatus.isFile()) {
          throw new FileAlreadyExistsException(
              String.format("Can't make directory for path: %s, "
                  + "since it is a file.", f));
        }
        if (fileStatus.isDirectory()) {
          break;
        }
      } catch (FileNotFoundException e) {
        LOG.debug("Making dir: [{}] in COS", f);

        String folderPath = pathToKey(makeAbsolute(f));
        if (!folderPath.endsWith(PATH_DELIMITER)) {
          folderPath += PATH_DELIMITER;
        }
        store.storeEmptyFile(folderPath);
      }
    }
    return true;
  }

  private boolean mkdir(Path f) throws IOException {
    try {
      FileStatus fileStatus = getFileStatus(f);
      if (fileStatus.isFile()) {
        throw new FileAlreadyExistsException(
            String.format(
                "Can't make directory for path '%s' since it is a file.", f));
      }
    } catch (FileNotFoundException e) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Make directory: [{}] in COS.", f);
      }

      String folderPath = pathToKey(makeAbsolute(f));
      if (!folderPath.endsWith(PATH_DELIMITER)) {
        folderPath += PATH_DELIMITER;
      }
      store.storeEmptyFile(folderPath);
    }
    return true;
  }

  @Override
  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    FileStatus fs = getFileStatus(f); // will throw if the file doesn't
    // exist
    if (fs.isDirectory()) {
      throw new FileNotFoundException("'" + f + "' is a directory");
    }
    LOG.info("Open the file: [{}] for reading.", f);
    Path absolutePath = makeAbsolute(f);
    String key = pathToKey(absolutePath);
    long fileSize = store.getFileLength(key);
    return new FSDataInputStream(new BufferedFSInputStream(
        new CosNInputStream(this.getConf(), store, statistics, key, fileSize,
            this.boundedIOThreadPool), bufferSize));
  }

  @Override
  public boolean rename(Path src, Path dst) throws IOException {
    LOG.debug("Rename source path: [{}] to dest path: [{}].", src, dst);

    // Renaming the root directory is not allowed
    if (src.isRoot()) {
      LOG.debug("Cannot rename the root directory of a filesystem.");
      return false;
    }

    // check the source path whether exists or not
    FileStatus srcFileStatus = this.getFileStatus(src);

    // Source path and destination path are not allowed to be the same
    if (src.equals(dst)) {
      LOG.debug("Source path and dest path refer to "
          + "the same file or directory: [{}].", dst);
      throw new IOException("Source path and dest path refer "
          + "the same file or directory");
    }

    // It is not allowed to rename a parent directory to its subdirectory
    Path dstParentPath;
    for (dstParentPath = dst.getParent();
         null != dstParentPath && !src.equals(dstParentPath);
         dstParentPath = dstParentPath.getParent()) {
      // Recursively find the common parent path of the source and
      // destination paths.
      LOG.debug("Recursively find the common parent directory of the source "
              + "and destination paths. The currently found parent path: {}",
          dstParentPath);
    }

    if (null != dstParentPath) {
      LOG.debug("It is not allowed to rename a parent directory:[{}] "
          + "to its subdirectory:[{}].", src, dst);
      throw new IOException(String.format(
          "It is not allowed to rename a parent directory: %s "
              + "to its subdirectory: %s", src, dst));
    }

    FileStatus dstFileStatus;
    try {
      dstFileStatus = this.getFileStatus(dst);

      // The destination path exists and is a file,
      // and the rename operation is not allowed.
      if (dstFileStatus.isFile()) {
        throw new FileAlreadyExistsException(String.format(
            "File: %s already exists", dstFileStatus.getPath()));
      } else {
        // The destination path is an existing directory,
        // and it is checked whether there is a file or directory
        // with the same name as the source path under the destination path
        dst = new Path(dst, src.getName());
        FileStatus[] statuses;
        try {
          statuses = this.listStatus(dst);
        } catch (FileNotFoundException e) {
          statuses = null;
        }
        if (null != statuses && statuses.length > 0) {
          LOG.debug("Cannot rename source file: [{}] to dest file: [{}], "
              + "because the file already exists.", src, dst);
          throw new FileAlreadyExistsException(
              String.format(
                  "File: %s already exists", dst
              )
          );
        }
      }
    } catch (FileNotFoundException e) {
      // destination path not exists
      Path tempDstParentPath = dst.getParent();
      FileStatus dstParentStatus = this.getFileStatus(tempDstParentPath);
      if (!dstParentStatus.isDirectory()) {
        throw new IOException(String.format(
            "Cannot rename %s to %s, %s is a file", src, dst, dst.getParent()
        ));
      }
      // The default root directory is definitely there.
    }

    boolean result;
    if (srcFileStatus.isDirectory()) {
      result = this.copyDirectory(src, dst);
    } else {
      result = this.copyFile(src, dst);
    }

    if (!result) {
      //Since rename is a non-atomic operation, after copy fails,
      // it is not allowed to delete the data of the original path.
      return false;
    } else {
      return this.delete(src, true);
    }
  }

  private boolean copyFile(Path srcPath, Path dstPath) throws IOException {
    String srcKey = pathToKey(srcPath);
    String dstKey = pathToKey(dstPath);
    this.store.copy(srcKey, dstKey);
    return true;
  }

  private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
    String srcKey = pathToKey(srcPath);
    if (!srcKey.endsWith(PATH_DELIMITER)) {
      srcKey += PATH_DELIMITER;
    }
    String dstKey = pathToKey(dstPath);
    if (!dstKey.endsWith(PATH_DELIMITER)) {
      dstKey += PATH_DELIMITER;
    }

    if (dstKey.startsWith(srcKey)) {
      throw new IOException(
          "can not copy a directory to a subdirectory of self");
    }

    this.store.storeEmptyFile(dstKey);
    CosNCopyFileContext copyFileContext = new CosNCopyFileContext();

    int copiesToFinishes = 0;
    String priorLastKey = null;
    do {
      PartialListing objectList = this.store.list(
          srcKey, Constants.COS_MAX_LISTING_LENGTH, priorLastKey, true);
      for (FileMetadata file : objectList.getFiles()) {
        this.boundedCopyThreadPool.execute(new CosNCopyFileTask(
            this.store,
            file.getKey(),
            dstKey.concat(file.getKey().substring(srcKey.length())),
            copyFileContext));
        copiesToFinishes++;
        if (!copyFileContext.isCopySuccess()) {
          break;
        }
      }
      priorLastKey = objectList.getPriorLastKey();
    } while (null != priorLastKey);

    copyFileContext.lock();
    try {
      copyFileContext.awaitAllFinish(copiesToFinishes);
    } catch (InterruptedException e) {
      LOG.warn("interrupted when wait copies to finish");
    } finally {
      copyFileContext.lock();
    }

    return copyFileContext.isCopySuccess();
  }

  private void createParent(Path path) throws IOException {
    Path parent = path.getParent();
    if (parent != null) {
      String parentKey = pathToKey(parent);
      LOG.debug("Create parent key: {}", parentKey);
      if (!parentKey.equals(PATH_DELIMITER)) {
        String key = pathToKey(makeAbsolute(parent));
        if (key.length() > 0) {
          try {
            store.storeEmptyFile(key + PATH_DELIMITER);
          } catch (IOException e) {
            LOG.debug("Store a empty file in COS failed.", e);
            throw e;
          }
        }
      }
    }
  }

  @Override
  @SuppressWarnings("deprecation")
  public long getDefaultBlockSize() {
    return getConf().getLong(
        CosNConfigKeys.COSN_BLOCK_SIZE_KEY,
        CosNConfigKeys.DEFAULT_BLOCK_SIZE);
  }

  /**
   * Set the working directory to the given directory.
   */
  @Override
  public void setWorkingDirectory(Path newDir) {
    workingDir = newDir;
  }

  @Override
  public Path getWorkingDirectory() {
    return workingDir;
  }

  @Override
  public String getCanonicalServiceName() {
    // Does not support Token
    return null;
  }

  @Override
  public void close() throws IOException {
    try {
      this.store.close();
      this.boundedIOThreadPool.shutdown();
      this.boundedCopyThreadPool.shutdown();
    } finally {
      super.close();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BufferPool 源码

hadoop ByteBufferInputStream 源码

hadoop ByteBufferOutputStream 源码

hadoop ByteBufferWrapper 源码

hadoop Constants 源码

hadoop CosN 源码

hadoop CosNConfigKeys 源码

hadoop CosNCopyFileContext 源码

hadoop CosNCopyFileTask 源码

hadoop CosNFileReadTask 源码

0  赞