hadoop OBSWriteOperationHelper 源码
haddop OBSWriteOperationHelper 代码
文件路径:/hadoop-cloud-storage-project/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSWriteOperationHelper.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.obs;
import org.apache.hadoop.util.Preconditions;
import com.obs.services.ObsClient;
import com.obs.services.exception.ObsException;
import com.obs.services.model.AbortMultipartUploadRequest;
import com.obs.services.model.CompleteMultipartUploadRequest;
import com.obs.services.model.CompleteMultipartUploadResult;
import com.obs.services.model.InitiateMultipartUploadRequest;
import com.obs.services.model.ObjectMetadata;
import com.obs.services.model.PartEtag;
import com.obs.services.model.PutObjectRequest;
import com.obs.services.model.PutObjectResult;
import com.obs.services.model.UploadPartRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Helper for an ongoing write operation.
*
* <p>It hides direct access to the OBS API from the output stream, and is a
* location where the object upload process can be evolved/enhanced.
*
* <p>Features
*
* <ul>
* <li>Methods to create and submit requests to OBS, so avoiding all direct
* interaction with the OBS APIs.
* <li>Some extra preflight checks of arguments, so failing fast on errors.
* <li>Callbacks to let the FS know of events in the output stream upload
* process.
* </ul>
* <p>
* Each instance of this state is unique to a single output stream.
*/
class OBSWriteOperationHelper {
/**
* Class logger.
*/
public static final Logger LOG = LoggerFactory.getLogger(
OBSWriteOperationHelper.class);
/**
* Part number of the multipart task.
*/
static final int PART_NUMBER = 10000;
/**
* Owning filesystem.
*/
private final OBSFileSystem owner;
/**
* Bucket of the owner FS.
*/
private final String bucket;
/**
* Define obs client.
*/
private final ObsClient obs;
protected OBSWriteOperationHelper(final OBSFileSystem fs) {
this.owner = fs;
this.bucket = fs.getBucket();
this.obs = fs.getObsClient();
}
/**
* Create a {@link PutObjectRequest} request. If {@code length} is set, the
* metadata is configured with the size of the upload.
*
* @param destKey key of object
* @param inputStream source data
* @param length size, if known. Use -1 for not known
* @return the request
*/
PutObjectRequest newPutRequest(final String destKey,
final InputStream inputStream,
final long length) {
return OBSCommonUtils.newPutObjectRequest(owner, destKey,
newObjectMetadata(length), inputStream);
}
/**
* Create a {@link PutObjectRequest} request to upload a file.
*
* @param destKey object key for request
* @param sourceFile source file
* @return the request
*/
PutObjectRequest newPutRequest(final String destKey,
final File sourceFile) {
int length = (int) sourceFile.length();
return OBSCommonUtils.newPutObjectRequest(owner, destKey,
newObjectMetadata(length), sourceFile);
}
/**
* Callback on a successful write.
*
* @param destKey object key
*/
void writeSuccessful(final String destKey) {
LOG.debug("Finished write to {}", destKey);
}
/**
* Create a new object metadata instance. Any standard metadata headers are
* added here, for example: encryption.
*
* @param length size, if known. Use -1 for not known
* @return a new metadata instance
*/
public ObjectMetadata newObjectMetadata(final long length) {
return OBSObjectBucketUtils.newObjectMetadata(length);
}
/**
* Start the multipart upload process.
*
* @param destKey object key
* @return the upload result containing the ID
* @throws IOException IO problem
*/
String initiateMultiPartUpload(final String destKey) throws IOException {
LOG.debug("Initiating Multipart upload");
final InitiateMultipartUploadRequest initiateMPURequest =
new InitiateMultipartUploadRequest(bucket, destKey);
initiateMPURequest.setAcl(owner.getCannedACL());
initiateMPURequest.setMetadata(newObjectMetadata(-1));
if (owner.getSse().isSseCEnable()) {
initiateMPURequest.setSseCHeader(owner.getSse().getSseCHeader());
} else if (owner.getSse().isSseKmsEnable()) {
initiateMPURequest.setSseKmsHeader(
owner.getSse().getSseKmsHeader());
}
try {
return obs.initiateMultipartUpload(initiateMPURequest)
.getUploadId();
} catch (ObsException ace) {
throw OBSCommonUtils.translateException("Initiate MultiPartUpload",
destKey, ace);
}
}
/**
* Complete a multipart upload operation.
*
* @param destKey Object key
* @param uploadId multipart operation Id
* @param partETags list of partial uploads
* @return the result
* @throws ObsException on problems.
*/
CompleteMultipartUploadResult completeMultipartUpload(
final String destKey, final String uploadId,
final List<PartEtag> partETags)
throws ObsException {
Preconditions.checkNotNull(uploadId);
Preconditions.checkNotNull(partETags);
Preconditions.checkArgument(!partETags.isEmpty(),
"No partitions have been uploaded");
LOG.debug("Completing multipart upload {} with {} parts", uploadId,
partETags.size());
// a copy of the list is required, so that the OBS SDK doesn't
// attempt to sort an unmodifiable list.
return obs.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucket, destKey, uploadId,
new ArrayList<>(partETags)));
}
/**
* Abort a multipart upload operation.
*
* @param destKey object key
* @param uploadId multipart operation Id
* @throws ObsException on problems. Immediately execute
*/
void abortMultipartUpload(final String destKey, final String uploadId)
throws ObsException {
LOG.debug("Aborting multipart upload {}", uploadId);
obs.abortMultipartUpload(
new AbortMultipartUploadRequest(bucket, destKey, uploadId));
}
/**
* Create request for uploading one part of a multipart task.
*
* @param destKey destination object key
* @param uploadId upload id
* @param partNumber part number
* @param size data size
* @param sourceFile source file to be uploaded
* @return part upload request
*/
UploadPartRequest newUploadPartRequest(
final String destKey,
final String uploadId,
final int partNumber,
final int size,
final File sourceFile) {
Preconditions.checkNotNull(uploadId);
Preconditions.checkArgument(sourceFile != null, "Data source");
Preconditions.checkArgument(size > 0, "Invalid partition size %s",
size);
Preconditions.checkArgument(
partNumber > 0 && partNumber <= PART_NUMBER);
LOG.debug("Creating part upload request for {} #{} size {}", uploadId,
partNumber, size);
UploadPartRequest request = new UploadPartRequest();
request.setUploadId(uploadId);
request.setBucketName(bucket);
request.setObjectKey(destKey);
request.setPartSize((long) size);
request.setPartNumber(partNumber);
request.setFile(sourceFile);
if (owner.getSse().isSseCEnable()) {
request.setSseCHeader(owner.getSse().getSseCHeader());
}
return request;
}
/**
* Create request for uploading one part of a multipart task.
*
* @param destKey destination object key
* @param uploadId upload id
* @param partNumber part number
* @param size data size
* @param uploadStream upload stream for the part
* @return part upload request
*/
UploadPartRequest newUploadPartRequest(
final String destKey,
final String uploadId,
final int partNumber,
final int size,
final InputStream uploadStream) {
Preconditions.checkNotNull(uploadId);
Preconditions.checkArgument(uploadStream != null, "Data source");
Preconditions.checkArgument(size > 0, "Invalid partition size %s",
size);
Preconditions.checkArgument(
partNumber > 0 && partNumber <= PART_NUMBER);
LOG.debug("Creating part upload request for {} #{} size {}", uploadId,
partNumber, size);
UploadPartRequest request = new UploadPartRequest();
request.setUploadId(uploadId);
request.setBucketName(bucket);
request.setObjectKey(destKey);
request.setPartSize((long) size);
request.setPartNumber(partNumber);
request.setInput(uploadStream);
if (owner.getSse().isSseCEnable()) {
request.setSseCHeader(owner.getSse().getSseCHeader());
}
return request;
}
public String toString(final String destKey) {
return "{bucket=" + bucket + ", key='" + destKey + '\'' + '}';
}
/**
* PUT an object directly (i.e. not via the transfer manager).
*
* @param putObjectRequest the request
* @return the upload initiated
* @throws IOException on problems
*/
PutObjectResult putObject(final PutObjectRequest putObjectRequest)
throws IOException {
try {
return OBSCommonUtils.putObjectDirect(owner, putObjectRequest);
} catch (ObsException e) {
throw OBSCommonUtils.translateException("put",
putObjectRequest.getObjectKey(), e);
}
}
}
相关信息
相关文章
hadoop BasicSessionCredential 源码
hadoop DefaultOBSClientFactory 源码
hadoop FileConflictException 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦