hadoop DeleteOperation 源码
haddop DeleteOperation 代码
文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.maybeAwaitCompletion;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
/**
* Implementation of the delete() operation.
* This issues only one bulk delete at a time,
* intending to update S3Guard after every request succeeded.
* Now that S3Guard has been removed, it
* would be possible to issue multiple delete calls
* in parallel.
* If this is done, then it may be good to experiment with different
* page sizes. The default value is
* {@link InternalConstants#MAX_ENTRIES_TO_DELETE}, the maximum a single
* POST permits.
* <p>
* Smaller pages executed in parallel may have different
* performance characteristics when deleting very large directories.
* Any exploration of options here MUST be done with performance
* measurements taken from test runs in EC2 against local S3 stores,
* so as to ensure network latencies do not skew the results.
*/
public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
private static final Logger LOG = LoggerFactory.getLogger(
DeleteOperation.class);
/**
* Pre-fetched source status.
*/
private final S3AFileStatus status;
/**
* Recursive delete?
*/
private final boolean recursive;
/**
* Callback provider.
*/
private final OperationCallbacks callbacks;
/**
* Number of entries in a page.
*/
private final int pageSize;
/**
* Executor for async operations.
*/
private final ListeningExecutorService executor;
/**
* List of keys built up for the next delete batch.
*/
private List<DeleteEntry> keys;
/**
* The single async delete operation, or null.
*/
private CompletableFuture<Void> deleteFuture;
/**
* Counter of deleted files.
*/
private long filesDeleted;
/**
* Constructor.
* @param context store context
* @param status pre-fetched source status
* @param recursive recursive delete?
* @param callbacks callback provider
* @param pageSize size of delete pages
*/
public DeleteOperation(final StoreContext context,
final S3AFileStatus status,
final boolean recursive,
final OperationCallbacks callbacks,
final int pageSize) {
super(context);
this.status = status;
this.recursive = recursive;
this.callbacks = callbacks;
checkArgument(pageSize > 0
&& pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %s", pageSize);
this.pageSize = pageSize;
executor = MoreExecutors.listeningDecorator(
context.createThrottledExecutor(1));
}
public long getFilesDeleted() {
return filesDeleted;
}
/**
* Delete a file or directory tree.
* <p>
* This call does not create any fake parent directory; that is
* left to the caller.
* The actual delete call is done in a separate thread.
* Only one delete at a time is submitted, however, to reduce the
* complexity of recovering from failures.
* <p>
* With S3Guard removed, the problem of updating any
* DynamoDB store has gone away -delete calls could now be issued
* in parallel.
* However, rate limiting may be required to keep write load
* below the throttling point. Every entry in a single
* bulk delete call counts as a single write request -overloading
* an S3 partition with delete calls has been a problem in
* the past.
*
* @return true, except in the corner cases of root directory deletion
* @throws PathIsNotEmptyDirectoryException if the path is a dir and this
* is not a recursive delete.
* @throws IOException list failures or an inability to delete a file.
*/
@Retries.RetryTranslated
public Boolean execute() throws IOException {
executeOnlyOnce();
StoreContext context = getStoreContext();
Path path = status.getPath();
LOG.debug("Delete path {} - recursive {}", path, recursive);
LOG.debug("Type = {}",
status.isFile() ? "File"
: (status.isEmptyDirectory() == Tristate.TRUE
? "Empty Directory"
: "Directory"));
String key = context.pathToKey(path);
if (status.isDirectory()) {
LOG.debug("delete: Path is a directory: {}", path);
checkArgument(
status.isEmptyDirectory() != Tristate.UNKNOWN,
"File status must have directory emptiness computed");
if (!key.endsWith("/")) {
key = key + "/";
}
if ("/".equals(key)) {
LOG.error("S3A: Cannot delete the root directory."
+ " Path: {}. Recursive: {}",
status.getPath(), recursive);
return false;
}
if (!recursive && status.isEmptyDirectory() == Tristate.FALSE) {
throw new PathIsNotEmptyDirectoryException(path.toString());
}
if (status.isEmptyDirectory() == Tristate.TRUE) {
LOG.debug("deleting empty directory {}", path);
deleteObjectAtPath(path, key, false);
} else {
deleteDirectoryTree(path, key);
}
} else {
// simple file.
LOG.debug("deleting simple file {}", path);
deleteObjectAtPath(path, key, true);
}
LOG.debug("Deleted {} objects", filesDeleted);
return true;
}
/**
* Delete a directory tree.
* <p>
* This is done by asking the filesystem for a list of all objects under
* the directory path.
* <p>
* Once the first {@link #pageSize} worth of objects has been listed, a batch
* delete is queued for execution in a separate thread; subsequent batches
* block waiting for the first call to complete or fail before again,
* being deleted in the separate thread.
* <p>
* After all listed objects are queued for deletion,
* @param path directory path
* @param dirKey directory key
* @throws IOException failure
*/
@Retries.RetryTranslated
protected void deleteDirectoryTree(final Path path,
final String dirKey) throws IOException {
try (DurationInfo ignored =
new DurationInfo(LOG, false, "deleting %s", dirKey)) {
// init the lists of keys and paths to delete
resetDeleteList();
deleteFuture = null;
// list files
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
callbacks.listFilesAndDirectoryMarkers(path, status,
true);
// iterate through and delete. The next() call will block when a new S3
// page is required; this any active delete submitted to the executor
// will run in parallel with this.
while (locatedFiles.hasNext()) {
// get the next entry in the listing.
S3AFileStatus child = locatedFiles.next().toS3AFileStatus();
queueForDeletion(child);
}
LOG.debug("Deleting final batch of listed files");
submitNextBatch();
maybeAwaitCompletion(deleteFuture);
}
LOG.debug("Delete \"{}\" completed; deleted {} objects", path,
filesDeleted);
}
/**
* Build an S3 key for a delete request,
* possibly adding a "/" if it represents directory and it does
* not have a trailing slash already.
* @param stat status to build the key from
* @return a key for a delete request
*/
private String deletionKey(final S3AFileStatus stat) {
return getStoreContext().fullKey(stat);
}
/**
* Queue for deletion.
* @param stat status to queue
* @throws IOException failure of the previous batch of deletions.
*/
private void queueForDeletion(
final S3AFileStatus stat) throws IOException {
queueForDeletion(deletionKey(stat), stat.isDirectory());
}
/**
* Queue keys for deletion.
* Once a page of keys are ready to delete this
* call is submitted to the executor, after waiting for the previous run to
* complete.
*
* @param key key to delete
* @param isDirMarker is the entry a directory?
* @throws IOException failure of the previous batch of deletions.
*/
private void queueForDeletion(final String key,
boolean isDirMarker) throws IOException {
LOG.debug("Adding object to delete: \"{}\"", key);
keys.add(new DeleteEntry(key, isDirMarker));
if (keys.size() == pageSize) {
submitNextBatch();
}
}
/**
* Wait for the previous batch to finish then submit this page.
* The lists of keys and pages are reset here.
*
* @throws IOException failure of the previous batch of deletions.
*/
private void submitNextBatch()
throws IOException {
// delete a single page of keys and the metadata.
// block for any previous batch.
maybeAwaitCompletion(deleteFuture);
// delete the current page of keys and paths
deleteFuture = submitDelete(keys);
// reset the references so a new list can be built up.
resetDeleteList();
}
/**
* Reset the lists of keys and paths so that a new batch of
* entries can built up.
*/
private void resetDeleteList() {
keys = new ArrayList<>(pageSize);
}
/**
* Delete a file or directory marker.
* @param path path
* @param key key
* @param isFile is this a file?
* @throws IOException failure
*/
@Retries.RetryTranslated
private void deleteObjectAtPath(
final Path path,
final String key,
final boolean isFile)
throws IOException {
LOG.debug("delete: {} {}", (isFile ? "file" : "dir marker"), key);
filesDeleted++;
callbacks.deleteObjectAtPath(path, key, isFile);
}
/**
* Delete a single page of keys.
* If the list is empty no work is submitted and null is returned.
*
* @param keyList keys to delete.
* @return the submitted future or null
*/
private CompletableFuture<Void> submitDelete(
final List<DeleteEntry> keyList) {
if (keyList.isEmpty()) {
return null;
}
filesDeleted += keyList.size();
return submit(executor,
callableWithinAuditSpan(
getAuditSpan(), () -> {
asyncDeleteAction(
keyList);
return null;
}));
}
/**
* The action called in the asynchronous thread to delete
* the keys from S3 and paths from S3Guard.
*
* @param keyList keys to delete.
* entries logged?
* @throws IOException failure
*/
@Retries.RetryTranslated
private void asyncDeleteAction(
final List<DeleteEntry> keyList)
throws IOException {
try (DurationInfo ignored =
new DurationInfo(LOG, false,
"Delete page of %d keys", keyList.size())) {
if (!keyList.isEmpty()) {
// first delete the files.
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
.filter(e -> !e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} file objects", files.size());
Invoker.once("Remove S3 Files",
status.getPath().toString(),
() -> callbacks.removeKeys(
files,
false
));
// now the dirs
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
.filter(e -> e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} directory markers", dirs.size());
// This is invoked with deleteFakeDir.
Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
true
));
}
}
}
/**
* Deletion entry; dir marker state is tracked to allow
* delete requests to be split into file
* and marker delete phases.
* Without S3Guard, the split is only used
* to choose which statistics to update.
*/
private static final class DeleteEntry {
private final DeleteObjectsRequest.KeyVersion keyVersion;
private final boolean isDirMarker;
private DeleteEntry(final String key, final boolean isDirMarker) {
this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
this.isDirMarker = isDirMarker;
}
public String getKey() {
return keyVersion.getKey();
}
@Override
public String toString() {
return "DeleteEntry{" +
"key='" + getKey() + '\'' +
", isDirMarker=" + isDirMarker +
'}';
}
}
}
相关信息
相关文章
hadoop AbstractStoreOperation 源码
hadoop ActiveOperationContext 源码
hadoop BulkDeleteRetryHandler 源码
hadoop ChangeDetectionPolicy 源码
hadoop ConfigureShadedAWSSocketFactory 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦