hadoop OBSListing 源码
haddop OBSListing 代码
文件路径:/hadoop-cloud-storage-project/hadoop-huaweicloud/src/main/java/org/apache/hadoop/fs/obs/OBSListing.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.obs;
import com.obs.services.exception.ObsException;
import com.obs.services.model.ListObjectsRequest;
import com.obs.services.model.ObjectListing;
import com.obs.services.model.ObsObject;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
/**
* OBS listing implementation.
*/
class OBSListing {
/**
* A Path filter which accepts all filenames.
*/
static final PathFilter ACCEPT_ALL =
new PathFilter() {
@Override
public boolean accept(final Path file) {
return true;
}
@Override
public String toString() {
return "ACCEPT_ALL";
}
};
/**
* Class logger.
*/
private static final Logger LOG = LoggerFactory.getLogger(OBSListing.class);
/**
* OBS File System instance.
*/
private final OBSFileSystem owner;
OBSListing(final OBSFileSystem ownerFS) {
this.owner = ownerFS;
}
/**
* Create a FileStatus iterator against a path, with a given list object
* request.
*
* @param listPath path of the listing
* @param request initial request to make
* @param filter the filter on which paths to accept
* @param acceptor the class/predicate to decide which entries to accept in
* the listing based on the full file status.
* @return the iterator
* @throws IOException IO Problems
*/
FileStatusListingIterator createFileStatusListingIterator(
final Path listPath,
final ListObjectsRequest request,
final PathFilter filter,
final FileStatusAcceptor acceptor)
throws IOException {
return new FileStatusListingIterator(
new ObjectListingIterator(listPath, request), filter, acceptor);
}
/**
* Create a located status iterator over a file status iterator.
*
* @param statusIterator an iterator over the remote status entries
* @return a new remote iterator
*/
LocatedFileStatusIterator createLocatedFileStatusIterator(
final RemoteIterator<FileStatus> statusIterator) {
return new LocatedFileStatusIterator(statusIterator);
}
/**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
*/
interface FileStatusAcceptor {
/**
* Predicate to decide whether or not to accept a summary entry.
*
* @param keyPath qualified path to the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry should be
* generated.
*/
boolean accept(Path keyPath, ObsObject summary);
/**
* Predicate to decide whether or not to accept a prefix.
*
* @param keyPath qualified path to the entry
* @param commonPrefix the prefix
* @return true if the entry is accepted (i.e. that a status entry should be
* generated.)
*/
boolean accept(Path keyPath, String commonPrefix);
}
/**
* A remote iterator which only iterates over a single `LocatedFileStatus`
* value.
*
* <p>If the status value is null, the iterator declares that it has no
* data. This iterator is used to handle
* {@link OBSFileSystem#listStatus(Path)}calls where the path handed in
* refers to a file, not a directory: this is
* the iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<LocatedFileStatus> {
/**
* The status to return; set to null after the first iteration.
*/
private LocatedFileStatus status;
/**
* Constructor.
*
* @param locatedFileStatus status value: may be null, in which case the
* iterator is empty.
*/
SingleStatusRemoteIterator(final LocatedFileStatus locatedFileStatus) {
this.status = locatedFileStatus;
}
/**
* {@inheritDoc}
*
* @return true if there is a file status to return: this is always false
* for the second iteration, and may be false for the first.
*/
@Override
public boolean hasNext() {
return status != null;
}
/**
* {@inheritDoc}
*
* @return the non-null status element passed in when the instance was
* constructed, if it ha not already been retrieved.
* @throws NoSuchElementException if this is the second call, or it is the
* first call and a null
* {@link LocatedFileStatus}
* entry was passed to the constructor.
*/
@Override
public LocatedFileStatus next() {
if (hasNext()) {
LocatedFileStatus s = this.status;
status = null;
return s;
} else {
throw new NoSuchElementException();
}
}
}
/**
* Accept all entries except the base path and those which map to OBS pseudo
* directory markers.
*/
static class AcceptFilesOnly implements FileStatusAcceptor {
/**
* path to qualify.
*/
private final Path qualifiedPath;
AcceptFilesOnly(final Path path) {
this.qualifiedPath = path;
}
/**
* Reject a summary entry if the key path is the qualified Path, or it ends
* with {@code "_$folder$"}.
*
* @param keyPath key path of the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry should be
* generated.
*/
@Override
public boolean accept(final Path keyPath, final ObsObject summary) {
return !keyPath.equals(qualifiedPath)
&& !summary.getObjectKey()
.endsWith(OBSConstants.OBS_FOLDER_SUFFIX)
&& !OBSCommonUtils.objectRepresentsDirectory(
summary.getObjectKey(),
summary.getMetadata().getContentLength());
}
/**
* Accept no directory paths.
*
* @param keyPath qualified path to the entry
* @param prefix common prefix in listing.
* @return false, always.
*/
@Override
public boolean accept(final Path keyPath, final String prefix) {
return false;
}
}
/**
* Accept all entries except the base path and those which map to OBS pseudo
* directory markers.
*/
static class AcceptAllButSelfAndS3nDirs implements FileStatusAcceptor {
/**
* Base path.
*/
private final Path qualifiedPath;
/**
* Constructor.
*
* @param path an already-qualified path.
*/
AcceptAllButSelfAndS3nDirs(final Path path) {
this.qualifiedPath = path;
}
/**
* Reject a summary entry if the key path is the qualified Path, or it ends
* with {@code "_$folder$"}.
*
* @param keyPath key path of the entry
* @param summary summary entry
* @return true if the entry is accepted (i.e. that a status entry should be
* generated.)
*/
@Override
public boolean accept(final Path keyPath, final ObsObject summary) {
return !keyPath.equals(qualifiedPath) && !summary.getObjectKey()
.endsWith(OBSConstants.OBS_FOLDER_SUFFIX);
}
/**
* Accept all prefixes except the one for the base path, "self".
*
* @param keyPath qualified path to the entry
* @param prefix common prefix in listing.
* @return true if the entry is accepted (i.e. that a status entry should be
* generated.
*/
@Override
public boolean accept(final Path keyPath, final String prefix) {
return !keyPath.equals(qualifiedPath);
}
}
/**
* Wraps up object listing into a remote iterator which will ask for more
* listing data if needed.
*
* <p>This is a complex operation, especially the process to determine if
* there are more entries remaining. If there are no more results remaining in
* the (filtered) results of the current listing request, then another request
* is made
* <i>and those results filtered</i> before the iterator can declare that
* there is more data available.
*
* <p>The need to filter the results precludes the iterator from simply
* declaring that if the {@link ObjectListingIterator#hasNext()} is true then
* there are more results. Instead the next batch of results must be retrieved
* and filtered.
*
* <p>What does this mean? It means that remote requests to retrieve new
* batches of object listings are made in the {@link #hasNext()} call; the
* {@link #next()} call simply returns the filtered results of the last
* listing processed. However, do note that {@link #next()} calls {@link
* #hasNext()} during its operation. This is critical to ensure that a listing
* obtained through a sequence of {@link #next()} will complete with the same
* set of results as a classic {@code while(it.hasNext()} loop.
*
* <p>Thread safety: None.
*/
class FileStatusListingIterator implements RemoteIterator<FileStatus> {
/**
* Source of objects.
*/
private final ObjectListingIterator source;
/**
* Filter of paths from API call.
*/
private final PathFilter filter;
/**
* Filter of entries from file status.
*/
private final FileStatusAcceptor acceptor;
/**
* Request batch size.
*/
private int batchSize;
/**
* Iterator over the current set of results.
*/
private ListIterator<FileStatus> statusBatchIterator;
/**
* Create an iterator over file status entries.
*
* @param listPath the listing iterator from a listObjects call.
* @param pathFilter the filter on which paths to accept
* @param fileStatusAcceptor the class/predicate to decide which entries to
* accept in the listing based on the full file
* status.
* @throws IOException IO Problems
*/
FileStatusListingIterator(
final ObjectListingIterator listPath, final PathFilter pathFilter,
final FileStatusAcceptor fileStatusAcceptor)
throws IOException {
this.source = listPath;
this.filter = pathFilter;
this.acceptor = fileStatusAcceptor;
// build the first set of results. This will not trigger any
// remote IO, assuming the source iterator is in its initial
// iteration
requestNextBatch();
}
/**
* Report whether or not there is new data available. If there is data in
* the local filtered list, return true. Else: request more data util that
* condition is met, or there is no more remote listing data.
*
* @return true if a call to {@link #next()} will succeed.
* @throws IOException on any failure to request next batch
*/
@Override
public boolean hasNext() throws IOException {
return statusBatchIterator.hasNext() || requestNextBatch();
}
@Override
public FileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
return statusBatchIterator.next();
}
/**
* Try to retrieve another batch. Note that for the initial batch, {@link
* ObjectListingIterator} does not generate a request; it simply returns the
* initial set.
*
* @return true if a new batch was created.
* @throws IOException IO problems
*/
private boolean requestNextBatch() throws IOException {
// look for more object listing batches being available
while (source.hasNext()) {
// if available, retrieve it and build the next status
if (buildNextStatusBatch(source.next())) {
// this batch successfully generated entries matching
// the filters/acceptors;
// declare that the request was successful
return true;
} else {
LOG.debug(
"All entries in batch were filtered...continuing");
}
}
// if this code is reached, it means that all remaining
// object lists have been retrieved, and there are no new entries
// to return.
return false;
}
/**
* Build the next status batch from a listing.
*
* @param objects the next object listing
* @return true if this added any entries after filtering
*/
private boolean buildNextStatusBatch(final ObjectListing objects) {
// counters for debug logs
int added = 0;
int ignored = 0;
// list to fill in with results. Initial size will be list maximum.
List<FileStatus> stats =
new ArrayList<>(
objects.getObjects().size() + objects.getCommonPrefixes()
.size());
// objects
for (ObsObject summary : objects.getObjects()) {
String key = summary.getObjectKey();
Path keyPath = OBSCommonUtils.keyToQualifiedPath(owner, key);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: {}", keyPath,
OBSCommonUtils.stringify(summary));
}
// Skip over keys that are ourselves and old OBS _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(
keyPath)) {
FileStatus status =
OBSCommonUtils.createFileStatus(
keyPath, summary,
owner.getDefaultBlockSize(keyPath),
owner.getUsername());
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
} else {
LOG.debug("Ignoring: {}", keyPath);
ignored++;
}
}
// prefixes: always directories
for (ObsObject prefix : objects.getExtenedCommonPrefixes()) {
String key = prefix.getObjectKey();
Path keyPath = OBSCommonUtils.keyToQualifiedPath(owner, key);
if (acceptor.accept(keyPath, key) && filter.accept(keyPath)) {
long lastModified =
prefix.getMetadata().getLastModified() == null
? System.currentTimeMillis()
: OBSCommonUtils.dateToLong(
prefix.getMetadata().getLastModified());
FileStatus status = new OBSFileStatus(keyPath, lastModified,
lastModified, owner.getUsername());
LOG.debug("Adding directory: {}", status);
added++;
stats.add(status);
} else {
LOG.debug("Ignoring directory: {}", keyPath);
ignored++;
}
}
// finish up
batchSize = stats.size();
statusBatchIterator = stats.listIterator();
boolean hasNext = statusBatchIterator.hasNext();
LOG.debug(
"Added {} entries; ignored {}; hasNext={}; hasMoreObjects={}",
added,
ignored,
hasNext,
objects.isTruncated());
return hasNext;
}
/**
* Get the number of entries in the current batch.
*
* @return a number, possibly zero.
*/
public int getBatchSize() {
return batchSize;
}
}
/**
* Wraps up OBS `ListObjects` requests in a remote iterator which will ask for
* more listing data if needed.
*
* <p>That is:
*
* <p>1. The first invocation of the {@link #next()} call will return the
* results of the first request, the one created during the construction of
* the instance.
*
* <p>2. Second and later invocations will continue the ongoing listing,
* calling {@link OBSCommonUtils#continueListObjects} to request the next
* batch of results.
*
* <p>3. The {@link #hasNext()} predicate returns true for the initial call,
* where {@link #next()} will return the initial results. It declares that it
* has future results iff the last executed request was truncated.
*
* <p>Thread safety: none.
*/
class ObjectListingIterator implements RemoteIterator<ObjectListing> {
/**
* The path listed.
*/
private final Path listPath;
/**
* The most recent listing results.
*/
private ObjectListing objects;
/**
* Indicator that this is the first listing.
*/
private boolean firstListing = true;
/**
* Count of how many listings have been requested (including initial
* result).
*/
private int listingCount = 1;
/**
* Maximum keys in a request.
*/
private int maxKeys;
/**
* Constructor -calls {@link OBSCommonUtils#listObjects} on the request to
* populate the initial set of results/fail if there was a problem talking
* to the bucket.
*
* @param path path of the listing
* @param request initial request to make
* @throws IOException on any failure to list objects
*/
ObjectListingIterator(final Path path,
final ListObjectsRequest request)
throws IOException {
this.listPath = path;
this.maxKeys = owner.getMaxKeys();
this.objects = OBSCommonUtils.listObjects(owner, request);
}
/**
* Declare that the iterator has data if it is either is the initial
* iteration or it is a later one and the last listing obtained was
* incomplete.
*/
@Override
public boolean hasNext() {
return firstListing || objects.isTruncated();
}
/**
* Ask for the next listing. For the first invocation, this returns the
* initial set, with no remote IO. For later requests, OBS will be queried,
* hence the calls may block or fail.
*
* @return the next object listing.
* @throws IOException if a query made of OBS fails.
* @throws NoSuchElementException if there is no more data to list.
*/
@Override
public ObjectListing next() throws IOException {
if (firstListing) {
// on the first listing, don't request more data.
// Instead just clear the firstListing flag so that it future
// calls will request new data.
firstListing = false;
} else {
try {
if (!objects.isTruncated()) {
// nothing more to request: fail.
throw new NoSuchElementException(
"No more results in listing of " + listPath);
}
// need to request a new set of objects.
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
objects = OBSCommonUtils.continueListObjects(owner,
objects);
listingCount++;
LOG.debug("New listing status: {}", this);
} catch (ObsException e) {
throw OBSCommonUtils.translateException("listObjects()",
listPath, e);
}
}
return objects;
}
@Override
public String toString() {
return "Object listing iterator against "
+ listPath
+ "; listing count "
+ listingCount
+ "; isTruncated="
+ objects.isTruncated();
}
}
/**
* Take a remote iterator over a set of {@link FileStatus} instances and
* return a remote iterator of {@link LocatedFileStatus} instances.
*/
class LocatedFileStatusIterator
implements RemoteIterator<LocatedFileStatus> {
/**
* File status.
*/
private final RemoteIterator<FileStatus> statusIterator;
/**
* Constructor.
*
* @param statusRemoteIterator an iterator over the remote status entries
*/
LocatedFileStatusIterator(
final RemoteIterator<FileStatus> statusRemoteIterator) {
this.statusIterator = statusRemoteIterator;
}
@Override
public boolean hasNext() throws IOException {
return statusIterator.hasNext();
}
@Override
public LocatedFileStatus next() throws IOException {
return OBSCommonUtils.toLocatedFileStatus(owner,
statusIterator.next());
}
}
}
相关信息
相关文章
hadoop BasicSessionCredential 源码
hadoop DefaultOBSClientFactory 源码
hadoop FileConflictException 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦