hadoop CosNativeFileSystemStore 源码

  • 2022-10-20
  • 浏览 (444)

haddop CosNativeFileSystemStore 代码

文件路径:/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNativeFileSystemStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.cosn;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import com.qcloud.cos.COSClient;
import com.qcloud.cos.ClientConfig;
import com.qcloud.cos.auth.BasicCOSCredentials;
import com.qcloud.cos.auth.COSCredentials;
import com.qcloud.cos.endpoint.SuffixEndpointBuilder;
import com.qcloud.cos.exception.CosClientException;
import com.qcloud.cos.exception.CosServiceException;
import com.qcloud.cos.http.HttpProtocol;
import com.qcloud.cos.model.AbortMultipartUploadRequest;
import com.qcloud.cos.model.COSObject;
import com.qcloud.cos.model.COSObjectSummary;
import com.qcloud.cos.model.CompleteMultipartUploadRequest;
import com.qcloud.cos.model.CompleteMultipartUploadResult;
import com.qcloud.cos.model.CopyObjectRequest;
import com.qcloud.cos.model.DeleteObjectRequest;
import com.qcloud.cos.model.GetObjectMetadataRequest;
import com.qcloud.cos.model.GetObjectRequest;
import com.qcloud.cos.model.InitiateMultipartUploadRequest;
import com.qcloud.cos.model.InitiateMultipartUploadResult;
import com.qcloud.cos.model.ListObjectsRequest;
import com.qcloud.cos.model.ObjectListing;
import com.qcloud.cos.model.ObjectMetadata;
import com.qcloud.cos.model.PartETag;
import com.qcloud.cos.model.PutObjectRequest;
import com.qcloud.cos.model.PutObjectResult;
import com.qcloud.cos.model.UploadPartRequest;
import com.qcloud.cos.model.UploadPartResult;
import com.qcloud.cos.region.Region;
import com.qcloud.cos.utils.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.cosn.auth.COSCredentialsProviderList;
import org.apache.hadoop.util.VersionInfo;
import org.apache.http.HttpStatus;

/**
 * The class actually performs access operation to the COS blob store.
 * It provides the bridging logic for the Hadoop's abstract filesystem and COS.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
class CosNativeFileSystemStore implements NativeFileSystemStore {
  private COSClient cosClient;
  private String bucketName;
  private int maxRetryTimes;

  public static final Logger LOG =
      LoggerFactory.getLogger(CosNativeFileSystemStore.class);

  /**
   * Initialize the client to access COS blob storage.
   *
   * @param conf Hadoop configuration with COS configuration options.
   * @throws IOException Initialize the COS client failed,
   *                     caused by incorrect options.
   */
  private void initCOSClient(URI uri, Configuration conf) throws IOException {
    COSCredentialsProviderList credentialProviderList =
        CosNUtils.createCosCredentialsProviderSet(uri, conf);
    String region = conf.get(CosNConfigKeys.COSN_REGION_KEY);
    String endpointSuffix = conf.get(
        CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY);
    if (null == region && null == endpointSuffix) {
      String exceptionMsg = String.format("config %s and %s at least one",
          CosNConfigKeys.COSN_REGION_KEY,
          CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY);
      throw new IOException(exceptionMsg);
    }

    COSCredentials cosCred;
    cosCred = new BasicCOSCredentials(
        credentialProviderList.getCredentials().getCOSAccessKeyId(),
        credentialProviderList.getCredentials().getCOSSecretKey());

    boolean useHttps = conf.getBoolean(CosNConfigKeys.COSN_USE_HTTPS_KEY,
        CosNConfigKeys.DEFAULT_USE_HTTPS);

    ClientConfig config;
    if (null == region) {
      config = new ClientConfig(new Region(""));
      config.setEndpointBuilder(new SuffixEndpointBuilder(endpointSuffix));
    } else {
      config = new ClientConfig(new Region(region));
    }
    if (useHttps) {
      config.setHttpProtocol(HttpProtocol.https);
    }

    config.setUserAgent(conf.get(CosNConfigKeys.USER_AGENT,
        CosNConfigKeys.DEFAULT_USER_AGENT) + " For " + " Hadoop "
        + VersionInfo.getVersion());

    this.maxRetryTimes = conf.getInt(CosNConfigKeys.COSN_MAX_RETRIES_KEY,
        CosNConfigKeys.DEFAULT_MAX_RETRIES);

    config.setMaxConnectionsCount(
        conf.getInt(CosNConfigKeys.MAX_CONNECTION_NUM,
            CosNConfigKeys.DEFAULT_MAX_CONNECTION_NUM));

    this.cosClient = new COSClient(cosCred, config);
  }

  /**
   * Initialize the CosNativeFileSystemStore object, including
   * its COS client and default COS bucket.
   *
   * @param uri  The URI of the COS bucket accessed by default.
   * @param conf Hadoop configuration with COS configuration options.
   * @throws IOException Initialize the COS client failed.
   */
  @Override
  public void initialize(URI uri, Configuration conf) throws IOException {
    try {
      initCOSClient(uri, conf);
      this.bucketName = uri.getHost();
    } catch (Exception e) {
      handleException(e, "");
    }
  }

  /**
   * Store a file into COS from the specified input stream, which would be
   * retried until the success or maximum number.
   *
   * @param key         COS object key.
   * @param inputStream Input stream to be uploaded into COS.
   * @param md5Hash     MD5 value of the content to be uploaded.
   * @param length      Length of uploaded content.
   * @throws IOException Upload the file failed.
   */
  private void storeFileWithRetry(String key, InputStream inputStream,
      byte[] md5Hash, long length) throws IOException {
    try {
      ObjectMetadata objectMetadata = new ObjectMetadata();
      objectMetadata.setContentMD5(Base64.encodeAsString(md5Hash));
      objectMetadata.setContentLength(length);
      PutObjectRequest putObjectRequest =
          new PutObjectRequest(bucketName, key, inputStream, objectMetadata);

      PutObjectResult putObjectResult =
          (PutObjectResult) callCOSClientWithRetry(putObjectRequest);
      LOG.debug("Store file successfully. COS key: [{}], ETag: [{}].",
          key, putObjectResult.getETag());
    } catch (Exception e) {
      String errMsg = String.format("Store file failed. COS key: [%s], "
          + "exception: [%s]", key, e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), key);
    }
  }

  /**
   * Store a local file into COS.
   *
   * @param key     COS object key.
   * @param file    The local file to be uploaded.
   * @param md5Hash The MD5 value of the file to be uploaded.
   * @throws IOException Upload the file failed.
   */
  @Override
  public void storeFile(String key, File file, byte[] md5Hash)
      throws IOException {
    LOG.info("Store file from local path: [{}]. file length: [{}] COS key: " +
        "[{}]", file.getCanonicalPath(), file.length(), key);
    storeFileWithRetry(key, new BufferedInputStream(new FileInputStream(file)),
        md5Hash, file.length());
  }

  /**
   * Store a file into COS from the specified input stream.
   *
   * @param key           COS object key.
   * @param inputStream   The Input stream to be uploaded.
   * @param md5Hash       The MD5 value of the content to be uploaded.
   * @param contentLength Length of uploaded content.
   * @throws IOException Upload the file failed.
   */
  @Override
  public void storeFile(
      String key,
      InputStream inputStream,
      byte[] md5Hash,
      long contentLength) throws IOException {
    LOG.info("Store file from input stream. COS key: [{}], "
        + "length: [{}].", key, contentLength);
    storeFileWithRetry(key, inputStream, md5Hash, contentLength);
  }

  // For cos, storeEmptyFile means creating a directory
  @Override
  public void storeEmptyFile(String key) throws IOException {
    if (!key.endsWith(CosNFileSystem.PATH_DELIMITER)) {
      key = key + CosNFileSystem.PATH_DELIMITER;
    }

    ObjectMetadata objectMetadata = new ObjectMetadata();
    objectMetadata.setContentLength(0);
    InputStream input = new ByteArrayInputStream(new byte[0]);
    PutObjectRequest putObjectRequest =
        new PutObjectRequest(bucketName, key, input, objectMetadata);
    try {
      PutObjectResult putObjectResult =
          (PutObjectResult) callCOSClientWithRetry(putObjectRequest);
      LOG.debug("Store empty file successfully. COS key: [{}], ETag: [{}].",
          key, putObjectResult.getETag());
    } catch (Exception e) {
      String errMsg = String.format("Store empty file failed. "
          + "COS key: [%s], exception: [%s]", key, e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), key);
    }
  }

  public PartETag uploadPart(File file, String key, String uploadId,
      int partNum) throws IOException {
    InputStream inputStream = new FileInputStream(file);
    try {
      return uploadPart(inputStream, key, uploadId, partNum, file.length());
    } finally {
      inputStream.close();
    }
  }

  @Override
  public PartETag uploadPart(InputStream inputStream, String key,
      String uploadId, int partNum, long partSize) throws IOException {
    UploadPartRequest uploadPartRequest = new UploadPartRequest();
    uploadPartRequest.setBucketName(this.bucketName);
    uploadPartRequest.setUploadId(uploadId);
    uploadPartRequest.setInputStream(inputStream);
    uploadPartRequest.setPartNumber(partNum);
    uploadPartRequest.setPartSize(partSize);
    uploadPartRequest.setKey(key);

    try {
      UploadPartResult uploadPartResult =
          (UploadPartResult) callCOSClientWithRetry(uploadPartRequest);
      return uploadPartResult.getPartETag();
    } catch (Exception e) {
      String errMsg = String.format("Current thread: [%d], COS key: [%s], "
              + "upload id: [%s], part num: [%d], exception: [%s]",
          Thread.currentThread().getId(), key, uploadId, partNum, e.toString());
      handleException(new Exception(errMsg), key);
    }

    return null;
  }

  public void abortMultipartUpload(String key, String uploadId) {
    LOG.info("Abort the multipart upload. COS key: [{}], upload id: [{}].",
        key, uploadId);
    AbortMultipartUploadRequest abortMultipartUploadRequest =
        new AbortMultipartUploadRequest(bucketName, key, uploadId);
    cosClient.abortMultipartUpload(abortMultipartUploadRequest);
  }

  /**
   * Initialize a multipart upload and return the upload id.
   *
   * @param key The COS object key initialized to multipart upload.
   * @return The multipart upload id.
   */
  public String getUploadId(String key) {
    if (null == key || key.length() == 0) {
      return "";
    }

    LOG.info("Initiate a multipart upload. bucket: [{}], COS key: [{}].",
        bucketName, key);
    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
        new InitiateMultipartUploadRequest(bucketName, key);
    InitiateMultipartUploadResult initiateMultipartUploadResult =
        cosClient.initiateMultipartUpload(initiateMultipartUploadRequest);
    return initiateMultipartUploadResult.getUploadId();
  }

  /**
   * Finish a multipart upload process, which will merge all parts uploaded.
   *
   * @param key          The COS object key to be finished.
   * @param uploadId     The upload id of the multipart upload to be finished.
   * @param partETagList The etag list of the part that has been uploaded.
   * @return The result object of completing the multipart upload process.
   */
  public CompleteMultipartUploadResult completeMultipartUpload(
      String key, String uploadId, List<PartETag> partETagList) {
    Collections.sort(partETagList, new Comparator<PartETag>() {
      @Override
      public int compare(PartETag o1, PartETag o2) {
        return o1.getPartNumber() - o2.getPartNumber();
      }
    });
    LOG.info("Complete the multipart upload. bucket: [{}], COS key: [{}], "
        + "upload id: [{}].", bucketName, key, uploadId);
    CompleteMultipartUploadRequest completeMultipartUploadRequest =
        new CompleteMultipartUploadRequest(
            bucketName, key, uploadId, partETagList);
    return cosClient.completeMultipartUpload(completeMultipartUploadRequest);
  }

  private FileMetadata queryObjectMetadata(String key) throws IOException {
    GetObjectMetadataRequest getObjectMetadataRequest =
        new GetObjectMetadataRequest(bucketName, key);
    try {
      ObjectMetadata objectMetadata =
          (ObjectMetadata) callCOSClientWithRetry(getObjectMetadataRequest);
      long mtime = 0;
      if (objectMetadata.getLastModified() != null) {
        mtime = objectMetadata.getLastModified().getTime();
      }
      long fileSize = objectMetadata.getContentLength();
      FileMetadata fileMetadata = new FileMetadata(key, fileSize, mtime,
          !key.endsWith(CosNFileSystem.PATH_DELIMITER));
      LOG.debug("Retrieve file metadata. COS key: [{}], ETag: [{}], "
              + "length: [{}].", key, objectMetadata.getETag(),
          objectMetadata.getContentLength());
      return fileMetadata;
    } catch (CosServiceException e) {
      if (e.getStatusCode() != HttpStatus.SC_NOT_FOUND) {
        String errorMsg = String.format("Retrieve file metadata file failed. "
            + "COS key: [%s], CosServiceException: [%s].", key, e.toString());
        LOG.error(errorMsg);
        handleException(new Exception(errorMsg), key);
      }
    }
    return null;
  }

  @Override
  public FileMetadata retrieveMetadata(String key) throws IOException {
    if (key.endsWith(CosNFileSystem.PATH_DELIMITER)) {
      key = key.substring(0, key.length() - 1);
    }

    if (!key.isEmpty()) {
      FileMetadata fileMetadata = queryObjectMetadata(key);
      if (fileMetadata != null) {
        return fileMetadata;
      }
    }

    // If the key is a directory.
    key = key + CosNFileSystem.PATH_DELIMITER;
    return queryObjectMetadata(key);
  }

  /**
   * Download a COS object and return the input stream associated with it.
   *
   * @param key The object key that is being retrieved from the COS bucket
   * @return This method returns null if the key is not found
   * @throws IOException if failed to download.
   */
  @Override
  public InputStream retrieve(String key) throws IOException {
    LOG.debug("Retrieve object key: [{}].", key);
    GetObjectRequest getObjectRequest =
        new GetObjectRequest(this.bucketName, key);
    try {
      COSObject cosObject =
          (COSObject) callCOSClientWithRetry(getObjectRequest);
      return cosObject.getObjectContent();
    } catch (Exception e) {
      String errMsg = String.format("Retrieving key: [%s] occurs "
          + "an exception: [%s].", key, e.toString());
      LOG.error("Retrieving COS key: [{}] occurs an exception: [{}].", key, e);
      handleException(new Exception(errMsg), key);
    }
    // never will get here
    return null;
  }

  /**
   * Retrieved a part of a COS object, which is specified the start position.
   *
   * @param key            The object key that is being retrieved from
   *                       the COS bucket.
   * @param byteRangeStart The start position of the part to be retrieved in
   *                       the object.
   * @return The input stream associated with the retrieved object.
   * @throws IOException if failed to retrieve.
   */
  @Override
  public InputStream retrieve(String key, long byteRangeStart)
      throws IOException {
    try {
      LOG.debug("Retrieve COS key:[{}]. range start:[{}].",
          key, byteRangeStart);
      long fileSize = getFileLength(key);
      long byteRangeEnd = fileSize - 1;
      GetObjectRequest getObjectRequest =
          new GetObjectRequest(this.bucketName, key);
      if (byteRangeEnd >= byteRangeStart) {
        getObjectRequest.setRange(byteRangeStart, fileSize - 1);
      }
      COSObject cosObject =
          (COSObject) callCOSClientWithRetry(getObjectRequest);
      return cosObject.getObjectContent();
    } catch (Exception e) {
      String errMsg =
          String.format("Retrieving COS key: [%s] occurs an exception. " +
                  "byte range start: [%s], exception: [%s].",
              key, byteRangeStart, e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), key);
    }

    // never will get here
    return null;
  }

  /**
   * Download a part of a COS object, which is specified the start and
   * end position.
   *
   * @param key            The object key that is being downloaded
   * @param byteRangeStart The start position of the part to be retrieved in
   *                       the object.
   * @param byteRangeEnd   The end position of the part to be retrieved in
   *                       the object.
   * @return The input stream associated with the retrieved objects.
   * @throws IOException If failed to retrieve.
   */
  @Override
  public InputStream retrieveBlock(String key, long byteRangeStart,
      long byteRangeEnd) throws IOException {
    try {
      GetObjectRequest request = new GetObjectRequest(this.bucketName, key);
      request.setRange(byteRangeStart, byteRangeEnd);
      COSObject cosObject = (COSObject) this.callCOSClientWithRetry(request);
      return cosObject.getObjectContent();
    } catch (CosServiceException e) {
      String errMsg =
          String.format("Retrieving key [%s] with byteRangeStart [%d] occurs " +
                  "an CosServiceException: [%s].",
              key, byteRangeStart, e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), key);
      return null;
    } catch (CosClientException e) {
      String errMsg =
          String.format("Retrieving key [%s] with byteRangeStart [%d] "
                  + "occurs an exception: [%s].",
              key, byteRangeStart, e.toString());
      LOG.error("Retrieving COS key: [{}] with byteRangeStart: [{}] " +
          "occurs an exception: [{}].", key, byteRangeStart, e);
      handleException(new Exception(errMsg), key);
    }

    return null;
  }

  @Override
  public PartialListing list(String prefix, int maxListingLength)
      throws IOException {
    return list(prefix, maxListingLength, null, false);
  }

  @Override
  public PartialListing list(String prefix, int maxListingLength,
      String priorLastKey, boolean recurse) throws IOException {
    return list(prefix, recurse ? null : CosNFileSystem.PATH_DELIMITER,
        maxListingLength, priorLastKey);
  }

  /**
   * List the metadata for all objects that
   * the object key has the specified prefix.
   *
   * @param prefix           The prefix to be listed.
   * @param delimiter        The delimiter is a sign, the same paths between
   *                         are listed.
   * @param maxListingLength The maximum number of listed entries.
   * @param priorLastKey     The last key in any previous search.
   * @return A metadata list on the match.
   * @throws IOException If list objects failed.
   */
  private PartialListing list(String prefix, String delimiter,
      int maxListingLength, String priorLastKey) throws IOException {
    LOG.debug("List objects. prefix: [{}], delimiter: [{}], " +
            "maxListLength: [{}], priorLastKey: [{}].",
        prefix, delimiter, maxListingLength, priorLastKey);

    if (!prefix.startsWith(CosNFileSystem.PATH_DELIMITER)) {
      prefix += CosNFileSystem.PATH_DELIMITER;
    }
    ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
    listObjectsRequest.setBucketName(bucketName);
    listObjectsRequest.setPrefix(prefix);
    listObjectsRequest.setDelimiter(delimiter);
    listObjectsRequest.setMarker(priorLastKey);
    listObjectsRequest.setMaxKeys(maxListingLength);
    ObjectListing objectListing = null;
    try {
      objectListing =
          (ObjectListing) callCOSClientWithRetry(listObjectsRequest);
    } catch (Exception e) {
      String errMsg = String.format("prefix: [%s], delimiter: [%s], "
              + "maxListingLength: [%d], priorLastKey: [%s]. "
              + "List objects occur an exception: [%s].", prefix,
          (delimiter == null) ? "" : delimiter, maxListingLength, priorLastKey,
          e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), prefix);
    }
    ArrayList<FileMetadata> fileMetadataArray = new ArrayList<>();
    ArrayList<FileMetadata> commonPrefixArray = new ArrayList<>();

    if (null == objectListing) {
      String errMsg = String.format("List the prefix: [%s] failed. " +
              "delimiter: [%s], max listing length:" +
              " [%s], prior last key: [%s]",
          prefix, delimiter, maxListingLength, priorLastKey);
      handleException(new Exception(errMsg), prefix);
    }

    List<COSObjectSummary> summaries = objectListing.getObjectSummaries();
    for (COSObjectSummary cosObjectSummary : summaries) {
      String filePath = cosObjectSummary.getKey();
      if (!filePath.startsWith(CosNFileSystem.PATH_DELIMITER)) {
        filePath = CosNFileSystem.PATH_DELIMITER + filePath;
      }
      if (filePath.equals(prefix)) {
        continue;
      }
      long mtime = 0;
      if (cosObjectSummary.getLastModified() != null) {
        mtime = cosObjectSummary.getLastModified().getTime();
      }
      long fileLen = cosObjectSummary.getSize();
      fileMetadataArray.add(
          new FileMetadata(filePath, fileLen, mtime, true));
    }
    List<String> commonPrefixes = objectListing.getCommonPrefixes();
    for (String commonPrefix : commonPrefixes) {
      if (!commonPrefix.startsWith(CosNFileSystem.PATH_DELIMITER)) {
        commonPrefix = CosNFileSystem.PATH_DELIMITER + commonPrefix;
      }
      commonPrefixArray.add(
          new FileMetadata(commonPrefix, 0, 0, false));
    }

    FileMetadata[] fileMetadata = new FileMetadata[fileMetadataArray.size()];
    for (int i = 0; i < fileMetadataArray.size(); ++i) {
      fileMetadata[i] = fileMetadataArray.get(i);
    }
    FileMetadata[] commonPrefixMetaData =
        new FileMetadata[commonPrefixArray.size()];
    for (int i = 0; i < commonPrefixArray.size(); ++i) {
      commonPrefixMetaData[i] = commonPrefixArray.get(i);
    }
    // when truncated is false, it means that listing is finished.
    if (!objectListing.isTruncated()) {
      return new PartialListing(
          null, fileMetadata, commonPrefixMetaData);
    } else {
      return new PartialListing(
          objectListing.getNextMarker(), fileMetadata, commonPrefixMetaData);
    }
  }

  @Override
  public void delete(String key) throws IOException {
    LOG.debug("Delete object key: [{}] from bucket: {}.", key, this.bucketName);
    try {
      DeleteObjectRequest deleteObjectRequest =
          new DeleteObjectRequest(bucketName, key);
      callCOSClientWithRetry(deleteObjectRequest);
    } catch (Exception e) {
      String errMsg =
          String.format("Delete key: [%s] occurs an exception: [%s].",
              key, e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), key);
    }
  }

  public void rename(String srcKey, String dstKey) throws IOException {
    LOG.debug("Rename source key: [{}] to dest key: [{}].", srcKey, dstKey);
    try {
      CopyObjectRequest copyObjectRequest =
          new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey);
      callCOSClientWithRetry(copyObjectRequest);
      DeleteObjectRequest deleteObjectRequest =
          new DeleteObjectRequest(bucketName, srcKey);
      callCOSClientWithRetry(deleteObjectRequest);
    } catch (Exception e) {
      String errMsg = String.format("Rename object unsuccessfully. "
              + "source cos key: [%s], dest COS " +
              "key: [%s], exception: [%s]",
          srcKey,
          dstKey, e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), srcKey);
    }
  }

  @Override
  public void copy(String srcKey, String dstKey) throws IOException {
    LOG.debug("Copy source key: [{}] to dest key: [{}].", srcKey, dstKey);
    try {
      CopyObjectRequest copyObjectRequest =
          new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey);
      callCOSClientWithRetry(copyObjectRequest);
    } catch (Exception e) {
      String errMsg = String.format("Copy object unsuccessfully. "
              + "source COS key: %s, dest COS key: " +
              "%s, exception: %s",
          srcKey,
          dstKey, e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), srcKey);
    }
  }

  @Override
  public void purge(String prefix) throws IOException {
    throw new IOException("purge not supported");
  }

  @Override
  public void dump() throws IOException {
    throw new IOException("dump not supported");
  }

  // process Exception and print detail
  private void handleException(Exception e, String key) throws IOException {
    String cosPath = CosNFileSystem.SCHEME + "://" + bucketName + key;
    String exceptInfo = String.format("%s : %s", cosPath, e.toString());
    throw new IOException(exceptInfo);
  }

  @Override
  public long getFileLength(String key) throws IOException {
    LOG.debug("Get file length. COS key: {}", key);
    GetObjectMetadataRequest getObjectMetadataRequest =
        new GetObjectMetadataRequest(bucketName, key);
    try {
      ObjectMetadata objectMetadata =
          (ObjectMetadata) callCOSClientWithRetry(getObjectMetadataRequest);
      return objectMetadata.getContentLength();
    } catch (Exception e) {
      String errMsg = String.format("Getting file length occurs an exception." +
              "COS key: %s, exception: %s", key,
          e.toString());
      LOG.error(errMsg);
      handleException(new Exception(errMsg), key);
      return 0; // never will get here
    }
  }

  private <X> Object callCOSClientWithRetry(X request)
      throws CosServiceException, IOException {
    String sdkMethod = "";
    int retryIndex = 1;
    while (true) {
      try {
        if (request instanceof PutObjectRequest) {
          sdkMethod = "putObject";
          return this.cosClient.putObject((PutObjectRequest) request);
        } else if (request instanceof UploadPartRequest) {
          sdkMethod = "uploadPart";
          if (((UploadPartRequest) request).getInputStream()
              instanceof ByteBufferInputStream) {
            ((UploadPartRequest) request).getInputStream()
                .mark((int) ((UploadPartRequest) request).getPartSize());
          }
          return this.cosClient.uploadPart((UploadPartRequest) request);
        } else if (request instanceof GetObjectMetadataRequest) {
          sdkMethod = "queryObjectMeta";
          return this.cosClient.getObjectMetadata(
              (GetObjectMetadataRequest) request);
        } else if (request instanceof DeleteObjectRequest) {
          sdkMethod = "deleteObject";
          this.cosClient.deleteObject((DeleteObjectRequest) request);
          return new Object();
        } else if (request instanceof CopyObjectRequest) {
          sdkMethod = "copyFile";
          return this.cosClient.copyObject((CopyObjectRequest) request);
        } else if (request instanceof GetObjectRequest) {
          sdkMethod = "getObject";
          return this.cosClient.getObject((GetObjectRequest) request);
        } else if (request instanceof ListObjectsRequest) {
          sdkMethod = "listObjects";
          return this.cosClient.listObjects((ListObjectsRequest) request);
        } else {
          throw new IOException("no such method");
        }
      } catch (CosServiceException cse) {
        String errMsg = String.format("Call cos sdk failed, "
                + "retryIndex: [%d / %d], "
                + "call method: %s, exception: %s",
            retryIndex, this.maxRetryTimes, sdkMethod, cse.toString());
        int statusCode = cse.getStatusCode();
        // Retry all server errors
        if (statusCode / 100 == 5) {
          if (retryIndex <= this.maxRetryTimes) {
            LOG.info(errMsg);
            long sleepLeast = retryIndex * 300L;
            long sleepBound = retryIndex * 500L;
            try {
              if (request instanceof UploadPartRequest) {
                if (((UploadPartRequest) request).getInputStream()
                    instanceof ByteBufferInputStream) {
                  ((UploadPartRequest) request).getInputStream().reset();
                }
              }
              Thread.sleep(
                  ThreadLocalRandom.current().nextLong(sleepLeast, sleepBound));
              ++retryIndex;
            } catch (InterruptedException e) {
              throw new IOException(e.toString());
            }
          } else {
            LOG.error(errMsg);
            throw new IOException(errMsg);
          }
        } else {
          throw cse;
        }
      } catch (Exception e) {
        String errMsg = String.format("Call cos sdk failed, "
            + "call method: %s, exception: %s", sdkMethod, e.toString());
        LOG.error(errMsg);
        throw new IOException(errMsg);
      }
    }
  }

  @Override
  public void close() {
    if (null != this.cosClient) {
      this.cosClient.shutdown();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BufferPool 源码

hadoop ByteBufferInputStream 源码

hadoop ByteBufferOutputStream 源码

hadoop ByteBufferWrapper 源码

hadoop Constants 源码

hadoop CosN 源码

hadoop CosNConfigKeys 源码

hadoop CosNCopyFileContext 源码

hadoop CosNCopyFileTask 源码

hadoop CosNFileReadTask 源码

0  赞