hadoop OBSWriteOperationHelper 源码

  • 2022-10-20
  • 浏览 (465)

haddop OBSWriteOperationHelper 代码

文件路径:/hadoop-cloud-storage-project/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSWriteOperationHelper.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.obs;

import org.apache.hadoop.util.Preconditions;
import com.obs.services.ObsClient;
import com.obs.services.exception.ObsException;
import com.obs.services.model.AbortMultipartUploadRequest;
import com.obs.services.model.CompleteMultipartUploadRequest;
import com.obs.services.model.CompleteMultipartUploadResult;
import com.obs.services.model.InitiateMultipartUploadRequest;
import com.obs.services.model.ObjectMetadata;
import com.obs.services.model.PartEtag;
import com.obs.services.model.PutObjectRequest;
import com.obs.services.model.PutObjectResult;
import com.obs.services.model.UploadPartRequest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

/**
 * Helper for an ongoing write operation.
 *
 * <p>It hides direct access to the OBS API from the output stream, and is a
 * location where the object upload process can be evolved/enhanced.
 *
 * <p>Features
 *
 * <ul>
 * <li>Methods to create and submit requests to OBS, so avoiding all direct
 * interaction with the OBS APIs.
 * <li>Some extra preflight checks of arguments, so failing fast on errors.
 * <li>Callbacks to let the FS know of events in the output stream upload
 * process.
 * </ul>
 * <p>
 * Each instance of this state is unique to a single output stream.
 */
class OBSWriteOperationHelper {
  /**
   * Class logger.
   */
  public static final Logger LOG = LoggerFactory.getLogger(
      OBSWriteOperationHelper.class);

  /**
   * Part number of the multipart task.
   */
  static final int PART_NUMBER = 10000;

  /**
   * Owning filesystem.
   */
  private final OBSFileSystem owner;

  /**
   * Bucket of the owner FS.
   */
  private final String bucket;

  /**
   * Define obs client.
   */
  private final ObsClient obs;

  protected OBSWriteOperationHelper(final OBSFileSystem fs) {
    this.owner = fs;
    this.bucket = fs.getBucket();
    this.obs = fs.getObsClient();
  }

  /**
   * Create a {@link PutObjectRequest} request. If {@code length} is set, the
   * metadata is configured with the size of the upload.
   *
   * @param destKey     key of object
   * @param inputStream source data
   * @param length      size, if known. Use -1 for not known
   * @return the request
   */
  PutObjectRequest newPutRequest(final String destKey,
      final InputStream inputStream,
      final long length) {
    return OBSCommonUtils.newPutObjectRequest(owner, destKey,
        newObjectMetadata(length), inputStream);
  }

  /**
   * Create a {@link PutObjectRequest} request to upload a file.
   *
   * @param destKey    object key for request
   * @param sourceFile source file
   * @return the request
   */
  PutObjectRequest newPutRequest(final String destKey,
      final File sourceFile) {
    int length = (int) sourceFile.length();
    return OBSCommonUtils.newPutObjectRequest(owner, destKey,
        newObjectMetadata(length), sourceFile);
  }

  /**
   * Callback on a successful write.
   *
   * @param destKey object key
   */
  void writeSuccessful(final String destKey) {
    LOG.debug("Finished write to {}", destKey);
  }

  /**
   * Create a new object metadata instance. Any standard metadata headers are
   * added here, for example: encryption.
   *
   * @param length size, if known. Use -1 for not known
   * @return a new metadata instance
   */
  public ObjectMetadata newObjectMetadata(final long length) {
    return OBSObjectBucketUtils.newObjectMetadata(length);
  }

  /**
   * Start the multipart upload process.
   *
   * @param destKey object key
   * @return the upload result containing the ID
   * @throws IOException IO problem
   */
  String initiateMultiPartUpload(final String destKey) throws IOException {
    LOG.debug("Initiating Multipart upload");
    final InitiateMultipartUploadRequest initiateMPURequest =
        new InitiateMultipartUploadRequest(bucket, destKey);
    initiateMPURequest.setAcl(owner.getCannedACL());
    initiateMPURequest.setMetadata(newObjectMetadata(-1));
    if (owner.getSse().isSseCEnable()) {
      initiateMPURequest.setSseCHeader(owner.getSse().getSseCHeader());
    } else if (owner.getSse().isSseKmsEnable()) {
      initiateMPURequest.setSseKmsHeader(
          owner.getSse().getSseKmsHeader());
    }
    try {
      return obs.initiateMultipartUpload(initiateMPURequest)
          .getUploadId();
    } catch (ObsException ace) {
      throw OBSCommonUtils.translateException("Initiate MultiPartUpload",
          destKey, ace);
    }
  }

  /**
   * Complete a multipart upload operation.
   *
   * @param destKey   Object key
   * @param uploadId  multipart operation Id
   * @param partETags list of partial uploads
   * @return the result
   * @throws ObsException on problems.
   */
  CompleteMultipartUploadResult completeMultipartUpload(
      final String destKey, final String uploadId,
      final List<PartEtag> partETags)
      throws ObsException {
    Preconditions.checkNotNull(uploadId);
    Preconditions.checkNotNull(partETags);
    Preconditions.checkArgument(!partETags.isEmpty(),
        "No partitions have been uploaded");
    LOG.debug("Completing multipart upload {} with {} parts", uploadId,
        partETags.size());
    // a copy of the list is required, so that the OBS SDK doesn't
    // attempt to sort an unmodifiable list.
    return obs.completeMultipartUpload(
        new CompleteMultipartUploadRequest(bucket, destKey, uploadId,
            new ArrayList<>(partETags)));
  }

  /**
   * Abort a multipart upload operation.
   *
   * @param destKey  object key
   * @param uploadId multipart operation Id
   * @throws ObsException on problems. Immediately execute
   */
  void abortMultipartUpload(final String destKey, final String uploadId)
      throws ObsException {
    LOG.debug("Aborting multipart upload {}", uploadId);
    obs.abortMultipartUpload(
        new AbortMultipartUploadRequest(bucket, destKey, uploadId));
  }

  /**
   * Create request for uploading one part of a multipart task.
   *
   * @param destKey    destination object key
   * @param uploadId   upload id
   * @param partNumber part number
   * @param size       data size
   * @param sourceFile source file to be uploaded
   * @return part upload request
   */
  UploadPartRequest newUploadPartRequest(
      final String destKey,
      final String uploadId,
      final int partNumber,
      final int size,
      final File sourceFile) {
    Preconditions.checkNotNull(uploadId);

    Preconditions.checkArgument(sourceFile != null, "Data source");
    Preconditions.checkArgument(size > 0, "Invalid partition size %s",
        size);
    Preconditions.checkArgument(
        partNumber > 0 && partNumber <= PART_NUMBER);

    LOG.debug("Creating part upload request for {} #{} size {}", uploadId,
        partNumber, size);
    UploadPartRequest request = new UploadPartRequest();
    request.setUploadId(uploadId);
    request.setBucketName(bucket);
    request.setObjectKey(destKey);
    request.setPartSize((long) size);
    request.setPartNumber(partNumber);
    request.setFile(sourceFile);
    if (owner.getSse().isSseCEnable()) {
      request.setSseCHeader(owner.getSse().getSseCHeader());
    }
    return request;
  }

  /**
   * Create request for uploading one part of a multipart task.
   *
   * @param destKey      destination object key
   * @param uploadId     upload id
   * @param partNumber   part number
   * @param size         data size
   * @param uploadStream upload stream for the part
   * @return part upload request
   */
  UploadPartRequest newUploadPartRequest(
      final String destKey,
      final String uploadId,
      final int partNumber,
      final int size,
      final InputStream uploadStream) {
    Preconditions.checkNotNull(uploadId);

    Preconditions.checkArgument(uploadStream != null, "Data source");
    Preconditions.checkArgument(size > 0, "Invalid partition size %s",
        size);
    Preconditions.checkArgument(
        partNumber > 0 && partNumber <= PART_NUMBER);

    LOG.debug("Creating part upload request for {} #{} size {}", uploadId,
        partNumber, size);
    UploadPartRequest request = new UploadPartRequest();
    request.setUploadId(uploadId);
    request.setBucketName(bucket);
    request.setObjectKey(destKey);
    request.setPartSize((long) size);
    request.setPartNumber(partNumber);
    request.setInput(uploadStream);
    if (owner.getSse().isSseCEnable()) {
      request.setSseCHeader(owner.getSse().getSseCHeader());
    }
    return request;
  }

  public String toString(final String destKey) {
    return "{bucket=" + bucket + ", key='" + destKey + '\'' + '}';
  }

  /**
   * PUT an object directly (i.e. not via the transfer manager).
   *
   * @param putObjectRequest the request
   * @return the upload initiated
   * @throws IOException on problems
   */
  PutObjectResult putObject(final PutObjectRequest putObjectRequest)
      throws IOException {
    try {
      return OBSCommonUtils.putObjectDirect(owner, putObjectRequest);
    } catch (ObsException e) {
      throw OBSCommonUtils.translateException("put",
          putObjectRequest.getObjectKey(), e);
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BasicSessionCredential 源码

hadoop DefaultOBSClientFactory 源码

hadoop FileConflictException 源码

hadoop OBS 源码

hadoop OBSBlockOutputStream 源码

hadoop OBSClientFactory 源码

hadoop OBSCommonUtils 源码

hadoop OBSConstants 源码

hadoop OBSDataBlocks 源码

hadoop OBSFileStatus 源码

0  赞