hadoop InodeTree 源码
haddop InodeTree 代码
文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.util.Collection;
import java.util.Comparator;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import org.apache.hadoop.util.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* InodeTree implements a mount-table as a tree of inodes.
* It is used to implement ViewFs and ViewFileSystem.
* In order to use it the caller must subclass it and implement
* the abstract methods {@link #getTargetFileSystem(INodeDir)}, etc.
*
* The mountable is initialized from the config variables as
* specified in {@link ViewFs}
*
* @param <T> is AbstractFileSystem or FileSystem
*
* The two main methods are
* {@link #InodeTree(Configuration, String, URI, boolean)} // constructor
* {@link #resolve(String, boolean)}
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class InodeTree<T> {
private static final Logger LOGGER =
LoggerFactory.getLogger(InodeTree.class.getName());
enum ResultKind {
INTERNAL_DIR,
EXTERNAL_DIR
}
static final Path SlashPath = new Path("/");
// the root of the mount table
private final INode<T> root;
// the fallback filesystem
private INodeLink<T> rootFallbackLink;
// the homedir for this mount table
private final String homedirPrefix;
private List<MountPoint<T>> mountPoints = new ArrayList<MountPoint<T>>();
private List<RegexMountPoint<T>> regexMountPointList =
new ArrayList<RegexMountPoint<T>>();
private final boolean isNestedMountPointSupported;
public static class MountPoint<T> {
String src;
INodeLink<T> target;
MountPoint(String srcPath, INodeLink<T> mountLink) {
src = srcPath;
target = mountLink;
}
/**
* Returns the source of mount point.
* @return The source
*/
public String getSource() {
return this.src;
}
/**
* Returns the target INode link.
* @return The target INode link
*/
public INodeLink<T> getTarget() {
return this.target;
}
}
/**
* Breaks file path into component names.
* @param path
* @return array of names component names
*/
static String[] breakIntoPathComponents(final String path) {
return path == null ? null : path.split(Path.SEPARATOR);
}
/**
* Internal class for INode tree.
* @param <T>
*/
abstract static class INode<T> {
final String fullPath; // the full path to the root
public INode(String pathToNode, UserGroupInformation aUgi) {
fullPath = pathToNode;
}
// INode forming the internal mount table directory tree
// for ViewFileSystem. This internal directory tree is
// constructed based on the mount table config entries
// and is read only.
abstract boolean isInternalDir();
// INode linking to another filesystem. Represented
// via mount table link config entries.
boolean isLink() {
return !isInternalDir();
}
/**
* Return the link if isLink.
* @return will return null, for non links.
*/
INodeLink<T> getLink() {
return null;
}
}
/**
* Internal class to represent an internal dir of the mount table.
* @param <T>
*/
static class INodeDir<T> extends INode<T> {
private final Map<String, INode<T>> children = new HashMap<>();
private T internalDirFs = null; //filesystem of this internal directory
private boolean isRoot = false;
private INodeLink<T> fallbackLink = null;
INodeDir(final String pathToNode, final UserGroupInformation aUgi) {
super(pathToNode, aUgi);
}
@Override
boolean isInternalDir() {
return true;
}
T getInternalDirFs() {
return internalDirFs;
}
void setInternalDirFs(T internalDirFs) {
this.internalDirFs = internalDirFs;
}
void setRoot(boolean root) {
isRoot = root;
}
boolean isRoot() {
return isRoot;
}
INodeLink<T> getFallbackLink() {
return fallbackLink;
}
void addFallbackLink(INodeLink<T> link) throws IOException {
if (!isRoot) {
throw new IOException("Fallback link can only be added for root");
}
this.fallbackLink = link;
}
Map<String, INode<T>> getChildren() {
return Collections.unmodifiableMap(children);
}
INode<T> resolveInternal(final String pathComponent) {
return children.get(pathComponent);
}
INodeDir<T> addDir(final String pathComponent,
final UserGroupInformation aUgi) throws FileAlreadyExistsException {
if (children.containsKey(pathComponent)) {
throw new FileAlreadyExistsException();
}
final INodeDir<T> newDir = new INodeDir<T>(fullPath +
(isRoot() ? "" : "/") + pathComponent, aUgi);
children.put(pathComponent, newDir);
return newDir;
}
void addLink(final String pathComponent, final INodeLink<T> link)
throws FileAlreadyExistsException {
if (children.containsKey(pathComponent)) {
throw new FileAlreadyExistsException();
}
children.put(pathComponent, link);
}
void addDirLink(final String pathComponent, final INodeDirLink<T> dirLink) {
children.put(pathComponent, dirLink);
}
}
/**
* Internal class to represent an INodeDir which also contains a INodeLink. This is used to support nested mount points
* where an INode is internalDir but points to a mount link. The class is a subclass of INodeDir and the semantics are
* as follows:
* isLink(): true
* isInternalDir(): true
* @param <T>
*/
static class INodeDirLink<T> extends INodeDir<T> {
/**
* INodeLink wrapped in the INodeDir
*/
private final INodeLink<T> link;
INodeDirLink(String pathToNode, UserGroupInformation aUgi, INodeLink<T> link) {
super(pathToNode, aUgi);
this.link = link;
}
@Override
INodeLink<T> getLink() {
return link;
}
/**
* True because the INodeDirLink also contains a INodeLink
*/
@Override
boolean isLink() {
return true;
}
/**
* True because the INodeDirLink is internal node
*/
@Override
boolean isInternalDir() {
return true;
}
}
/**
* Mount table link type.
*/
enum LinkType {
/**
* Link entry pointing to a single filesystem uri.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.link.<link_name>
* Refer: {@link Constants#CONFIG_VIEWFS_LINK}
*/
SINGLE,
/**
* Fallback filesystem for the paths not mounted by
* any single link entries.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkFallback
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_FALLBACK}
*/
SINGLE_FALLBACK,
/**
* Link entry pointing to an union of two or more filesystem uris.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMerge.<link_name>
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE}
*/
MERGE,
/**
* Link entry for merging mount table's root with the
* root of another filesystem.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMergeSlash
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE_SLASH}
*/
MERGE_SLASH,
/**
* Link entry to write to multiple filesystems and read
* from the closest filesystem.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkNfly
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_NFLY}
*/
NFLY,
/**
* Link entry which source are regex exrepssions and target refer matched
* group from source
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkRegex
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_REGEX}
*/
REGEX;
}
/**
* An internal class to represent a mount link.
* A mount link can be single dir link or a merge dir link.
* A merge dir link is a merge (junction) of links to dirs:
* example : merge of 2 dirs
* /users -> hdfs:nn1//users
* /users -> hdfs:nn2//users
*
* For a merge, each target is checked to be dir when created but if target
* is changed later it is then ignored (a dir with null entries)
*/
public static class INodeLink<T> extends INode<T> {
final String[] targetDirLinkList;
private T targetFileSystem; // file system object created from the link.
// Function to initialize file system. Only applicable for simple links
private Function<URI, T> fileSystemInitMethod;
private final Object lock = new Object();
/**
* Construct a mergeLink or nfly.
*/
INodeLink(final String pathToNode, final UserGroupInformation aUgi,
final T targetMergeFs, final String[] aTargetDirLinkList) {
super(pathToNode, aUgi);
targetFileSystem = targetMergeFs;
targetDirLinkList = aTargetDirLinkList;
}
/**
* Construct a simple link (i.e. not a mergeLink).
*/
INodeLink(final String pathToNode, final UserGroupInformation aUgi,
Function<URI, T> createFileSystemMethod,
final String aTargetDirLink) throws URISyntaxException {
super(pathToNode, aUgi);
targetFileSystem = null;
targetDirLinkList = new String[1];
targetDirLinkList[0] = new URI(aTargetDirLink).toString();
this.fileSystemInitMethod = createFileSystemMethod;
}
/**
* Get the target of the link. If a merge link then it returned
* as "," separated URI list.
*
* @return the path.
*/
public Path getTargetLink() {
StringBuilder result = new StringBuilder(targetDirLinkList[0].toString());
// If merge link, use "," as separator between the merged URIs
for (int i = 1; i < targetDirLinkList.length; ++i) {
result.append(',').append(targetDirLinkList[i].toString());
}
return new Path(result.toString());
}
@Override
boolean isInternalDir() {
return false;
}
@Override
INodeLink<T> getLink() {
return this;
}
/**
* Get the instance of FileSystem to use, creating one if needed.
* @return An Initialized instance of T
* @throws IOException raised on errors performing I/O.
*/
public T getTargetFileSystem() throws IOException {
if (targetFileSystem != null) {
return targetFileSystem;
}
// For non NFLY and MERGE links, we initialize the FileSystem when the
// corresponding mount path is accessed.
if (targetDirLinkList.length == 1) {
synchronized (lock) {
if (targetFileSystem != null) {
return targetFileSystem;
}
targetFileSystem =
fileSystemInitMethod.apply(URI.create(targetDirLinkList[0]));
if (targetFileSystem == null) {
throw new IOException(
"Could not initialize target File System for URI : " +
targetDirLinkList[0]);
}
}
}
return targetFileSystem;
}
}
private void createLink(final String src, final String target,
final LinkType linkType, final String settings,
final UserGroupInformation aUgi,
final Configuration config)
throws URISyntaxException, IOException,
FileAlreadyExistsException, UnsupportedFileSystemException {
// Validate that src is valid absolute path
final Path srcPath = new Path(src);
if (!srcPath.isAbsoluteAndSchemeAuthorityNull()) {
throw new IOException("ViewFs: Non absolute mount name in config:" + src);
}
final String[] srcPaths = breakIntoPathComponents(src);
// Make sure root is of INodeDir type before
// adding any regular links to it.
Preconditions.checkState(root.isInternalDir());
INodeDir<T> curInode = getRootDir();
int i;
// Ignore first initial slash, process all except last component
for (i = 1; i < srcPaths.length - 1; i++) {
final String iPath = srcPaths[i];
INode<T> nextInode = curInode.resolveInternal(iPath);
if (nextInode == null) {
INodeDir<T> newDir = curInode.addDir(iPath, aUgi);
newDir.setInternalDirFs(getTargetFileSystem(newDir));
nextInode = newDir;
}
if (!nextInode.isInternalDir()) {
if (isNestedMountPointSupported) {
// nested mount detected, add a new INodeDirLink that wraps existing INodeLink to INodeTree and override existing INodelink
INodeDirLink<T> dirLink = new INodeDirLink<T>(nextInode.fullPath, aUgi, (INodeLink<T>) nextInode);
curInode.addDirLink(iPath, dirLink);
curInode = dirLink;
} else {
// Error - expected a dir but got a link
throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
" already exists as link");
}
} else {
assert(nextInode.isInternalDir());
curInode = (INodeDir<T>) nextInode;
}
}
// Now process the last component
// Add the link in 2 cases: does not exist or a link exists
String iPath = srcPaths[i];// last component
if (curInode.resolveInternal(iPath) != null) {
// directory/link already exists
StringBuilder strB = new StringBuilder(srcPaths[0]);
for (int j = 1; j <= i; ++j) {
strB.append('/').append(srcPaths[j]);
}
throw new FileAlreadyExistsException("Path " + strB +
" already exists as dir; cannot create link here");
}
final INodeLink<T> newLink;
final String fullPath = curInode.fullPath + (curInode == root ? "" : "/")
+ iPath;
switch (linkType) {
case SINGLE:
newLink = new INodeLink<T>(fullPath, aUgi,
initAndGetTargetFs(), target);
break;
case SINGLE_FALLBACK:
case MERGE_SLASH:
// Link fallback and link merge slash configuration
// are handled specially at InodeTree.
throw new IllegalArgumentException("Unexpected linkType: " + linkType);
case MERGE:
case NFLY:
final String[] targetUris = StringUtils.getStrings(target);
newLink = new INodeLink<T>(fullPath, aUgi,
getTargetFileSystem(settings, StringUtils.stringToURI(targetUris)),
targetUris);
break;
default:
throw new IllegalArgumentException(linkType + ": Infeasible linkType");
}
curInode.addLink(iPath, newLink);
mountPoints.add(new MountPoint<T>(src, newLink));
}
/**
* The user of this class must subclass and implement the following
* 3 abstract methods.
* @return Function.
*/
protected abstract Function<URI, T> initAndGetTargetFs();
protected abstract T getTargetFileSystem(INodeDir<T> dir)
throws URISyntaxException, IOException;
protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
throws UnsupportedFileSystemException, URISyntaxException, IOException;
private INodeDir<T> getRootDir() {
Preconditions.checkState(root.isInternalDir());
return (INodeDir<T>)root;
}
private INodeLink<T> getRootLink() {
Preconditions.checkState(!root.isInternalDir());
return (INodeLink<T>)root;
}
private boolean hasFallbackLink() {
return rootFallbackLink != null;
}
/**
* @return true if the root represented as internalDir. In LinkMergeSlash,
* there will be root to root mapping. So, root does not represent as
* internalDir.
*/
public boolean isRootInternalDir() {
return root.isInternalDir();
}
public INodeLink<T> getRootFallbackLink() {
Preconditions.checkState(root.isInternalDir());
return rootFallbackLink;
}
/**
* An internal class representing the ViewFileSystem mount table
* link entries and their attributes.
* @see LinkType
*/
private static class LinkEntry {
private final String src;
private final String target;
private final LinkType linkType;
private final String settings;
private final UserGroupInformation ugi;
private final Configuration config;
LinkEntry(String src, String target, LinkType linkType, String settings,
UserGroupInformation ugi, Configuration config) {
this.src = src;
this.target = target;
this.linkType = linkType;
this.settings = settings;
this.ugi = ugi;
this.config = config;
}
String getSrc() {
return src;
}
String getTarget() {
return target;
}
LinkType getLinkType() {
return linkType;
}
boolean isLinkType(LinkType type) {
return this.linkType == type;
}
String getSettings() {
return settings;
}
UserGroupInformation getUgi() {
return ugi;
}
Configuration getConfig() {
return config;
}
}
/**
* Create Inode Tree from the specified mount-table specified in Config.
*
* @param config the mount table keys are prefixed with
* FsConstants.CONFIG_VIEWFS_PREFIX.
* @param viewName the name of the mount table
* if null use defaultMT name.
* @param theUri heUri.
* @param initingUriAsFallbackOnNoMounts initingUriAsFallbackOnNoMounts.
* @throws UnsupportedFileSystemException file system for <code>uri</code> is
* not found.
* @throws URISyntaxException if the URI does not have an authority
* it is badly formed.
* @throws FileAlreadyExistsException there is a file at the path specified
* or is discovered on one of its ancestors.
* @throws IOException raised on errors performing I/O.
*/
protected InodeTree(final Configuration config, final String viewName,
final URI theUri, boolean initingUriAsFallbackOnNoMounts)
throws UnsupportedFileSystemException, URISyntaxException,
FileAlreadyExistsException, IOException {
String mountTableName = viewName;
if (mountTableName == null) {
mountTableName = ConfigUtil.getDefaultMountTableName(config);
}
homedirPrefix = ConfigUtil.getHomeDirValue(config, mountTableName);
isNestedMountPointSupported = ConfigUtil.isNestedMountPointSupported(config);
boolean isMergeSlashConfigured = false;
String mergeSlashTarget = null;
List<LinkEntry> linkEntries = new LinkedList<>();
final String mountTablePrefix =
Constants.CONFIG_VIEWFS_PREFIX + "." + mountTableName + ".";
final String linkPrefix = Constants.CONFIG_VIEWFS_LINK + ".";
final String linkFallbackPrefix = Constants.CONFIG_VIEWFS_LINK_FALLBACK;
final String linkMergePrefix = Constants.CONFIG_VIEWFS_LINK_MERGE + ".";
final String linkMergeSlashPrefix =
Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH;
boolean gotMountTableEntry = false;
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
for (Entry<String, String> si : config) {
final String key = si.getKey();
if (!key.startsWith(mountTablePrefix)) {
continue;
}
gotMountTableEntry = true;
LinkType linkType;
String src = key.substring(mountTablePrefix.length());
String settings = null;
if (src.startsWith(linkPrefix)) {
src = src.substring(linkPrefix.length());
if (src.equals(SlashPath.toString())) {
throw new UnsupportedFileSystemException("Unexpected mount table "
+ "link entry '" + key + "'. Use "
+ Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH + " instead!");
}
linkType = LinkType.SINGLE;
} else if (src.startsWith(linkFallbackPrefix)) {
checkMntEntryKeyEqualsTarget(src, linkFallbackPrefix);
linkType = LinkType.SINGLE_FALLBACK;
} else if (src.startsWith(linkMergePrefix)) { // A merge link
src = src.substring(linkMergePrefix.length());
linkType = LinkType.MERGE;
} else if (src.startsWith(linkMergeSlashPrefix)) {
// This is a LinkMergeSlash entry. This entry should
// not have any additional source path.
checkMntEntryKeyEqualsTarget(src, linkMergeSlashPrefix);
linkType = LinkType.MERGE_SLASH;
} else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
// prefix.settings.src
src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
// settings.src
settings = src.substring(0, src.indexOf('.'));
// settings
// settings.src
src = src.substring(settings.length() + 1);
// src
linkType = LinkType.NFLY;
} else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_REGEX)) {
linkEntries.add(
buildLinkRegexEntry(config, ugi, src, si.getValue()));
continue;
} else if (src.startsWith(Constants.CONFIG_VIEWFS_HOMEDIR)) {
// ignore - we set home dir from config
continue;
} else {
throw new IOException("ViewFs: Cannot initialize: Invalid entry in " +
"Mount table in config: " + src);
}
final String target = si.getValue();
if (linkType != LinkType.MERGE_SLASH) {
if (isMergeSlashConfigured) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a merge slash link. "
+ "A regular link should not be added.");
}
linkEntries.add(
new LinkEntry(src, target, linkType, settings, ugi, config));
} else {
if (!linkEntries.isEmpty()) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with regular links. "
+ "A merge slash link should not be configured.");
}
if (isMergeSlashConfigured) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a merge slash link. "
+ "Multiple merge slash links for the same mount table is "
+ "not allowed.");
}
isMergeSlashConfigured = true;
mergeSlashTarget = target;
}
} // End of for loop.
if (isMergeSlashConfigured) {
Preconditions.checkNotNull(mergeSlashTarget);
root = new INodeLink<T>(mountTableName, ugi,
initAndGetTargetFs(), mergeSlashTarget);
mountPoints.add(new MountPoint<T>("/", (INodeLink<T>) root));
rootFallbackLink = null;
} else {
root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
getRootDir().setInternalDirFs(getTargetFileSystem(getRootDir()));
getRootDir().setRoot(true);
INodeLink<T> fallbackLink = null;
for (LinkEntry le : getLinkEntries(linkEntries)) {
switch (le.getLinkType()) {
case SINGLE_FALLBACK:
if (fallbackLink != null) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a link fallback. "
+ "Multiple fallback links for the same mount table is "
+ "not allowed.");
}
fallbackLink = new INodeLink<T>(mountTableName, ugi,
initAndGetTargetFs(), le.getTarget());
continue;
case REGEX:
addRegexMountEntry(le);
continue;
default:
createLink(le.getSrc(), le.getTarget(), le.getLinkType(),
le.getSettings(), le.getUgi(), le.getConfig());
}
}
rootFallbackLink = fallbackLink;
getRootDir().addFallbackLink(rootFallbackLink);
}
if (!gotMountTableEntry) {
if (!initingUriAsFallbackOnNoMounts) {
throw new IOException(new StringBuilder(
"ViewFs: Cannot initialize: Empty Mount table in config for ")
.append(theUri.getScheme()).append("://").append(mountTableName)
.append("/").toString());
}
FileSystem.LOG
.info("Empty mount table detected for {} and considering itself "
+ "as a linkFallback.", theUri);
rootFallbackLink = new INodeLink<T>(mountTableName, ugi,
initAndGetTargetFs(), theUri.toString());
getRootDir().addFallbackLink(rootFallbackLink);
}
}
/**
* Get collection of linkEntry. Sort mount point based on alphabetical order of the src paths.
* The purpose is to group nested paths(shortest path always comes first) during INodeTree creation.
* E.g. /foo is nested with /foo/bar so an INodeDirLink will be created at /foo.
* @param linkEntries input linkEntries
* @return sorted linkEntries
*/
private Collection<LinkEntry> getLinkEntries(List<LinkEntry> linkEntries) {
Set<LinkEntry> sortedLinkEntries = new TreeSet<>(new Comparator<LinkEntry>() {
@Override
public int compare(LinkEntry o1, LinkEntry o2) {
if (o1 == null) {
return -1;
}
if (o2 == null) {
return 1;
}
String src1 = o1.getSrc();
String src2= o2.getSrc();
return src1.compareTo(src2);
}
});
sortedLinkEntries.addAll(linkEntries);
return sortedLinkEntries;
}
private void checkMntEntryKeyEqualsTarget(
String mntEntryKey, String targetMntEntryKey) throws IOException {
if (!mntEntryKey.equals(targetMntEntryKey)) {
throw new IOException("ViewFs: Mount points initialization error." +
" Invalid " + targetMntEntryKey +
" entry in config: " + mntEntryKey);
}
}
private void addRegexMountEntry(LinkEntry le) throws IOException {
LOGGER.info("Add regex mount point:" + le.getSrc()
+ ", target:" + le.getTarget()
+ ", interceptor settings:" + le.getSettings());
RegexMountPoint regexMountPoint =
new RegexMountPoint<T>(
this, le.getSrc(), le.getTarget(), le.getSettings());
regexMountPoint.initialize();
regexMountPointList.add(regexMountPoint);
}
private LinkEntry buildLinkRegexEntry(
Configuration config, UserGroupInformation ugi,
String mntEntryStrippedKey, String mntEntryValue) {
String linkKeyPath = null;
String settings = null;
final String linkRegexPrefix = Constants.CONFIG_VIEWFS_LINK_REGEX + ".";
// settings#.linkKey
String settingsAndLinkKeyPath =
mntEntryStrippedKey.substring(linkRegexPrefix.length());
int settingLinkKeySepIndex = settingsAndLinkKeyPath
.indexOf(RegexMountPoint.SETTING_SRCREGEX_SEP);
if (settingLinkKeySepIndex == -1) {
// There's no settings
linkKeyPath = settingsAndLinkKeyPath;
settings = null;
} else {
// settings#.linkKey style configuration
// settings from settings#.linkKey
settings =
settingsAndLinkKeyPath.substring(0, settingLinkKeySepIndex);
// linkKeyPath
linkKeyPath = settingsAndLinkKeyPath.substring(
settings.length() + RegexMountPoint.SETTING_SRCREGEX_SEP
.length());
}
return new LinkEntry(
linkKeyPath, mntEntryValue, LinkType.REGEX, settings, ugi, config);
}
/**
* Resolve returns ResolveResult.
* The caller can continue the resolution of the remainingPath
* in the targetFileSystem.
*
* If the input pathname leads to link to another file system then
* the targetFileSystem is the one denoted by the link (except it is
* file system chrooted to link target.
* If the input pathname leads to an internal mount-table entry then
* the target file system is one that represents the internal inode.
*/
public static class ResolveResult<T> {
final ResultKind kind;
final T targetFileSystem;
final String resolvedPath;
final Path remainingPath; // to resolve in the target FileSystem
private final boolean isLastInternalDirLink;
ResolveResult(final ResultKind k, final T targetFs, final String resolveP,
final Path remainingP, boolean isLastIntenalDirLink) {
kind = k;
targetFileSystem = targetFs;
resolvedPath = resolveP;
remainingPath = remainingP;
this.isLastInternalDirLink = isLastIntenalDirLink;
}
// Internal dir path resolution completed within the mount table
boolean isInternalDir() {
return (kind == ResultKind.INTERNAL_DIR);
}
// Indicates whether the internal dir path resolution completed at the link
// or resolved due to fallback.
boolean isLastInternalDirLink() {
return this.isLastInternalDirLink;
}
}
/**
* Resolve the pathname p relative to root InodeDir.
* @param p - input path
* @param resolveLastComponent resolveLastComponent.
* @return ResolveResult which allows further resolution of the remaining path
* @throws IOException raised on errors performing I/O.
*/
public ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
throws IOException {
ResolveResult<T> resolveResult = null;
String[] path = breakIntoPathComponents(p);
if (path.length <= 1) { // special case for when path is "/"
T targetFs = root.isInternalDir() ?
getRootDir().getInternalDirFs()
: getRootLink().getTargetFileSystem();
resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
targetFs, root.fullPath, SlashPath, false);
return resolveResult;
}
/**
* linkMergeSlash has been configured. The root of this mount table has
* been linked to the root directory of a file system.
* The first non-slash path component should be name of the mount table.
*/
if (!root.isInternalDir()) {
Path remainingPath;
StringBuilder remainingPathStr = new StringBuilder();
// ignore first slash
for (int i = 1; i < path.length; i++) {
remainingPathStr.append("/").append(path[i]);
}
remainingPath = new Path(remainingPathStr.toString());
resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
getRootLink().getTargetFileSystem(), root.fullPath, remainingPath,
true);
return resolveResult;
}
Preconditions.checkState(root.isInternalDir());
INodeDir<T> curInode = getRootDir();
// Try to resolve path in the regex mount point
resolveResult = tryResolveInRegexMountpoint(p, resolveLastComponent);
if (resolveResult != null) {
return resolveResult;
}
int i;
INodeDirLink<T> lastResolvedDirLink = null;
int lastResolvedDirLinkIndex = -1;
// ignore first slash
for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
INode<T> nextInode = curInode.resolveInternal(path[i]);
if (nextInode == null) {
// first resolve to dirlink for nested mount point
if (isNestedMountPointSupported && lastResolvedDirLink != null) {
return new ResolveResult<T>(ResultKind.EXTERNAL_DIR, lastResolvedDirLink.getLink().getTargetFileSystem(),
lastResolvedDirLink.fullPath, getRemainingPath(path, i),true);
}
if (hasFallbackLink()) {
resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
getRootFallbackLink().getTargetFileSystem(), root.fullPath,
new Path(p), false);
return resolveResult;
} else {
StringBuilder failedAt = new StringBuilder(path[0]);
for (int j = 1; j <= i; ++j) {
failedAt.append('/').append(path[j]);
}
throw (new FileNotFoundException(
"File/Directory does not exist: " + failedAt.toString()));
}
}
if (!nextInode.isInternalDir()) {
final INodeLink<T> link = (INodeLink<T>) nextInode;
final Path remainingPath = getRemainingPath(path, i + 1);
resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
link.getTargetFileSystem(), nextInode.fullPath, remainingPath,
true);
return resolveResult;
} else {
curInode = (INodeDir<T>) nextInode;
// track last resolved nest mount point.
if (isNestedMountPointSupported && nextInode.isLink()) {
lastResolvedDirLink = (INodeDirLink<T>) nextInode;
lastResolvedDirLinkIndex = i;
}
}
}
Path remainingPath;
if (isNestedMountPointSupported && lastResolvedDirLink != null) {
remainingPath = getRemainingPath(path, lastResolvedDirLinkIndex + 1);
resolveResult = new ResolveResult<T>(ResultKind.EXTERNAL_DIR, lastResolvedDirLink.getLink().getTargetFileSystem(),
lastResolvedDirLink.fullPath, remainingPath,true);
} else {
remainingPath = resolveLastComponent ? SlashPath : getRemainingPath(path, i);
resolveResult = new ResolveResult<T>(ResultKind.INTERNAL_DIR, curInode.getInternalDirFs(),
curInode.fullPath, remainingPath, false);
}
return resolveResult;
}
/**
* Return remaining path from specified index to the end of the path array.
* @param path An array of path components split by slash
* @param startIndex the specified start index of the path array
* @return remaining path.
*/
private Path getRemainingPath(String[] path, int startIndex) {
Path remainingPath;
if (startIndex >= path.length) {
remainingPath = SlashPath;
} else {
StringBuilder remainingPathStr = new StringBuilder();
for (int j = startIndex; j < path.length; j++) {
remainingPathStr.append("/").append(path[j]);
}
remainingPath = new Path(remainingPathStr.toString());
}
return remainingPath;
}
/**
* Walk through all regex mount points to see
* whether the path match any regex expressions.
* E.g. link: ^/user/(?<username>\\w+) => s3://$user.apache.com/_${user}
* srcPath: is /user/hadoop/dir1
* resolveLastComponent: true
* then return value is s3://hadoop.apache.com/_hadoop
*
* @param srcPath srcPath.
* @param resolveLastComponent resolveLastComponent.
* @return ResolveResult.
*/
protected ResolveResult<T> tryResolveInRegexMountpoint(final String srcPath,
final boolean resolveLastComponent) {
for (RegexMountPoint regexMountPoint : regexMountPointList) {
ResolveResult resolveResult =
regexMountPoint.resolve(srcPath, resolveLastComponent);
if (resolveResult != null) {
return resolveResult;
}
}
return null;
}
/**
* Build resolve result.
* Here's an example
* Mountpoint: fs.viewfs.mounttable.mt
* .linkRegex.replaceresolveddstpath:_:-#.^/user/(??<username>\w+)
* Value: /targetTestRoot/$username
* Dir path to test:
* viewfs://mt/user/hadoop_user1/hadoop_dir1
* Expect path: /targetTestRoot/hadoop-user1/hadoop_dir1
* resolvedPathStr: /user/hadoop_user1
* targetOfResolvedPathStr: /targetTestRoot/hadoop-user1
* remainingPath: /hadoop_dir1
*
* @param resultKind resultKind.
* @param resolvedPathStr resolvedPathStr.
* @param targetOfResolvedPathStr targetOfResolvedPathStr.
* @param remainingPath remainingPath.
* @return targetFileSystem or null on exceptions.
*/
protected ResolveResult<T> buildResolveResultForRegexMountPoint(
ResultKind resultKind, String resolvedPathStr,
String targetOfResolvedPathStr, Path remainingPath) {
try {
T targetFs = initAndGetTargetFs()
.apply(new URI(targetOfResolvedPathStr));
if (targetFs == null) {
LOGGER.error(String.format(
"Not able to initialize target file system."
+ " ResultKind:%s, resolvedPathStr:%s,"
+ " targetOfResolvedPathStr:%s, remainingPath:%s,"
+ " will return null.",
resultKind, resolvedPathStr, targetOfResolvedPathStr,
remainingPath));
return null;
}
return new ResolveResult<T>(resultKind, targetFs, resolvedPathStr,
remainingPath, true);
} catch (URISyntaxException uex) {
LOGGER.error(String.format(
"Got Exception while build resolve result."
+ " ResultKind:%s, resolvedPathStr:%s,"
+ " targetOfResolvedPathStr:%s, remainingPath:%s,"
+ " will return null.",
resultKind, resolvedPathStr, targetOfResolvedPathStr, remainingPath),
uex);
return null;
}
}
public List<MountPoint<T>> getMountPoints() {
return mountPoints;
}
/**
*
* @return home dir value from mount table; null if no config value
* was found.
*/
public String getHomeDirPrefixValue() {
return homedirPrefix;
}
}
相关信息
相关文章
hadoop HCFSMountTableConfigLoader 源码
hadoop MountTableConfigLoader 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦