hadoop IOStatisticsBinding 源码
haddop IOStatisticsBinding 代码
文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.statistics.impl;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.ConsumerRaisingIOE;
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
import static org.apache.hadoop.fs.statistics.IOStatistics.MIN_UNSET_VALUE;
import static org.apache.hadoop.fs.statistics.impl.StubDurationTracker.STUB_DURATION_TRACKER;
/**
* Support for implementing IOStatistics interfaces.
*/
public final class IOStatisticsBinding {
/** Pattern used for each entry. */
public static final String ENTRY_PATTERN = "(%s=%s)";
/** String to return when a source is null. */
@VisibleForTesting
public static final String NULL_SOURCE = "()";
private IOStatisticsBinding() {
}
/**
* Create IOStatistics from a storage statistics instance.
*
* This will be updated as the storage statistics change.
* @param storageStatistics source data.
* @return an IO statistics source.
*/
public static IOStatistics fromStorageStatistics(
StorageStatistics storageStatistics) {
DynamicIOStatisticsBuilder builder = dynamicIOStatistics();
Iterator<StorageStatistics.LongStatistic> it = storageStatistics
.getLongStatistics();
while (it.hasNext()) {
StorageStatistics.LongStatistic next = it.next();
builder.withLongFunctionCounter(next.getName(),
k -> storageStatistics.getLong(k));
}
return builder.build();
}
/**
* Create a builder for dynamic IO Statistics.
* @return a builder to be completed.
*/
public static DynamicIOStatisticsBuilder dynamicIOStatistics() {
return new DynamicIOStatisticsBuilder();
}
/**
* Get the shared instance of the immutable empty statistics
* object.
* @return an empty statistics object.
*/
public static IOStatistics emptyStatistics() {
return EmptyIOStatistics.getInstance();
}
/**
* Get the shared instance of the immutable empty statistics
* store.
* @return an empty statistics object.
*/
public static IOStatisticsStore emptyStatisticsStore() {
return EmptyIOStatisticsStore.getInstance();
}
/**
* Take an IOStatistics instance and wrap it in a source.
* @param statistics statistics.
* @return a source which will return the values
*/
public static IOStatisticsSource wrap(IOStatistics statistics) {
return new SourceWrappedStatistics(statistics);
}
/**
* Create a builder for an {@link IOStatisticsStore}.
*
* @return a builder instance.
*/
public static IOStatisticsStoreBuilder iostatisticsStore() {
return new IOStatisticsStoreBuilderImpl();
}
/**
* Convert an entry to the string format used in logging.
*
* @param entry entry to evaluate
* @param <E> entry type
* @return formatted string
*/
public static <E> String entryToString(
final Map.Entry<String, E> entry) {
return entryToString(entry.getKey(), entry.getValue());
}
/**
* Convert entry values to the string format used in logging.
*
* @param <E> type of values.
* @param name statistic name
* @param value stat value
* @return formatted string
*/
public static <E> String entryToString(
final String name, final E value) {
return String.format(
ENTRY_PATTERN,
name,
value);
}
/**
* Copy into the dest map all the source entries.
* The destination is cleared first.
* @param <E> entry type
* @param dest destination of the copy
* @param source source
* @param copyFn function to copy entries
* @return the destination.
*/
private static <E> Map<String, E> copyMap(
Map<String, E> dest,
Map<String, E> source,
Function<E, E> copyFn) {
// we have to clone the values so that they aren't
// bound to the original values
dest.clear();
source.entrySet()
.forEach(entry ->
dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
return dest;
}
/**
* A passthrough copy operation suitable for immutable
* types, including numbers.
*
* @param <E> type of values.
* @param src source object
* @return the source object
*/
public static <E extends Serializable> E passthroughFn(E src) {
return src;
}
/**
* Take a snapshot of a supplied map, where the copy option simply
* uses the existing value.
*
* For this to be safe, the map must refer to immutable objects.
* @param source source map
* @param <E> type of values.
* @return a new map referencing the same values.
*/
public static <E extends Serializable> Map<String, E> snapshotMap(
Map<String, E> source) {
return snapshotMap(source,
IOStatisticsBinding::passthroughFn);
}
/**
* Take a snapshot of a supplied map, using the copy function
* to replicate the source values.
* @param source source map
* @param copyFn function to copy the value
* @param <E> type of values.
* @return a concurrent hash map referencing the same values.
*/
public static <E extends Serializable>
ConcurrentHashMap<String, E> snapshotMap(
Map<String, E> source,
Function<E, E> copyFn) {
ConcurrentHashMap<String, E> dest = new ConcurrentHashMap<>();
copyMap(dest, source, copyFn);
return dest;
}
/**
* Aggregate two maps so that the destination.
* @param <E> type of values
* @param dest destination map.
* @param other other map
* @param aggregateFn function to aggregate the values.
* @param copyFn function to copy the value
*/
public static <E> void aggregateMaps(
Map<String, E> dest,
Map<String, E> other,
BiFunction<E, E, E> aggregateFn,
Function<E, E> copyFn) {
// scan through the other hand map; copy
// any values not in the left map,
// aggregate those for which there is already
// an entry
other.entrySet().forEach(entry -> {
String key = entry.getKey();
E rVal = entry.getValue();
E lVal = dest.get(key);
if (lVal == null) {
dest.put(key, copyFn.apply(rVal));
} else {
dest.put(key, aggregateFn.apply(lVal, rVal));
}
});
}
/**
* Aggregate two counters.
* @param l left value
* @param r right value
* @return the aggregate value
*/
public static Long aggregateCounters(Long l, Long r) {
return Math.max(l, 0) + Math.max(r, 0);
}
/**
* Add two gauges.
* @param l left value
* @param r right value
* @return aggregate value
*/
public static Long aggregateGauges(Long l, Long r) {
return l + r;
}
/**
* Aggregate two minimum values.
* @param l left
* @param r right
* @return the new minimum.
*/
public static Long aggregateMinimums(Long l, Long r) {
if (l == MIN_UNSET_VALUE) {
return r;
} else if (r == MIN_UNSET_VALUE) {
return l;
} else {
return Math.min(l, r);
}
}
/**
* Aggregate two maximum values.
* @param l left
* @param r right
* @return the new minimum.
*/
public static Long aggregateMaximums(Long l, Long r) {
if (l == MIN_UNSET_VALUE) {
return r;
} else if (r == MIN_UNSET_VALUE) {
return l;
} else {
return Math.max(l, r);
}
}
/**
* Aggregate the mean statistics.
* This returns a new instance.
* @param l left value
* @param r right value
* @return aggregate value
*/
public static MeanStatistic aggregateMeanStatistics(
MeanStatistic l, MeanStatistic r) {
MeanStatistic res = l.copy();
res.add(r);
return res;
}
/**
* Update a maximum value tracked in an atomic long.
* This is thread safe -it uses compareAndSet to ensure
* that Thread T1 whose sample is greater than the current
* value never overwrites an update from thread T2 whose
* sample was also higher -and which completed first.
* @param dest destination for all changes.
* @param sample sample to update.
*/
public static void maybeUpdateMaximum(AtomicLong dest, long sample) {
boolean done;
do {
long current = dest.get();
if (sample > current) {
done = dest.compareAndSet(current, sample);
} else {
done = true;
}
} while (!done);
}
/**
* Update a maximum value tracked in an atomic long.
* This is thread safe -it uses compareAndSet to ensure
* that Thread T1 whose sample is greater than the current
* value never overwrites an update from thread T2 whose
* sample was also higher -and which completed first.
* @param dest destination for all changes.
* @param sample sample to update.
*/
public static void maybeUpdateMinimum(AtomicLong dest, long sample) {
boolean done;
do {
long current = dest.get();
if (current == MIN_UNSET_VALUE || sample < current) {
done = dest.compareAndSet(current, sample);
} else {
done = true;
}
} while (!done);
}
/**
* Given an IOException raising function/lambda expression,
* return a new one which wraps the inner and tracks
* the duration of the operation, including whether
* it passes/fails.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param inputFn input function
* @param <A> type of argument to the input function.
* @param <B> return type.
* @return a new function which tracks duration and failure.
*/
public static <A, B> FunctionRaisingIOE<A, B> trackFunctionDuration(
@Nullable DurationTrackerFactory factory,
String statistic,
FunctionRaisingIOE<A, B> inputFn) {
return (x) -> {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
try {
// exec the input function and return its value
return inputFn.apply(x);
} catch (IOException | RuntimeException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after the catch() call would have
// set the failed flag.
tracker.close();
}
};
}
/**
* Given a java function/lambda expression,
* return a new one which wraps the inner and tracks
* the duration of the operation, including whether
* it passes/fails.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param inputFn input function
* @param <A> type of argument to the input function.
* @param <B> return type.
* @return a new function which tracks duration and failure.
*/
public static <A, B> Function<A, B> trackJavaFunctionDuration(
@Nullable DurationTrackerFactory factory,
String statistic,
Function<A, B> inputFn) {
return (x) -> {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
try {
// exec the input function and return its value
return inputFn.apply(x);
} catch (RuntimeException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after the catch() call would have
// set the failed flag.
tracker.close();
}
};
}
/**
* Given an IOException raising callable/lambda expression,
* execute it and update the relevant statistic.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @param <B> return type.
* @return the result of the operation.
* @throws IOException raised on errors performing I/O.
*/
public static <B> B trackDuration(
DurationTrackerFactory factory,
String statistic,
CallableRaisingIOE<B> input) throws IOException {
return trackDurationOfOperation(factory, statistic, input).apply();
}
/**
* Given an IOException raising callable/lambda expression,
* execute it and update the relevant statistic.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @throws IOException IO failure.
*/
public static void trackDurationOfInvocation(
DurationTrackerFactory factory,
String statistic,
InvocationRaisingIOE input) throws IOException {
measureDurationOfInvocation(factory, statistic, input);
}
/**
* Given an IOException raising callable/lambda expression,
* execute it and update the relevant statistic,
* returning the measured duration.
*
* {@link #trackDurationOfInvocation(DurationTrackerFactory, String, InvocationRaisingIOE)}
* with the duration returned for logging etc.; added as a new
* method to avoid linking problems with any code calling the existing
* method.
*
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @return the duration of the operation, as measured by the duration tracker.
* @throws IOException IO failure.
*/
public static Duration measureDurationOfInvocation(
DurationTrackerFactory factory,
String statistic,
InvocationRaisingIOE input) throws IOException {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
try {
// exec the input function and return its value
input.apply();
} catch (IOException | RuntimeException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after the catch() call would have
// set the failed flag.
tracker.close();
}
return tracker.asDuration();
}
/**
* Given an IOException raising callable/lambda expression,
* return a new one which wraps the inner and tracks
* the duration of the operation, including whether
* it passes/fails.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @param <B> return type.
* @return a new callable which tracks duration and failure.
*/
public static <B> CallableRaisingIOE<B> trackDurationOfOperation(
@Nullable DurationTrackerFactory factory,
String statistic,
CallableRaisingIOE<B> input) {
return () -> {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
return invokeTrackingDuration(tracker, input);
};
}
/**
* Given an IOException raising callable/lambda expression,
* execute it, updating the tracker on success/failure.
* @param tracker duration tracker.
* @param input input callable.
* @param <B> return type.
* @return the result of the invocation
* @throws IOException on failure.
*/
public static <B> B invokeTrackingDuration(
final DurationTracker tracker,
final CallableRaisingIOE<B> input)
throws IOException {
try {
// exec the input function and return its value
return input.apply();
} catch (IOException | RuntimeException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after the catch() call would have
// set the failed flag.
tracker.close();
}
}
/**
* Given an IOException raising Consumer,
* return a new one which wraps the inner and tracks
* the duration of the operation, including whether
* it passes/fails.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @param <B> return type.
* @return a new consumer which tracks duration and failure.
*/
public static <B> ConsumerRaisingIOE<B> trackDurationConsumer(
@Nullable DurationTrackerFactory factory,
String statistic,
ConsumerRaisingIOE<B> input) {
return (B t) -> {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
try {
// exec the input function and return its value
input.accept(t);
} catch (IOException | RuntimeException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after the catch() call would have
// set the failed flag.
tracker.close();
}
};
}
/**
* Given a callable/lambda expression,
* return a new one which wraps the inner and tracks
* the duration of the operation, including whether
* it passes/fails.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @param <B> return type.
* @return a new callable which tracks duration and failure.
*/
public static <B> Callable<B> trackDurationOfCallable(
@Nullable DurationTrackerFactory factory,
String statistic,
Callable<B> input) {
return () -> {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
try {
// exec the input function and return its value
return input.call();
} catch (RuntimeException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after any catch() call will have
// set the failed flag.
tracker.close();
}
};
}
/**
* Given a Java supplier, evaluate it while
* tracking the duration of the operation and success/failure.
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @param <B> return type.
* @return the output of the supplier.
*/
public static <B> B trackDurationOfSupplier(
@Nullable DurationTrackerFactory factory,
String statistic,
Supplier<B> input) {
// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
try {
// exec the input function and return its value
return input.get();
} catch (RuntimeException e) {
// input function failed: note it
tracker.failed();
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after any catch() call will have
// set the failed flag.
tracker.close();
}
}
/**
* Create the tracker. If the factory is null, a stub
* tracker is returned.
* @param factory tracker factory
* @param statistic statistic to track
* @return a duration tracker.
*/
public static DurationTracker createTracker(
@Nullable final DurationTrackerFactory factory,
final String statistic) {
return factory != null
? factory.trackDuration(statistic)
: STUB_DURATION_TRACKER;
}
/**
* Create a DurationTrackerFactory which aggregates the tracking
* of two other factories.
* @param first first tracker factory
* @param second second tracker factory
* @return a factory
*/
public static DurationTrackerFactory pairedTrackerFactory(
final DurationTrackerFactory first,
final DurationTrackerFactory second) {
return new PairedDurationTrackerFactory(first, second);
}
/**
* Publish the IOStatistics as a set of storage statistics.
* This is dynamic.
* @param name storage statistics name.
* @param scheme FS scheme; may be null.
* @param source IOStatistics source.
* @return a dynamic storage statistics object.
*/
public static StorageStatistics publishAsStorageStatistics(
String name, String scheme, IOStatistics source) {
return new StorageStatisticsFromIOStatistics(name, scheme, source);
}
}
相关信息
相关文章
hadoop AbstractIOStatisticsImpl 源码
hadoop DynamicIOStatisticsBuilder 源码
hadoop EmptyIOStatisticsContextImpl 源码
hadoop EmptyIOStatisticsStore 源码
hadoop EvaluatingStatisticsMap 源码
hadoop IOStatisticsContextImpl 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦