hadoop IOStatisticsBinding 源码

  • 2022-10-20
  • 浏览 (531)

haddop IOStatisticsBinding 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.statistics.impl;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.hadoop.classification.VisibleForTesting;

import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.ConsumerRaisingIOE;
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;

import static org.apache.hadoop.fs.statistics.IOStatistics.MIN_UNSET_VALUE;
import static org.apache.hadoop.fs.statistics.impl.StubDurationTracker.STUB_DURATION_TRACKER;

/**
 * Support for implementing IOStatistics interfaces.
 */
public final class IOStatisticsBinding {

  /** Pattern used for each entry. */
  public static final String ENTRY_PATTERN = "(%s=%s)";

  /** String to return when a source is null. */
  @VisibleForTesting
  public static final String NULL_SOURCE = "()";

  private IOStatisticsBinding() {
  }

  /**
   * Create  IOStatistics from a storage statistics instance.
   *
   * This will be updated as the storage statistics change.
   * @param storageStatistics source data.
   * @return an IO statistics source.
   */
  public static IOStatistics fromStorageStatistics(
      StorageStatistics storageStatistics) {
    DynamicIOStatisticsBuilder builder = dynamicIOStatistics();
    Iterator<StorageStatistics.LongStatistic> it = storageStatistics
        .getLongStatistics();
    while (it.hasNext()) {
      StorageStatistics.LongStatistic next = it.next();
      builder.withLongFunctionCounter(next.getName(),
          k -> storageStatistics.getLong(k));
    }
    return builder.build();
  }

  /**
   * Create a builder for dynamic IO Statistics.
   * @return a builder to be completed.
   */
  public static DynamicIOStatisticsBuilder dynamicIOStatistics() {
    return new DynamicIOStatisticsBuilder();
  }

  /**
   * Get the shared instance of the immutable empty statistics
   * object.
   * @return an empty statistics object.
   */
  public static IOStatistics emptyStatistics() {
    return EmptyIOStatistics.getInstance();
  }

  /**
   * Get the shared instance of the immutable empty statistics
   * store.
   * @return an empty statistics object.
   */
  public static IOStatisticsStore emptyStatisticsStore() {
    return EmptyIOStatisticsStore.getInstance();
  }

  /**
   * Take an IOStatistics instance and wrap it in a source.
   * @param statistics statistics.
   * @return a source which will return the values
   */
  public static IOStatisticsSource wrap(IOStatistics statistics) {
    return new SourceWrappedStatistics(statistics);
  }

  /**
   * Create a builder for an {@link IOStatisticsStore}.
   *
   * @return a builder instance.
   */
  public static IOStatisticsStoreBuilder iostatisticsStore() {
    return new IOStatisticsStoreBuilderImpl();
  }

  /**
   * Convert an entry to the string format used in logging.
   *
   * @param entry entry to evaluate
   * @param <E> entry type
   * @return formatted string
   */
  public static <E> String entryToString(
      final Map.Entry<String, E> entry) {
    return entryToString(entry.getKey(), entry.getValue());
  }

  /**
   * Convert entry values to the string format used in logging.
   *
   * @param <E> type of values.
   * @param name statistic name
   * @param value stat value
   * @return formatted string
   */
  public static <E> String entryToString(
      final String name, final E value) {
    return String.format(
        ENTRY_PATTERN,
        name,
        value);
  }

  /**
   * Copy into the dest map all the source entries.
   * The destination is cleared first.
   * @param <E> entry type
   * @param dest destination of the copy
   * @param source source
   * @param copyFn function to copy entries
   * @return the destination.
   */
  private static <E> Map<String, E> copyMap(
      Map<String, E> dest,
      Map<String, E> source,
      Function<E, E> copyFn) {
    // we have to clone the values so that they aren't
    // bound to the original values
    dest.clear();
    source.entrySet()
        .forEach(entry ->
            dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
    return dest;
  }

  /**
   * A passthrough copy operation suitable for immutable
   * types, including numbers.
   *
   * @param <E> type of values.
   * @param src source object
   * @return the source object
   */
  public static <E extends Serializable> E passthroughFn(E src) {
    return src;
  }

  /**
   * Take a snapshot of a supplied map, where the copy option simply
   * uses the existing value.
   *
   * For this to be safe, the map must refer to immutable objects.
   * @param source source map
   * @param <E> type of values.
   * @return a new map referencing the same values.
   */
  public static <E extends Serializable> Map<String, E> snapshotMap(
      Map<String, E> source) {
    return snapshotMap(source,
        IOStatisticsBinding::passthroughFn);
  }

  /**
   * Take a snapshot of a supplied map, using the copy function
   * to replicate the source values.
   * @param source source map
   * @param copyFn function to copy the value
   * @param <E> type of values.
   * @return a concurrent hash map referencing the same values.
   */
  public static <E extends Serializable>
      ConcurrentHashMap<String, E> snapshotMap(
          Map<String, E> source,
          Function<E, E> copyFn) {
    ConcurrentHashMap<String, E> dest = new ConcurrentHashMap<>();
    copyMap(dest, source, copyFn);
    return dest;
  }

  /**
   * Aggregate two maps so that the destination.
   * @param <E> type of values
   * @param dest destination map.
   * @param other other map
   * @param aggregateFn function to aggregate the values.
   * @param copyFn function to copy the value
   */
  public static <E> void aggregateMaps(
      Map<String, E> dest,
      Map<String, E> other,
      BiFunction<E, E, E> aggregateFn,
      Function<E, E> copyFn) {
    // scan through the other hand map; copy
    // any values not in the left map,
    // aggregate those for which there is already
    // an entry
    other.entrySet().forEach(entry -> {
      String key = entry.getKey();
      E rVal = entry.getValue();
      E lVal = dest.get(key);
      if (lVal == null) {
        dest.put(key, copyFn.apply(rVal));
      } else {
        dest.put(key, aggregateFn.apply(lVal, rVal));
      }
    });
  }

  /**
   * Aggregate two counters.
   * @param l left value
   * @param r right value
   * @return the aggregate value
   */
  public static Long aggregateCounters(Long l, Long r) {
    return Math.max(l, 0) + Math.max(r, 0);
  }

  /**
   * Add two gauges.
   * @param l left value
   * @param r right value
   * @return aggregate value
   */
  public static Long aggregateGauges(Long l, Long r) {
    return l + r;
  }


  /**
   * Aggregate two minimum values.
   * @param l left
   * @param r right
   * @return the new minimum.
   */
  public static Long aggregateMinimums(Long l, Long r) {
    if (l == MIN_UNSET_VALUE) {
      return r;
    } else if (r == MIN_UNSET_VALUE) {
      return l;
    } else {
      return Math.min(l, r);
    }
  }

  /**
   * Aggregate two maximum values.
   * @param l left
   * @param r right
   * @return the new minimum.
   */
  public static Long aggregateMaximums(Long l, Long r) {
    if (l == MIN_UNSET_VALUE) {
      return r;
    } else if (r == MIN_UNSET_VALUE) {
      return l;
    } else {
      return Math.max(l, r);
    }
  }

  /**
   * Aggregate the mean statistics.
   * This returns a new instance.
   * @param l left value
   * @param r right value
   * @return aggregate value
   */
  public static MeanStatistic aggregateMeanStatistics(
      MeanStatistic l, MeanStatistic r) {
    MeanStatistic res = l.copy();
    res.add(r);
    return res;
  }

  /**
   * Update a maximum value tracked in an atomic long.
   * This is thread safe -it uses compareAndSet to ensure
   * that Thread T1 whose sample is greater than the current
   * value never overwrites an update from thread T2 whose
   * sample was also higher -and which completed first.
   * @param dest destination for all changes.
   * @param sample sample to update.
   */
  public static void maybeUpdateMaximum(AtomicLong dest, long sample) {
    boolean done;
    do {
      long current = dest.get();
      if (sample > current) {
        done = dest.compareAndSet(current, sample);
      } else {
        done = true;
      }
    } while (!done);
  }

  /**
   * Update a maximum value tracked in an atomic long.
   * This is thread safe -it uses compareAndSet to ensure
   * that Thread T1 whose sample is greater than the current
   * value never overwrites an update from thread T2 whose
   * sample was also higher -and which completed first.
   * @param dest destination for all changes.
   * @param sample sample to update.
   */
  public static void maybeUpdateMinimum(AtomicLong dest, long sample) {
    boolean done;
    do {
      long current = dest.get();
      if (current == MIN_UNSET_VALUE || sample < current) {
        done = dest.compareAndSet(current, sample);
      } else {
        done = true;
      }
    } while (!done);
  }

  /**
   * Given an IOException raising function/lambda expression,
   * return a new one which wraps the inner and tracks
   * the duration of the operation, including whether
   * it passes/fails.
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param inputFn input function
   * @param <A> type of argument to the input function.
   * @param <B> return type.
   * @return a new function which tracks duration and failure.
   */
  public static <A, B> FunctionRaisingIOE<A, B> trackFunctionDuration(
      @Nullable DurationTrackerFactory factory,
      String statistic,
      FunctionRaisingIOE<A, B> inputFn) {
    return (x) -> {
      // create the tracker outside try-with-resources so
      // that failures can be set in the catcher.
      DurationTracker tracker = createTracker(factory, statistic);
      try {
        // exec the input function and return its value
        return inputFn.apply(x);
      } catch (IOException | RuntimeException e) {
        // input function failed: note it
        tracker.failed();
        // and rethrow
        throw e;
      } finally {
        // update the tracker.
        // this is called after the catch() call would have
        // set the failed flag.
        tracker.close();
      }
    };
  }

  /**
   * Given a java function/lambda expression,
   * return a new one which wraps the inner and tracks
   * the duration of the operation, including whether
   * it passes/fails.
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param inputFn input function
   * @param <A> type of argument to the input function.
   * @param <B> return type.
   * @return a new function which tracks duration and failure.
   */
  public static <A, B> Function<A, B> trackJavaFunctionDuration(
      @Nullable DurationTrackerFactory factory,
      String statistic,
      Function<A, B> inputFn) {
    return (x) -> {
      // create the tracker outside try-with-resources so
      // that failures can be set in the catcher.
      DurationTracker tracker = createTracker(factory, statistic);
      try {
        // exec the input function and return its value
        return inputFn.apply(x);
      } catch (RuntimeException e) {
        // input function failed: note it
        tracker.failed();
        // and rethrow
        throw e;
      } finally {
        // update the tracker.
        // this is called after the catch() call would have
        // set the failed flag.
        tracker.close();
      }
    };
  }

  /**
   * Given an IOException raising callable/lambda expression,
   * execute it and update the relevant statistic.
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param input input callable.
   * @param <B> return type.
   * @return the result of the operation.
   * @throws IOException raised on errors performing I/O.
   */
  public static <B> B trackDuration(
      DurationTrackerFactory factory,
      String statistic,
      CallableRaisingIOE<B> input) throws IOException {
    return trackDurationOfOperation(factory, statistic, input).apply();
  }

  /**
   * Given an IOException raising callable/lambda expression,
   * execute it and update the relevant statistic.
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param input input callable.
   * @throws IOException IO failure.
   */
  public static void trackDurationOfInvocation(
      DurationTrackerFactory factory,
      String statistic,
      InvocationRaisingIOE input) throws IOException {

    measureDurationOfInvocation(factory, statistic, input);
  }

  /**
   * Given an IOException raising callable/lambda expression,
   * execute it and update the relevant statistic,
   * returning the measured duration.
   *
   * {@link #trackDurationOfInvocation(DurationTrackerFactory, String, InvocationRaisingIOE)}
   * with the duration returned for logging etc.; added as a new
   * method to avoid linking problems with any code calling the existing
   * method.
   *
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param input input callable.
   * @return the duration of the operation, as measured by the duration tracker.
   * @throws IOException IO failure.
   */
  public static Duration measureDurationOfInvocation(
      DurationTrackerFactory factory,
      String statistic,
      InvocationRaisingIOE input) throws IOException {

    // create the tracker outside try-with-resources so
    // that failures can be set in the catcher.
    DurationTracker tracker = createTracker(factory, statistic);
    try {
      // exec the input function and return its value
      input.apply();
    } catch (IOException | RuntimeException e) {
      // input function failed: note it
      tracker.failed();
      // and rethrow
      throw e;
    } finally {
      // update the tracker.
      // this is called after the catch() call would have
      // set the failed flag.
      tracker.close();
    }
    return tracker.asDuration();
  }

  /**
   * Given an IOException raising callable/lambda expression,
   * return a new one which wraps the inner and tracks
   * the duration of the operation, including whether
   * it passes/fails.
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param input input callable.
   * @param <B> return type.
   * @return a new callable which tracks duration and failure.
   */
  public static <B> CallableRaisingIOE<B> trackDurationOfOperation(
      @Nullable DurationTrackerFactory factory,
      String statistic,
      CallableRaisingIOE<B> input) {
    return () -> {
      // create the tracker outside try-with-resources so
      // that failures can be set in the catcher.
      DurationTracker tracker = createTracker(factory, statistic);
      return invokeTrackingDuration(tracker, input);
    };
  }

  /**
   * Given an IOException raising callable/lambda expression,
   * execute it, updating the tracker on success/failure.
   * @param tracker duration tracker.
   * @param input input callable.
   * @param <B> return type.
   * @return the result of the invocation
   * @throws IOException on failure.
   */
  public static <B> B invokeTrackingDuration(
      final DurationTracker tracker,
      final CallableRaisingIOE<B> input)
      throws IOException {
    try {
      // exec the input function and return its value
      return input.apply();
    } catch (IOException | RuntimeException e) {
      // input function failed: note it
      tracker.failed();
      // and rethrow
      throw e;
    } finally {
      // update the tracker.
      // this is called after the catch() call would have
      // set the failed flag.
      tracker.close();
    }
  }

  /**
   * Given an IOException raising Consumer,
   * return a new one which wraps the inner and tracks
   * the duration of the operation, including whether
   * it passes/fails.
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param input input callable.
   * @param <B> return type.
   * @return a new consumer which tracks duration and failure.
   */
  public static <B> ConsumerRaisingIOE<B> trackDurationConsumer(
      @Nullable DurationTrackerFactory factory,
      String statistic,
      ConsumerRaisingIOE<B> input) {
    return (B t) -> {
      // create the tracker outside try-with-resources so
      // that failures can be set in the catcher.
      DurationTracker tracker = createTracker(factory, statistic);
      try {
        // exec the input function and return its value
        input.accept(t);
      } catch (IOException | RuntimeException e) {
        // input function failed: note it
        tracker.failed();
        // and rethrow
        throw e;
      } finally {
        // update the tracker.
        // this is called after the catch() call would have
        // set the failed flag.
        tracker.close();
      }
    };
  }

  /**
   * Given a callable/lambda expression,
   * return a new one which wraps the inner and tracks
   * the duration of the operation, including whether
   * it passes/fails.
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param input input callable.
   * @param <B> return type.
   * @return a new callable which tracks duration and failure.
   */
  public static <B> Callable<B> trackDurationOfCallable(
      @Nullable DurationTrackerFactory factory,
      String statistic,
      Callable<B> input) {
    return () -> {
      // create the tracker outside try-with-resources so
      // that failures can be set in the catcher.
      DurationTracker tracker = createTracker(factory, statistic);
      try {
        // exec the input function and return its value
        return input.call();
      } catch (RuntimeException e) {
        // input function failed: note it
        tracker.failed();
        // and rethrow
        throw e;
      } finally {
        // update the tracker.
        // this is called after any catch() call will have
        // set the failed flag.
        tracker.close();
      }
    };
  }

  /**
   * Given a Java supplier, evaluate it while
   * tracking the duration of the operation and success/failure.
   * @param factory factory of duration trackers
   * @param statistic statistic key
   * @param input input callable.
   * @param <B> return type.
   * @return the output of the supplier.
   */
  public static <B> B trackDurationOfSupplier(
      @Nullable DurationTrackerFactory factory,
      String statistic,
      Supplier<B> input) {
    // create the tracker outside try-with-resources so
    // that failures can be set in the catcher.
    DurationTracker tracker = createTracker(factory, statistic);
    try {
      // exec the input function and return its value
      return input.get();
    } catch (RuntimeException e) {
      // input function failed: note it
      tracker.failed();
      // and rethrow
      throw e;
    } finally {
      // update the tracker.
      // this is called after any catch() call will have
      // set the failed flag.
      tracker.close();
    }
  }

  /**
   * Create the tracker. If the factory is null, a stub
   * tracker is returned.
   * @param factory tracker factory
   * @param statistic statistic to track
   * @return a duration tracker.
   */
  public static DurationTracker createTracker(
      @Nullable final DurationTrackerFactory factory,
      final String statistic) {
    return factory != null
        ? factory.trackDuration(statistic)
        : STUB_DURATION_TRACKER;
  }

  /**
   * Create a DurationTrackerFactory which aggregates the tracking
   * of two other factories.
   * @param first first tracker factory
   * @param second second tracker factory
   * @return a factory
   */
  public static DurationTrackerFactory pairedTrackerFactory(
      final DurationTrackerFactory first,
      final DurationTrackerFactory second) {
    return new PairedDurationTrackerFactory(first, second);
  }

  /**
   * Publish the IOStatistics as a set of storage statistics.
   * This is dynamic.
   * @param name storage statistics name.
   * @param scheme FS scheme; may be null.
   * @param source IOStatistics source.
   * @return a dynamic storage statistics object.
   */
  public static StorageStatistics publishAsStorageStatistics(
      String name, String scheme, IOStatistics source) {
    return new StorageStatisticsFromIOStatistics(name, scheme, source);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractIOStatisticsImpl 源码

hadoop DynamicIOStatistics 源码

hadoop DynamicIOStatisticsBuilder 源码

hadoop EmptyIOStatistics 源码

hadoop EmptyIOStatisticsContextImpl 源码

hadoop EmptyIOStatisticsStore 源码

hadoop EvaluatingStatisticsMap 源码

hadoop IOStatisticsContextImpl 源码

hadoop IOStatisticsContextIntegration 源码

hadoop IOStatisticsStore 源码

0  赞