hadoop MutableRollingAverages 源码

  • 2022-10-20
  • 浏览 (307)

haddop MutableRollingAverages 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.metrics2.lib;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;

import javax.annotation.Nullable;

import static org.apache.hadoop.metrics2.lib.Interns.*;

/**
 * <p>
 * This class maintains a group of rolling average metrics. It implements the
 * algorithm of rolling average, i.e. a number of sliding windows are kept to
 * roll over and evict old subsets of samples. Each window has a subset of
 * samples in a stream, where sub-sum and sub-total are collected. All sub-sums
 * and sub-totals in all windows will be aggregated to final-sum and final-total
 * used to compute final average, which is called rolling average.
 * </p>
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MutableRollingAverages extends MutableMetric implements Closeable {

  private MutableRatesWithAggregation innerMetrics =
      new MutableRatesWithAggregation();

  @VisibleForTesting
  static final ScheduledExecutorService SCHEDULER = Executors
      .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
          .setNameFormat("MutableRollingAverages-%d").build());

  private ScheduledFuture<?> scheduledTask = null;

  @Nullable
  private Map<String, MutableRate> currentSnapshot;

  private final String avgInfoNameTemplate;
  private final String avgInfoDescTemplate;
  private int numWindows;

  /**
   * This class maintains sub-sum and sub-total of SampleStat.
   */
  private static class SumAndCount {
    private final double sum;
    private final long count;
    private final long snapshotTimeStamp;

    /**
     * Constructor for {@link SumAndCount}.
     *
     * @param sum sub-sum in sliding windows
     * @param count sub-total in sliding windows
     * @param snapshotTimeStamp when is a new SampleStat snapshot.
     */
    SumAndCount(final double sum, final long count,
        final long snapshotTimeStamp) {
      this.sum = sum;
      this.count = count;
      this.snapshotTimeStamp = snapshotTimeStamp;
    }

    public double getSum() {
      return sum;
    }

    public long getCount() {
      return count;
    }

    public long getSnapshotTimeStamp() {
      return snapshotTimeStamp;
    }
  }

  /**
   * <p>
   * key: metric name
   * </p>
   * <p>
   * value: deque where sub-sums and sub-totals for sliding windows are
   * maintained.
   * </p>
   */
  private Map<String, LinkedBlockingDeque<SumAndCount>> averages =
      new ConcurrentHashMap<>();

  private static final long WINDOW_SIZE_MS_DEFAULT = 300_000;
  private static final int NUM_WINDOWS_DEFAULT = 36;

  /**
   * Time duration after which a record is considered stale.
   * {@link MutableRollingAverages} should be time-sensitive, and it should use
   * the time window length(i.e. NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT)
   * as the valid time to make sure some too old record won't be use to compute
   * average.
   */
  private long recordValidityMs =
      NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT;

  /**
   * Constructor for {@link MutableRollingAverages}.
   * @param metricValueName input metricValueName.
   */
  public MutableRollingAverages(String metricValueName) {
    if (metricValueName == null) {
      metricValueName = "";
    }
    avgInfoNameTemplate = "[%s]" + "RollingAvg" +
        StringUtils.capitalize(metricValueName);
    avgInfoDescTemplate = "Rolling average " +
        StringUtils.uncapitalize(metricValueName) +" for "+ "%s";
    numWindows = NUM_WINDOWS_DEFAULT;
    scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
        WINDOW_SIZE_MS_DEFAULT, WINDOW_SIZE_MS_DEFAULT, TimeUnit.MILLISECONDS);
  }

  /**
   * This method is for testing only to replace the scheduledTask.
   */
  @VisibleForTesting
  synchronized void replaceScheduledTask(int windows, long interval,
                                         TimeUnit timeUnit) {
    numWindows = windows;
    scheduledTask.cancel(true);
    scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this),
        interval, interval, timeUnit);
  }

  @Override
  public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
    if (all || changed()) {
      for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
          : averages.entrySet()) {
        final String name = entry.getKey();
        final MetricsInfo avgInfo = info(
            String.format(avgInfoNameTemplate, StringUtils.capitalize(name)),
            String.format(avgInfoDescTemplate, StringUtils.uncapitalize(name)));
        double totalSum = 0;
        long totalCount = 0;

        for (final SumAndCount sumAndCount : entry.getValue()) {
          if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp()
              < recordValidityMs) {
            totalCount += sumAndCount.getCount();
            totalSum += sumAndCount.getSum();
          }
        }

        if (totalCount != 0) {
          builder.addGauge(avgInfo, totalSum / totalCount);
        }
      }
      if (changed()) {
        clearChanged();
      }
    }
  }

  /**
   * Collects states maintained in {@link ThreadLocal}, if any.
   */
  public void collectThreadLocalStates() {
    innerMetrics.collectThreadLocalStates();
  }

  /**
   * @param name
   *          name of metric
   * @param value
   *          value of metric
   */
  public void add(final String name, final long value) {
    innerMetrics.add(name, value);
  }

  private static class RatesRoller implements Runnable {
    private final MutableRollingAverages parent;

    RatesRoller(final MutableRollingAverages parent) {
      this.parent = parent;
    }

    @Override
    public void run() {
      synchronized (parent) {
        final MetricsCollectorImpl mc = new MetricsCollectorImpl();
        final MetricsRecordBuilder rb = mc.addRecord("RatesRoller");
        /**
         * snapshot all metrics regardless of being changed or not, in case no
         * ops since last snapshot, we will get 0.
         */
        parent.innerMetrics.snapshot(rb, true);
        Preconditions.checkState(mc.getRecords().size() == 1,
            "There must be only one record and it's named with 'RatesRoller'");

        parent.currentSnapshot = parent.innerMetrics.getGlobalMetrics();
        parent.rollOverAvgs();
      }
      parent.setChanged();
    }
  }

  /**
   * Iterates over snapshot to capture all Avg metrics into rolling structure
   * {@link MutableRollingAverages#averages}.
   */
  private synchronized void rollOverAvgs() {
    if (currentSnapshot == null) {
      return;
    }

    for (Map.Entry<String, MutableRate> entry : currentSnapshot.entrySet()) {
      final MutableRate rate = entry.getValue();
      final LinkedBlockingDeque<SumAndCount> deque = averages.computeIfAbsent(
          entry.getKey(),
          new Function<String, LinkedBlockingDeque<SumAndCount>>() {
            @Override
            public LinkedBlockingDeque<SumAndCount> apply(String k) {
              return new LinkedBlockingDeque<>(numWindows);
            }
          });
      final SumAndCount sumAndCount = new SumAndCount(
          rate.lastStat().total(),
          rate.lastStat().numSamples(),
          rate.getSnapshotTimeStamp());
      /* put newest sum and count to the end */
      if (!deque.offerLast(sumAndCount)) {
        deque.pollFirst();
        deque.offerLast(sumAndCount);
      }
    }

    setChanged();
  }

  @Override
  public void close() throws IOException {
    if (scheduledTask != null) {
      scheduledTask.cancel(false);
    }
    scheduledTask = null;
  }

  /**
   * Retrieve a map of metric name {@literal ->} (aggregate).
   * Filter out entries that don't have at least minSamples.
   *
   * @param minSamples input minSamples.
   * @return a map of peer DataNode Id to the average latency to that
   *         node seen over the measurement period.
   */
  public synchronized Map<String, Double> getStats(long minSamples) {
    final Map<String, Double> stats = new HashMap<>();

    for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
        : averages.entrySet()) {
      final String name = entry.getKey();
      double totalSum = 0;
      long totalCount = 0;

      for (final SumAndCount sumAndCount : entry.getValue()) {
        if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp()
            < recordValidityMs) {
          totalCount += sumAndCount.getCount();
          totalSum += sumAndCount.getSum();
        }
      }

      if (totalCount > minSamples) {
        stats.put(name, totalSum / totalCount);
      }
    }
    return stats;
  }

  /**
   * Use for test only.
   * @param value input value.
   */
  @VisibleForTesting
  public synchronized void setRecordValidityMs(long value) {
    this.recordValidityMs = value;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop DefaultMetricsFactory 源码

hadoop DefaultMetricsSystem 源码

hadoop Interns 源码

hadoop MethodMetric 源码

hadoop MetricsAnnotations 源码

hadoop MetricsInfoImpl 源码

hadoop MetricsRegistry 源码

hadoop MetricsSourceBuilder 源码

hadoop MutableCounter 源码

hadoop MutableCounterInt 源码

0  赞