hadoop CosNFileSystem 源码
haddop CosNFileSystem 代码
文件路径:/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.cosn;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;
/**
* The core CosN Filesystem implementation.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class CosNFileSystem extends FileSystem {
static final Logger LOG = LoggerFactory.getLogger(CosNFileSystem.class);
public static final String SCHEME = "cosn";
public static final String PATH_DELIMITER = Path.SEPARATOR;
private URI uri;
private String bucket;
private NativeFileSystemStore store;
private Path workingDir;
private String owner = "Unknown";
private String group = "Unknown";
private ExecutorService boundedIOThreadPool;
private ExecutorService boundedCopyThreadPool;
public CosNFileSystem() {
}
public CosNFileSystem(NativeFileSystemStore store) {
this.store = store;
}
/**
* Return the protocol scheme for the FileSystem.
*
* @return <code>cosn</code>
*/
@Override
public String getScheme() {
return CosNFileSystem.SCHEME;
}
@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
this.bucket = name.getHost();
if (this.store == null) {
this.store = createDefaultStore(conf);
}
this.store.initialize(name, conf);
setConf(conf);
this.uri = URI.create(name.getScheme() + "://" + name.getAuthority());
this.workingDir = new Path("/user",
System.getProperty("user.name")).makeQualified(
this.uri,
this.getWorkingDirectory());
this.owner = getOwnerId();
this.group = getGroupId();
LOG.debug("owner:" + owner + ", group:" + group);
BufferPool.getInstance().initialize(this.getConf());
// initialize the thread pool
int uploadThreadPoolSize = this.getConf().getInt(
CosNConfigKeys.UPLOAD_THREAD_POOL_SIZE_KEY,
CosNConfigKeys.DEFAULT_UPLOAD_THREAD_POOL_SIZE
);
int readAheadPoolSize = this.getConf().getInt(
CosNConfigKeys.READ_AHEAD_QUEUE_SIZE,
CosNConfigKeys.DEFAULT_READ_AHEAD_QUEUE_SIZE
);
int ioThreadPoolSize = uploadThreadPoolSize + readAheadPoolSize / 3;
long threadKeepAlive = this.getConf().getLong(
CosNConfigKeys.THREAD_KEEP_ALIVE_TIME_KEY,
CosNConfigKeys.DEFAULT_THREAD_KEEP_ALIVE_TIME
);
this.boundedIOThreadPool = BlockingThreadPoolExecutorService.newInstance(
ioThreadPoolSize / 2, ioThreadPoolSize,
threadKeepAlive, TimeUnit.SECONDS,
"cos-transfer-thread-pool");
int copyThreadPoolSize = this.getConf().getInt(
CosNConfigKeys.COPY_THREAD_POOL_SIZE_KEY,
CosNConfigKeys.DEFAULT_COPY_THREAD_POOL_SIZE
);
this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
CosNConfigKeys.DEFAULT_COPY_THREAD_POOL_SIZE, copyThreadPoolSize,
60L, TimeUnit.SECONDS,
"cos-copy-thread-pool");
}
private static NativeFileSystemStore createDefaultStore(Configuration conf) {
NativeFileSystemStore store = new CosNativeFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt(CosNConfigKeys.COSN_MAX_RETRIES_KEY,
CosNConfigKeys.DEFAULT_MAX_RETRIES),
conf.getLong(CosNConfigKeys.COSN_RETRY_INTERVAL_KEY,
CosNConfigKeys.DEFAULT_RETRY_INTERVAL),
TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL,
exceptionToPolicyMap);
Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
methodNameToPolicyMap.put("storeFile", methodPolicy);
methodNameToPolicyMap.put("rename", methodPolicy);
return (NativeFileSystemStore) RetryProxy.create(
NativeFileSystemStore.class, store, methodNameToPolicyMap);
}
private String getOwnerId() {
return System.getProperty("user.name");
}
private String getGroupId() {
return System.getProperty("user.name");
}
private String getOwnerInfo(boolean getOwnerId) {
String ownerInfoId = "";
try {
String userName = System.getProperty("user.name");
String command = "id -u " + userName;
if (!getOwnerId) {
command = "id -g " + userName;
}
Process child = Runtime.getRuntime().exec(command);
child.waitFor();
// Get the input stream and read from it
InputStream in = child.getInputStream();
StringBuilder strBuffer = new StringBuilder();
int c;
while ((c = in.read()) != -1) {
strBuffer.append((char) c);
}
in.close();
ownerInfoId = strBuffer.toString();
} catch (IOException | InterruptedException e) {
LOG.error("Getting owner info occurs a exception", e);
}
return ownerInfoId;
}
private static String pathToKey(Path path) {
if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
// allow uris without trailing slash after bucket to refer to root,
// like cosn://mybucket
return "";
}
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
String ret = path.toUri().getPath();
if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
ret = ret.substring(0, ret.length() - 1);
}
return ret;
}
private static Path keyToPath(String key) {
if (!key.startsWith(PATH_DELIMITER)) {
return new Path("/" + key);
} else {
return new Path(key);
}
}
private Path makeAbsolute(Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workingDir, path);
}
/**
* This optional operation is not yet supported.
*/
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Not supported");
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
FileStatus fileStatus;
try {
fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
throw new FileAlreadyExistsException(f + " is a directory");
}
if (!overwrite) {
// path references a file and overwrite is disabled
throw new FileAlreadyExistsException(f + " already exists");
}
} catch (FileNotFoundException e) {
LOG.debug("Creating a new file: [{}] in COS.", f);
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
return new FSDataOutputStream(
new CosNOutputStream(getConf(), store, key, blockSize,
this.boundedIOThreadPool), statistics);
}
private boolean rejectRootDirectoryDelete(boolean isEmptyDir,
boolean recursive) throws PathIOException {
if (isEmptyDir) {
return true;
}
if (recursive) {
return false;
} else {
throw new PathIOException(this.bucket, "Can not delete root path");
}
}
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
Path parent = f.getParent();
if (null != parent) {
if (!getFileStatus(parent).isDirectory()) {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
bufferSize, replication, blockSize, progress);
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
LOG.debug("Ready to delete path: [{}]. recursive: [{}].", f, recursive);
FileStatus status;
try {
status = getFileStatus(f);
} catch (FileNotFoundException e) {
LOG.debug("Ready to delete the file: [{}], but it does not exist.", f);
return false;
}
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (key.compareToIgnoreCase("/") == 0) {
FileStatus[] fileStatuses = listStatus(f);
return this.rejectRootDirectoryDelete(
fileStatuses.length == 0, recursive);
}
if (status.isDirectory()) {
if (!key.endsWith(PATH_DELIMITER)) {
key += PATH_DELIMITER;
}
if (!recursive && listStatus(f).length > 0) {
String errMsg = String.format("Can not delete the directory: [%s], as"
+ " it is not empty and option recursive is false.", f);
throw new IOException(errMsg);
}
createParent(f);
String priorLastKey = null;
do {
PartialListing listing = store.list(
key,
Constants.COS_MAX_LISTING_LENGTH,
priorLastKey,
true);
for (FileMetadata file : listing.getFiles()) {
store.delete(file.getKey());
}
for (FileMetadata commonPrefix : listing.getCommonPrefixes()) {
store.delete(commonPrefix.getKey());
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
try {
store.delete(key);
} catch (Exception e) {
LOG.error("Deleting the COS key: [{}] occurs an exception.", key, e);
}
} else {
LOG.debug("Delete the file: {}", f);
createParent(f);
store.delete(key);
}
return true;
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (key.length() == 0) {
// root always exists
return newDirectory(absolutePath);
}
LOG.debug("Call the getFileStatus to obtain the metadata for "
+ "the file: [{}].", f);
FileMetadata meta = store.retrieveMetadata(key);
if (meta != null) {
if (meta.isFile()) {
LOG.debug("Path: [{}] is a file. COS key: [{}]", f, key);
return newFile(meta, absolutePath);
} else {
LOG.debug("Path: [{}] is a dir. COS key: [{}]", f, key);
return newDirectory(meta, absolutePath);
}
}
if (!key.endsWith(PATH_DELIMITER)) {
key += PATH_DELIMITER;
}
// Considering that the object store's directory is a common prefix in
// the object key, it needs to check the existence of the path by listing
// the COS key.
LOG.debug("List COS key: [{}] to check the existence of the path.", key);
PartialListing listing = store.list(key, 1);
if (listing.getFiles().length > 0
|| listing.getCommonPrefixes().length > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Path: [{}] is a directory. COS key: [{}]", f, key);
}
return newDirectory(absolutePath);
}
throw new FileNotFoundException(
"No such file or directory '" + absolutePath + "'");
}
@Override
public URI getUri() {
return uri;
}
/**
* <p>
* If <code>f</code> is a file, this method will make a single call to COS.
* If <code>f</code> is a directory,
* this method will make a maximum of ( <i>n</i> / 199) + 2 calls to cos,
* where <i>n</i> is the total number of files
* and directories contained directly in <code>f</code>.
* </p>
*/
@Override
public FileStatus[] listStatus(Path f) throws IOException {
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (key.length() > 0) {
FileStatus fileStatus = this.getFileStatus(f);
if (fileStatus.isFile()) {
return new FileStatus[]{fileStatus};
}
}
if (!key.endsWith(PATH_DELIMITER)) {
key += PATH_DELIMITER;
}
URI pathUri = absolutePath.toUri();
Set<FileStatus> status = new TreeSet<>();
String priorLastKey = null;
do {
PartialListing listing = store.list(
key, Constants.COS_MAX_LISTING_LENGTH, priorLastKey, false);
for (FileMetadata fileMetadata : listing.getFiles()) {
Path subPath = keyToPath(fileMetadata.getKey());
if (fileMetadata.getKey().equals(key)) {
// this is just the directory we have been asked to list.
LOG.debug("The file list contains the COS key [{}] to be listed.",
key);
} else {
status.add(newFile(fileMetadata, subPath));
}
}
for (FileMetadata commonPrefix : listing.getCommonPrefixes()) {
Path subPath = keyToPath(commonPrefix.getKey());
String relativePath = pathUri.relativize(subPath.toUri()).getPath();
status.add(
newDirectory(commonPrefix, new Path(absolutePath, relativePath)));
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
return status.toArray(new FileStatus[status.size()]);
}
private FileStatus newFile(FileMetadata meta, Path path) {
return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
meta.getLastModified(), 0, null, this.owner, this.group,
path.makeQualified(this.getUri(), this.getWorkingDirectory()));
}
private FileStatus newDirectory(Path path) {
return new FileStatus(0, true, 1, 0, 0, 0, null, this.owner, this.group,
path.makeQualified(this.getUri(), this.getWorkingDirectory()));
}
private FileStatus newDirectory(FileMetadata meta, Path path) {
if (meta == null) {
return newDirectory(path);
}
return new FileStatus(0, true, 1, 0, meta.getLastModified(),
0, null, this.owner, this.group,
path.makeQualified(this.getUri(), this.getWorkingDirectory()));
}
/**
* Validate the path from the bottom up.
*
* @param path The path to be validated
* @throws FileAlreadyExistsException The specified path is an existing file
* @throws IOException Getting the file status of the
* specified path occurs
* an IOException.
*/
private void validatePath(Path path) throws IOException {
Path parent = path.getParent();
do {
try {
FileStatus fileStatus = getFileStatus(parent);
if (fileStatus.isDirectory()) {
break;
} else {
throw new FileAlreadyExistsException(String.format(
"Can't make directory for path '%s', it is a file.", parent));
}
} catch (FileNotFoundException e) {
LOG.debug("The Path: [{}] does not exist.", path);
}
parent = parent.getParent();
} while (parent != null);
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
try {
FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
return true;
} else {
throw new FileAlreadyExistsException("Path is a file: " + f);
}
} catch (FileNotFoundException e) {
validatePath(f);
}
return mkDirRecursively(f, permission);
}
/**
* Recursively create a directory.
*
* @param f Absolute path to the directory.
* @param permission Directory permissions. Permission does not work for
* the CosN filesystem currently.
* @return Return true if the creation was successful, throw a IOException.
* @throws IOException The specified path already exists or an error
* creating the path.
*/
public boolean mkDirRecursively(Path f, FsPermission permission)
throws IOException {
Path absolutePath = makeAbsolute(f);
List<Path> paths = new ArrayList<>();
do {
paths.add(absolutePath);
absolutePath = absolutePath.getParent();
} while (absolutePath != null);
for (Path path : paths) {
if (path.equals(new Path(CosNFileSystem.PATH_DELIMITER))) {
break;
}
try {
FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isFile()) {
throw new FileAlreadyExistsException(
String.format("Can't make directory for path: %s, "
+ "since it is a file.", f));
}
if (fileStatus.isDirectory()) {
break;
}
} catch (FileNotFoundException e) {
LOG.debug("Making dir: [{}] in COS", f);
String folderPath = pathToKey(makeAbsolute(f));
if (!folderPath.endsWith(PATH_DELIMITER)) {
folderPath += PATH_DELIMITER;
}
store.storeEmptyFile(folderPath);
}
}
return true;
}
private boolean mkdir(Path f) throws IOException {
try {
FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isFile()) {
throw new FileAlreadyExistsException(
String.format(
"Can't make directory for path '%s' since it is a file.", f));
}
} catch (FileNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Make directory: [{}] in COS.", f);
}
String folderPath = pathToKey(makeAbsolute(f));
if (!folderPath.endsWith(PATH_DELIMITER)) {
folderPath += PATH_DELIMITER;
}
store.storeEmptyFile(folderPath);
}
return true;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
FileStatus fs = getFileStatus(f); // will throw if the file doesn't
// exist
if (fs.isDirectory()) {
throw new FileNotFoundException("'" + f + "' is a directory");
}
LOG.info("Open the file: [{}] for reading.", f);
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
long fileSize = store.getFileLength(key);
return new FSDataInputStream(new BufferedFSInputStream(
new CosNInputStream(this.getConf(), store, statistics, key, fileSize,
this.boundedIOThreadPool), bufferSize));
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("Rename source path: [{}] to dest path: [{}].", src, dst);
// Renaming the root directory is not allowed
if (src.isRoot()) {
LOG.debug("Cannot rename the root directory of a filesystem.");
return false;
}
// check the source path whether exists or not
FileStatus srcFileStatus = this.getFileStatus(src);
// Source path and destination path are not allowed to be the same
if (src.equals(dst)) {
LOG.debug("Source path and dest path refer to "
+ "the same file or directory: [{}].", dst);
throw new IOException("Source path and dest path refer "
+ "the same file or directory");
}
// It is not allowed to rename a parent directory to its subdirectory
Path dstParentPath;
for (dstParentPath = dst.getParent();
null != dstParentPath && !src.equals(dstParentPath);
dstParentPath = dstParentPath.getParent()) {
// Recursively find the common parent path of the source and
// destination paths.
LOG.debug("Recursively find the common parent directory of the source "
+ "and destination paths. The currently found parent path: {}",
dstParentPath);
}
if (null != dstParentPath) {
LOG.debug("It is not allowed to rename a parent directory:[{}] "
+ "to its subdirectory:[{}].", src, dst);
throw new IOException(String.format(
"It is not allowed to rename a parent directory: %s "
+ "to its subdirectory: %s", src, dst));
}
FileStatus dstFileStatus;
try {
dstFileStatus = this.getFileStatus(dst);
// The destination path exists and is a file,
// and the rename operation is not allowed.
if (dstFileStatus.isFile()) {
throw new FileAlreadyExistsException(String.format(
"File: %s already exists", dstFileStatus.getPath()));
} else {
// The destination path is an existing directory,
// and it is checked whether there is a file or directory
// with the same name as the source path under the destination path
dst = new Path(dst, src.getName());
FileStatus[] statuses;
try {
statuses = this.listStatus(dst);
} catch (FileNotFoundException e) {
statuses = null;
}
if (null != statuses && statuses.length > 0) {
LOG.debug("Cannot rename source file: [{}] to dest file: [{}], "
+ "because the file already exists.", src, dst);
throw new FileAlreadyExistsException(
String.format(
"File: %s already exists", dst
)
);
}
}
} catch (FileNotFoundException e) {
// destination path not exists
Path tempDstParentPath = dst.getParent();
FileStatus dstParentStatus = this.getFileStatus(tempDstParentPath);
if (!dstParentStatus.isDirectory()) {
throw new IOException(String.format(
"Cannot rename %s to %s, %s is a file", src, dst, dst.getParent()
));
}
// The default root directory is definitely there.
}
boolean result;
if (srcFileStatus.isDirectory()) {
result = this.copyDirectory(src, dst);
} else {
result = this.copyFile(src, dst);
}
if (!result) {
//Since rename is a non-atomic operation, after copy fails,
// it is not allowed to delete the data of the original path.
return false;
} else {
return this.delete(src, true);
}
}
private boolean copyFile(Path srcPath, Path dstPath) throws IOException {
String srcKey = pathToKey(srcPath);
String dstKey = pathToKey(dstPath);
this.store.copy(srcKey, dstKey);
return true;
}
private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
String srcKey = pathToKey(srcPath);
if (!srcKey.endsWith(PATH_DELIMITER)) {
srcKey += PATH_DELIMITER;
}
String dstKey = pathToKey(dstPath);
if (!dstKey.endsWith(PATH_DELIMITER)) {
dstKey += PATH_DELIMITER;
}
if (dstKey.startsWith(srcKey)) {
throw new IOException(
"can not copy a directory to a subdirectory of self");
}
this.store.storeEmptyFile(dstKey);
CosNCopyFileContext copyFileContext = new CosNCopyFileContext();
int copiesToFinishes = 0;
String priorLastKey = null;
do {
PartialListing objectList = this.store.list(
srcKey, Constants.COS_MAX_LISTING_LENGTH, priorLastKey, true);
for (FileMetadata file : objectList.getFiles()) {
this.boundedCopyThreadPool.execute(new CosNCopyFileTask(
this.store,
file.getKey(),
dstKey.concat(file.getKey().substring(srcKey.length())),
copyFileContext));
copiesToFinishes++;
if (!copyFileContext.isCopySuccess()) {
break;
}
}
priorLastKey = objectList.getPriorLastKey();
} while (null != priorLastKey);
copyFileContext.lock();
try {
copyFileContext.awaitAllFinish(copiesToFinishes);
} catch (InterruptedException e) {
LOG.warn("interrupted when wait copies to finish");
} finally {
copyFileContext.lock();
}
return copyFileContext.isCopySuccess();
}
private void createParent(Path path) throws IOException {
Path parent = path.getParent();
if (parent != null) {
String parentKey = pathToKey(parent);
LOG.debug("Create parent key: {}", parentKey);
if (!parentKey.equals(PATH_DELIMITER)) {
String key = pathToKey(makeAbsolute(parent));
if (key.length() > 0) {
try {
store.storeEmptyFile(key + PATH_DELIMITER);
} catch (IOException e) {
LOG.debug("Store a empty file in COS failed.", e);
throw e;
}
}
}
}
}
@Override
@SuppressWarnings("deprecation")
public long getDefaultBlockSize() {
return getConf().getLong(
CosNConfigKeys.COSN_BLOCK_SIZE_KEY,
CosNConfigKeys.DEFAULT_BLOCK_SIZE);
}
/**
* Set the working directory to the given directory.
*/
@Override
public void setWorkingDirectory(Path newDir) {
workingDir = newDir;
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
@Override
public void close() throws IOException {
try {
this.store.close();
this.boundedIOThreadPool.shutdown();
this.boundedCopyThreadPool.shutdown();
} finally {
super.close();
}
}
}
相关信息
相关文章
hadoop ByteBufferInputStream 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦