hadoop OBSFileSystem 源码
haddop OBSFileSystem 代码
文件路径:/hadoop-cloud-storage-project/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSFileSystem.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.obs;
import org.apache.hadoop.classification.VisibleForTesting;
import com.obs.services.ObsClient;
import com.obs.services.exception.ObsException;
import com.obs.services.model.AccessControlList;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The core OBS Filesystem implementation.
*
* <p>This subclass is marked as private as code should not be creating it
* directly; use {@link FileSystem#get(Configuration)} and variants to create
* one.
*
* <p>If cast to {@code OBSFileSystem}, extra methods and features may be
* accessed. Consider those private and unstable.
*
* <p>Because it prints some of the state of the instrumentation, the output of
* {@link #toString()} must also be considered unstable.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class OBSFileSystem extends FileSystem {
/**
* Class logger.
*/
public static final Logger LOG = LoggerFactory.getLogger(
OBSFileSystem.class);
/**
* Flag indicating if the filesystem instance is closed.
*/
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* URI of the filesystem.
*/
private URI uri;
/**
* Current working directory of the filesystem.
*/
private Path workingDir;
/**
* Short name of the user who instantiated the filesystem.
*/
private String username;
/**
* OBS client instance.
*/
private ObsClient obs;
/**
* Flag indicating if posix bucket is used.
*/
private boolean enablePosix = false;
/**
* Flag indicating if multi-object delete recursion is enabled.
*/
private boolean enableMultiObjectDeleteRecursion = true;
/**
* Flag indicating if OBS specific content summary is enabled.
*/
private boolean obsContentSummaryEnable = true;
/**
* Flag indicating if OBS client specific depth first search (DFS) list is
* enabled.
*/
private boolean obsClientDFSListEnable = true;
/**
* Bucket name.
*/
private String bucket;
/**
* Max number of keys to get while paging through a directory listing.
*/
private int maxKeys;
/**
* OBSListing instance.
*/
private OBSListing obsListing;
/**
* Helper for an ongoing write operation.
*/
private OBSWriteOperationHelper writeHelper;
/**
* Part size for multipart upload.
*/
private long partSize;
/**
* Flag indicating if multi-object delete is enabled.
*/
private boolean enableMultiObjectDelete;
/**
* Minimum number of objects in one multi-object delete call.
*/
private int multiDeleteThreshold;
/**
* Maximum number of entries in one multi-object delete call.
*/
private int maxEntriesToDelete;
/**
* Bounded thread pool for multipart upload.
*/
private ExecutorService boundedMultipartUploadThreadPool;
/**
* Bounded thread pool for copy.
*/
private ThreadPoolExecutor boundedCopyThreadPool;
/**
* Bounded thread pool for delete.
*/
private ThreadPoolExecutor boundedDeleteThreadPool;
/**
* Bounded thread pool for copy part.
*/
private ThreadPoolExecutor boundedCopyPartThreadPool;
/**
* Bounded thread pool for list.
*/
private ThreadPoolExecutor boundedListThreadPool;
/**
* List parallel factor.
*/
private int listParallelFactor;
/**
* Read ahead range.
*/
private long readAheadRange;
/**
* Flag indicating if {@link OBSInputStream#read(long, byte[], int, int)} will
* be transformed into {@link org.apache.hadoop.fs.FSInputStream#read(long,
* byte[], int, int)}.
*/
private boolean readTransformEnable = true;
/**
* Factory for creating blocks.
*/
private OBSDataBlocks.BlockFactory blockFactory;
/**
* Maximum Number of active blocks a single output stream can submit to {@link
* #boundedMultipartUploadThreadPool}.
*/
private int blockOutputActiveBlocks;
/**
* Copy part size.
*/
private long copyPartSize;
/**
* Flag indicating if fast delete is enabled.
*/
private boolean enableTrash = false;
/**
* Trash directory for fast delete.
*/
private String trashDir;
/**
* OBS redefined access control list.
*/
private AccessControlList cannedACL;
/**
* Server-side encryption wrapper.
*/
private SseWrapper sse;
/**
* Block size for {@link FileSystem#getDefaultBlockSize()}.
*/
private long blockSize;
/**
* Initialize a FileSystem. Called after a new FileSystem instance is
* constructed.
*
* @param name a URI whose authority section names the host, port,
* etc. for this FileSystem
* @param originalConf the configuration to use for the FS. The
* bucket-specific options are patched over the base ones
* before any use is made of the config.
*/
@Override
public void initialize(final URI name, final Configuration originalConf)
throws IOException {
uri = URI.create(name.getScheme() + "://" + name.getAuthority());
bucket = name.getAuthority();
// clone the configuration into one with propagated bucket options
Configuration conf = OBSCommonUtils.propagateBucketOptions(originalConf,
bucket);
OBSCommonUtils.patchSecurityCredentialProviders(conf);
super.initialize(name, conf);
setConf(conf);
try {
// Username is the current user at the time the FS was instantiated.
username = UserGroupInformation.getCurrentUser().getShortUserName();
workingDir = new Path("/user", username).makeQualified(this.uri,
this.getWorkingDirectory());
Class<? extends OBSClientFactory> obsClientFactoryClass =
conf.getClass(
OBSConstants.OBS_CLIENT_FACTORY_IMPL,
OBSConstants.DEFAULT_OBS_CLIENT_FACTORY_IMPL,
OBSClientFactory.class);
obs = ReflectionUtils.newInstance(obsClientFactoryClass, conf)
.createObsClient(name);
sse = new SseWrapper(conf);
OBSCommonUtils.verifyBucketExists(this);
enablePosix = OBSCommonUtils.getBucketFsStatus(obs, bucket);
maxKeys = OBSCommonUtils.intOption(conf,
OBSConstants.MAX_PAGING_KEYS,
OBSConstants.DEFAULT_MAX_PAGING_KEYS, 1);
obsListing = new OBSListing(this);
partSize = OBSCommonUtils.getMultipartSizeProperty(conf,
OBSConstants.MULTIPART_SIZE,
OBSConstants.DEFAULT_MULTIPART_SIZE);
// check but do not store the block size
blockSize = OBSCommonUtils.longBytesOption(conf,
OBSConstants.FS_OBS_BLOCK_SIZE,
OBSConstants.DEFAULT_FS_OBS_BLOCK_SIZE, 1);
enableMultiObjectDelete = conf.getBoolean(
OBSConstants.ENABLE_MULTI_DELETE, true);
maxEntriesToDelete = conf.getInt(
OBSConstants.MULTI_DELETE_MAX_NUMBER,
OBSConstants.DEFAULT_MULTI_DELETE_MAX_NUMBER);
enableMultiObjectDeleteRecursion = conf.getBoolean(
OBSConstants.MULTI_DELETE_RECURSION, true);
obsContentSummaryEnable = conf.getBoolean(
OBSConstants.OBS_CONTENT_SUMMARY_ENABLE, true);
readAheadRange = OBSCommonUtils.longBytesOption(conf,
OBSConstants.READAHEAD_RANGE,
OBSConstants.DEFAULT_READAHEAD_RANGE, 0);
readTransformEnable = conf.getBoolean(
OBSConstants.READ_TRANSFORM_ENABLE, true);
multiDeleteThreshold = conf.getInt(
OBSConstants.MULTI_DELETE_THRESHOLD,
OBSConstants.MULTI_DELETE_DEFAULT_THRESHOLD);
initThreadPools(conf);
writeHelper = new OBSWriteOperationHelper(this);
initCannedAcls(conf);
OBSCommonUtils.initMultipartUploads(this, conf);
String blockOutputBuffer = conf.getTrimmed(
OBSConstants.FAST_UPLOAD_BUFFER,
OBSConstants.FAST_UPLOAD_BUFFER_DISK);
partSize = OBSCommonUtils.ensureOutputParameterInRange(
OBSConstants.MULTIPART_SIZE, partSize);
blockFactory = OBSDataBlocks.createFactory(this, blockOutputBuffer);
blockOutputActiveBlocks =
OBSCommonUtils.intOption(conf,
OBSConstants.FAST_UPLOAD_ACTIVE_BLOCKS,
OBSConstants.DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
LOG.debug(
"Using OBSBlockOutputStream with buffer = {}; block={};"
+ " queue limit={}",
blockOutputBuffer,
partSize,
blockOutputActiveBlocks);
enableTrash = conf.getBoolean(OBSConstants.TRASH_ENABLE,
OBSConstants.DEFAULT_TRASH);
if (enableTrash) {
if (!isFsBucket()) {
String errorMsg = String.format(
"The bucket [%s] is not posix. not supported for "
+ "trash.", bucket);
LOG.warn(errorMsg);
enableTrash = false;
trashDir = null;
} else {
trashDir = conf.get(OBSConstants.TRASH_DIR);
if (StringUtils.isEmpty(trashDir)) {
String errorMsg =
String.format(
"The trash feature(fs.obs.trash.enable) is "
+ "enabled, but the "
+ "configuration(fs.obs.trash.dir [%s]) "
+ "is empty.",
trashDir);
LOG.error(errorMsg);
throw new ObsException(errorMsg);
}
trashDir = OBSCommonUtils.maybeAddBeginningSlash(trashDir);
trashDir = OBSCommonUtils.maybeAddTrailingSlash(trashDir);
}
}
} catch (ObsException e) {
throw OBSCommonUtils.translateException("initializing ",
new Path(name), e);
}
}
private void initThreadPools(final Configuration conf) {
long keepAliveTime = OBSCommonUtils.longOption(conf,
OBSConstants.KEEPALIVE_TIME,
OBSConstants.DEFAULT_KEEPALIVE_TIME, 0);
int maxThreads = conf.getInt(OBSConstants.MAX_THREADS,
OBSConstants.DEFAULT_MAX_THREADS);
if (maxThreads < 2) {
LOG.warn(OBSConstants.MAX_THREADS
+ " must be at least 2: forcing to 2.");
maxThreads = 2;
}
int totalTasks = OBSCommonUtils.intOption(conf,
OBSConstants.MAX_TOTAL_TASKS,
OBSConstants.DEFAULT_MAX_TOTAL_TASKS, 1);
boundedMultipartUploadThreadPool =
BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks,
keepAliveTime,
TimeUnit.SECONDS,
"obs-transfer-shared");
int maxDeleteThreads = conf.getInt(OBSConstants.MAX_DELETE_THREADS,
OBSConstants.DEFAULT_MAX_DELETE_THREADS);
if (maxDeleteThreads < 2) {
LOG.warn(OBSConstants.MAX_DELETE_THREADS
+ " must be at least 2: forcing to 2.");
maxDeleteThreads = 2;
}
int coreDeleteThreads = (int) Math.ceil(maxDeleteThreads / 2.0);
boundedDeleteThreadPool =
new ThreadPoolExecutor(
coreDeleteThreads,
maxDeleteThreads,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
"obs-delete-transfer-shared"));
boundedDeleteThreadPool.allowCoreThreadTimeOut(true);
if (enablePosix) {
obsClientDFSListEnable = conf.getBoolean(
OBSConstants.OBS_CLIENT_DFS_LIST_ENABLE, true);
if (obsClientDFSListEnable) {
int coreListThreads = conf.getInt(
OBSConstants.CORE_LIST_THREADS,
OBSConstants.DEFAULT_CORE_LIST_THREADS);
int maxListThreads = conf.getInt(OBSConstants.MAX_LIST_THREADS,
OBSConstants.DEFAULT_MAX_LIST_THREADS);
int listWorkQueueCapacity = conf.getInt(
OBSConstants.LIST_WORK_QUEUE_CAPACITY,
OBSConstants.DEFAULT_LIST_WORK_QUEUE_CAPACITY);
listParallelFactor = conf.getInt(
OBSConstants.LIST_PARALLEL_FACTOR,
OBSConstants.DEFAULT_LIST_PARALLEL_FACTOR);
if (listParallelFactor < 1) {
LOG.warn(OBSConstants.LIST_PARALLEL_FACTOR
+ " must be at least 1: forcing to 1.");
listParallelFactor = 1;
}
boundedListThreadPool =
new ThreadPoolExecutor(
coreListThreads,
maxListThreads,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(listWorkQueueCapacity),
BlockingThreadPoolExecutorService
.newDaemonThreadFactory(
"obs-list-transfer-shared"));
boundedListThreadPool.allowCoreThreadTimeOut(true);
}
} else {
int maxCopyThreads = conf.getInt(OBSConstants.MAX_COPY_THREADS,
OBSConstants.DEFAULT_MAX_COPY_THREADS);
if (maxCopyThreads < 2) {
LOG.warn(OBSConstants.MAX_COPY_THREADS
+ " must be at least 2: forcing to 2.");
maxCopyThreads = 2;
}
int coreCopyThreads = (int) Math.ceil(maxCopyThreads / 2.0);
boundedCopyThreadPool =
new ThreadPoolExecutor(
coreCopyThreads,
maxCopyThreads,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
"obs-copy-transfer-shared"));
boundedCopyThreadPool.allowCoreThreadTimeOut(true);
copyPartSize = OBSCommonUtils.longOption(conf,
OBSConstants.COPY_PART_SIZE,
OBSConstants.DEFAULT_COPY_PART_SIZE, 0);
if (copyPartSize > OBSConstants.MAX_COPY_PART_SIZE) {
LOG.warn(
"obs: {} capped to ~5GB (maximum allowed part size with "
+ "current output mechanism)",
OBSConstants.COPY_PART_SIZE);
copyPartSize = OBSConstants.MAX_COPY_PART_SIZE;
}
int maxCopyPartThreads = conf.getInt(
OBSConstants.MAX_COPY_PART_THREADS,
OBSConstants.DEFAULT_MAX_COPY_PART_THREADS);
if (maxCopyPartThreads < 2) {
LOG.warn(OBSConstants.MAX_COPY_PART_THREADS
+ " must be at least 2: forcing to 2.");
maxCopyPartThreads = 2;
}
int coreCopyPartThreads = (int) Math.ceil(maxCopyPartThreads / 2.0);
boundedCopyPartThreadPool =
new ThreadPoolExecutor(
coreCopyPartThreads,
maxCopyPartThreads,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
"obs-copy-part-transfer-shared"));
boundedCopyPartThreadPool.allowCoreThreadTimeOut(true);
}
}
/**
* Is posix bucket or not.
*
* @return is it posix bucket
*/
boolean isFsBucket() {
return enablePosix;
}
/**
* Get read transform switch stat.
*
* @return is read transform enabled
*/
boolean isReadTransformEnabled() {
return readTransformEnable;
}
/**
* Initialize bucket acl for upload, write operation.
*
* @param conf the configuration to use for the FS.
*/
private void initCannedAcls(final Configuration conf) {
// No canned acl in obs
String cannedACLName = conf.get(OBSConstants.CANNED_ACL,
OBSConstants.DEFAULT_CANNED_ACL);
if (!cannedACLName.isEmpty()) {
switch (cannedACLName) {
case "Private":
case "PublicRead":
case "PublicReadWrite":
case "AuthenticatedRead":
case "LogDeliveryWrite":
case "BucketOwnerRead":
case "BucketOwnerFullControl":
cannedACL = new AccessControlList();
break;
default:
cannedACL = null;
}
} else {
cannedACL = null;
}
}
/**
* Get the bucket acl of user setting.
*
* @return bucket acl {@link AccessControlList}
*/
AccessControlList getCannedACL() {
return cannedACL;
}
/**
* Return the protocol scheme for the FileSystem.
*
* @return "obs"
*/
@Override
public String getScheme() {
return "obs";
}
/**
* Return a URI whose scheme and authority identify this FileSystem.
*
* @return the URI of this filesystem.
*/
@Override
public URI getUri() {
return uri;
}
/**
* Return the default port for this FileSystem.
*
* @return -1 to indicate the port is undefined, which agrees with the
* contract of {@link URI#getPort()}
*/
@Override
public int getDefaultPort() {
return OBSConstants.OBS_DEFAULT_PORT;
}
/**
* Return the OBS client used by this filesystem.
*
* @return OBS client
*/
@VisibleForTesting
ObsClient getObsClient() {
return obs;
}
/**
* Return the read ahead range used by this filesystem.
*
* @return read ahead range
*/
@VisibleForTesting
long getReadAheadRange() {
return readAheadRange;
}
/**
* Return the bucket of this filesystem.
*
* @return the bucket
*/
String getBucket() {
return bucket;
}
/**
* Check that a Path belongs to this FileSystem. Unlike the superclass, this
* version does not look at authority, but only hostname.
*
* @param path the path to check
* @throws IllegalArgumentException if there is an FS mismatch
*/
@Override
public void checkPath(final Path path) {
OBSLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort());
}
/**
* Canonicalize the given URI.
*
* @param rawUri the URI to be canonicalized
* @return the canonicalized URI
*/
@Override
protected URI canonicalizeUri(final URI rawUri) {
return OBSLoginHelper.canonicalizeUri(rawUri, getDefaultPort());
}
/**
* Open an FSDataInputStream at the indicated Path.
*
* @param f the file path to open
* @param bufferSize the size of the buffer to be used
* @return the FSDataInputStream for the file
* @throws IOException on any failure to open the file
*/
@Override
public FSDataInputStream open(final Path f, final int bufferSize)
throws IOException {
LOG.debug("Opening '{}' for reading.", f);
final FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException(
"Can't open " + f + " because it is a directory");
}
return new FSDataInputStream(
new OBSInputStream(bucket, OBSCommonUtils.pathToKey(this, f),
fileStatus.getLen(),
obs, statistics, readAheadRange, this));
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
*
* @param f the file path to create
* @param permission the permission to set
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will
* be thrown
* @param bufferSize the size of the buffer to be used
* @param replication required block replication for the file
* @param blkSize the requested block size
* @param progress the progress reporter
* @throws IOException on any failure to create the file
* @see #setPermission(Path, FsPermission)
*/
@Override
public FSDataOutputStream create(
final Path f,
final FsPermission permission,
final boolean overwrite,
final int bufferSize,
final short replication,
final long blkSize,
final Progressable progress)
throws IOException {
String key = OBSCommonUtils.pathToKey(this, f);
FileStatus status;
long objectLen = 0;
try {
// get the status or throw an exception
status = getFileStatus(f);
objectLen = status.getLen();
// if the thread reaches here, there is something at the path
if (status.isDirectory()) {
// path references a directory: automatic error
throw new FileAlreadyExistsException(f + " is a directory");
}
if (!overwrite) {
// path references a file and overwrite is disabled
throw new FileAlreadyExistsException(f + " already exists");
}
LOG.debug("create: Overwriting file {}", f);
} catch (FileNotFoundException e) {
// this means the file is not found
LOG.debug("create: Creating new file {}", f);
}
return new FSDataOutputStream(
new OBSBlockOutputStream(
this,
key,
objectLen,
new SemaphoredDelegatingExecutor(
boundedMultipartUploadThreadPool,
blockOutputActiveBlocks, true),
false),
null);
}
/**
* Return the part size for multipart upload used by {@link
* OBSBlockOutputStream}.
*
* @return the part size
*/
long getPartSize() {
return partSize;
}
/**
* Return the block factory used by {@link OBSBlockOutputStream}.
*
* @return the block factory
*/
OBSDataBlocks.BlockFactory getBlockFactory() {
return blockFactory;
}
/**
* Return the write helper used by {@link OBSBlockOutputStream}.
*
* @return the write helper
*/
OBSWriteOperationHelper getWriteHelper() {
return writeHelper;
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
*
* @param f the file name to create
* @param permission permission of
* @param flags {@link CreateFlag}s to use for this stream
* @param bufferSize the size of the buffer to be used
* @param replication required block replication for the file
* @param blkSize block size
* @param progress progress
* @param checksumOpt check sum option
* @throws IOException io exception
*/
@Override
@SuppressWarnings("checkstyle:parameternumber")
public FSDataOutputStream create(
final Path f,
final FsPermission permission,
final EnumSet<CreateFlag> flags,
final int bufferSize,
final short replication,
final long blkSize,
final Progressable progress,
final ChecksumOpt checksumOpt)
throws IOException {
LOG.debug("create: Creating new file {}, flags:{}, isFsBucket:{}", f,
flags, isFsBucket());
if (null != flags && flags.contains(CreateFlag.APPEND)) {
if (!isFsBucket()) {
throw new UnsupportedOperationException(
"non-posix bucket. Append is not supported by "
+ "OBSFileSystem");
}
String key = OBSCommonUtils.pathToKey(this, f);
FileStatus status;
long objectLen = 0;
try {
// get the status or throw an FNFE
status = getFileStatus(f);
objectLen = status.getLen();
// if the thread reaches here, there is something at the path
if (status.isDirectory()) {
// path references a directory: automatic error
throw new FileAlreadyExistsException(f + " is a directory");
}
} catch (FileNotFoundException e) {
LOG.debug("FileNotFoundException, create: Creating new file {}",
f);
}
return new FSDataOutputStream(
new OBSBlockOutputStream(
this,
key,
objectLen,
new SemaphoredDelegatingExecutor(
boundedMultipartUploadThreadPool,
blockOutputActiveBlocks, true),
true),
null);
} else {
return create(
f,
permission,
flags == null || flags.contains(CreateFlag.OVERWRITE),
bufferSize,
replication,
blkSize,
progress);
}
}
/**
* Open an FSDataOutputStream at the indicated Path with write-progress
* reporting. Same as create(), except fails if parent directory doesn't
* already exist.
*
* @param path the file path to create
* @param permission file permission
* @param flags {@link CreateFlag}s to use for this stream
* @param bufferSize the size of the buffer to be used
* @param replication required block replication for the file
* @param blkSize block size
* @param progress the progress reporter
* @throws IOException IO failure
*/
@Override
public FSDataOutputStream createNonRecursive(
final Path path,
final FsPermission permission,
final EnumSet<CreateFlag> flags,
final int bufferSize,
final short replication,
final long blkSize,
final Progressable progress)
throws IOException {
Path parent = path.getParent();
if (parent != null && !getFileStatus(parent).isDirectory()) {
// expect this to raise an exception if there is no parent
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
return create(
path,
permission,
flags.contains(CreateFlag.OVERWRITE),
bufferSize,
replication,
blkSize,
progress);
}
/**
* Append to an existing file (optional operation).
*
* @param f the existing file to be appended
* @param bufferSize the size of the buffer to be used
* @param progress for reporting progress if it is not null
* @throws IOException indicating that append is not supported
*/
@Override
public FSDataOutputStream append(final Path f, final int bufferSize,
final Progressable progress)
throws IOException {
if (!isFsBucket()) {
throw new UnsupportedOperationException(
"non-posix bucket. Append is not supported "
+ "by OBSFileSystem");
}
LOG.debug("append: Append file {}.", f);
String key = OBSCommonUtils.pathToKey(this, f);
// get the status or throw an FNFE
FileStatus status = getFileStatus(f);
long objectLen = status.getLen();
// if the thread reaches here, there is something at the path
if (status.isDirectory()) {
// path references a directory: automatic error
throw new FileAlreadyExistsException(f + " is a directory");
}
return new FSDataOutputStream(
new OBSBlockOutputStream(
this,
key,
objectLen,
new SemaphoredDelegatingExecutor(
boundedMultipartUploadThreadPool,
blockOutputActiveBlocks, true),
true),
null);
}
/**
* Check if a path exists.
*
* @param f source path
* @return true if the path exists
* @throws IOException IO failure
*/
@Override
public boolean exists(final Path f) throws IOException {
try {
return getFileStatus(f) != null;
} catch (FileNotFoundException | FileConflictException e) {
return false;
}
}
/**
* Rename Path src to Path dst.
*
* @param src path to be renamed
* @param dst new path after rename
* @return true if rename is successful
* @throws IOException on IO failure
*/
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
long startTime = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
LOG.debug("Rename path {} to {} start", src, dst);
try {
if (enablePosix) {
return OBSPosixBucketUtils.renameBasedOnPosix(this, src, dst);
} else {
return OBSObjectBucketUtils.renameBasedOnObject(this, src, dst);
}
} catch (ObsException e) {
throw OBSCommonUtils.translateException(
"rename(" + src + ", " + dst + ")", src, e);
} catch (RenameFailedException e) {
LOG.error(e.getMessage());
return e.getExitCode();
} catch (FileNotFoundException e) {
LOG.error(e.toString());
return false;
} finally {
long endTime = System.currentTimeMillis();
LOG.debug(
"Rename path {} to {} finished, thread:{}, "
+ "timeUsedInMilliSec:{}.", src, dst, threadId,
endTime - startTime);
}
}
/**
* Return maximum number of entries in one multi-object delete call.
*
* @return the maximum number of entries in one multi-object delete call
*/
int getMaxEntriesToDelete() {
return maxEntriesToDelete;
}
/**
* Return list parallel factor.
*
* @return the list parallel factor
*/
int getListParallelFactor() {
return listParallelFactor;
}
/**
* Return bounded thread pool for list.
*
* @return bounded thread pool for list
*/
ThreadPoolExecutor getBoundedListThreadPool() {
return boundedListThreadPool;
}
/**
* Return a flag that indicates if OBS client specific depth first search
* (DFS) list is enabled.
*
* @return the flag
*/
boolean isObsClientDFSListEnable() {
return obsClientDFSListEnable;
}
/**
* Return the {@link Statistics} instance used by this filesystem.
*
* @return the used {@link Statistics} instance
*/
Statistics getSchemeStatistics() {
return statistics;
}
/**
* Return the minimum number of objects in one multi-object delete call.
*
* @return the minimum number of objects in one multi-object delete call
*/
int getMultiDeleteThreshold() {
return multiDeleteThreshold;
}
/**
* Return a flag that indicates if multi-object delete is enabled.
*
* @return the flag
*/
boolean isEnableMultiObjectDelete() {
return enableMultiObjectDelete;
}
/**
* Delete a Path. This operation is at least {@code O(files)}, with added
* overheads to enumerate the path. It is also not atomic.
*
* @param f the path to delete
* @param recursive if path is a directory and set to true, the directory is
* deleted else throws an exception. In case of a file the
* recursive can be set to either true or false
* @return true if delete is successful else false
* @throws IOException due to inability to delete a directory or file
*/
@Override
public boolean delete(final Path f, final boolean recursive)
throws IOException {
try {
FileStatus status = getFileStatus(f);
LOG.debug("delete: path {} - recursive {}", status.getPath(),
recursive);
if (enablePosix) {
return OBSPosixBucketUtils.fsDelete(this, status, recursive);
}
return OBSObjectBucketUtils.objectDelete(this, status, recursive);
} catch (FileNotFoundException e) {
LOG.warn("Couldn't delete {} - does not exist", f);
return false;
} catch (ObsException e) {
throw OBSCommonUtils.translateException("delete", f, e);
}
}
/**
* Return a flag that indicates if fast delete is enabled.
*
* @return the flag
*/
boolean isEnableTrash() {
return enableTrash;
}
/**
* Return trash directory for fast delete.
*
* @return the trash directory
*/
String getTrashDir() {
return trashDir;
}
/**
* Return a flag that indicates if multi-object delete recursion is enabled.
*
* @return the flag
*/
boolean isEnableMultiObjectDeleteRecursion() {
return enableMultiObjectDeleteRecursion;
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
@Override
public FileStatus[] listStatus(final Path f)
throws FileNotFoundException, IOException {
long startTime = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
try {
FileStatus[] statuses = OBSCommonUtils.innerListStatus(this, f,
false);
long endTime = System.currentTimeMillis();
LOG.debug(
"List status for path:{}, thread:{}, timeUsedInMilliSec:{}", f,
threadId, endTime - startTime);
return statuses;
} catch (ObsException e) {
throw OBSCommonUtils.translateException("listStatus", f, e);
}
}
/**
* This public interface is provided specially for Huawei MRS. List the
* statuses of the files/directories in the given path if the path is a
* directory. When recursive is true, iterator all objects in the given path
* and its sub directories.
*
* @param f given path
* @param recursive whether to iterator objects in sub direcotries
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public FileStatus[] listStatus(final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
long startTime = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
try {
FileStatus[] statuses = OBSCommonUtils.innerListStatus(this, f,
recursive);
long endTime = System.currentTimeMillis();
LOG.debug(
"List status for path:{}, thread:{}, timeUsedInMilliSec:{}", f,
threadId, endTime - startTime);
return statuses;
} catch (ObsException e) {
throw OBSCommonUtils.translateException(
"listStatus with recursive flag["
+ (recursive ? "true] " : "false] "), f, e);
}
}
/**
* Return the OBSListing instance used by this filesystem.
*
* @return the OBSListing instance
*/
OBSListing getObsListing() {
return obsListing;
}
/**
* Return the current working directory for the given file system.
*
* @return the directory pathname
*/
@Override
public Path getWorkingDirectory() {
return workingDir;
}
/**
* Set the current working directory for the file system. All relative paths
* will be resolved relative to it.
*
* @param newDir the new working directory
*/
@Override
public void setWorkingDirectory(final Path newDir) {
workingDir = newDir;
}
/**
* Return the username of the filesystem.
*
* @return the short name of the user who instantiated the filesystem
*/
String getUsername() {
return username;
}
/**
* Make the given path and all non-existent parents into directories. Has the
* semantics of Unix {@code 'mkdir -p'}. Existence of the directory hierarchy
* is not an error.
*
* @param path path to create
* @param permission to apply to f
* @return true if a directory was created
* @throws FileAlreadyExistsException there is a file at the path specified
* @throws IOException other IO problems
*/
@Override
public boolean mkdirs(final Path path, final FsPermission permission)
throws IOException, FileAlreadyExistsException {
try {
return OBSCommonUtils.innerMkdirs(this, path);
} catch (ObsException e) {
throw OBSCommonUtils.translateException("mkdirs", path, e);
}
}
/**
* Return a file status object that represents the path.
*
* @param f the path we want information from
* @return a FileStatus object
* @throws FileNotFoundException when the path does not exist
* @throws IOException on other problems
*/
@Override
public FileStatus getFileStatus(final Path f)
throws FileNotFoundException, IOException {
for (int retryTime = 1;
retryTime < OBSCommonUtils.MAX_RETRY_TIME; retryTime++) {
try {
return innerGetFileStatus(f);
} catch (FileNotFoundException | FileConflictException e) {
throw e;
} catch (IOException e) {
LOG.warn("Failed to get file status for [{}], retry time [{}], "
+ "exception [{}]", f, retryTime, e);
try {
Thread.sleep(OBSCommonUtils.DELAY_TIME);
} catch (InterruptedException ie) {
throw e;
}
}
}
return innerGetFileStatus(f);
}
/**
* Inner implementation without retry for {@link #getFileStatus(Path)}.
*
* @param f the path we want information from
* @return a FileStatus object
* @throws IOException on IO failure
*/
@VisibleForTesting
OBSFileStatus innerGetFileStatus(final Path f) throws IOException {
if (enablePosix) {
return OBSPosixBucketUtils.innerFsGetObjectStatus(this, f);
}
return OBSObjectBucketUtils.innerGetObjectStatus(this, f);
}
/**
* Return the {@link ContentSummary} of a given {@link Path}.
*
* @param f path to use
* @return the {@link ContentSummary}
* @throws FileNotFoundException if the path does not resolve
* @throws IOException IO failure
*/
@Override
public ContentSummary getContentSummary(final Path f)
throws FileNotFoundException, IOException {
if (!obsContentSummaryEnable) {
return super.getContentSummary(f);
}
FileStatus status = getFileStatus(f);
if (status.isFile()) {
// f is a file
long length = status.getLen();
return new ContentSummary.Builder().length(length)
.fileCount(1).directoryCount(0).spaceConsumed(length).build();
}
// f is a directory
if (enablePosix) {
return OBSPosixBucketUtils.fsGetDirectoryContentSummary(this,
OBSCommonUtils.pathToKey(this, f));
} else {
return OBSObjectBucketUtils.getDirectoryContentSummary(this,
OBSCommonUtils.pathToKey(this, f));
}
}
/**
* Copy the {@code src} file on the local disk to the filesystem at the given
* {@code dst} name.
*
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src path
* @param dst path
* @throws FileAlreadyExistsException if the destination file exists and
* overwrite == false
* @throws IOException IO problem
*/
@Override
public void copyFromLocalFile(final boolean delSrc, final boolean overwrite,
final Path src, final Path dst) throws FileAlreadyExistsException,
IOException {
try {
super.copyFromLocalFile(delSrc, overwrite, src, dst);
} catch (ObsException e) {
throw OBSCommonUtils.translateException(
"copyFromLocalFile(" + src + ", " + dst + ")", src, e);
}
}
/**
* Close the filesystem. This shuts down all transfers.
*
* @throws IOException IO problem
*/
@Override
public void close() throws IOException {
LOG.debug("This Filesystem closed by user, clear resource.");
if (closed.getAndSet(true)) {
// already closed
return;
}
try {
super.close();
} finally {
OBSCommonUtils.shutdownAll(
boundedMultipartUploadThreadPool,
boundedCopyThreadPool,
boundedDeleteThreadPool,
boundedCopyPartThreadPool,
boundedListThreadPool);
}
}
/**
* Override {@code getCanonicalServiceName} and return {@code null} since
* delegation token is not supported.
*/
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
/**
* Return copy part size.
*
* @return copy part size
*/
long getCopyPartSize() {
return copyPartSize;
}
/**
* Return bounded thread pool for copy part.
*
* @return the bounded thread pool for copy part
*/
ThreadPoolExecutor getBoundedCopyPartThreadPool() {
return boundedCopyPartThreadPool;
}
/**
* Return bounded thread pool for copy.
*
* @return the bounded thread pool for copy
*/
ThreadPoolExecutor getBoundedCopyThreadPool() {
return boundedCopyThreadPool;
}
/**
* Imitate HDFS to return the number of bytes that large input files should be
* optimally split into to minimize I/O time for compatibility.
*
* @deprecated use {@link #getDefaultBlockSize(Path)} instead
*/
@Override
public long getDefaultBlockSize() {
return blockSize;
}
/**
* Imitate HDFS to return the number of bytes that large input files should be
* optimally split into to minimize I/O time. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
*
* @param f path of file
* @return the default block size for the path's filesystem
*/
@Override
public long getDefaultBlockSize(final Path f) {
return blockSize;
}
/**
* Return a string that describes this filesystem instance.
*
* @return the string
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("OBSFileSystem{");
sb.append("uri=").append(uri);
sb.append(", workingDir=").append(workingDir);
sb.append(", partSize=").append(partSize);
sb.append(", enableMultiObjectsDelete=")
.append(enableMultiObjectDelete);
sb.append(", maxKeys=").append(maxKeys);
if (cannedACL != null) {
sb.append(", cannedACL=").append(cannedACL.toString());
}
sb.append(", readAheadRange=").append(readAheadRange);
sb.append(", blockSize=").append(getDefaultBlockSize());
if (blockFactory != null) {
sb.append(", blockFactory=").append(blockFactory);
}
sb.append(", boundedMultipartUploadThreadPool=")
.append(boundedMultipartUploadThreadPool);
sb.append(", statistics {").append(statistics).append("}");
sb.append(", metrics {").append("}");
sb.append('}');
return sb.toString();
}
/**
* Return the maximum number of keys to get while paging through a directory
* listing.
*
* @return the maximum number of keys
*/
int getMaxKeys() {
return maxKeys;
}
/**
* List the statuses and block locations of the files in the given path. Does
* not guarantee to return the iterator that traverses statuses of the files
* in a sorted order.
*
* <pre>
* If the path is a directory,
* if recursive is false, returns files in the directory;
* if recursive is true, return files in the subtree rooted at the path.
* If the path is a file, return the file's status and block locations.
* </pre>
*
* @param f a path
* @param recursive if the subdirectories need to be traversed recursively
* @return an iterator that traverses statuses of the files/directories in the
* given path
* @throws FileNotFoundException if {@code path} does not exist
* @throws IOException if any I/O error occurred
*/
@Override
public RemoteIterator<LocatedFileStatus> listFiles(final Path f,
final boolean recursive)
throws FileNotFoundException, IOException {
Path path = OBSCommonUtils.qualify(this, f);
LOG.debug("listFiles({}, {})", path, recursive);
try {
// lookup dir triggers existence check
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isFile()) {
// simple case: File
LOG.debug("Path is a file");
return new OBSListing
.SingleStatusRemoteIterator(
OBSCommonUtils.toLocatedFileStatus(this, fileStatus));
} else {
LOG.debug(
"listFiles: doing listFiles of directory {} - recursive {}",
path, recursive);
// directory: do a bulk operation
String key = OBSCommonUtils.maybeAddTrailingSlash(
OBSCommonUtils.pathToKey(this, path));
String delimiter = recursive ? null : "/";
LOG.debug("Requesting all entries under {} with delimiter '{}'",
key, delimiter);
return obsListing.createLocatedFileStatusIterator(
obsListing.createFileStatusListingIterator(
path,
OBSCommonUtils.createListObjectsRequest(this, key,
delimiter),
OBSListing.ACCEPT_ALL,
new OBSListing.AcceptFilesOnly(path)));
}
} catch (ObsException e) {
throw OBSCommonUtils.translateException("listFiles", path, e);
}
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory. Return the file's status and block locations If the path is a
* file.
* <p>
* If a returned status is a file, it contains the file's block locations.
*
* @param f is the path
* @return an iterator that traverses statuses of the files/directories in the
* given path
* @throws FileNotFoundException If <code>f</code> does not exist
* @throws IOException If an I/O error occurred
*/
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws FileNotFoundException, IOException {
return listLocatedStatus(f,
OBSListing.ACCEPT_ALL);
}
/**
* List a directory. The returned results include its block location if it is
* a file The results are filtered by the given path filter
*
* @param f a path
* @param filter a path filter
* @return an iterator that traverses statuses of the files/directories in the
* given path
* @throws FileNotFoundException if <code>f</code> does not exist
* @throws IOException if any I/O error occurred
*/
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter)
throws FileNotFoundException, IOException {
Path path = OBSCommonUtils.qualify(this, f);
LOG.debug("listLocatedStatus({}, {}", path, filter);
try {
// lookup dir triggers existence check
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isFile()) {
// simple case: File
LOG.debug("Path is a file");
return new OBSListing.SingleStatusRemoteIterator(
filter.accept(path) ? OBSCommonUtils.toLocatedFileStatus(
this, fileStatus) : null);
} else {
// directory: trigger a lookup
String key = OBSCommonUtils.maybeAddTrailingSlash(
OBSCommonUtils.pathToKey(this, path));
return obsListing.createLocatedFileStatusIterator(
obsListing.createFileStatusListingIterator(
path,
OBSCommonUtils.createListObjectsRequest(this, key, "/"),
filter,
new OBSListing.AcceptAllButSelfAndS3nDirs(path)));
}
} catch (ObsException e) {
throw OBSCommonUtils.translateException("listLocatedStatus", path,
e);
}
}
/**
* Return server-side encryption wrapper used by this filesystem instance.
*
* @return the server-side encryption wrapper
*/
SseWrapper getSse() {
return sse;
}
}
相关信息
相关文章
hadoop BasicSessionCredential 源码
hadoop DefaultOBSClientFactory 源码
hadoop FileConflictException 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦