hadoop CopyFromLocalOperation 源码
haddop CopyFromLocalOperation 代码
文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections.comparators.ReverseComparator;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
/**
* Implementation of CopyFromLocalOperation.
* <p>
* This operation copies a file or directory (recursively) from a local
* FS to an object store. Initially, this operation has been developed for
* S3 (s3a) interaction, however, there's minimal work needed for it to
* work with other stores.
* </p>
* <p>How the uploading of files works:</p>
* <ul>
* <li> all source files and directories are scanned through;</li>
* <li> the LARGEST_N_FILES start uploading; </li>
* <li> the remaining files are shuffled and uploaded; </li>
* <li>
* any remaining empty directory is uploaded too to preserve local
* tree structure.
* </li>
* </ul>
*/
public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
/**
* Largest N files to be uploaded first.
*/
private static final int LARGEST_N_FILES = 5;
private static final Logger LOG = LoggerFactory.getLogger(
CopyFromLocalOperation.class);
/**
* Callbacks to be used by this operation for external / IO actions.
*/
private final CopyFromLocalOperationCallbacks callbacks;
/**
* Delete source after operation finishes.
*/
private final boolean deleteSource;
/**
* Overwrite destination files / folders.
*/
private final boolean overwrite;
/**
* Source path to file / directory.
*/
private final Path source;
/**
* Async operations executor.
*/
private final ListeningExecutorService executor;
/**
* Destination path.
*/
private Path destination;
/**
* Destination file status.
*/
private FileStatus destStatus;
public CopyFromLocalOperation(
final StoreContext storeContext,
Path source,
Path destination,
boolean deleteSource,
boolean overwrite,
CopyFromLocalOperationCallbacks callbacks) {
super(storeContext);
this.callbacks = callbacks;
this.deleteSource = deleteSource;
this.overwrite = overwrite;
this.source = source;
this.destination = destination;
// Capacity of 1 is a safe default for now since transfer manager can also
// spawn threads when uploading bigger files.
this.executor = MoreExecutors.listeningDecorator(
storeContext.createThrottledExecutor(1)
);
}
/**
* Executes the {@link CopyFromLocalOperation}.
*
* @throws IOException - if there are any failures with upload or deletion
* of files. Check {@link CopyFromLocalOperationCallbacks} for specifics.
* @throws PathExistsException - if the path exists and no overwrite flag
* is set OR if the source is file and destination is a directory
*/
@Override
@Retries.RetryTranslated
public Void execute()
throws IOException, PathExistsException {
LOG.debug("Copying local file from {} to {}", source, destination);
File sourceFile = callbacks.pathToLocalFile(source);
updateDestStatus(destination);
// Handles bar/ -> foo/ => foo/bar and bar/ -> foo/bar/ => foo/bar/bar
if (getDestStatus().isPresent() && getDestStatus().get().isDirectory()
&& sourceFile.isDirectory()) {
destination = new Path(destination, sourceFile.getName());
LOG.debug("Destination updated to: {}", destination);
updateDestStatus(destination);
}
checkSource(sourceFile);
checkDestination(destination, sourceFile, overwrite);
uploadSourceFromFS();
if (deleteSource) {
callbacks.deleteLocal(source, true);
}
return null;
}
/**
* Does a {@link CopyFromLocalOperationCallbacks#getFileStatus(Path)}
* operation on the provided destination and updates the internal status of
* destStatus field.
*
* @param dest - destination Path
* @throws IOException if getFileStatus fails
*/
private void updateDestStatus(Path dest) throws IOException {
try {
destStatus = callbacks.getFileStatus(dest);
} catch (FileNotFoundException e) {
destStatus = null;
}
}
/**
* Starts async upload operations for files. Creating an empty directory
* classifies as a "file upload".
*
* Check {@link CopyFromLocalOperation} for details on the order of
* operations.
*
* @throws IOException - if listing or upload fail
*/
private void uploadSourceFromFS() throws IOException {
RemoteIterator<LocatedFileStatus> localFiles = listFilesAndDirs(source);
List<CompletableFuture<Void>> activeOps = new ArrayList<>();
// After all files are traversed, this set will contain only emptyDirs
Set<Path> emptyDirs = new HashSet<>();
List<UploadEntry> entries = new ArrayList<>();
while (localFiles.hasNext()) {
LocatedFileStatus sourceFile = localFiles.next();
Path sourceFilePath = sourceFile.getPath();
// Directory containing this file / directory isn't empty
emptyDirs.remove(sourceFilePath.getParent());
if (sourceFile.isDirectory()) {
emptyDirs.add(sourceFilePath);
continue;
}
Path destPath = getFinalPath(sourceFilePath);
// UploadEntries: have a destination path, a file size
entries.add(new UploadEntry(
sourceFilePath,
destPath,
sourceFile.getLen()));
}
if (localFiles instanceof Closeable) {
IOUtils.closeStream((Closeable) localFiles);
}
// Sort all upload entries based on size
entries.sort(new ReverseComparator(new UploadEntry.SizeComparator()));
// Take only top most N entries and upload
final int sortedUploadsCount = Math.min(LARGEST_N_FILES, entries.size());
List<UploadEntry> markedForUpload = new ArrayList<>();
for (int uploadNo = 0; uploadNo < sortedUploadsCount; uploadNo++) {
UploadEntry uploadEntry = entries.get(uploadNo);
File file = callbacks.pathToLocalFile(uploadEntry.source);
activeOps.add(submitUpload(file, uploadEntry));
markedForUpload.add(uploadEntry);
}
// No files found, it's empty source directory
if (entries.isEmpty()) {
emptyDirs.add(source);
}
// Shuffle all remaining entries and upload them
entries.removeAll(markedForUpload);
Collections.shuffle(entries);
for (UploadEntry uploadEntry : entries) {
File file = callbacks.pathToLocalFile(uploadEntry.source);
activeOps.add(submitUpload(file, uploadEntry));
}
for (Path emptyDir : emptyDirs) {
Path emptyDirPath = getFinalPath(emptyDir);
activeOps.add(submitCreateEmptyDir(emptyDirPath));
}
waitForCompletion(activeOps);
}
/**
* Async call to create an empty directory.
*
* @param dir directory path
* @return the submitted future
*/
private CompletableFuture<Void> submitCreateEmptyDir(Path dir) {
return submit(executor, callableWithinAuditSpan(
getAuditSpan(), () -> {
callbacks.createEmptyDir(dir, getStoreContext());
return null;
}
));
}
/**
* Async call to upload a file.
*
* @param file - File to be uploaded
* @param uploadEntry - Upload entry holding the source and destination
* @return the submitted future
*/
private CompletableFuture<Void> submitUpload(
File file,
UploadEntry uploadEntry) {
return submit(executor, callableWithinAuditSpan(
getAuditSpan(), () -> {
callbacks.copyLocalFileFromTo(
file,
uploadEntry.source,
uploadEntry.destination);
return null;
}
));
}
/**
* Checks the source before upload starts.
*
* @param src - Source file
* @throws FileNotFoundException - if the file isn't found
*/
private void checkSource(File src)
throws FileNotFoundException {
if (!src.exists()) {
throw new FileNotFoundException("No file: " + src.getPath());
}
}
/**
* Check the destination path and make sure it's compatible with the source,
* i.e. source and destination are both files / directories.
*
* @param dest - Destination path
* @param src - Source file
* @param overwrite - Should source overwrite destination
* @throws PathExistsException - If the destination path exists and no
* overwrite flag is set
* @throws FileAlreadyExistsException - If source is file and destination is path
*/
private void checkDestination(
Path dest,
File src,
boolean overwrite) throws PathExistsException,
FileAlreadyExistsException {
if (!getDestStatus().isPresent()) {
return;
}
if (src.isDirectory() && getDestStatus().get().isFile()) {
throw new FileAlreadyExistsException(
"Source '" + src.getPath() + "' is directory and " +
"destination '" + dest + "' is file");
}
if (!overwrite) {
throw new PathExistsException(dest + " already exists");
}
}
/**
* Get the final path of a source file with regards to its destination.
*
* @param src - source path
* @return - the final path for the source file to be uploaded to
* @throws PathIOException - if a relative path can't be created
*/
private Path getFinalPath(Path src) throws PathIOException {
URI currentSrcUri = src.toUri();
URI relativeSrcUri = source.toUri().relativize(currentSrcUri);
if (relativeSrcUri.equals(currentSrcUri)) {
throw new PathIOException("Cannot get relative path for URI:"
+ relativeSrcUri);
}
Optional<FileStatus> status = getDestStatus();
if (!relativeSrcUri.getPath().isEmpty()) {
return new Path(destination, relativeSrcUri.getPath());
} else if (status.isPresent() && status.get().isDirectory()) {
// file to dir
return new Path(destination, src.getName());
} else {
// file to file
return destination;
}
}
private Optional<FileStatus> getDestStatus() {
return Optional.ofNullable(destStatus);
}
/**
* {@link RemoteIterator} which lists all of the files and directories for
* a given path. It's strikingly similar to
* {@link org.apache.hadoop.fs.LocalFileSystem#listFiles(Path, boolean)}
* however with the small addition that it includes directories.
*
* @param path - Path to list files and directories from
* @return - an iterator
* @throws IOException - if listing of a path file fails
*/
private RemoteIterator<LocatedFileStatus> listFilesAndDirs(Path path)
throws IOException {
return new RemoteIterator<LocatedFileStatus>() {
private final Stack<RemoteIterator<LocatedFileStatus>> iterators =
new Stack<>();
private RemoteIterator<LocatedFileStatus> current =
callbacks.listLocalStatusIterator(path);
private LocatedFileStatus curFile;
@Override
public boolean hasNext() throws IOException {
while (curFile == null) {
if (current.hasNext()) {
handleFileStat(current.next());
} else if (!iterators.empty()) {
current = iterators.pop();
} else {
return false;
}
}
return true;
}
/**
* Process the input stat.
* If it is a file or directory return the file stat.
* If it is a directory, traverse the directory;
* @param stat input status
* @throws IOException if any IO error occurs
*/
private void handleFileStat(LocatedFileStatus stat)
throws IOException {
if (stat.isFile()) { // file
curFile = stat;
} else { // directory
curFile = stat;
iterators.push(current);
current = callbacks.listLocalStatusIterator(stat.getPath());
}
}
@Override
public LocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus result = curFile;
curFile = null;
return result;
}
throw new NoSuchElementException("No more entry in "
+ path);
}
};
}
/**
* <p>Represents an entry for a file to be moved.</p>
* <p>
* Helpful with sorting files by their size and keeping track of path
* information for the upload.
* </p>
*/
private static final class UploadEntry {
private final Path source;
private final Path destination;
private final long size;
private UploadEntry(Path source, Path destination, long size) {
this.source = source;
this.destination = destination;
this.size = size;
}
/**
* Compares {@link UploadEntry} objects and produces DESC ordering.
*/
static class SizeComparator implements Comparator<UploadEntry>,
Serializable {
@Override
public int compare(UploadEntry entry1, UploadEntry entry2) {
return Long.compare(entry1.size, entry2.size);
}
}
}
/**
* Define the contract for {@link CopyFromLocalOperation} to interact
* with any external resources.
*/
public interface CopyFromLocalOperationCallbacks {
/**
* List all entries (files AND directories) for a path.
*
* @param path - path to list
* @return an iterator for all entries
* @throws IOException - for any failure
*/
RemoteIterator<LocatedFileStatus> listLocalStatusIterator(Path path)
throws IOException;
/**
* Get the file status for a path.
*
* @param path - target path
* @return FileStatus
* @throws IOException - for any failure
*/
FileStatus getFileStatus(Path path) throws IOException;
/**
* Get the file from a path.
*
* @param path - target path
* @return file at path
*/
File pathToLocalFile(Path path);
/**
* Delete file / directory at path.
*
* @param path - target path
* @param recursive - recursive deletion
* @return boolean result of operation
* @throws IOException for any failure
*/
boolean deleteLocal(Path path, boolean recursive) throws IOException;
/**
* Copy / Upload a file from a source path to a destination path.
*
* @param file - target file
* @param source - source path
* @param destination - destination path
* @throws IOException for any failure
*/
void copyLocalFileFromTo(
File file,
Path source,
Path destination) throws IOException;
/**
* Create empty directory at path. Most likely an upload operation.
*
* @param path - target path
* @param storeContext - store context
* @return boolean result of operation
* @throws IOException for any failure
*/
boolean createEmptyDir(Path path, StoreContext storeContext)
throws IOException;
}
}
相关信息
相关文章
hadoop AbstractStoreOperation 源码
hadoop ActiveOperationContext 源码
hadoop BulkDeleteRetryHandler 源码
hadoop ChangeDetectionPolicy 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦