hadoop FileSystem 源码
haddop FileSystem 代码
文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.lang.ref.ReferenceQueue;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
* may be implemented as a distributed filesystem, or as a "local"
* one that reflects the locally-connected disk. The local version
* exists for small Hadoop instances and for testing.
*
* <p>
*
* All user code that may potentially use the Hadoop Distributed
* File System should be written to use a FileSystem object or its
* successor, {@link FileContext}.
* </p>
* <p>
* The local implementation is {@link LocalFileSystem} and distributed
* implementation is DistributedFileSystem. There are other implementations
* for object stores and (outside the Apache Hadoop codebase),
* third party filesystems.
* </p>
* Notes
* <ol>
* <li>The behaviour of the filesystem is
* <a href="https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html">
* specified in the Hadoop documentation. </a>
* However, the normative specification of the behavior of this class is
* actually HDFS: if HDFS does not behave the way these Javadocs or
* the specification in the Hadoop documentations define, assume that
* the documentation is incorrect.
* </li>
* <li>The term {@code FileSystem} refers to an instance of this class.</li>
* <li>The acronym "FS" is used as an abbreviation of FileSystem.</li>
* <li>The term {@code filesystem} refers to the distributed/local filesystem
* itself, rather than the class used to interact with it.</li>
* <li>The term "file" refers to a file in the remote filesystem,
* rather than instances of {@code java.io.File}.</li>
* </ol>
*
* This is a carefully evolving class.
* New methods may be marked as Unstable or Evolving for their initial release,
* as a warning that they are new and may change based on the
* experience of use in applications.
* <p>
* <b>Important note for developers</b>
* </p>
* If you are making changes here to the public API or protected methods,
* you must review the following subclasses and make sure that
* they are filtering/passing through new methods as appropriate.
*
* {@link FilterFileSystem}: methods are passed through. If not,
* then {@code TestFilterFileSystem.MustNotImplement} must be
* updated with the unsupported interface.
* Furthermore, if the new API's support is probed for via
* {@link #hasPathCapability(Path, String)} then
* {@link FilterFileSystem#hasPathCapability(Path, String)}
* must return false, always.
* <p>
* {@link ChecksumFileSystem}: checksums are created and
* verified.
* </p>
* {@code TestHarFileSystem} will need its {@code MustNotImplement}
* interface updated.
*
* <p>
* There are some external places your changes will break things.
* Do co-ordinate changes here.
* </p>
*
* HBase: HBoss
* <p>
* Hive: HiveShim23
* </p>
* {@code shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java}
*
*****************************************************************/
@SuppressWarnings("DeprecatedIsStillUsed")
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured
implements Closeable, DelegationTokenIssuer, PathCapabilities {
public static final String FS_DEFAULT_NAME_KEY =
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
public static final String DEFAULT_FS =
CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
/**
* This log is widely used in the org.apache.hadoop.fs code and tests,
* so must be considered something to only be changed with care.
*/
@InterfaceAudience.Private
public static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
/**
* The SLF4J logger to use in logging within the FileSystem class itself.
*/
private static final Logger LOGGER =
LoggerFactory.getLogger(FileSystem.class);
/**
* Priority of the FileSystem shutdown hook: {@value}.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
/**
* Prefix for trash directory: {@value}.
*/
public static final String TRASH_PREFIX = ".Trash";
public static final String USER_HOME_PREFIX = "/user";
/** FileSystem cache. */
static final Cache CACHE = new Cache(new Configuration());
/** The key this instance is stored under in the cache. */
private Cache.Key key;
/** Recording statistics per a FileSystem class. */
private static final Map<Class<? extends FileSystem>, Statistics>
statisticsTable = new IdentityHashMap<>();
/**
* The statistics for this file system.
*/
protected Statistics statistics;
/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
*/
private final Set<Path> deleteOnExit = new TreeSet<>();
/**
* Should symbolic links be resolved by {@link FileSystemLinkResolver}.
* Set to the value of
* {@link CommonConfigurationKeysPublic#FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY}
*/
boolean resolveSymlinks;
/**
* This method adds a FileSystem instance to the cache so that it can
* be retrieved later. It is only for testing.
* @param uri the uri to store it under
* @param conf the configuration to store it under
* @param fs the FileSystem to store
* @throws IOException if the current user cannot be determined.
*/
@VisibleForTesting
static void addFileSystemForTesting(URI uri, Configuration conf,
FileSystem fs) throws IOException {
CACHE.map.put(new Cache.Key(uri, conf), fs);
}
@VisibleForTesting
static void removeFileSystemForTesting(URI uri, Configuration conf,
FileSystem fs) throws IOException {
CACHE.map.remove(new Cache.Key(uri, conf), fs);
}
@VisibleForTesting
static int cacheSize() {
return CACHE.map.size();
}
/**
* Get a FileSystem instance based on the uri, the passed in
* configuration and the user.
* @param uri of the filesystem
* @param conf the configuration to use
* @param user to perform the get as
* @return the filesystem instance
* @throws IOException failure to load
* @throws InterruptedException If the {@code UGI.doAs()} call was
* somehow interrupted.
*/
public static FileSystem get(final URI uri, final Configuration conf,
final String user) throws IOException, InterruptedException {
String ticketCachePath =
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return get(uri, conf);
}
});
}
/**
* Returns the configured FileSystem implementation.
* @param conf the configuration to use
* @return FileSystem.
* @throws IOException If an I/O error occurred.
*/
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
/**
* Get the default FileSystem URI from a configuration.
* @param conf the configuration to use
* @return the uri of the default filesystem
*/
public static URI getDefaultUri(Configuration conf) {
URI uri =
URI.create(fixName(conf.getTrimmed(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
if (uri.getScheme() == null) {
throw new IllegalArgumentException("No scheme in default FS: " + uri);
}
return uri;
}
/**
* Set the default FileSystem URI in a configuration.
* @param conf the configuration to alter
* @param uri the new default filesystem uri
*/
public static void setDefaultUri(Configuration conf, URI uri) {
conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
}
/** Set the default FileSystem URI in a configuration.
* @param conf the configuration to alter
* @param uri the new default filesystem uri
*/
public static void setDefaultUri(Configuration conf, String uri) {
setDefaultUri(conf, URI.create(fixName(uri)));
}
/**
* Initialize a FileSystem.
*
* Called after the new FileSystem instance is constructed, and before it
* is ready for use.
*
* FileSystem implementations overriding this method MUST forward it to
* their superclass, though the order in which it is done, and whether
* to alter the configuration before the invocation are options of the
* subclass.
* @param name a URI whose authority section names the host, port, etc.
* for this FileSystem
* @param conf the configuration
* @throws IOException on any failure to initialize this instance.
* @throws IllegalArgumentException if the URI is considered invalid.
*/
public void initialize(URI name, Configuration conf) throws IOException {
final String scheme;
if (name.getScheme() == null || name.getScheme().isEmpty()) {
scheme = getDefaultUri(conf).getScheme();
} else {
scheme = name.getScheme();
}
statistics = getStatistics(scheme, getClass());
resolveSymlinks = conf.getBoolean(
CommonConfigurationKeysPublic.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
CommonConfigurationKeysPublic.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
}
/**
* Return the protocol scheme for this FileSystem.
* <p>
* This implementation throws an <code>UnsupportedOperationException</code>.
*
* @return the protocol scheme for this FileSystem.
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
public String getScheme() {
throw new UnsupportedOperationException("Not implemented by the "
+ getClass().getSimpleName() + " FileSystem implementation");
}
/**
* Returns a URI which identifies this FileSystem.
*
* @return the URI of this filesystem.
*/
public abstract URI getUri();
/**
* Return a canonicalized form of this FileSystem's URI.
*
* The default implementation simply calls {@link #canonicalizeUri(URI)}
* on the filesystem's own URI, so subclasses typically only need to
* implement that method.
*
* @see #canonicalizeUri(URI)
* @return the URI of this filesystem.
*/
protected URI getCanonicalUri() {
return canonicalizeUri(getUri());
}
/**
* Canonicalize the given URI.
*
* This is implementation-dependent, and may for example consist of
* canonicalizing the hostname using DNS and adding the default
* port if not specified.
*
* The default implementation simply fills in the default port if
* not specified and if {@link #getDefaultPort()} returns a
* default port.
*
* @param uri url.
* @return URI
* @see NetUtils#getCanonicalUri(URI, int)
*/
protected URI canonicalizeUri(URI uri) {
if (uri.getPort() == -1 && getDefaultPort() > 0) {
// reconstruct the uri with the default port set
try {
uri = new URI(uri.getScheme(), uri.getUserInfo(),
uri.getHost(), getDefaultPort(),
uri.getPath(), uri.getQuery(), uri.getFragment());
} catch (URISyntaxException e) {
// Should never happen!
throw new AssertionError("Valid URI became unparseable: " +
uri);
}
}
return uri;
}
/**
* Get the default port for this FileSystem.
* @return the default port or 0 if there isn't one
*/
protected int getDefaultPort() {
return 0;
}
protected static FileSystem getFSofPath(final Path absOrFqPath,
final Configuration conf)
throws UnsupportedFileSystemException, IOException {
absOrFqPath.checkNotSchemeWithRelative();
absOrFqPath.checkNotRelative();
// Uses the default FileSystem if not fully qualified
return get(absOrFqPath.toUri(), conf);
}
/**
* Get a canonical service name for this FileSystem.
* The token cache is the only user of the canonical service name,
* and uses it to lookup this FileSystem's service tokens.
* If the file system provides a token of its own then it must have a
* canonical name, otherwise the canonical name can be null.
*
* Default implementation: If the FileSystem has child file systems
* (such as an embedded file system) then it is assumed that the FS has no
* tokens of its own and hence returns a null name; otherwise a service
* name is built using Uri and port.
*
* @return a service string that uniquely identifies this file system, null
* if the filesystem does not implement tokens
* @see SecurityUtil#buildDTServiceName(URI, int)
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@Override
public String getCanonicalServiceName() {
return (getChildFileSystems() == null)
? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
: null;
}
/**
* @return uri to string.
* @deprecated call {@link #getUri()} instead.
*/
@Deprecated
public String getName() { return getUri().toString(); }
/**
* @deprecated call {@link #get(URI, Configuration)} instead.
*
* @param name name.
* @param conf configuration.
* @return file system.
* @throws IOException If an I/O error occurred.
*/
@Deprecated
public static FileSystem getNamed(String name, Configuration conf)
throws IOException {
return get(URI.create(fixName(name)), conf);
}
/** Update old-format filesystem names, for back-compatibility. This should
* eventually be replaced with a checkName() method that throws an exception
* for old-format names.
*/
private static String fixName(String name) {
// convert old-format name to new-format name
if (name.equals("local")) { // "local" is now "file:///".
LOGGER.warn("\"local\" is a deprecated filesystem name."
+" Use \"file:///\" instead.");
name = "file:///";
} else if (name.indexOf('/')==-1) { // unqualified is "hdfs://"
LOGGER.warn("\""+name+"\" is a deprecated filesystem name."
+" Use \"hdfs://"+name+"/\" instead.");
name = "hdfs://"+name;
}
return name;
}
/**
* Get the local FileSystem.
* @param conf the configuration to configure the FileSystem with
* if it is newly instantiated.
* @return a LocalFileSystem
* @throws IOException if somehow the local FS cannot be instantiated.
*/
public static LocalFileSystem getLocal(Configuration conf)
throws IOException {
return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
}
/**
* Get a FileSystem for this URI's scheme and authority.
* <ol>
* <li>
* If the configuration has the property
* {@code "fs.$SCHEME.impl.disable.cache"} set to true,
* a new instance will be created, initialized with the supplied URI and
* configuration, then returned without being cached.
* </li>
* <li>
* If the there is a cached FS instance matching the same URI, it will
* be returned.
* </li>
* <li>
* Otherwise: a new FS instance will be created, initialized with the
* configuration and URI, cached and returned to the caller.
* </li>
* </ol>
* @param uri uri of the filesystem.
* @param conf configrution.
* @return filesystem instance.
* @throws IOException if the FileSystem cannot be instantiated.
*/
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
/**
* Returns the FileSystem for this URI's scheme and authority and the
* given user. Internally invokes {@link #newInstance(URI, Configuration)}
* @param uri uri of the filesystem.
* @param conf the configuration to use
* @param user to perform the get as
* @return filesystem instance
* @throws IOException if the FileSystem cannot be instantiated.
* @throws InterruptedException If the {@code UGI.doAs()} call was
* somehow interrupted.
*/
public static FileSystem newInstance(final URI uri, final Configuration conf,
final String user) throws IOException, InterruptedException {
String ticketCachePath =
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return newInstance(uri, conf);
}
});
}
/**
* Returns the FileSystem for this URI's scheme and authority.
* The entire URI is passed to the FileSystem instance's initialize method.
* This always returns a new FileSystem object.
* @param uri FS URI
* @param config configuration to use
* @return the new FS instance
* @throws IOException FS creation or initialization failure.
*/
public static FileSystem newInstance(URI uri, Configuration config)
throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null) { // no scheme: use default FS
return newInstance(config);
}
if (authority == null) { // no authority
URI defaultUri = getDefaultUri(config);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return newInstance(defaultUri, config); // return default
}
}
return CACHE.getUnique(uri, config);
}
/**
* Returns a unique configured FileSystem implementation for the default
* filesystem of the supplied configuration.
* This always returns a new FileSystem object.
* @param conf the configuration to use
* @return the new FS instance
* @throws IOException FS creation or initialization failure.
*/
public static FileSystem newInstance(Configuration conf) throws IOException {
return newInstance(getDefaultUri(conf), conf);
}
/**
* Get a unique local FileSystem object.
* @param conf the configuration to configure the FileSystem with
* @return a new LocalFileSystem object.
* @throws IOException FS creation or initialization failure.
*/
public static LocalFileSystem newInstanceLocal(Configuration conf)
throws IOException {
return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
}
/**
* Close all cached FileSystem instances. After this operation, they
* may not be used in any operations.
*
* @throws IOException a problem arose closing one or more filesystem.
*/
public static void closeAll() throws IOException {
debugLogFileSystemClose("closeAll", "");
CACHE.closeAll();
}
/**
* Close all cached FileSystem instances for a given UGI.
* Be sure those filesystems are not used anymore.
* @param ugi user group info to close
* @throws IOException a problem arose closing one or more filesystem.
*/
public static void closeAllForUGI(UserGroupInformation ugi)
throws IOException {
debugLogFileSystemClose("closeAllForUGI", "UGI: " + ugi);
CACHE.closeAll(ugi);
}
private static void debugLogFileSystemClose(String methodName,
String additionalInfo) {
if (LOGGER.isDebugEnabled()) {
Throwable throwable = new Throwable().fillInStackTrace();
LOGGER.debug("FileSystem.{}() by method: {}); {}", methodName,
throwable.getStackTrace()[2], additionalInfo);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("FileSystem.{}() full stack trace:", methodName,
throwable);
}
}
}
/**
* Qualify a path to one which uses this FileSystem and, if relative,
* made absolute.
* @param path to qualify.
* @return this path if it contains a scheme and authority and is absolute, or
* a new path that includes a path and authority and is fully qualified
* @see Path#makeQualified(URI, Path)
* @throws IllegalArgumentException if the path has a schema/URI different
* from this FileSystem.
*/
public Path makeQualified(Path path) {
checkPath(path);
return path.makeQualified(this.getUri(), this.getWorkingDirectory());
}
/**
* Get a new delegation token for this FileSystem.
* This is an internal method that should have been declared protected
* but wasn't historically.
* Callers should use {@link #addDelegationTokens(String, Credentials)}
*
* @param renewer the account name that is allowed to renew the token.
* @return a new delegation token or null if the FS does not support tokens.
* @throws IOException on any problem obtaining a token
*/
@InterfaceAudience.Private()
@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
return null;
}
/**
* Get all the immediate child FileSystems embedded in this FileSystem.
* It does not recurse and get grand children. If a FileSystem
* has multiple child FileSystems, then it must return a unique list
* of those FileSystems. Default is to return null to signify no children.
*
* @return FileSystems that are direct children of this FileSystem,
* or null for "no children"
*/
@InterfaceAudience.LimitedPrivate({ "HDFS" })
@VisibleForTesting
public FileSystem[] getChildFileSystems() {
return null;
}
@InterfaceAudience.Private
@Override
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException {
return getChildFileSystems();
}
/**
* Create a file with the provided permission.
*
* The permission of the file is set to be the provided permission as in
* setPermission, not permission{@literal &~}umask
*
* The HDFS implementation is implemented using two RPCs.
* It is understood that it is inefficient,
* but the implementation is thread-safe. The other option is to change the
* value of umask in configuration to be 0, but it is not thread-safe.
*
* @param fs FileSystem
* @param file the name of the file to be created
* @param permission the permission of the file
* @return an output stream
* @throws IOException IO failure
*/
public static FSDataOutputStream create(FileSystem fs,
Path file, FsPermission permission) throws IOException {
// create the file with default permission
FSDataOutputStream out = fs.create(file);
// set its permission to the supplied one
fs.setPermission(file, permission);
return out;
}
/**
* Create a directory with the provided permission.
* The permission of the directory is set to be the provided permission as in
* setPermission, not permission{@literal &~}umask
*
* @see #create(FileSystem, Path, FsPermission)
*
* @param fs FileSystem handle
* @param dir the name of the directory to be created
* @param permission the permission of the directory
* @return true if the directory creation succeeds; false otherwise
* @throws IOException A problem creating the directories.
*/
public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
throws IOException {
// create the directory using the default permission
boolean result = fs.mkdirs(dir);
// set its permission to be the supplied one
fs.setPermission(dir, permission);
return result;
}
///////////////////////////////////////////////////////////////
// FileSystem
///////////////////////////////////////////////////////////////
protected FileSystem() {
super(null);
}
/**
* Check that a Path belongs to this FileSystem.
*
* The base implementation performs case insensitive equality checks
* of the URIs' schemes and authorities. Subclasses may implement slightly
* different checks.
* @param path to check
* @throws IllegalArgumentException if the path is not considered to be
* part of this FileSystem.
*
*/
protected void checkPath(Path path) {
Preconditions.checkArgument(path != null, "null path");
URI uri = path.toUri();
String thatScheme = uri.getScheme();
if (thatScheme == null) // fs is relative
return;
URI thisUri = getCanonicalUri();
String thisScheme = thisUri.getScheme();
//authority and scheme are not case sensitive
if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
String thisAuthority = thisUri.getAuthority();
String thatAuthority = uri.getAuthority();
if (thatAuthority == null && // path's authority is null
thisAuthority != null) { // fs has an authority
URI defaultUri = getDefaultUri(getConf());
if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
uri = defaultUri; // schemes match, so use this uri instead
} else {
uri = null; // can't determine auth of the path
}
}
if (uri != null) {
// canonicalize uri before comparing with this fs
uri = canonicalizeUri(uri);
thatAuthority = uri.getAuthority();
if (thisAuthority == thatAuthority || // authorities match
(thisAuthority != null &&
thisAuthority.equalsIgnoreCase(thatAuthority)))
return;
}
}
throw new IllegalArgumentException("Wrong FS: " + path +
", expected: " + this.getUri());
}
/**
* Return an array containing hostnames, offset and size of
* portions of the given file. For nonexistent
* file or regions, {@code null} is returned.
*
* <pre>
* if f == null :
* result = null
* elif f.getLen() {@literal <=} start:
* result = []
* else result = [ locations(FS, b) for b in blocks(FS, p, s, s+l)]
* </pre>
* This call is most helpful with and distributed filesystem
* where the hostnames of machines that contain blocks of the given file
* can be determined.
*
* The default implementation returns an array containing one element:
* <pre>
* BlockLocation( { "localhost:9866" }, { "localhost" }, 0, file.getLen())
* </pre>
*
* In HDFS, if file is three-replicated, the returned array contains
* elements like:
* <pre>
* BlockLocation(offset: 0, length: BLOCK_SIZE,
* hosts: {"host1:9866", "host2:9866, host3:9866"})
* BlockLocation(offset: BLOCK_SIZE, length: BLOCK_SIZE,
* hosts: {"host2:9866", "host3:9866, host4:9866"})
* </pre>
*
* And if a file is erasure-coded, the returned BlockLocation are logical
* block groups.
*
* Suppose we have a RS_3_2 coded file (3 data units and 2 parity units).
* 1. If the file size is less than one stripe size, say 2 * CELL_SIZE, then
* there will be one BlockLocation returned, with 0 offset, actual file size
* and 4 hosts (2 data blocks and 2 parity blocks) hosting the actual blocks.
* 3. If the file size is less than one group size but greater than one
* stripe size, then there will be one BlockLocation returned, with 0 offset,
* actual file size with 5 hosts (3 data blocks and 2 parity blocks) hosting
* the actual blocks.
* 4. If the file size is greater than one group size, 3 * BLOCK_SIZE + 123
* for example, then the result will be like:
* <pre>
* BlockLocation(offset: 0, length: 3 * BLOCK_SIZE, hosts: {"host1:9866",
* "host2:9866","host3:9866","host4:9866","host5:9866"})
* BlockLocation(offset: 3 * BLOCK_SIZE, length: 123, hosts: {"host1:9866",
* "host4:9866", "host5:9866"})
* </pre>
*
* @param file FilesStatus to get data from
* @param start offset into the given file
* @param len length for which to get locations for
* @throws IOException IO failure
* @return block location array.
*/
public BlockLocation[] getFileBlockLocations(FileStatus file,
long start, long len) throws IOException {
if (file == null) {
return null;
}
if (start < 0 || len < 0) {
throw new IllegalArgumentException("Invalid start or len parameter");
}
if (file.getLen() <= start) {
return new BlockLocation[0];
}
String[] name = {"localhost:9866"};
String[] host = {"localhost"};
return new BlockLocation[] {
new BlockLocation(name, host, 0, file.getLen()) };
}
/**
* Return an array containing hostnames, offset and size of
* portions of the given file. For a nonexistent
* file or regions, {@code null} is returned.
*
* This call is most helpful with location-aware distributed
* filesystems, where it returns hostnames of machines that
* contain the given file.
*
* A FileSystem will normally return the equivalent result
* of passing the {@code FileStatus} of the path to
* {@link #getFileBlockLocations(FileStatus, long, long)}
*
* @param p path is used to identify an FS since an FS could have
* another FS that it could be delegating the call to
* @param start offset into the given file
* @param len length for which to get locations for
* @throws FileNotFoundException when the path does not exist
* @throws IOException IO failure
* @return block location array.
*/
public BlockLocation[] getFileBlockLocations(Path p,
long start, long len) throws IOException {
if (p == null) {
throw new NullPointerException();
}
FileStatus file = getFileStatus(p);
return getFileBlockLocations(file, start, len);
}
/**
* Return a set of server default configuration values.
* @return server default configuration values
* @throws IOException IO failure
* @deprecated use {@link #getServerDefaults(Path)} instead
*/
@Deprecated
public FsServerDefaults getServerDefaults() throws IOException {
Configuration config = getConf();
// CRC32 is chosen as default as it is available in all
// releases that support checksum.
// The client trash configuration is ignored.
return new FsServerDefaults(getDefaultBlockSize(),
config.getInt("io.bytes.per.checksum", 512),
64 * 1024,
getDefaultReplication(),
config.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
false,
FS_TRASH_INTERVAL_DEFAULT,
DataChecksum.Type.CRC32,
"");
}
/**
* Return a set of server default configuration values.
* @param p path is used to identify an FS since an FS could have
* another FS that it could be delegating the call to
* @return server default configuration values
* @throws IOException IO failure
*/
public FsServerDefaults getServerDefaults(Path p) throws IOException {
return getServerDefaults();
}
/**
* Return the fully-qualified path of path, resolving the path
* through any symlinks or mount point.
* @param p path to be resolved
* @return fully qualified path
* @throws FileNotFoundException if the path is not present
* @throws IOException for any other error
*/
public Path resolvePath(final Path p) throws IOException {
checkPath(p);
return getFileStatus(p).getPath();
}
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
* @throws IOException IO failure
* @return input stream.
*/
public abstract FSDataInputStream open(Path f, int bufferSize)
throws IOException;
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file to open
* @throws IOException IO failure
* @return input stream.
*/
public FSDataInputStream open(Path f) throws IOException {
return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT));
}
/**
* Open an FSDataInputStream matching the PathHandle instance. The
* implementation may encode metadata in PathHandle to address the
* resource directly and verify that the resource referenced
* satisfies constraints specified at its construciton.
* @param fd PathHandle object returned by the FS authority.
* @throws InvalidPathHandleException If {@link PathHandle} constraints are
* not satisfied
* @throws IOException IO failure
* @throws UnsupportedOperationException If {@link #open(PathHandle, int)}
* not overridden by subclass
* @return input stream.
*/
public FSDataInputStream open(PathHandle fd) throws IOException {
return open(fd, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT));
}
/**
* Open an FSDataInputStream matching the PathHandle instance. The
* implementation may encode metadata in PathHandle to address the
* resource directly and verify that the resource referenced
* satisfies constraints specified at its construciton.
* @param fd PathHandle object returned by the FS authority.
* @param bufferSize the size of the buffer to use
* @throws InvalidPathHandleException If {@link PathHandle} constraints are
* not satisfied
* @throws IOException IO failure
* @throws UnsupportedOperationException If not overridden by subclass
* @return input stream.
*/
public FSDataInputStream open(PathHandle fd, int bufferSize)
throws IOException {
throw new UnsupportedOperationException();
}
/**
* Create a durable, serializable handle to the referent of the given
* entity.
* @param stat Referent in the target FileSystem
* @param opt If absent, assume {@link HandleOpt#path()}.
* @throws IllegalArgumentException If the FileStatus does not belong to
* this FileSystem
* @throws UnsupportedOperationException If {@link #createPathHandle}
* not overridden by subclass.
* @throws UnsupportedOperationException If this FileSystem cannot enforce
* the specified constraints.
* @return path handle.
*/
public final PathHandle getPathHandle(FileStatus stat, HandleOpt... opt) {
// method is final with a default so clients calling getPathHandle(stat)
// get the same semantics for all FileSystem implementations
if (null == opt || 0 == opt.length) {
return createPathHandle(stat, HandleOpt.path());
}
return createPathHandle(stat, opt);
}
/**
* Hook to implement support for {@link PathHandle} operations.
* @param stat Referent in the target FileSystem
* @param opt Constraints that determine the validity of the
* {@link PathHandle} reference.
* @return path handle.
*/
protected PathHandle createPathHandle(FileStatus stat, HandleOpt... opt) {
throw new UnsupportedOperationException();
}
/**
* Create an FSDataOutputStream at the indicated Path.
* Files are overwritten by default.
* @param f the file to create
* @throws IOException IO failure
* @return output stream.
*/
public FSDataOutputStream create(Path f) throws IOException {
return create(f, true);
}
/**
* Create an FSDataOutputStream at the indicated Path.
* @param f the file to create
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an exception will be thrown.
* @throws IOException IO failure
* @return output stream.
*/
public FSDataOutputStream create(Path f, boolean overwrite)
throws IOException {
return create(f, overwrite,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
getDefaultReplication(f),
getDefaultBlockSize(f));
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* Files are overwritten by default.
* @param f the file to create
* @param progress to report progress
* @throws IOException IO failure
* @return output stream.
*/
public FSDataOutputStream create(Path f, Progressable progress)
throws IOException {
return create(f, true,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
getDefaultReplication(f),
getDefaultBlockSize(f), progress);
}
/**
* Create an FSDataOutputStream at the indicated Path.
* Files are overwritten by default.
* @param f the file to create
* @param replication the replication factor
* @throws IOException IO failure
* @return output stream1
*/
public FSDataOutputStream create(Path f, short replication)
throws IOException {
return create(f, true,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
replication,
getDefaultBlockSize(f));
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* Files are overwritten by default.
* @param f the file to create
* @param replication the replication factor
* @param progress to report progress
* @throws IOException IO failure
* @return output stream.
*/
public FSDataOutputStream create(Path f, short replication,
Progressable progress) throws IOException {
return create(f, true,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
replication, getDefaultBlockSize(f), progress);
}
/**
* Create an FSDataOutputStream at the indicated Path.
* @param f the file to create
* @param overwrite if a path with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @throws IOException IO failure
* @return output stream.
*/
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize
) throws IOException {
return create(f, overwrite, bufferSize,
getDefaultReplication(f),
getDefaultBlockSize(f));
}
/**
* Create an {@link FSDataOutputStream} at the indicated Path
* with write-progress reporting.
*
* The frequency of callbacks is implementation-specific; it may be "none".
* @param f the path of the file to open
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param progress to report progress.
* @throws IOException IO failure
* @return output stream.
*/
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
Progressable progress
) throws IOException {
return create(f, overwrite, bufferSize,
getDefaultReplication(f),
getDefaultBlockSize(f), progress);
}
/**
* Create an FSDataOutputStream at the indicated Path.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize the size of the buffer to be used.
* @throws IOException IO failure
* @return output stream.
*/
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize) throws IOException {
return create(f, overwrite, bufferSize, replication, blockSize, null);
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize the size of the buffer to be used.
* @param progress to report progress.
* @throws IOException IO failure
* @return output stream.
*/
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress
) throws IOException {
return this.create(f, FsCreateModes.applyUMask(
FsPermission.getFileDefault(), FsPermission.getUMask(getConf())),
overwrite, bufferSize, replication, blockSize, progress);
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param permission file permission
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize block size
* @param progress the progress reporter
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
* @return output stream.
*/
public abstract FSDataOutputStream create(Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException;
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param permission file permission
* @param flags {@link CreateFlag}s to use for this stream.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize block size
* @param progress the progress reporter
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
* @return output stream.
*/
public FSDataOutputStream create(Path f,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
return create(f, permission, flags, bufferSize, replication,
blockSize, progress, null);
}
/**
* Create an FSDataOutputStream at the indicated Path with a custom
* checksum option.
* @param f the file name to open
* @param permission file permission
* @param flags {@link CreateFlag}s to use for this stream.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize block size
* @param progress the progress reporter
* @param checksumOpt checksum parameter. If null, the values
* found in conf will be used.
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
* @return output stream.
*/
public FSDataOutputStream create(Path f,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress,
ChecksumOpt checksumOpt) throws IOException {
// Checksum options are ignored by default. The file systems that
// implement checksum need to override this method. The full
// support is currently only available in DFS.
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
bufferSize, replication, blockSize, progress);
}
/**
* This create has been added to support the FileContext that processes
* the permission with umask before calling this method.
* This a temporary method added to support the transition from FileSystem
* to FileContext for user applications.
*
* @param f path.
* @param absolutePermission permission.
* @param flag create flag.
* @param bufferSize buffer size.
* @param replication replication.
* @param blockSize block size.
* @param progress progress.
* @param checksumOpt check sum opt.
* @return output stream.
* @throws IOException IO failure
*/
@Deprecated
protected FSDataOutputStream primitiveCreate(Path f,
FsPermission absolutePermission,
EnumSet<CreateFlag> flag,
int bufferSize,
short replication,
long blockSize,
Progressable progress,
ChecksumOpt checksumOpt) throws IOException {
boolean pathExists = exists(f);
CreateFlag.validate(f, pathExists, flag);
// Default impl assumes that permissions do not matter and
// nor does the bytesPerChecksum hence
// calling the regular create is good enough.
// FSs that implement permissions should override this.
if (pathExists && flag.contains(CreateFlag.APPEND)) {
return append(f, bufferSize, progress);
}
return this.create(f, absolutePermission,
flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
blockSize, progress);
}
/**
* This version of the mkdirs method assumes that the permission is absolute.
* It has been added to support the FileContext that processes the permission
* with umask before calling this method.
* This a temporary method added to support the transition from FileSystem
* to FileContext for user applications.
* @param f path
* @param absolutePermission permissions
* @return true if the directory was actually created.
* @throws IOException IO failure
* @see #mkdirs(Path, FsPermission)
*/
@Deprecated
protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
throws IOException {
return this.mkdirs(f, absolutePermission);
}
/**
* This version of the mkdirs method assumes that the permission is absolute.
* It has been added to support the FileContext that processes the permission
* with umask before calling this method.
* This a temporary method added to support the transition from FileSystem
* to FileContext for user applications.
*
* @param f the path.
* @param absolutePermission permission.
* @param createParent create parent.
* @throws IOException IO failure.
*/
@Deprecated
protected void primitiveMkdir(Path f, FsPermission absolutePermission,
boolean createParent)
throws IOException {
if (!createParent) { // parent must exist.
// since the this.mkdirs makes parent dirs automatically
// we must throw exception if parent does not exist.
final FileStatus stat = getFileStatus(f.getParent());
if (stat == null) {
throw new FileNotFoundException("Missing parent:" + f);
}
if (!stat.isDirectory()) {
throw new ParentNotDirectoryException("parent is not a dir");
}
// parent does exist - go ahead with mkdir of leaf
}
// Default impl is to assume that permissions do not matter and hence
// calling the regular mkdirs is good enough.
// FSs that implement permissions should override this.
if (!this.mkdirs(f, absolutePermission)) {
throw new IOException("mkdir of "+ f + " failed");
}
}
/**
* Opens an FSDataOutputStream at the indicated Path with write-progress
* reporting. Same as create(), except fails if parent directory doesn't
* already exist.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize block size
* @param progress the progress reporter
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
* @return output stream.
*/
public FSDataOutputStream createNonRecursive(Path f,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return this.createNonRecursive(f, FsPermission.getFileDefault(),
overwrite, bufferSize, replication, blockSize, progress);
}
/**
* Opens an FSDataOutputStream at the indicated Path with write-progress
* reporting. Same as create(), except fails if parent directory doesn't
* already exist.
* @param f the file name to open
* @param permission file permission
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize block size
* @param progress the progress reporter
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
* @return output stream.
*/
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return createNonRecursive(f, permission,
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), bufferSize,
replication, blockSize, progress);
}
/**
* Opens an FSDataOutputStream at the indicated Path with write-progress
* reporting. Same as create(), except fails if parent directory doesn't
* already exist.
* @param f the file name to open
* @param permission file permission
* @param flags {@link CreateFlag}s to use for this stream.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize block size
* @param progress the progress reporter
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
* @return output stream.
*/
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
throw new IOException("createNonRecursive unsupported for this filesystem "
+ this.getClass());
}
/**
* Creates the given Path as a brand-new zero-length file. If
* create fails, or if it already existed, return false.
* <i>Important: the default implementation is not atomic</i>
* @param f path to use for create
* @throws IOException IO failure
* @return if create new file success true,not false.
*/
public boolean createNewFile(Path f) throws IOException {
if (exists(f)) {
return false;
} else {
create(f, false, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT)).close();
return true;
}
}
/**
* Append to an existing file (optional operation).
* Same as
* {@code append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
* IO_FILE_BUFFER_SIZE_DEFAULT), null)}
* @param f the existing file to be appended.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
* @return output stream.
*/
public FSDataOutputStream append(Path f) throws IOException {
return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT), null);
}
/**
* Append to an existing file (optional operation).
* Same as append(f, bufferSize, null).
* @param f the existing file to be appended.
* @param bufferSize the size of the buffer to be used.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
* @return output stream.
*/
public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
return append(f, bufferSize, null);
}
/**
* Append to an existing file (optional operation).
* @param f the existing file to be appended.
* @param bufferSize the size of the buffer to be used.
* @param progress for reporting progress if it is not null.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
* @return output stream.
*/
public abstract FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException;
/**
* Concat existing files together.
* @param trg the path to the target destination.
* @param psrcs the paths to the sources to use for the concatenation.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
public void concat(final Path trg, final Path [] psrcs) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
getClass().getSimpleName() + " FileSystem implementation");
}
/**
* Get the replication factor.
*
* @deprecated Use {@link #getFileStatus(Path)} instead
* @param src file name
* @return file replication
* @throws FileNotFoundException if the path does not resolve.
* @throws IOException an IO failure
*/
@Deprecated
public short getReplication(Path src) throws IOException {
return getFileStatus(src).getReplication();
}
/**
* Set the replication for an existing file.
* If a filesystem does not support replication, it will always
* return true: the check for a file existing may be bypassed.
* This is the default behavior.
* @param src file name
* @param replication new replication
* @throws IOException an IO failure.
* @return true if successful, or the feature in unsupported;
* false if replication is supported but the file does not exist,
* or is a directory
*/
public boolean setReplication(Path src, short replication)
throws IOException {
return true;
}
/**
* Renames Path src to Path dst.
* @param src path to be renamed
* @param dst new path after rename
* @throws IOException on failure
* @return true if rename is successful
*/
public abstract boolean rename(Path src, Path dst) throws IOException;
/**
* Renames Path src to Path dst
* <ul>
* <li>Fails if src is a file and dst is a directory.</li>
* <li>Fails if src is a directory and dst is a file.</li>
* <li>Fails if the parent of dst does not exist or is a file.</li>
* </ul>
* <p>
* If OVERWRITE option is not passed as an argument, rename fails
* if the dst already exists.
* </p>
* <p>
* If OVERWRITE option is passed as an argument, rename overwrites
* the dst if it is a file or an empty directory. Rename fails if dst is
* a non-empty directory.
* </p>
* Note that atomicity of rename is dependent on the file system
* implementation. Please refer to the file system documentation for
* details. This default implementation is non atomic.
* <p>
* This method is deprecated since it is a temporary method added to
* support the transition from FileSystem to FileContext for user
* applications.
* </p>
*
* @param src path to be renamed
* @param dst new path after rename
* @param options rename options.
* @throws FileNotFoundException src path does not exist, or the parent
* path of dst does not exist.
* @throws FileAlreadyExistsException dest path exists and is a file
* @throws ParentNotDirectoryException if the parent path of dest is not
* a directory
* @throws IOException on failure
*/
@Deprecated
protected void rename(final Path src, final Path dst,
final Rename... options) throws IOException {
// Default implementation
final FileStatus srcStatus = getFileLinkStatus(src);
if (srcStatus == null) {
throw new FileNotFoundException("rename source " + src + " not found.");
}
boolean overwrite = false;
if (null != options) {
for (Rename option : options) {
if (option == Rename.OVERWRITE) {
overwrite = true;
}
}
}
FileStatus dstStatus;
try {
dstStatus = getFileLinkStatus(dst);
} catch (IOException e) {
dstStatus = null;
}
if (dstStatus != null) {
if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
throw new IOException("Source " + src + " Destination " + dst
+ " both should be either file or directory");
}
if (!overwrite) {
throw new FileAlreadyExistsException("rename destination " + dst
+ " already exists.");
}
// Delete the destination that is a file or an empty directory
if (dstStatus.isDirectory()) {
FileStatus[] list = listStatus(dst);
if (list != null && list.length != 0) {
throw new IOException(
"rename cannot overwrite non empty destination directory " + dst);
}
}
delete(dst, false);
} else {
final Path parent = dst.getParent();
final FileStatus parentStatus = getFileStatus(parent);
if (parentStatus == null) {
throw new FileNotFoundException("rename destination parent " + parent
+ " not found.");
}
if (!parentStatus.isDirectory()) {
throw new ParentNotDirectoryException("rename destination parent " + parent
+ " is a file.");
}
}
if (!rename(src, dst)) {
throw new IOException("rename from " + src + " to " + dst + " failed.");
}
}
/**
* Truncate the file in the indicated path to the indicated size.
* <ul>
* <li>Fails if path is a directory.</li>
* <li>Fails if path does not exist.</li>
* <li>Fails if path is not closed.</li>
* <li>Fails if new size is greater than current size.</li>
* </ul>
* @param f The path to the file to be truncated
* @param newLength The size the file is to be truncated to
*
* @return <code>true</code> if the file has been truncated to the desired
* <code>newLength</code> and is immediately available to be reused for
* write operations such as <code>append</code>, or
* <code>false</code> if a background process of adjusting the length of
* the last block has been started, and clients should wait for it to
* complete before proceeding with further file updates.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
*/
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Not implemented by the " +
getClass().getSimpleName() + " FileSystem implementation");
}
/**
* Delete a file/directory.
* @param f the path.
* @throws IOException IO failure.
* @return if delete success true, not false.
* @deprecated Use {@link #delete(Path, boolean)} instead.
*/
@Deprecated
public boolean delete(Path f) throws IOException {
return delete(f, true);
}
/** Delete a file.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false.
* @throws IOException IO failure
*/
public abstract boolean delete(Path f, boolean recursive) throws IOException;
/**
* Mark a path to be deleted when its FileSystem is closed.
* When the JVM shuts down cleanly, all cached FileSystem objects will be
* closed automatically. These the marked paths will be deleted as a result.
*
* If a FileSystem instance is not cached, i.e. has been created with
* {@link #createFileSystem(URI, Configuration)}, then the paths will
* be deleted in when {@link #close()} is called on that instance.
*
* The path must exist in the filesystem at the time of the method call;
* it does not have to exist at the time of JVM shutdown.
*
* Notes
* <ol>
* <li>Clean shutdown of the JVM cannot be guaranteed.</li>
* <li>The time to shut down a FileSystem will depends on the number of
* files to delete. For filesystems where the cost of checking
* for the existence of a file/directory and the actual delete operation
* (for example: object stores) is high, the time to shutdown the JVM can be
* significantly extended by over-use of this feature.</li>
* <li>Connectivity problems with a remote filesystem may delay shutdown
* further, and may cause the files to not be deleted.</li>
* </ol>
* @param f the path to delete.
* @return true if deleteOnExit is successful, otherwise false.
* @throws IOException IO failure
*/
public boolean deleteOnExit(Path f) throws IOException {
if (!exists(f)) {
return false;
}
synchronized (deleteOnExit) {
deleteOnExit.add(f);
}
return true;
}
/**
* Cancel the scheduled deletion of the path when the FileSystem is closed.
* @param f the path to cancel deletion
* @return true if the path was found in the delete-on-exit list.
*/
public boolean cancelDeleteOnExit(Path f) {
synchronized (deleteOnExit) {
return deleteOnExit.remove(f);
}
}
/**
* Delete all paths that were marked as delete-on-exit. This recursively
* deletes all files and directories in the specified paths.
*
* The time to process this operation is {@code O(paths)}, with the actual
* time dependent on the time for existence and deletion operations to
* complete, successfully or not.
*/
protected void processDeleteOnExit() {
synchronized (deleteOnExit) {
for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
Path path = iter.next();
try {
if (exists(path)) {
delete(path, true);
}
}
catch (IOException e) {
LOGGER.info("Ignoring failure to deleteOnExit for path {}", path);
}
iter.remove();
}
}
}
/** Check if a path exists.
*
* It is highly discouraged to call this method back to back with other
* {@link #getFileStatus(Path)} calls, as this will involve multiple redundant
* RPC calls in HDFS.
*
* @param f source path
* @return true if the path exists
* @throws IOException IO failure
*/
public boolean exists(Path f) throws IOException {
try {
return getFileStatus(f) != null;
} catch (FileNotFoundException e) {
return false;
}
}
/** True iff the named path is a directory.
* Note: Avoid using this method. Instead reuse the FileStatus
* returned by getFileStatus() or listStatus() methods.
*
* @param f path to check
* @throws IOException IO failure
* @deprecated Use {@link #getFileStatus(Path)} instead
* @return if f is directory true, not false.
*/
@Deprecated
public boolean isDirectory(Path f) throws IOException {
try {
return getFileStatus(f).isDirectory();
} catch (FileNotFoundException e) {
return false; // f does not exist
}
}
/** True iff the named path is a regular file.
* Note: Avoid using this method. Instead reuse the FileStatus
* returned by {@link #getFileStatus(Path)} or listStatus() methods.
*
* @param f path to check
* @throws IOException IO failure
* @deprecated Use {@link #getFileStatus(Path)} instead
* @return if f is file true, not false.
*/
@Deprecated
public boolean isFile(Path f) throws IOException {
try {
return getFileStatus(f).isFile();
} catch (FileNotFoundException e) {
return false; // f does not exist
}
}
/**
* The number of bytes in a file.
* @param f the path.
* @return the number of bytes; 0 for a directory
* @deprecated Use {@link #getFileStatus(Path)} instead.
* @throws FileNotFoundException if the path does not resolve
* @throws IOException IO failure
*/
@Deprecated
public long getLength(Path f) throws IOException {
return getFileStatus(f).getLen();
}
/** Return the {@link ContentSummary} of a given {@link Path}.
* @param f path to use
* @throws FileNotFoundException if the path does not resolve
* @throws IOException IO failure
* @return content summary.
*/
public ContentSummary getContentSummary(Path f) throws IOException {
FileStatus status = getFileStatus(f);
if (status.isFile()) {
// f is a file
long length = status.getLen();
return new ContentSummary.Builder().length(length).
fileCount(1).directoryCount(0).spaceConsumed(length).build();
}
// f is a directory
long[] summary = {0, 0, 1};
for(FileStatus s : listStatus(f)) {
long length = s.getLen();
ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
new ContentSummary.Builder().length(length).
fileCount(1).directoryCount(0).spaceConsumed(length).build();
summary[0] += c.getLength();
summary[1] += c.getFileCount();
summary[2] += c.getDirectoryCount();
}
return new ContentSummary.Builder().length(summary[0]).
fileCount(summary[1]).directoryCount(summary[2]).
spaceConsumed(summary[0]).build();
}
/** Return the {@link QuotaUsage} of a given {@link Path}.
* @param f path to use
* @return the quota usage
* @throws IOException IO failure
*/
public QuotaUsage getQuotaUsage(Path f) throws IOException {
return getContentSummary(f);
}
/**
* Set quota for the given {@link Path}.
*
* @param src the target path to set quota for
* @param namespaceQuota the namespace quota (i.e., # of files/directories)
* to set
* @param storagespaceQuota the storage space quota to set
* @throws IOException IO failure
*/
public void setQuota(Path src, final long namespaceQuota,
final long storagespaceQuota) throws IOException {
methodNotSupported();
}
/**
* Set per storage type quota for the given {@link Path}.
*
* @param src the target path to set storage type quota for
* @param type the storage type to set
* @param quota the quota to set for the given storage type
* @throws IOException IO failure
*/
public void setQuotaByStorageType(Path src, final StorageType type,
final long quota) throws IOException {
methodNotSupported();
}
/**
* The default filter accepts all paths.
*/
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
return true;
}
};
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
* <p>
* Does not guarantee to return the List of files/directories status in a
* sorted order.
* <p>
* Will not return null. Expect IOException upon access error.
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException;
/**
* Represents a batch of directory entries when iteratively listing a
* directory. This is a private API not meant for use by end users.
* <p>
* For internal use by FileSystem subclasses that override
* {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative
* listing.
*/
@InterfaceAudience.Private
public static class DirectoryEntries {
private final FileStatus[] entries;
private final byte[] token;
private final boolean hasMore;
public DirectoryEntries(FileStatus[] entries, byte[] token, boolean
hasMore) {
this.entries = entries;
if (token != null) {
this.token = token.clone();
} else {
this.token = null;
}
this.hasMore = hasMore;
}
public FileStatus[] getEntries() {
return entries;
}
public byte[] getToken() {
return token;
}
public boolean hasMore() {
return hasMore;
}
}
/**
* Given an opaque iteration token, return the next batch of entries in a
* directory. This is a private API not meant for use by end users.
* <p>
* This method should be overridden by FileSystem subclasses that want to
* use the generic {@link FileSystem#listStatusIterator(Path)} implementation.
* @param f Path to list
* @param token opaque iteration token returned by previous call, or null
* if this is the first call.
* @return directory entries.
* @throws FileNotFoundException when the path does not exist.
* @throws IOException If an I/O error occurred.
*/
@InterfaceAudience.Private
protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
FileNotFoundException, IOException {
// The default implementation returns the entire listing as a single batch.
// Thus, there is never a second batch, and no need to respect the passed
// token or set a token in the returned DirectoryEntries.
FileStatus[] listing = listStatus(f);
return new DirectoryEntries(listing, null, false);
}
/**
* Filter files/directories in the given path using the user-supplied path
* filter. Results are added to the given array <code>results</code>.
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
private void listStatus(ArrayList<FileStatus> results, Path f,
PathFilter filter) throws FileNotFoundException, IOException {
FileStatus listing[] = listStatus(f);
Preconditions.checkNotNull(listing, "listStatus should not return NULL");
for (int i = 0; i < listing.length; i++) {
if (filter.accept(listing[i].getPath())) {
results.add(listing[i]);
}
}
}
/**
* List corrupted file blocks.
*
* @param path the path.
* @return an iterator over the corrupt files under the given path
* (may contain duplicates if a file has more than one corrupt block)
* @throws UnsupportedOperationException if the operation is unsupported
* (default).
* @throws IOException IO failure
*/
public RemoteIterator<Path> listCorruptFileBlocks(Path path)
throws IOException {
throw new UnsupportedOperationException(getClass().getCanonicalName() +
" does not support listCorruptFileBlocks");
}
/**
* Filter files/directories in the given path using the user-supplied path
* filter.
* <p>
* Does not guarantee to return the List of files/directories status in a
* sorted order.
*
* @param f
* a path name
* @param filter
* the user-supplied path filter
* @return an array of FileStatus objects for the files under the given path
* after applying the filter
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public FileStatus[] listStatus(Path f, PathFilter filter)
throws FileNotFoundException, IOException {
ArrayList<FileStatus> results = new ArrayList<>();
listStatus(results, f, filter);
return results.toArray(new FileStatus[results.size()]);
}
/**
* Filter files/directories in the given list of paths using default
* path filter.
* <p>
* Does not guarantee to return the List of files/directories status in a
* sorted order.
*
* @param files
* a list of paths
* @return a list of statuses for the files under the given paths after
* applying the filter default Path filter
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public FileStatus[] listStatus(Path[] files)
throws FileNotFoundException, IOException {
return listStatus(files, DEFAULT_FILTER);
}
/**
* Filter files/directories in the given list of paths using user-supplied
* path filter.
* <p>
* Does not guarantee to return the List of files/directories status in a
* sorted order.
*
* @param files
* a list of paths
* @param filter
* the user-supplied path filter
* @return a list of statuses for the files under the given paths after
* applying the filter
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public FileStatus[] listStatus(Path[] files, PathFilter filter)
throws FileNotFoundException, IOException {
ArrayList<FileStatus> results = new ArrayList<FileStatus>();
for (int i = 0; i < files.length; i++) {
listStatus(results, files[i], filter);
}
return results.toArray(new FileStatus[results.size()]);
}
/**
* <p>Return all the files that match filePattern and are not checksum
* files. Results are sorted by their names.
*
* <p>
* A filename pattern is composed of <i>regular</i> characters and
* <i>special pattern matching</i> characters, which are:
*
* <dl>
* <dd>
* <dl>
* <dt> <tt> ? </tt>
* <dd> Matches any single character.
*
* <dt> <tt> * </tt>
* <dd> Matches zero or more characters.
*
* <dt> <tt> [<i>abc</i>] </tt>
* <dd> Matches a single character from character set
* <tt>{<i>a,b,c</i>}</tt>.
*
* <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
* <dd> Matches a single character from the character range
* <tt>{<i>a...b</i>}</tt>. Note that character <tt><i>a</i></tt> must be
* lexicographically less than or equal to character <tt><i>b</i></tt>.
*
* <dt> <tt> [^<i>a</i>] </tt>
* <dd> Matches a single character that is not from character set or range
* <tt>{<i>a</i>}</tt>. Note that the <tt>^</tt> character must occur
* immediately to the right of the opening bracket.
*
* <dt> <tt> \<i>c</i> </tt>
* <dd> Removes (escapes) any special meaning of character <i>c</i>.
*
* <dt> <tt> {ab,cd} </tt>
* <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
*
* <dt> <tt> {ab,c{de,fh}} </tt>
* <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
*
* </dl>
* </dd>
* </dl>
*
* @param pathPattern a glob specifying a path pattern
* @return an array of paths that match the path pattern
* @throws IOException IO failure
*/
public FileStatus[] globStatus(Path pathPattern) throws IOException {
return Globber.createGlobber(this)
.withPathPattern(pathPattern)
.withPathFiltern(DEFAULT_FILTER)
.withResolveSymlinks(true)
.build()
.glob();
}
/**
* Return an array of {@link FileStatus} objects whose path names match
* {@code pathPattern} and is accepted by the user-supplied path filter.
* Results are sorted by their path names.
*
* @param pathPattern a glob specifying the path pattern
* @param filter a user-supplied path filter
* @return null if {@code pathPattern} has no glob and the path does not exist
* an empty array if {@code pathPattern} has a glob and no path
* matches it else an array of {@link FileStatus} objects matching the
* pattern
* @throws IOException if any I/O error occurs when fetching file status
*/
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
throws IOException {
return new Globber(this, pathPattern, filter).glob();
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
* Return the file's status and block locations If the path is a file.
*
* If a returned status is a file, it contains the file's block locations.
*
* @param f is the path
*
* @return an iterator that traverses statuses of the files/directories
* in the given path
*
* @throws FileNotFoundException If <code>f</code> does not exist
* @throws IOException If an I/O error occurred
*/
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws FileNotFoundException, IOException {
return listLocatedStatus(f, DEFAULT_FILTER);
}
/**
* List a directory.
* The returned results include its block location if it is a file
* The results are filtered by the given path filter
* @param f a path
* @param filter a path filter
* @return an iterator that traverses statuses of the files/directories
* in the given path
* @throws FileNotFoundException if <code>f</code> does not exist
* @throws IOException if any I/O error occurred
*/
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter)
throws FileNotFoundException, IOException {
return new RemoteIterator<LocatedFileStatus>() {
private final FileStatus[] stats = listStatus(f, filter);
private int i = 0;
@Override
public boolean hasNext() {
return i<stats.length;
}
@Override
public LocatedFileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("No more entries in " + f);
}
FileStatus result = stats[i++];
// for files, use getBlockLocations(FileStatus, int, int) to avoid
// calling getFileStatus(Path) to load the FileStatus again
BlockLocation[] locs = result.isFile() ?
getFileBlockLocations(result, 0, result.getLen()) :
null;
return new LocatedFileStatus(result, locs);
}
};
}
/**
* Generic iterator for implementing {@link #listStatusIterator(Path)}.
*/
protected class DirListingIterator<T extends FileStatus> implements
RemoteIterator<T> {
private final Path path;
private DirectoryEntries entries;
private int i = 0;
DirListingIterator(Path path) throws IOException {
this.path = path;
this.entries = listStatusBatch(path, null);
}
@Override
public boolean hasNext() throws IOException {
return i < entries.getEntries().length ||
entries.hasMore();
}
private void fetchMore() throws IOException {
byte[] token = entries.getToken();
entries = listStatusBatch(path, token);
i = 0;
}
@Override
@SuppressWarnings("unchecked")
public T next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("No more items in iterator");
}
if (i == entries.getEntries().length) {
fetchMore();
}
return (T)entries.getEntries()[i++];
}
}
/**
* Returns a remote iterator so that followup calls are made on demand
* while consuming the entries. Each FileSystem implementation should
* override this method and provide a more efficient implementation, if
* possible.
*
* Does not guarantee to return the iterator that traverses statuses
* of the files in a sorted order.
*
* @param p target path
* @return remote iterator
* @throws FileNotFoundException if <code>p</code> does not exist
* @throws IOException if any I/O error occurred
*/
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
throws FileNotFoundException, IOException {
return new DirListingIterator<>(p);
}
/**
* List the statuses and block locations of the files in the given path.
* Does not guarantee to return the iterator that traverses statuses
* of the files in a sorted order.
*
* <pre>
* If the path is a directory,
* if recursive is false, returns files in the directory;
* if recursive is true, return files in the subtree rooted at the path.
* If the path is a file, return the file's status and block locations.
* </pre>
* @param f is the path
* @param recursive if the subdirectories need to be traversed recursively
*
* @return an iterator that traverses statuses of the files
*
* @throws FileNotFoundException when the path does not exist;
* @throws IOException see specific implementation
*/
public RemoteIterator<LocatedFileStatus> listFiles(
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
return new RemoteIterator<LocatedFileStatus>() {
private Stack<RemoteIterator<LocatedFileStatus>> itors = new Stack<>();
private RemoteIterator<LocatedFileStatus> curItor =
listLocatedStatus(f);
private LocatedFileStatus curFile;
@Override
public boolean hasNext() throws IOException {
while (curFile == null) {
if (curItor.hasNext()) {
handleFileStat(curItor.next());
} else if (!itors.empty()) {
curItor = itors.pop();
} else {
return false;
}
}
return true;
}
/**
* Process the input stat.
* If it is a file, return the file stat.
* If it is a directory, traverse the directory if recursive is true;
* ignore it if recursive is false.
* @param stat input status
* @throws IOException if any IO error occurs
*/
private void handleFileStat(LocatedFileStatus stat) throws IOException {
if (stat.isFile()) { // file
curFile = stat;
} else if (recursive) { // directory
itors.push(curItor);
curItor = listLocatedStatus(stat.getPath());
}
}
@Override
public LocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus result = curFile;
curFile = null;
return result;
}
throw new java.util.NoSuchElementException("No more entry in " + f);
}
};
}
/** Return the current user's home directory in this FileSystem.
* The default implementation returns {@code "/user/$USER/"}.
* @return the path.
*/
public Path getHomeDirectory() {
String username;
try {
username = UserGroupInformation.getCurrentUser().getShortUserName();
} catch(IOException ex) {
LOGGER.warn("Unable to get user name. Fall back to system property " +
"user.name", ex);
username = System.getProperty("user.name");
}
return this.makeQualified(
new Path(USER_HOME_PREFIX + "/" + username));
}
/**
* Set the current working directory for the given FileSystem. All relative
* paths will be resolved relative to it.
*
* @param new_dir Path of new working directory
*/
public abstract void setWorkingDirectory(Path new_dir);
/**
* Get the current working directory for the given FileSystem
* @return the directory pathname
*/
public abstract Path getWorkingDirectory();
/**
* Note: with the new FileContext class, getWorkingDirectory()
* will be removed.
* The working directory is implemented in FileContext.
*
* Some FileSystems like LocalFileSystem have an initial workingDir
* that we use as the starting workingDir. For other file systems
* like HDFS there is no built in notion of an initial workingDir.
*
* @return if there is built in notion of workingDir then it
* is returned; else a null is returned.
*/
protected Path getInitialWorkingDirectory() {
return null;
}
/**
* Call {@link #mkdirs(Path, FsPermission)} with default permission.
* @param f path
* @return true if the directory was created
* @throws IOException IO failure
*/
public boolean mkdirs(Path f) throws IOException {
return mkdirs(f, FsPermission.getDirDefault());
}
/**
* Make the given file and all non-existent parents into
* directories. Has roughly the semantics of Unix @{code mkdir -p}.
* Existence of the directory hierarchy is not an error.
* @param f path to create
* @param permission to apply to f
* @throws IOException IO failure
* @return if mkdir success true, not false.
*/
public abstract boolean mkdirs(Path f, FsPermission permission
) throws IOException;
/**
* The src file is on the local disk. Add it to filesystem at
* the given dst name and the source is kept intact afterwards
* @param src path
* @param dst path
* @throws IOException IO failure
*/
public void copyFromLocalFile(Path src, Path dst)
throws IOException {
copyFromLocalFile(false, src, dst);
}
/**
* The src files is on the local disk. Add it to filesystem at
* the given dst name, removing the source afterwards.
* @param srcs source paths
* @param dst path
* @throws IOException IO failure
*/
public void moveFromLocalFile(Path[] srcs, Path dst)
throws IOException {
copyFromLocalFile(true, true, srcs, dst);
}
/**
* The src file is on the local disk. Add it to the filesystem at
* the given dst name, removing the source afterwards.
* @param src local path
* @param dst path
* @throws IOException IO failure
*/
public void moveFromLocalFile(Path src, Path dst)
throws IOException {
copyFromLocalFile(true, src, dst);
}
/**
* The src file is on the local disk. Add it to the filesystem at
* the given dst name.
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param src path
* @param dst path
* @throws IOException IO failure.
*/
public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
copyFromLocalFile(delSrc, true, src, dst);
}
/**
* The src files are on the local disk. Add it to the filesystem at
* the given dst name.
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param srcs array of paths which are source
* @param dst path
* @throws IOException IO failure
*/
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path[] srcs, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
}
/**
* The src file is on the local disk. Add it to the filesystem at
* the given dst name.
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src path
* @param dst path
* @throws IOException IO failure
*/
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
}
/**
* Copy it a file from the remote filesystem to the local one.
* @param src path src file in the remote filesystem
* @param dst path local destination
* @throws IOException IO failure
*/
public void copyToLocalFile(Path src, Path dst) throws IOException {
copyToLocalFile(false, src, dst);
}
/**
* Copy a file to the local filesystem, then delete it from the
* remote filesystem (if successfully copied).
* @param src path src file in the remote filesystem
* @param dst path local destination
* @throws IOException IO failure
*/
public void moveToLocalFile(Path src, Path dst) throws IOException {
copyToLocalFile(true, src, dst);
}
/**
* Copy it a file from a remote filesystem to the local one.
* delSrc indicates if the src will be removed or not.
* @param delSrc whether to delete the src
* @param src path src file in the remote filesystem
* @param dst path local destination
* @throws IOException IO failure
*/
public void copyToLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
copyToLocalFile(delSrc, src, dst, false);
}
/**
* The src file is under this filesystem, and the dst is on the local disk.
* Copy it from the remote filesystem to the local dst name.
* delSrc indicates if the src will be removed
* or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
* as the local file system or not. RawLocalFileSystem is non checksumming,
* So, It will not create any crc files at local.
*
* @param delSrc
* whether to delete the src
* @param src
* path
* @param dst
* path
* @param useRawLocalFileSystem
* whether to use RawLocalFileSystem as local file system or not.
*
* @throws IOException for any IO error
*/
public void copyToLocalFile(boolean delSrc, Path src, Path dst,
boolean useRawLocalFileSystem) throws IOException {
Configuration conf = getConf();
FileSystem local = null;
if (useRawLocalFileSystem) {
local = getLocal(conf).getRawFileSystem();
} else {
local = getLocal(conf);
}
FileUtil.copy(this, src, local, dst, delSrc, conf);
}
/**
* Returns a local file that the user can write output to. The caller
* provides both the eventual target name in this FileSystem
* and the local working file path.
* If this FileSystem is local, we write directly into the target. If
* the FileSystem is not local, we write into the tmp local area.
* @param fsOutputFile path of output file
* @param tmpLocalFile path of local tmp file
* @throws IOException IO failure
* @return the path.
*/
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
return tmpLocalFile;
}
/**
* Called when we're all done writing to the target.
* A local FS will do nothing, because we've written to exactly the
* right place.
* A remote FS will copy the contents of tmpLocalFile to the correct target at
* fsOutputFile.
* @param fsOutputFile path of output file
* @param tmpLocalFile path to local tmp file
* @throws IOException IO failure
*/
public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
moveFromLocalFile(tmpLocalFile, fsOutputFile);
}
/**
* Close this FileSystem instance.
* Will release any held locks, delete all files queued for deletion
* through calls to {@link #deleteOnExit(Path)}, and remove this FS instance
* from the cache, if cached.
*
* After this operation, the outcome of any method call on this FileSystem
* instance, or any input/output stream created by it is <i>undefined</i>.
* @throws IOException IO failure
*/
@Override
public void close() throws IOException {
debugLogFileSystemClose("close", "Key: " + key + "; URI: " + getUri()
+ "; Object Identity Hash: "
+ Integer.toHexString(System.identityHashCode(this)));
// delete all files that were marked as delete-on-exit.
try {
processDeleteOnExit();
} finally {
CACHE.remove(this.key, this);
}
}
/**
* Return the total size of all files in the filesystem.
* @throws IOException IO failure
* @return the number of path used.
*/
public long getUsed() throws IOException {
Path path = new Path("/");
return getUsed(path);
}
/**
* Return the total size of all files from a specified path.
* @param path the path.
* @throws IOException IO failure
* @return the number of path content summary.
*/
public long getUsed(Path path) throws IOException {
return getContentSummary(path).getLength();
}
/**
* Get the block size for a particular file.
* @param f the filename
* @return the number of bytes in a block
* @deprecated Use {@link #getFileStatus(Path)} instead
* @throws FileNotFoundException if the path is not present
* @throws IOException IO failure
*/
@Deprecated
public long getBlockSize(Path f) throws IOException {
return getFileStatus(f).getBlockSize();
}
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time.
* @deprecated use {@link #getDefaultBlockSize(Path)} instead
* @return default block size.
*/
@Deprecated
public long getDefaultBlockSize() {
// default to 32MB: large enough to minimize the impact of seeks
return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
}
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
* @param f path of file
* @return the default block size for the path's filesystem
*/
public long getDefaultBlockSize(Path f) {
return getDefaultBlockSize();
}
/**
* Get the default replication.
* @return the replication; the default value is "1".
* @deprecated use {@link #getDefaultReplication(Path)} instead
*/
@Deprecated
public short getDefaultReplication() { return 1; }
/**
* Get the default replication for a path.
* The given path will be used to locate the actual FileSystem to query.
* The full path does not have to exist.
* @param path of the file
* @return default replication for the path's filesystem
*/
public short getDefaultReplication(Path path) {
return getDefaultReplication();
}
/**
* Return a file status object that represents the path.
* @param f The path we want information from
* @return a FileStatus object
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
public abstract FileStatus getFileStatus(Path f) throws IOException;
/**
* Synchronize client metadata state.
* <p>
* In some FileSystem implementations such as HDFS metadata
* synchronization is essential to guarantee consistency of read requests
* particularly in HA setting.
* @throws IOException If an I/O error occurred.
* @throws UnsupportedOperationException if the operation is unsupported.
*/
public void msync() throws IOException, UnsupportedOperationException {
throw new UnsupportedOperationException(getClass().getCanonicalName() +
" does not support method msync");
}
/**
* Checks if the user can access a path. The mode specifies which access
* checks to perform. If the requested permissions are granted, then the
* method returns normally. If access is denied, then the method throws an
* {@link AccessControlException}.
* <p>
* The default implementation calls {@link #getFileStatus(Path)}
* and checks the returned permissions against the requested permissions.
*
* Note that the {@link #getFileStatus(Path)} call will be subject to
* authorization checks.
* Typically, this requires search (execute) permissions on each directory in
* the path's prefix, but this is implementation-defined. Any file system
* that provides a richer authorization model (such as ACLs) may override the
* default implementation so that it checks against that model instead.
* <p>
* In general, applications should avoid using this method, due to the risk of
* time-of-check/time-of-use race conditions. The permissions on a file may
* change immediately after the access call returns. Most applications should
* prefer running specific file system actions as the desired user represented
* by a {@link UserGroupInformation}.
*
* @param path Path to check
* @param mode type of access to check
* @throws AccessControlException if access is denied
* @throws FileNotFoundException if the path does not exist
* @throws IOException see specific implementation
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
public void access(Path path, FsAction mode) throws AccessControlException,
FileNotFoundException, IOException {
checkAccessPermissions(this.getFileStatus(path), mode);
}
/**
* This method provides the default implementation of
* {@link #access(Path, FsAction)}.
*
* @param stat FileStatus to check
* @param mode type of access to check
* @throws AccessControlException if access is denied
* @throws IOException for any error
*/
@InterfaceAudience.Private
static void checkAccessPermissions(FileStatus stat, FsAction mode)
throws AccessControlException, IOException {
FsPermission perm = stat.getPermission();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String user = ugi.getShortUserName();
if (user.equals(stat.getOwner())) {
if (perm.getUserAction().implies(mode)) {
return;
}
} else if (ugi.getGroupsSet().contains(stat.getGroup())) {
if (perm.getGroupAction().implies(mode)) {
return;
}
} else {
if (perm.getOtherAction().implies(mode)) {
return;
}
}
throw new AccessControlException(String.format(
"Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
}
/**
* See {@link FileContext#fixRelativePart}.
* @param p the path.
* @return relative part.
*/
protected Path fixRelativePart(Path p) {
if (p.isUriPathAbsolute()) {
return p;
} else {
return new Path(getWorkingDirectory(), p);
}
}
/**
* See {@link FileContext#createSymlink(Path, Path, boolean)}.
*
* @param target target path.
* @param link link.
* @param createParent create parent.
* @throws AccessControlException if access is denied.
* @throws FileAlreadyExistsException when the path does not exist.
* @throws FileNotFoundException when the path does not exist.
* @throws ParentNotDirectoryException if the parent path of dest is not
* a directory.
* @throws UnsupportedFileSystemException if there was no known implementation
* for the scheme.
* @throws IOException raised on errors performing I/O.
*/
public void createSymlink(final Path target, final Path link,
final boolean createParent) throws AccessControlException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, UnsupportedFileSystemException,
IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
}
/**
* See {@link FileContext#getFileLinkStatus(Path)}.
*
* @param f the path.
* @throws AccessControlException if access is denied.
* @throws FileNotFoundException when the path does not exist.
* @throws IOException raised on errors performing I/O.
* @throws UnsupportedFileSystemException if there was no known implementation
* for the scheme.
* @return file status
*/
public FileStatus getFileLinkStatus(final Path f)
throws AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IOException {
// Supporting filesystems should override this method
return getFileStatus(f);
}
/**
* See {@link AbstractFileSystem#supportsSymlinks()}.
* @return if support symlinkls true, not false.
*/
public boolean supportsSymlinks() {
return false;
}
/**
* See {@link FileContext#getLinkTarget(Path)}.
* @param f the path.
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
* @throws IOException IO failure.
* @return the path.
*/
public Path getLinkTarget(Path f) throws IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
}
/**
* See {@link AbstractFileSystem#getLinkTarget(Path)}.
* @param f the path.
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
* @throws IOException IO failure.
* @return the path.
*/
protected Path resolveLink(Path f) throws IOException {
// Supporting filesystems should override this method
throw new UnsupportedOperationException(
"Filesystem does not support symlinks!");
}
/**
* Get the checksum of a file, if the FS supports checksums.
*
* @param f The file path
* @return The file checksum. The default return value is null,
* which indicates that no checksum algorithm is implemented
* in the corresponding FileSystem.
* @throws IOException IO failure
*/
public FileChecksum getFileChecksum(Path f) throws IOException {
return getFileChecksum(f, Long.MAX_VALUE);
}
/**
* Get the checksum of a file, from the beginning of the file till the
* specific length.
* @param f The file path
* @param length The length of the file range for checksum calculation
* @return The file checksum or null if checksums are not supported.
* @throws IOException IO failure
*/
public FileChecksum getFileChecksum(Path f, final long length)
throws IOException {
return null;
}
/**
* Set the verify checksum flag. This is only applicable if the
* corresponding filesystem supports checksums.
* By default doesn't do anything.
* @param verifyChecksum Verify checksum flag
*/
public void setVerifyChecksum(boolean verifyChecksum) {
//doesn't do anything
}
/**
* Set the write checksum flag. This is only applicable if the
* corresponding filesystem supports checksums.
* By default doesn't do anything.
* @param writeChecksum Write checksum flag
*/
public void setWriteChecksum(boolean writeChecksum) {
//doesn't do anything
}
/**
* Returns a status object describing the use and capacity of the
* filesystem. If the filesystem has multiple partitions, the
* use and capacity of the root partition is reflected.
*
* @return a FsStatus object
* @throws IOException
* see specific implementation
*/
public FsStatus getStatus() throws IOException {
return getStatus(null);
}
/**
* Returns a status object describing the use and capacity of the
* filesystem. If the filesystem has multiple partitions, the
* use and capacity of the partition pointed to by the specified
* path is reflected.
* @param p Path for which status should be obtained. null means
* the default partition.
* @return a FsStatus object
* @throws IOException
* see specific implementation
*/
public FsStatus getStatus(Path p) throws IOException {
return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
}
/**
* Set permission of a path.
* @param p The path
* @param permission permission
* @throws IOException IO failure
*/
public void setPermission(Path p, FsPermission permission
) throws IOException {
}
/**
* Set owner of a path (i.e. a file or a directory).
* The parameters username and groupname cannot both be null.
* @param p The path
* @param username If it is null, the original username remains unchanged.
* @param groupname If it is null, the original groupname remains unchanged.
* @throws IOException IO failure
*/
public void setOwner(Path p, String username, String groupname
) throws IOException {
}
/**
* Set access time of a file.
* @param p The path
* @param mtime Set the modification time of this file.
* The number of milliseconds since Jan 1, 1970.
* A value of -1 means that this call should not set modification time.
* @param atime Set the access time of this file.
* The number of milliseconds since Jan 1, 1970.
* A value of -1 means that this call should not set access time.
* @throws IOException IO failure
*/
public void setTimes(Path p, long mtime, long atime
) throws IOException {
}
/**
* Create a snapshot with a default name.
* @param path The directory where snapshots will be taken.
* @return the snapshot path.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
*/
public final Path createSnapshot(Path path) throws IOException {
return createSnapshot(path, null);
}
/**
* Create a snapshot.
* @param path The directory where snapshots will be taken.
* @param snapshotName The name of the snapshot
* @return the snapshot path.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
*/
public Path createSnapshot(Path path, String snapshotName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support createSnapshot");
}
/**
* Rename a snapshot.
* @param path The directory path where the snapshot was taken
* @param snapshotOldName Old name of the snapshot
* @param snapshotNewName New name of the snapshot
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void renameSnapshot(Path path, String snapshotOldName,
String snapshotNewName) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support renameSnapshot");
}
/**
* Delete a snapshot of a directory.
* @param path The directory that the to-be-deleted snapshot belongs to
* @param snapshotName The name of the snapshot
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void deleteSnapshot(Path path, String snapshotName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support deleteSnapshot");
}
/**
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List<AclEntry> describing modifications
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support modifyAclEntries");
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
* @param aclSpec List describing entries to remove
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeAclEntries");
}
/**
* Removes all default ACL entries from files and directories.
*
* @param path Path to modify
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void removeDefaultAcl(Path path)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeDefaultAcl");
}
/**
* Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
* @param path Path to modify
* @throws IOException if an ACL could not be removed
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void removeAcl(Path path)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeAcl");
}
/**
* Fully replaces ACL of files and directories, discarding all existing
* entries.
*
* @param path Path to modify
* @param aclSpec List describing modifications, which must include entries
* for user, group, and others for compatibility with permission bits.
* @throws IOException if an ACL could not be modified
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setAcl");
}
/**
* Gets the ACL of a file or directory.
*
* @param path Path to get
* @return AclStatus describing the ACL of the file or directory
* @throws IOException if an ACL could not be read
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public AclStatus getAclStatus(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getAclStatus");
}
/**
* Set an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to modify
* @param name xattr name.
* @param value xattr value.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void setXAttr(Path path, String name, byte[] value)
throws IOException {
setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
XAttrSetFlag.REPLACE));
}
/**
* Set an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to modify
* @param name xattr name.
* @param value xattr value.
* @param flag xattr set flag
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void setXAttr(Path path, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setXAttr");
}
/**
* Get an xattr name and value for a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attribute
* @param name xattr name.
* @return byte[] xattr value.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public byte[] getXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttr");
}
/**
* Get all of the xattr name/value pairs for a file or directory.
* Only those xattrs which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @return Map describing the XAttrs of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttrs");
}
/**
* Get all of the xattrs name/value pairs for a file or directory.
* Only those xattrs which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @param names XAttr names.
* @return Map describing the XAttrs of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public Map<String, byte[]> getXAttrs(Path path, List<String> names)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getXAttrs");
}
/**
* Get all of the xattr names for a file or directory.
* Only those xattr names which the logged-in user has permissions to view
* are returned.
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @return List{@literal <String>} of the XAttr names of the file or directory
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public List<String> listXAttrs(Path path) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support listXAttrs");
}
/**
* Remove an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
* <p>
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to remove extended attribute
* @param name xattr name
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void removeXAttr(Path path, String name) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support removeXAttr");
}
/**
* Set the source path to satisfy storage policy.
* @param path The source path referring to either a directory or a file.
* @throws IOException If an I/O error occurred.
*/
public void satisfyStoragePolicy(final Path path) throws IOException {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " doesn't support setStoragePolicy");
}
/**
* Set the storage policy for a given file or directory.
*
* @param src file or directory path.
* @param policyName the name of the target storage policy. The list
* of supported Storage policies can be retrieved
* via {@link #getAllStoragePolicies}.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void setStoragePolicy(final Path src, final String policyName)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support setStoragePolicy");
}
/**
* Unset the storage policy set for a given file or directory.
* @param src file or directory path.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public void unsetStoragePolicy(final Path src) throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support unsetStoragePolicy");
}
/**
* Query the effective storage policy ID for the given file or directory.
*
* @param src file or directory path.
* @return storage policy for give file.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public BlockStoragePolicySpi getStoragePolicy(final Path src)
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getStoragePolicy");
}
/**
* Retrieve all the storage policies supported by this file system.
*
* @return all storage policies supported by this filesystem.
* @throws IOException IO failure
* @throws UnsupportedOperationException if the operation is unsupported
* (default outcome).
*/
public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
throws IOException {
throw new UnsupportedOperationException(getClass().getSimpleName()
+ " doesn't support getAllStoragePolicies");
}
/**
* Get the root directory of Trash for current user when the path specified
* is deleted.
*
* @param path the trash root of the path to be determined.
* @return the default implementation returns {@code /user/$USER/.Trash}
*/
public Path getTrashRoot(Path path) {
return this.makeQualified(new Path(getHomeDirectory().toUri().getPath(),
TRASH_PREFIX));
}
/**
* Get all the trash roots for current user or all users.
*
* @param allUsers return trash roots for all users if true.
* @return all the trash root directories.
* Default FileSystem returns .Trash under users' home directories if
* {@code /user/$USER/.Trash} exists.
*/
public Collection<FileStatus> getTrashRoots(boolean allUsers) {
Path userHome = new Path(getHomeDirectory().toUri().getPath());
List<FileStatus> ret = new ArrayList<>();
try {
if (!allUsers) {
Path userTrash = new Path(userHome, TRASH_PREFIX);
if (exists(userTrash)) {
ret.add(getFileStatus(userTrash));
}
} else {
Path homeParent = userHome.getParent();
if (exists(homeParent)) {
FileStatus[] candidates = listStatus(homeParent);
for (FileStatus candidate : candidates) {
Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
if (exists(userTrash)) {
candidate.setPath(userTrash);
ret.add(candidate);
}
}
}
}
} catch (IOException e) {
LOGGER.warn("Cannot get all trash roots", e);
}
return ret;
}
/**
* The base FileSystem implementation generally has no knowledge
* of the capabilities of actual implementations.
* Unless it has a way to explicitly determine the capabilities,
* this method returns false.
* {@inheritDoc}
*/
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_SYMLINKS:
// delegate to the existing supportsSymlinks() call.
return supportsSymlinks() && areSymlinksEnabled();
default:
// the feature is not implemented.
return false;
}
}
// making it volatile to be able to do a double checked locking
private volatile static boolean FILE_SYSTEMS_LOADED = false;
/**
* Filesystems listed as services.
*/
private static final Map<String, Class<? extends FileSystem>>
SERVICE_FILE_SYSTEMS = new HashMap<>();
/**
* Load the filesystem declarations from service resources.
* This is a synchronized operation.
*/
private static void loadFileSystems() {
LOGGER.debug("Loading filesystems");
synchronized (FileSystem.class) {
if (!FILE_SYSTEMS_LOADED) {
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
Iterator<FileSystem> it = serviceLoader.iterator();
while (it.hasNext()) {
FileSystem fs;
try {
fs = it.next();
try {
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{}:// = {} from {}",
fs.getScheme(), fs.getClass(),
ClassUtil.findContainingJar(fs.getClass()));
}
} catch (Exception e) {
LOGGER.warn("Cannot load: {} from {}", fs,
ClassUtil.findContainingJar(fs.getClass()));
LOGGER.info("Full exception loading: {}", fs, e);
}
} catch (ServiceConfigurationError ee) {
LOGGER.warn("Cannot load filesystem", ee);
}
}
FILE_SYSTEMS_LOADED = true;
}
}
}
/**
* Get the FileSystem implementation class of a filesystem.
* This triggers a scan and load of all FileSystem implementations listed as
* services and discovered via the {@link ServiceLoader}
* @param scheme URL scheme of FS
* @param conf configuration: can be null, in which case the check for
* a filesystem binding declaration in the configuration is skipped.
* @return the filesystem
* @throws UnsupportedFileSystemException if there was no known implementation
* for the scheme.
* @throws IOException if the filesystem could not be loaded
*/
public static Class<? extends FileSystem> getFileSystemClass(String scheme,
Configuration conf) throws IOException {
if (!FILE_SYSTEMS_LOADED) {
loadFileSystems();
}
LOGGER.debug("Looking for FS supporting {}", scheme);
Class<? extends FileSystem> clazz = null;
if (conf != null) {
String property = "fs." + scheme + ".impl";
LOGGER.debug("looking for configuration option {}", property);
clazz = (Class<? extends FileSystem>) conf.getClass(
property, null);
} else {
LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme);
}
if (clazz == null) {
LOGGER.debug("Looking in service filesystems for implementation class");
clazz = SERVICE_FILE_SYSTEMS.get(scheme);
} else {
LOGGER.debug("Filesystem {} defined in configuration option", scheme);
}
if (clazz == null) {
throw new UnsupportedFileSystemException("No FileSystem for scheme "
+ "\"" + scheme + "\"");
}
LOGGER.debug("FS for {} is {}", scheme, clazz);
return clazz;
}
/**
* Create and initialize a new instance of a FileSystem.
* @param uri URI containing the FS schema and FS details
* @param conf configuration to use to look for the FS instance declaration
* and to pass to the {@link FileSystem#initialize(URI, Configuration)}.
* @return the initialized filesystem.
* @throws IOException problems loading or initializing the FileSystem
*/
private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
Tracer tracer = FsTracer.get(conf);
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
DurationInfo ignored =
new DurationInfo(LOGGER, false, "Creating FS %s", uri)) {
scope.addKVAnnotation("scheme", uri.getScheme());
Class<? extends FileSystem> clazz =
getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = ReflectionUtils.newInstance(clazz, conf);
try {
fs.initialize(uri, conf);
} catch (IOException | RuntimeException e) {
// exception raised during initialization.
// log summary at warn and full stack at debug
LOGGER.warn("Failed to initialize fileystem {}: {}",
uri, e.toString());
LOGGER.debug("Failed to initialize fileystem", e);
// then (robustly) close the FS, so as to invoke any
// cleanup code.
IOUtils.cleanupWithLogger(LOGGER, fs);
throw e;
}
return fs;
}
}
/** Caching FileSystem objects. */
static final class Cache {
private final ClientFinalizer clientFinalizer = new ClientFinalizer();
private final Map<Key, FileSystem> map = new HashMap<>();
private final Set<Key> toAutoClose = new HashSet<>();
/** Semaphore used to serialize creation of new FS instances. */
private final Semaphore creatorPermits;
/**
* Counter of the number of discarded filesystem instances
* in this cache. Primarily for testing, but it could possibly
* be made visible as some kind of metric.
*/
private final AtomicLong discardedInstances = new AtomicLong(0);
/** A variable that makes all objects in the cache unique. */
private static AtomicLong unique = new AtomicLong(1);
/**
* Instantiate. The configuration is used to read the
* count of permits issued for concurrent creation
* of filesystem instances.
* @param conf configuration
*/
Cache(final Configuration conf) {
int permits = conf.getInt(FS_CREATION_PARALLEL_COUNT,
FS_CREATION_PARALLEL_COUNT_DEFAULT);
checkArgument(permits > 0, "Invalid value of %s: %s",
FS_CREATION_PARALLEL_COUNT, permits);
creatorPermits = new Semaphore(permits);
}
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
}
/** The objects inserted into the cache using this method are all unique. */
FileSystem getUnique(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf, unique.getAndIncrement());
return getInternal(uri, conf, key);
}
/**
* Get the FS instance if the key maps to an instance, creating and
* initializing the FS if it is not found.
* If this is the first entry in the map and the JVM is not shutting down,
* this registers a shutdown hook to close filesystems, and adds this
* FS to the {@code toAutoClose} set if {@code "fs.automatic.close"}
* is set in the configuration (default: true).
* @param uri filesystem URI
* @param conf configuration
* @param key key to store/retrieve this FileSystem in the cache
* @return a cached or newly instantiated FileSystem.
* @throws IOException If an I/O error occurred.
*/
private FileSystem getInternal(URI uri, Configuration conf, Key key)
throws IOException{
FileSystem fs;
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
}
// fs not yet created, acquire lock
// to construct an instance.
try (DurationInfo d = new DurationInfo(LOGGER, false,
"Acquiring creator semaphore for %s", uri)) {
creatorPermits.acquireUninterruptibly();
}
FileSystem fsToClose = null;
try {
// See if FS was instantiated by another thread while waiting
// for the permit.
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
LOGGER.debug("Filesystem {} created while awaiting semaphore", uri);
return fs;
}
// create the filesystem
fs = createFileSystem(uri, conf);
final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
ShutdownHookManager.TIME_UNIT_DEFAULT);
// any FS to close outside of the synchronized section
synchronized (this) { // lock on the Cache object
// see if there is now an entry for the FS, which happens
// if another thread's creation overlapped with this one.
FileSystem oldfs = map.get(key);
if (oldfs != null) {
// a file system was created in a separate thread.
// save the FS reference to close outside all locks,
// and switch to returning the oldFS
fsToClose = fs;
fs = oldfs;
} else {
// register the clientFinalizer if needed and shutdown isn't
// already active
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer,
SHUTDOWN_HOOK_PRIORITY, timeout,
ShutdownHookManager.TIME_UNIT_DEFAULT);
}
// insert the new file system into the map
fs.key = key;
map.put(key, fs);
if (conf.getBoolean(
FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
toAutoClose.add(key);
}
}
} // end of synchronized block
} finally {
// release the creator permit.
creatorPermits.release();
}
if (fsToClose != null) {
LOGGER.debug("Duplicate FS created for {}; discarding {}",
uri, fs);
discardedInstances.incrementAndGet();
// close the new file system
// note this will briefly remove and reinstate "fsToClose" from
// the map. It is done in a synchronized block so will not be
// visible to others.
IOUtils.cleanupWithLogger(LOGGER, fsToClose);
}
return fs;
}
/**
* Get the count of discarded instances.
* @return the new instance.
*/
@VisibleForTesting
long getDiscardedInstances() {
return discardedInstances.get();
}
synchronized void remove(Key key, FileSystem fs) {
FileSystem cachedFs = map.remove(key);
if (fs == cachedFs) {
toAutoClose.remove(key);
} else if (cachedFs != null) {
map.put(key, cachedFs);
}
}
/**
* Close all FileSystems in the cache, whether they are marked for
* automatic closing or not.
* @throws IOException a problem arose closing one or more FileSystem.
*/
synchronized void closeAll() throws IOException {
closeAll(false);
}
/**
* Close all FileSystem instances in the Cache.
* @param onlyAutomatic only close those that are marked for automatic closing
* @throws IOException a problem arose closing one or more FileSystem.
*/
synchronized void closeAll(boolean onlyAutomatic) throws IOException {
List<IOException> exceptions = new ArrayList<>();
// Make a copy of the keys in the map since we'll be modifying
// the map while iterating over it, which isn't safe.
List<Key> keys = new ArrayList<>();
keys.addAll(map.keySet());
for (Key key : keys) {
final FileSystem fs = map.get(key);
if (onlyAutomatic && !toAutoClose.contains(key)) {
continue;
}
//remove from cache
map.remove(key);
toAutoClose.remove(key);
if (fs != null) {
try {
fs.close();
}
catch(IOException ioe) {
exceptions.add(ioe);
}
}
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
private class ClientFinalizer implements Runnable {
@Override
public synchronized void run() {
try {
closeAll(true);
} catch (IOException e) {
LOGGER.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
}
}
}
synchronized void closeAll(UserGroupInformation ugi) throws IOException {
List<FileSystem> targetFSList = new ArrayList<>(map.entrySet().size());
//Make a pass over the list and collect the FileSystems to close
//we cannot close inline since close() removes the entry from the Map
for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
final Key key = entry.getKey();
final FileSystem fs = entry.getValue();
if (ugi.equals(key.ugi) && fs != null) {
targetFSList.add(fs);
}
}
List<IOException> exceptions = new ArrayList<>();
//now make a pass over the target list and close each
for (FileSystem fs : targetFSList) {
try {
fs.close();
}
catch(IOException ioe) {
exceptions.add(ioe);
}
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
/** FileSystem.Cache.Key */
static class Key {
final String scheme;
final String authority;
final UserGroupInformation ugi;
final long unique; // an artificial way to make a key unique
Key(URI uri, Configuration conf) throws IOException {
this(uri, conf, 0);
}
Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null ?
"" : StringUtils.toLowerCase(uri.getScheme());
authority = uri.getAuthority()==null ?
"" : StringUtils.toLowerCase(uri.getAuthority());
this.unique = unique;
this.ugi = UserGroupInformation.getCurrentUser();
}
@Override
public int hashCode() {
return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
}
static boolean isEqual(Object a, Object b) {
return a == b || (a != null && a.equals(b));
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof Key) {
Key that = (Key)obj;
return isEqual(this.scheme, that.scheme)
&& isEqual(this.authority, that.authority)
&& isEqual(this.ugi, that.ugi)
&& (this.unique == that.unique);
}
return false;
}
@Override
public String toString() {
return "("+ugi.toString() + ")@" + scheme + "://" + authority;
}
}
}
/**
* Tracks statistics about how many reads, writes, and so forth have been
* done in a FileSystem.
*
* Since there is only one of these objects per FileSystem, there will
* typically be many threads writing to this object. Almost every operation
* on an open file will involve a write to this object. In contrast, reading
* statistics is done infrequently by most programs, and not at all by others.
* Hence, this is optimized for writes.
*
* Each thread writes to its own thread-local area of memory. This removes
* contention and allows us to scale up to many, many threads. To read
* statistics, the reader thread totals up the contents of all of the
* thread-local data areas.
*/
public static final class Statistics {
/**
* Statistics data.
*
* There is only a single writer to thread-local StatisticsData objects.
* Hence, volatile is adequate here-- we do not need AtomicLong or similar
* to prevent lost updates.
* The Java specification guarantees that updates to volatile longs will
* be perceived as atomic with respect to other threads, which is all we
* need.
*/
public static class StatisticsData {
private volatile long bytesRead;
private volatile long bytesWritten;
private volatile int readOps;
private volatile int largeReadOps;
private volatile int writeOps;
private volatile long bytesReadLocalHost;
private volatile long bytesReadDistanceOfOneOrTwo;
private volatile long bytesReadDistanceOfThreeOrFour;
private volatile long bytesReadDistanceOfFiveOrLarger;
private volatile long bytesReadErasureCoded;
/**
* Add another StatisticsData object to this one.
*/
void add(StatisticsData other) {
this.bytesRead += other.bytesRead;
this.bytesWritten += other.bytesWritten;
this.readOps += other.readOps;
this.largeReadOps += other.largeReadOps;
this.writeOps += other.writeOps;
this.bytesReadLocalHost += other.bytesReadLocalHost;
this.bytesReadDistanceOfOneOrTwo += other.bytesReadDistanceOfOneOrTwo;
this.bytesReadDistanceOfThreeOrFour +=
other.bytesReadDistanceOfThreeOrFour;
this.bytesReadDistanceOfFiveOrLarger +=
other.bytesReadDistanceOfFiveOrLarger;
this.bytesReadErasureCoded += other.bytesReadErasureCoded;
}
/**
* Negate the values of all statistics.
*/
void negate() {
this.bytesRead = -this.bytesRead;
this.bytesWritten = -this.bytesWritten;
this.readOps = -this.readOps;
this.largeReadOps = -this.largeReadOps;
this.writeOps = -this.writeOps;
this.bytesReadLocalHost = -this.bytesReadLocalHost;
this.bytesReadDistanceOfOneOrTwo = -this.bytesReadDistanceOfOneOrTwo;
this.bytesReadDistanceOfThreeOrFour =
-this.bytesReadDistanceOfThreeOrFour;
this.bytesReadDistanceOfFiveOrLarger =
-this.bytesReadDistanceOfFiveOrLarger;
this.bytesReadErasureCoded = -this.bytesReadErasureCoded;
}
@Override
public String toString() {
return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
+ readOps + " read ops, " + largeReadOps + " large read ops, "
+ writeOps + " write ops";
}
public long getBytesRead() {
return bytesRead;
}
public long getBytesWritten() {
return bytesWritten;
}
public int getReadOps() {
return readOps;
}
public int getLargeReadOps() {
return largeReadOps;
}
public int getWriteOps() {
return writeOps;
}
public long getBytesReadLocalHost() {
return bytesReadLocalHost;
}
public long getBytesReadDistanceOfOneOrTwo() {
return bytesReadDistanceOfOneOrTwo;
}
public long getBytesReadDistanceOfThreeOrFour() {
return bytesReadDistanceOfThreeOrFour;
}
public long getBytesReadDistanceOfFiveOrLarger() {
return bytesReadDistanceOfFiveOrLarger;
}
public long getBytesReadErasureCoded() {
return bytesReadErasureCoded;
}
}
private interface StatisticsAggregator<T> {
void accept(StatisticsData data);
T aggregate();
}
private final String scheme;
/**
* rootData is data that doesn't belong to any thread, but will be added
* to the totals. This is useful for making copies of Statistics objects,
* and for storing data that pertains to threads that have been garbage
* collected. Protected by the Statistics lock.
*/
private final StatisticsData rootData;
/**
* Thread-local data.
*/
@SuppressWarnings("ThreadLocalNotStaticFinal")
private final ThreadLocal<StatisticsData> threadData;
/**
* Set of all thread-local data areas. Protected by the Statistics lock.
* The references to the statistics data are kept using weak references
* to the associated threads. Proper clean-up is performed by the cleaner
* thread when the threads are garbage collected.
*/
private final Set<StatisticsDataReference> allData;
/**
* Global reference queue and a cleaner thread that manage statistics data
* references from all filesystem instances.
*/
private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
private static final Thread STATS_DATA_CLEANER;
static {
STATS_DATA_REF_QUEUE = new ReferenceQueue<>();
// start a single daemon cleaner thread
STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
STATS_DATA_CLEANER.
setName(StatisticsDataReferenceCleaner.class.getName());
STATS_DATA_CLEANER.setDaemon(true);
STATS_DATA_CLEANER.start();
}
public Statistics(String scheme) {
this.scheme = scheme;
this.rootData = new StatisticsData();
this.threadData = new ThreadLocal<>();
this.allData = new HashSet<>();
}
/**
* Copy constructor.
*
* @param other The input Statistics object which is cloned.
*/
public Statistics(Statistics other) {
this.scheme = other.scheme;
this.rootData = new StatisticsData();
other.visitAll(new StatisticsAggregator<Void>() {
@Override
public void accept(StatisticsData data) {
rootData.add(data);
}
public Void aggregate() {
return null;
}
});
this.threadData = new ThreadLocal<>();
this.allData = new HashSet<>();
}
/**
* A weak reference to a thread that also includes the data associated
* with that thread. On the thread being garbage collected, it is enqueued
* to the reference queue for clean-up.
*/
private final class StatisticsDataReference extends WeakReference<Thread> {
private final StatisticsData data;
private StatisticsDataReference(StatisticsData data, Thread thread) {
super(thread, STATS_DATA_REF_QUEUE);
this.data = data;
}
public StatisticsData getData() {
return data;
}
/**
* Performs clean-up action when the associated thread is garbage
* collected.
*/
public void cleanUp() {
// use the statistics lock for safety
synchronized (Statistics.this) {
/*
* If the thread that created this thread-local data no longer exists,
* remove the StatisticsData from our list and fold the values into
* rootData.
*/
rootData.add(data);
allData.remove(this);
}
}
}
/**
* Background action to act on references being removed.
*/
private static class StatisticsDataReferenceCleaner implements Runnable {
@Override
public void run() {
while (!Thread.interrupted()) {
try {
StatisticsDataReference ref =
(StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
ref.cleanUp();
} catch (InterruptedException ie) {
LOGGER.warn("Cleaner thread interrupted, will stop", ie);
Thread.currentThread().interrupt();
} catch (Throwable th) {
LOGGER.warn("Exception in the cleaner thread but it will" +
" continue to run", th);
}
}
}
}
/**
* Get or create the thread-local data associated with the current thread.
* @return statistics data.
*/
public StatisticsData getThreadStatistics() {
StatisticsData data = threadData.get();
if (data == null) {
data = new StatisticsData();
threadData.set(data);
StatisticsDataReference ref =
new StatisticsDataReference(data, Thread.currentThread());
synchronized(this) {
allData.add(ref);
}
}
return data;
}
/**
* Increment the bytes read in the statistics.
* @param newBytes the additional bytes read
*/
public void incrementBytesRead(long newBytes) {
getThreadStatistics().bytesRead += newBytes;
}
/**
* Increment the bytes written in the statistics.
* @param newBytes the additional bytes written
*/
public void incrementBytesWritten(long newBytes) {
getThreadStatistics().bytesWritten += newBytes;
}
/**
* Increment the number of read operations.
* @param count number of read operations
*/
public void incrementReadOps(int count) {
getThreadStatistics().readOps += count;
}
/**
* Increment the number of large read operations.
* @param count number of large read operations
*/
public void incrementLargeReadOps(int count) {
getThreadStatistics().largeReadOps += count;
}
/**
* Increment the number of write operations.
* @param count number of write operations
*/
public void incrementWriteOps(int count) {
getThreadStatistics().writeOps += count;
}
/**
* Increment the bytes read on erasure-coded files in the statistics.
* @param newBytes the additional bytes read
*/
public void incrementBytesReadErasureCoded(long newBytes) {
getThreadStatistics().bytesReadErasureCoded += newBytes;
}
/**
* Increment the bytes read by the network distance in the statistics
* In the common network topology setup, distance value should be an even
* number such as 0, 2, 4, 6. To make it more general, we group distance
* by {1, 2}, {3, 4} and {5 and beyond} for accounting.
* @param distance the network distance
* @param newBytes the additional bytes read
*/
public void incrementBytesReadByDistance(int distance, long newBytes) {
switch (distance) {
case 0:
getThreadStatistics().bytesReadLocalHost += newBytes;
break;
case 1:
case 2:
getThreadStatistics().bytesReadDistanceOfOneOrTwo += newBytes;
break;
case 3:
case 4:
getThreadStatistics().bytesReadDistanceOfThreeOrFour += newBytes;
break;
default:
getThreadStatistics().bytesReadDistanceOfFiveOrLarger += newBytes;
break;
}
}
/**
* Apply the given aggregator to all StatisticsData objects associated with
* this Statistics object.
*
* For each StatisticsData object, we will call accept on the visitor.
* Finally, at the end, we will call aggregate to get the final total.
*
* @param visitor to use.
* @return The total.
*/
private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
visitor.accept(rootData);
for (StatisticsDataReference ref: allData) {
StatisticsData data = ref.getData();
visitor.accept(data);
}
return visitor.aggregate();
}
/**
* Get the total number of bytes read.
* @return the number of bytes
*/
public long getBytesRead() {
return visitAll(new StatisticsAggregator<Long>() {
private long bytesRead = 0;
@Override
public void accept(StatisticsData data) {
bytesRead += data.bytesRead;
}
public Long aggregate() {
return bytesRead;
}
});
}
/**
* Get the total number of bytes written.
* @return the number of bytes
*/
public long getBytesWritten() {
return visitAll(new StatisticsAggregator<Long>() {
private long bytesWritten = 0;
@Override
public void accept(StatisticsData data) {
bytesWritten += data.bytesWritten;
}
public Long aggregate() {
return bytesWritten;
}
});
}
/**
* Get the number of file system read operations such as list files.
* @return number of read operations
*/
public int getReadOps() {
return visitAll(new StatisticsAggregator<Integer>() {
private int readOps = 0;
@Override
public void accept(StatisticsData data) {
readOps += data.readOps;
readOps += data.largeReadOps;
}
public Integer aggregate() {
return readOps;
}
});
}
/**
* Get the number of large file system read operations such as list files
* under a large directory.
* @return number of large read operations
*/
public int getLargeReadOps() {
return visitAll(new StatisticsAggregator<Integer>() {
private int largeReadOps = 0;
@Override
public void accept(StatisticsData data) {
largeReadOps += data.largeReadOps;
}
public Integer aggregate() {
return largeReadOps;
}
});
}
/**
* Get the number of file system write operations such as create, append
* rename etc.
* @return number of write operations
*/
public int getWriteOps() {
return visitAll(new StatisticsAggregator<Integer>() {
private int writeOps = 0;
@Override
public void accept(StatisticsData data) {
writeOps += data.writeOps;
}
public Integer aggregate() {
return writeOps;
}
});
}
/**
* In the common network topology setup, distance value should be an even
* number such as 0, 2, 4, 6. To make it more general, we group distance
* by {1, 2}, {3, 4} and {5 and beyond} for accounting. So if the caller
* ask for bytes read for distance 2, the function will return the value
* for group {1, 2}.
* @param distance the network distance
* @return the total number of bytes read by the network distance
*/
public long getBytesReadByDistance(int distance) {
long bytesRead;
switch (distance) {
case 0:
bytesRead = getData().getBytesReadLocalHost();
break;
case 1:
case 2:
bytesRead = getData().getBytesReadDistanceOfOneOrTwo();
break;
case 3:
case 4:
bytesRead = getData().getBytesReadDistanceOfThreeOrFour();
break;
default:
bytesRead = getData().getBytesReadDistanceOfFiveOrLarger();
break;
}
return bytesRead;
}
/**
* Get all statistics data.
* MR or other frameworks can use the method to get all statistics at once.
* @return the StatisticsData
*/
public StatisticsData getData() {
return visitAll(new StatisticsAggregator<StatisticsData>() {
private StatisticsData all = new StatisticsData();
@Override
public void accept(StatisticsData data) {
all.add(data);
}
public StatisticsData aggregate() {
return all;
}
});
}
/**
* Get the total number of bytes read on erasure-coded files.
* @return the number of bytes
*/
public long getBytesReadErasureCoded() {
return visitAll(new StatisticsAggregator<Long>() {
private long bytesReadErasureCoded = 0;
@Override
public void accept(StatisticsData data) {
bytesReadErasureCoded += data.bytesReadErasureCoded;
}
public Long aggregate() {
return bytesReadErasureCoded;
}
});
}
@Override
public String toString() {
return visitAll(new StatisticsAggregator<String>() {
private StatisticsData total = new StatisticsData();
@Override
public void accept(StatisticsData data) {
total.add(data);
}
public String aggregate() {
return total.toString();
}
});
}
/**
* Resets all statistics to 0.
*
* In order to reset, we add up all the thread-local statistics data, and
* set rootData to the negative of that.
*
* This may seem like a counterintuitive way to reset the statistics. Why
* can't we just zero out all the thread-local data? Well, thread-local
* data can only be modified by the thread that owns it. If we tried to
* modify the thread-local data from this thread, our modification might get
* interleaved with a read-modify-write operation done by the thread that
* owns the data. That would result in our update getting lost.
*
* The approach used here avoids this problem because it only ever reads
* (not writes) the thread-local data. Both reads and writes to rootData
* are done under the lock, so we're free to modify rootData from any thread
* that holds the lock.
*/
public void reset() {
visitAll(new StatisticsAggregator<Void>() {
private StatisticsData total = new StatisticsData();
@Override
public void accept(StatisticsData data) {
total.add(data);
}
public Void aggregate() {
total.negate();
rootData.add(total);
return null;
}
});
}
/**
* Get the uri scheme associated with this statistics object.
* @return the schema associated with this set of statistics
*/
public String getScheme() {
return scheme;
}
@VisibleForTesting
synchronized int getAllThreadLocalDataSize() {
return allData.size();
}
}
/**
* Get the Map of Statistics object indexed by URI Scheme.
* @return a Map having a key as URI scheme and value as Statistics object
* @deprecated use {@link #getGlobalStorageStatistics()}
*/
@Deprecated
public static synchronized Map<String, Statistics> getStatistics() {
Map<String, Statistics> result = new HashMap<>();
for(Statistics stat: statisticsTable.values()) {
result.put(stat.getScheme(), stat);
}
return result;
}
/**
* Return the FileSystem classes that have Statistics.
* @deprecated use {@link #getGlobalStorageStatistics()}
* @return statistics lists.
*/
@Deprecated
public static synchronized List<Statistics> getAllStatistics() {
return new ArrayList<>(statisticsTable.values());
}
/**
* Get the statistics for a particular file system.
* @param scheme scheme.
* @param cls the class to lookup
* @return a statistics object
* @deprecated use {@link #getGlobalStorageStatistics()}
*/
@Deprecated
public static synchronized Statistics getStatistics(final String scheme,
Class<? extends FileSystem> cls) {
checkArgument(scheme != null,
"No statistics is allowed for a file system with null scheme!");
Statistics result = statisticsTable.get(cls);
if (result == null) {
final Statistics newStats = new Statistics(scheme);
statisticsTable.put(cls, newStats);
result = newStats;
GlobalStorageStatistics.INSTANCE.put(scheme,
new StorageStatisticsProvider() {
@Override
public StorageStatistics provide() {
return new FileSystemStorageStatistics(scheme, newStats);
}
});
}
return result;
}
/**
* Reset all statistics for all file systems.
*/
public static synchronized void clearStatistics() {
GlobalStorageStatistics.INSTANCE.reset();
}
/**
* Print all statistics for all file systems to {@code System.out}
* @throws IOException If an I/O error occurred.
*/
public static synchronized
void printStatistics() throws IOException {
for (Map.Entry<Class<? extends FileSystem>, Statistics> pair:
statisticsTable.entrySet()) {
System.out.println(" FileSystem " + pair.getKey().getName() +
": " + pair.getValue());
}
}
// Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
private static boolean symlinksEnabled = false;
@VisibleForTesting
public static boolean areSymlinksEnabled() {
return symlinksEnabled;
}
@VisibleForTesting
public static void enableSymlinks() {
symlinksEnabled = true;
}
/**
* Get the StorageStatistics for this FileSystem object. These statistics are
* per-instance. They are not shared with any other FileSystem object.
*
* <p>This is a default method which is intended to be overridden by
* subclasses. The default implementation returns an empty storage statistics
* object.</p>
*
* @return The StorageStatistics for this FileSystem instance.
* Will never be null.
*/
public StorageStatistics getStorageStatistics() {
return new EmptyStorageStatistics(getUri().toString());
}
/**
* Get the global storage statistics.
* @return global storage statistics.
*/
public static GlobalStorageStatistics getGlobalStorageStatistics() {
return GlobalStorageStatistics.INSTANCE;
}
/**
* Create instance of the standard FSDataOutputStreamBuilder for the
* given filesystem and path.
* @param fileSystem owner
* @param path path to create
* @return a builder.
*/
@InterfaceStability.Unstable
protected static FSDataOutputStreamBuilder createDataOutputStreamBuilder(
@Nonnull final FileSystem fileSystem,
@Nonnull final Path path) {
return new FileSystemDataOutputStreamBuilder(fileSystem, path);
}
/**
* Standard implementation of the FSDataOutputStreamBuilder; invokes
* create/createNonRecursive or Append depending upon the options.
*/
private static final class FileSystemDataOutputStreamBuilder extends
FSDataOutputStreamBuilder<FSDataOutputStream,
FileSystemDataOutputStreamBuilder> {
/**
* Constructor.
* @param fileSystem owner
* @param p path to create
*/
private FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
super(fileSystem, p);
}
@Override
public FSDataOutputStream build() throws IOException {
rejectUnknownMandatoryKeys(Collections.emptySet(),
" for " + getPath());
if (getFlags().contains(CreateFlag.CREATE) ||
getFlags().contains(CreateFlag.OVERWRITE)) {
if (isRecursive()) {
return getFS().create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(), getProgress(),
getChecksumOpt());
} else {
return getFS().createNonRecursive(getPath(), getPermission(),
getFlags(), getBufferSize(), getReplication(), getBlockSize(),
getProgress());
}
} else if (getFlags().contains(CreateFlag.APPEND)) {
return getFS().append(getPath(), getBufferSize(), getProgress());
}
throw new PathIOException(getPath().toString(),
"Must specify either create, overwrite or append");
}
@Override
public FileSystemDataOutputStreamBuilder getThisBuilder() {
return this;
}
}
/**
* Create a new FSDataOutputStreamBuilder for the file with path.
* Files are overwritten by default.
*
* @param path file path
* @return a FSDataOutputStreamBuilder object to build the file
*
* HADOOP-14384. Temporarily reduce the visibility of method before the
* builder interface becomes stable.
*/
public FSDataOutputStreamBuilder createFile(Path path) {
return createDataOutputStreamBuilder(this, path)
.create().overwrite(true);
}
/**
* Create a Builder to append a file.
* @param path file path.
* @return a {@link FSDataOutputStreamBuilder} to build file append request.
*/
public FSDataOutputStreamBuilder appendFile(Path path) {
return createDataOutputStreamBuilder(this, path).append();
}
/**
* Open a file for reading through a builder API.
* Ultimately calls {@link #open(Path, int)} unless a subclass
* executes the open command differently.
*
* The semantics of this call are therefore the same as that of
* {@link #open(Path, int)} with one special point: it is in
* {@code FSDataInputStreamBuilder.build()} in which the open operation
* takes place -it is there where all preconditions to the operation
* are checked.
* @param path file path
* @return a FSDataInputStreamBuilder object to build the input stream
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public FutureDataInputStreamBuilder openFile(Path path)
throws IOException, UnsupportedOperationException {
return createDataInputStreamBuilder(this, path).getThisBuilder();
}
/**
* Open a file for reading through a builder API.
* Ultimately calls {@link #open(PathHandle, int)} unless a subclass
* executes the open command differently.
*
* If PathHandles are unsupported, this may fail in the
* {@code FSDataInputStreamBuilder.build()} command,
* rather than in this {@code openFile()} operation.
* @param pathHandle path handle.
* @return a FSDataInputStreamBuilder object to build the input stream
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public FutureDataInputStreamBuilder openFile(PathHandle pathHandle)
throws IOException, UnsupportedOperationException {
return createDataInputStreamBuilder(this, pathHandle)
.getThisBuilder();
}
/**
* Execute the actual open file operation.
*
* This is invoked from {@code FSDataInputStreamBuilder.build()}
* and from {@link DelegateToFileSystem} and is where
* the action of opening the file should begin.
*
* The base implementation performs a blocking
* call to {@link #open(Path, int)} in this call;
* the actual outcome is in the returned {@code CompletableFuture}.
* This avoids having to create some thread pool, while still
* setting up the expectation that the {@code get()} call
* is needed to evaluate the result.
* @param path path to the file
* @param parameters open file parameters from the builder.
* @return a future which will evaluate to the opened file.
* @throws IOException failure to resolve the link.
* @throws IllegalArgumentException unknown mandatory key
*/
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path,
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
open(path, parameters.getBufferSize()));
}
/**
* Execute the actual open file operation.
* The base implementation performs a blocking
* call to {@link #open(Path, int)} in this call;
* the actual outcome is in the returned {@code CompletableFuture}.
* This avoids having to create some thread pool, while still
* setting up the expectation that the {@code get()} call
* is needed to evaluate the result.
* @param pathHandle path to the file
* @param parameters open file parameters from the builder.
* @return a future which will evaluate to the opened file.
* @throws IOException failure to resolve the link.
* @throws IllegalArgumentException unknown mandatory key
* @throws UnsupportedOperationException PathHandles are not supported.
* This may be deferred until the future is evaluated.
*/
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final PathHandle pathHandle,
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS, "");
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
try {
result.complete(open(pathHandle, parameters.getBufferSize()));
} catch (UnsupportedOperationException tx) {
// fail fast here
throw tx;
} catch (Throwable tx) {
// fail lazily here to ensure callers expect all File IO operations to
// surface later
result.completeExceptionally(tx);
}
return result;
}
/**
* Helper method that throws an {@link UnsupportedOperationException} for the
* current {@link FileSystem} method being called.
*/
private void methodNotSupported() {
// The order of the stacktrace elements is (from top to bottom):
// - java.lang.Thread.getStackTrace
// - org.apache.hadoop.fs.FileSystem.methodNotSupported
// - <the FileSystem method>
// therefore, to find out the current method name, we use the element at
// index 2.
String name = Thread.currentThread().getStackTrace()[2].getMethodName();
throw new UnsupportedOperationException(getClass().getCanonicalName() +
" does not support method " + name);
}
/**
* Create instance of the standard {@link FSDataInputStreamBuilder} for the
* given filesystem and path.
* @param fileSystem owner
* @param path path to read
* @return a builder.
*/
@InterfaceAudience.LimitedPrivate("Filesystems")
@InterfaceStability.Unstable
protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
@Nonnull final FileSystem fileSystem,
@Nonnull final Path path) {
return new FSDataInputStreamBuilder(fileSystem, path);
}
/**
* Create instance of the standard {@link FSDataInputStreamBuilder} for the
* given filesystem and path handle.
* @param fileSystem owner
* @param pathHandle path handle of file to open.
* @return a builder.
*/
@InterfaceAudience.LimitedPrivate("Filesystems")
@InterfaceStability.Unstable
protected static FSDataInputStreamBuilder createDataInputStreamBuilder(
@Nonnull final FileSystem fileSystem,
@Nonnull final PathHandle pathHandle) {
return new FSDataInputStreamBuilder(fileSystem, pathHandle);
}
/**
* Builder returned for {@code #openFile(Path)}
* and {@code #openFile(PathHandle)}.
*/
private static class FSDataInputStreamBuilder
extends FutureDataInputStreamBuilderImpl
implements FutureDataInputStreamBuilder {
/**
* Path Constructor.
* @param fileSystem owner
* @param path path to open.
*/
protected FSDataInputStreamBuilder(
@Nonnull final FileSystem fileSystem,
@Nonnull final Path path) {
super(fileSystem, path);
}
/**
* Construct from a path handle.
* @param fileSystem owner
* @param pathHandle path handle of file to open.
*/
protected FSDataInputStreamBuilder(
@Nonnull final FileSystem fileSystem,
@Nonnull final PathHandle pathHandle) {
super(fileSystem, pathHandle);
}
/**
* Perform the open operation.
* Returns a future which, when get() or a chained completion
* operation is invoked, will supply the input stream of the file
* referenced by the path/path handle.
* @return a future to the input stream.
* @throws IOException early failure to open
* @throws UnsupportedOperationException if the specific operation
* is not supported.
* @throws IllegalArgumentException if the parameters are not valid.
*/
@Override
public CompletableFuture<FSDataInputStream> build() throws IOException {
Optional<Path> optionalPath = getOptionalPath();
OpenFileParameters parameters = new OpenFileParameters()
.withMandatoryKeys(getMandatoryKeys())
.withOptionalKeys(getOptionalKeys())
.withOptions(getOptions())
.withStatus(super.getStatus())
.withBufferSize(
getOptions().getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, getBufferSize()));
if(optionalPath.isPresent()) {
return getFS().openFileWithOptions(optionalPath.get(),
parameters);
} else {
return getFS().openFileWithOptions(getPathHandle(),
parameters);
}
}
}
/**
* Create a multipart uploader.
* @param basePath file path under which all files are uploaded
* @return a MultipartUploaderBuilder object to build the uploader
* @throws IOException if some early checks cause IO failures.
* @throws UnsupportedOperationException if support is checked early.
*/
@InterfaceStability.Unstable
public MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException {
methodNotSupported();
return null;
}
}
相关信息
相关文章
hadoop BatchListingOperations 源码
hadoop BatchedRemoteIterator 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦