hadoop AliyunOSSFileSystemStore 源码

  • 2022-10-20
  • 浏览 (112)

haddop AliyunOSSFileSystemStore 代码

文件路径:/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.aliyun.oss;

import com.aliyun.oss.ClientConfiguration;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.comm.Protocol;
import com.aliyun.oss.model.AbortMultipartUploadRequest;
import com.aliyun.oss.model.CannedAccessControlList;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
import com.aliyun.oss.model.CompleteMultipartUploadResult;
import com.aliyun.oss.model.CopyObjectResult;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.DeleteObjectsResult;
import com.aliyun.oss.model.GenericRequest;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.ListObjectsV2Request;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.PutObjectResult;
import com.aliyun.oss.model.UploadPartCopyRequest;
import com.aliyun.oss.model.UploadPartCopyResult;
import com.aliyun.oss.model.UploadPartRequest;
import com.aliyun.oss.model.UploadPartResult;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;

import static org.apache.hadoop.fs.aliyun.oss.Constants.*;

/**
 * Core implementation of Aliyun OSS Filesystem for Hadoop.
 * Provides the bridging logic between Hadoop's abstract filesystem and
 * Aliyun OSS.
 */
public class AliyunOSSFileSystemStore {
  public static final Logger LOG =
      LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
  private String username;
  private FileSystem.Statistics statistics;
  private OSSClient ossClient;
  private String bucketName;
  private long uploadPartSize;
  private int maxKeys;
  private String serverSideEncryptionAlgorithm;
  private boolean useListV1;

  public void initialize(URI uri, Configuration conf, String user,
                         FileSystem.Statistics stat) throws IOException {
    this.username = user;
    statistics = stat;
    ClientConfiguration clientConf = new ClientConfiguration();
    clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
        MAXIMUM_CONNECTIONS_DEFAULT));
    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
        SECURE_CONNECTIONS_DEFAULT);
    clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
    clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
        MAX_ERROR_RETRIES_DEFAULT));
    clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
        ESTABLISH_TIMEOUT_DEFAULT));
    clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
        SOCKET_TIMEOUT_DEFAULT));
    clientConf.setUserAgent(
        conf.get(USER_AGENT_PREFIX, USER_AGENT_PREFIX_DEFAULT) + ", Hadoop/"
            + VersionInfo.getVersion());

    String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
    int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
    if (StringUtils.isNotEmpty(proxyHost)) {
      clientConf.setProxyHost(proxyHost);
      if (proxyPort >= 0) {
        clientConf.setProxyPort(proxyPort);
      } else {
        if (secureConnections) {
          LOG.warn("Proxy host set without port. Using HTTPS default 443");
          clientConf.setProxyPort(443);
        } else {
          LOG.warn("Proxy host set without port. Using HTTP default 80");
          clientConf.setProxyPort(80);
        }
      }
      String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
      String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
      if ((proxyUsername == null) != (proxyPassword == null)) {
        String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
            PROXY_PASSWORD_KEY + " set without the other.";
        LOG.error(msg);
        throw new IllegalArgumentException(msg);
      }
      clientConf.setProxyUsername(proxyUsername);
      clientConf.setProxyPassword(proxyPassword);
      clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
      clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
    } else if (proxyPort >= 0) {
      String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
          PROXY_HOST_KEY;
      LOG.error(msg);
      throw new IllegalArgumentException(msg);
    }

    String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
    if (StringUtils.isEmpty(endPoint)) {
      throw new IllegalArgumentException("Aliyun OSS endpoint should not be " +
          "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
    }
    CredentialsProvider provider =
        AliyunOSSUtils.getCredentialsProvider(uri, conf);
    ossClient = new OSSClient(endPoint, provider, clientConf);
    uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
        MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);

    serverSideEncryptionAlgorithm =
        conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");

    bucketName = uri.getHost();

    String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
    if (StringUtils.isNotEmpty(cannedACLName)) {
      CannedAccessControlList cannedACL =
          CannedAccessControlList.valueOf(cannedACLName);
      ossClient.setBucketAcl(bucketName, cannedACL);
      statistics.incrementWriteOps(1);
    }

    maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
    int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
    if (listVersion < 1 || listVersion > 2) {
      LOG.warn("Configured fs.oss.list.version {} is invalid, forcing " +
          "version 2", listVersion);
    }
    useListV1 = (listVersion == 1);
  }

  /**
   * Delete an object, and update write operation statistics.
   *
   * @param key key to blob to delete.
   */
  public void deleteObject(String key) {
    ossClient.deleteObject(bucketName, key);
    statistics.incrementWriteOps(1);
  }

  /**
   * Delete a list of keys, and update write operation statistics.
   *
   * @param keysToDelete collection of keys to delete.
   * @throws IOException if failed to delete objects.
   */
  public void deleteObjects(List<String> keysToDelete) throws IOException {
    if (CollectionUtils.isEmpty(keysToDelete)) {
      LOG.warn("Keys to delete is empty.");
      return;
    }

    int retry = 10;
    int tries = 0;
    List<String> deleteFailed = keysToDelete;
    while(CollectionUtils.isNotEmpty(deleteFailed)) {
      DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName);
      deleteRequest.setKeys(deleteFailed);
      // There are two modes to do batch delete:
      // 1. detail mode: DeleteObjectsResult.getDeletedObjects returns objects
      // which were deleted successfully.
      // 2. simple mode: DeleteObjectsResult.getDeletedObjects returns objects
      // which were deleted unsuccessfully.
      // Here, we choose the simple mode to do batch delete.
      deleteRequest.setQuiet(true);
      DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest);
      statistics.incrementWriteOps(1);
      deleteFailed = result.getDeletedObjects();
      tries++;
      if (tries == retry) {
        break;
      }
    }

    if (tries == retry && CollectionUtils.isNotEmpty(deleteFailed)) {
      // Most of time, it is impossible to try 10 times, expect the
      // Aliyun OSS service problems.
      throw new IOException("Failed to delete Aliyun OSS objects for " +
          tries + " times.");
    }
  }

  /**
   * Delete a directory from Aliyun OSS.
   *
   * @param key directory key to delete.
   * @throws IOException if failed to delete directory.
   */
  public void deleteDirs(String key) throws IOException {
    OSSListRequest listRequest = createListObjectsRequest(key,
        maxKeys, null, null, true);
    while (true) {
      OSSListResult objects = listObjects(listRequest);
      statistics.incrementReadOps(1);
      List<String> keysToDelete = new ArrayList<String>();
      for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
        keysToDelete.add(objectSummary.getKey());
      }
      deleteObjects(keysToDelete);
      if (objects.isTruncated()) {
        if (objects.isV1()) {
          listRequest.getV1().setMarker(objects.getV1().getNextMarker());
        } else {
          listRequest.getV2().setContinuationToken(
              objects.getV2().getNextContinuationToken());
        }
      } else {
        break;
      }
    }
  }

  /**
   * Return metadata of a given object key.
   *
   * @param key object key.
   * @return return null if key does not exist.
   */
  public ObjectMetadata getObjectMetadata(String key) {
    try {
      GenericRequest request = new GenericRequest(bucketName, key);
      request.setLogEnabled(false);
      ObjectMetadata objectMeta = ossClient.getObjectMetadata(request);
      statistics.incrementReadOps(1);
      return objectMeta;
    } catch (OSSException osse) {
      LOG.debug("Exception thrown when get object meta: "
              + key + ", exception: " + osse);
      return null;
    }
  }

  /**
   * Upload an empty file as an OSS object, using single upload.
   *
   * @param key object key.
   * @throws IOException if failed to upload object.
   */
  public void storeEmptyFile(String key) throws IOException {
    ObjectMetadata dirMeta = new ObjectMetadata();
    byte[] buffer = new byte[0];
    ByteArrayInputStream in = new ByteArrayInputStream(buffer);
    dirMeta.setContentLength(0);
    try {
      ossClient.putObject(bucketName, key, in, dirMeta);
      statistics.incrementWriteOps(1);
    } finally {
      in.close();
    }
  }

  /**
   * Copy an object from source key to destination key.
   *
   * @param srcKey source key.
   * @param srcLen source file length.
   * @param dstKey destination key.
   * @return true if file is successfully copied.
   */
  public boolean copyFile(String srcKey, long srcLen, String dstKey) {
    try {
      //1, try single copy first
      return singleCopy(srcKey, dstKey);
    } catch (Exception e) {
      //2, if failed(shallow copy not supported), then multi part copy
      LOG.debug("Exception thrown when copy file: " + srcKey
          + ", exception: " + e + ", use multipartCopy instead");
      return multipartCopy(srcKey, srcLen, dstKey);
    }
  }

  /**
   * Use single copy to copy an OSS object.
   * (The caller should make sure srcPath is a file and dstPath is valid)
   *
   * @param srcKey source key.
   * @param dstKey destination key.
   * @return true if object is successfully copied.
   */
  private boolean singleCopy(String srcKey, String dstKey) {
    CopyObjectResult copyResult =
        ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
    statistics.incrementWriteOps(1);
    LOG.debug(copyResult.getETag());
    return true;
  }

  /**
   * Use multipart copy to copy an OSS object.
   * (The caller should make sure srcPath is a file and dstPath is valid)
   *
   * @param srcKey source key.
   * @param contentLength data size of the object to copy.
   * @param dstKey destination key.
   * @return true if success, or false if upload is aborted.
   */
  private boolean multipartCopy(String srcKey, long contentLength,
      String dstKey) {
    long realPartSize =
        AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize);
    int partNum = (int) (contentLength / realPartSize);
    if (contentLength % realPartSize != 0) {
      partNum++;
    }
    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
        new InitiateMultipartUploadRequest(bucketName, dstKey);
    ObjectMetadata meta = new ObjectMetadata();
    if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
    }
    initiateMultipartUploadRequest.setObjectMetadata(meta);
    InitiateMultipartUploadResult initiateMultipartUploadResult =
        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
    String uploadId = initiateMultipartUploadResult.getUploadId();
    List<PartETag> partETags = new ArrayList<PartETag>();
    try {
      for (int i = 0; i < partNum; i++) {
        long skipBytes = realPartSize * i;
        long size = (realPartSize < contentLength - skipBytes) ?
            realPartSize : contentLength - skipBytes;
        UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
        partCopyRequest.setSourceBucketName(bucketName);
        partCopyRequest.setSourceKey(srcKey);
        partCopyRequest.setBucketName(bucketName);
        partCopyRequest.setKey(dstKey);
        partCopyRequest.setUploadId(uploadId);
        partCopyRequest.setPartSize(size);
        partCopyRequest.setBeginIndex(skipBytes);
        partCopyRequest.setPartNumber(i + 1);
        UploadPartCopyResult partCopyResult =
            ossClient.uploadPartCopy(partCopyRequest);
        statistics.incrementWriteOps(1);
        statistics.incrementBytesWritten(size);
        partETags.add(partCopyResult.getPartETag());
      }
      CompleteMultipartUploadRequest completeMultipartUploadRequest =
          new CompleteMultipartUploadRequest(bucketName, dstKey,
              uploadId, partETags);
      CompleteMultipartUploadResult completeMultipartUploadResult =
          ossClient.completeMultipartUpload(completeMultipartUploadRequest);
      LOG.debug(completeMultipartUploadResult.getETag());
      return true;
    } catch (OSSException | ClientException e) {
      AbortMultipartUploadRequest abortMultipartUploadRequest =
          new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
      ossClient.abortMultipartUpload(abortMultipartUploadRequest);
      return false;
    }
  }

  /**
   * Upload a file as an OSS object, using single upload.
   *
   * @param key object key.
   * @param file local file to upload.
   * @throws IOException if failed to upload object.
   */
  public void uploadObject(String key, File file) throws IOException {
    File object = file.getAbsoluteFile();
    FileInputStream fis = new FileInputStream(object);
    ObjectMetadata meta = new ObjectMetadata();
    meta.setContentLength(object.length());
    if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
    }
    try {
      PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
      LOG.debug(result.getETag());
      statistics.incrementWriteOps(1);
    } finally {
      fis.close();
    }
  }

  /**
   * list objects.
   *
   * @param listRequest list request.
   * @return a list of matches.
   */
  public OSSListResult listObjects(OSSListRequest listRequest) {
    OSSListResult listResult;
    if (listRequest.isV1()) {
      listResult = OSSListResult.v1(
          ossClient.listObjects(listRequest.getV1()));
    } else {
      listResult = OSSListResult.v2(
          ossClient.listObjectsV2(listRequest.getV2()));
    }
    statistics.incrementReadOps(1);
    return listResult;
  }

  /**
   * continue to list objects depends on previous list result.
   *
   * @param listRequest list request.
   * @param preListResult previous list result.
   * @return a list of matches.
   */
  public OSSListResult continueListObjects(OSSListRequest listRequest,
      OSSListResult preListResult) {
    OSSListResult listResult;
    if (listRequest.isV1()) {
      listRequest.getV1().setMarker(preListResult.getV1().getNextMarker());
      listResult = OSSListResult.v1(
          ossClient.listObjects(listRequest.getV1()));
    } else {
      listRequest.getV2().setContinuationToken(
          preListResult.getV2().getNextContinuationToken());
      listResult = OSSListResult.v2(
          ossClient.listObjectsV2(listRequest.getV2()));
    }
    statistics.incrementReadOps(1);
    return listResult;
  }

  /**
   * create list objects request.
   *
   * @param prefix prefix.
   * @param maxListingLength max no. of entries
   * @param marker last key in any previous search.
   * @param continuationToken list from a specific point.
   * @param recursive whether to list directory recursively.
   * @return a list of matches.
   */
  protected OSSListRequest createListObjectsRequest(String prefix,
      int maxListingLength, String marker,
      String continuationToken, boolean recursive) {
    String delimiter = recursive ? null : "/";
    prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
    if (useListV1) {
      ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
      listRequest.setPrefix(prefix);
      listRequest.setDelimiter(delimiter);
      listRequest.setMaxKeys(maxListingLength);
      listRequest.setMarker(marker);
      return OSSListRequest.v1(listRequest);
    } else {
      ListObjectsV2Request listV2Request = new ListObjectsV2Request(bucketName);
      listV2Request.setPrefix(prefix);
      listV2Request.setDelimiter(delimiter);
      listV2Request.setMaxKeys(maxListingLength);
      listV2Request.setContinuationToken(continuationToken);
      return OSSListRequest.v2(listV2Request);
    }
  }

  /**
   * Retrieve a part of an object.
   *
   * @param key the object name that is being retrieved from the Aliyun OSS.
   * @param byteStart start position.
   * @param byteEnd end position.
   * @return This method returns null if the key is not found.
   */
  public InputStream retrieve(String key, long byteStart, long byteEnd) {
    try {
      GetObjectRequest request = new GetObjectRequest(bucketName, key);
      request.setRange(byteStart, byteEnd);
      InputStream in = ossClient.getObject(request).getObjectContent();
      statistics.incrementReadOps(1);
      return in;
    } catch (OSSException | ClientException e) {
      LOG.error("Exception thrown when store retrieves key: "
              + key + ", exception: " + e);
      return null;
    }
  }

  /**
   * Close OSS client properly.
   */
  public void close() {
    if (ossClient != null) {
      ossClient.shutdown();
      ossClient = null;
    }
  }

  /**
   * Clean up all objects matching the prefix.
   *
   * @param prefix Aliyun OSS object prefix.
   * @throws IOException if failed to clean up objects.
   */
  public void purge(String prefix) throws IOException {
    deleteDirs(prefix);
  }

  public RemoteIterator<LocatedFileStatus> singleStatusRemoteIterator(
      final FileStatus fileStatus, final BlockLocation[] locations) {
    return new RemoteIterator<LocatedFileStatus>() {
      private boolean hasNext = true;
      @Override
      public boolean hasNext() throws IOException {
        return fileStatus != null && hasNext;
      }

      @Override
      public LocatedFileStatus next() throws IOException {
        if (hasNext()) {
          LocatedFileStatus s = new LocatedFileStatus(fileStatus,
              fileStatus.isFile() ? locations : null);
          hasNext = false;
          return s;
        } else {
          throw new NoSuchElementException();
        }
      }
    };
  }

  public RemoteIterator<LocatedFileStatus> createLocatedFileStatusIterator(
      final String prefix, final int maxListingLength, FileSystem fs,
      PathFilter filter, FileStatusAcceptor acceptor, boolean recursive) {
    return new RemoteIterator<LocatedFileStatus>() {
      private boolean firstListing = true;
      private boolean meetEnd = false;
      private ListIterator<FileStatus> batchIterator;
      private OSSListRequest listRequest = null;

      @Override
      public boolean hasNext() throws IOException {
        if (firstListing) {
          requestNextBatch();
          firstListing = false;
        }
        return batchIterator.hasNext() || requestNextBatch();
      }

      @Override
      public LocatedFileStatus next() throws IOException {
        if (hasNext()) {
          FileStatus status = batchIterator.next();
          BlockLocation[] locations = fs.getFileBlockLocations(status,
              0, status.getLen());
          return new LocatedFileStatus(
              status, status.isFile() ? locations : null);
        } else {
          throw new NoSuchElementException();
        }
      }

      private boolean requestNextBatch() {
        while (!meetEnd) {
          if (continueListStatus()) {
            return true;
          }
        }

        return false;
      }

      private boolean continueListStatus() {
        if (meetEnd) {
          return false;
        }
        if (listRequest == null) {
          listRequest = createListObjectsRequest(prefix,
              maxListingLength, null, null, recursive);
        }
        OSSListResult listing = listObjects(listRequest);
        List<FileStatus> stats = new ArrayList<>(
            listing.getObjectSummaries().size() +
            listing.getCommonPrefixes().size());
        for (OSSObjectSummary summary : listing.getObjectSummaries()) {
          String key = summary.getKey();
          Path path = fs.makeQualified(new Path("/" + key));
          if (filter.accept(path) && acceptor.accept(path, summary)) {
            FileStatus status = new OSSFileStatus(summary.getSize(),
                key.endsWith("/"), 1, fs.getDefaultBlockSize(path),
                summary.getLastModified().getTime(), path, username);
            stats.add(status);
          }
        }

        for (String commonPrefix : listing.getCommonPrefixes()) {
          Path path = fs.makeQualified(new Path("/" + commonPrefix));
          if (filter.accept(path) && acceptor.accept(path, commonPrefix)) {
            FileStatus status = new OSSFileStatus(0, true, 1, 0, 0,
                path, username);
            stats.add(status);
          }
        }

        batchIterator = stats.listIterator();
        if (listing.isTruncated()) {
          if (listing.isV1()) {
            listRequest.getV1().setMarker(listing.getV1().getNextMarker());
          } else {
            listRequest.getV2().setContinuationToken(
                listing.getV2().getNextContinuationToken());
          }
        } else {
          meetEnd = true;
        }
        statistics.incrementReadOps(1);
        return batchIterator.hasNext();
      }
    };
  }

  public PartETag uploadPart(File file, String key, String uploadId, int idx)
      throws IOException {
    InputStream instream = null;
    Exception caught = null;
    int tries = 3;
    while (tries > 0) {
      try {
        instream = new FileInputStream(file);
        UploadPartRequest uploadRequest = new UploadPartRequest();
        uploadRequest.setBucketName(bucketName);
        uploadRequest.setKey(key);
        uploadRequest.setUploadId(uploadId);
        uploadRequest.setInputStream(instream);
        uploadRequest.setPartSize(file.length());
        uploadRequest.setPartNumber(idx);
        UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
        statistics.incrementWriteOps(1);
        return uploadResult.getPartETag();
      } catch (Exception e) {
        LOG.debug("Failed to upload "+ file.getPath() +", " +
            "try again.", e);
        caught = e;
      } finally {
        if (instream != null) {
          instream.close();
          instream = null;
        }
      }
      tries--;
    }

    assert (caught != null);
    throw new IOException("Failed to upload " + file.getPath() +
        " for 3 times.", caught);
  }

  /**
   * Initiate multipart upload.
   */
  public String getUploadId(String key) {
    InitiateMultipartUploadRequest initiateMultipartUploadRequest =
        new InitiateMultipartUploadRequest(bucketName, key);
    InitiateMultipartUploadResult initiateMultipartUploadResult =
        ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
    return initiateMultipartUploadResult.getUploadId();
  }

  /**
   * Complete the specific multipart upload.
   */
  public CompleteMultipartUploadResult completeMultipartUpload(String key,
      String uploadId, List<PartETag> partETags) {
    Collections.sort(partETags, new PartNumberAscendComparator());
    CompleteMultipartUploadRequest completeMultipartUploadRequest =
        new CompleteMultipartUploadRequest(bucketName, key, uploadId,
            partETags);
    return ossClient.completeMultipartUpload(completeMultipartUploadRequest);
  }

  /**
   * Abort the specific multipart upload.
   */
  public void abortMultipartUpload(String key, String uploadId) {
    AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(
        bucketName, key, uploadId);
    ossClient.abortMultipartUpload(request);
  }

  private static class PartNumberAscendComparator
      implements Comparator<PartETag>, Serializable {
    @Override
    public int compare(PartETag o1, PartETag o2) {
      if (o1.getPartNumber() > o2.getPartNumber()) {
        return 1;
      } else {
        return -1;
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AliyunCredentialsProvider 源码

hadoop AliyunOSSBlockOutputStream 源码

hadoop AliyunOSSCopyFileContext 源码

hadoop AliyunOSSCopyFileTask 源码

hadoop AliyunOSSFileReaderTask 源码

hadoop AliyunOSSFileSystem 源码

hadoop AliyunOSSInputStream 源码

hadoop AliyunOSSUtils 源码

hadoop Constants 源码

hadoop FileStatusAcceptor 源码

0  赞