hadoop StatisticsCollector 源码

  • 2022-10-20
  • 浏览 (184)

haddop StatisticsCollector 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/StatisticsCollector.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.mapred;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.hadoop.mapred.StatisticsCollector.Stat.TimeStat;

/**
 * Collects the statistics in time windows.
 */
class StatisticsCollector {

  private static final int DEFAULT_PERIOD = 5;

  static final TimeWindow 
    SINCE_START = new TimeWindow("Since Start", -1, -1);
  
  static final TimeWindow 
    LAST_WEEK = new TimeWindow("Last Week", 7 * 24 * 60 * 60, 60 * 60);
  
  static final TimeWindow 
    LAST_DAY = new TimeWindow("Last Day", 24 * 60 * 60, 60 * 60);
  
  static final TimeWindow 
    LAST_HOUR = new TimeWindow("Last Hour", 60 * 60, 60);
  
  static final TimeWindow 
    LAST_MINUTE = new TimeWindow("Last Minute", 60, 10);

  static final TimeWindow[] DEFAULT_COLLECT_WINDOWS = {
    StatisticsCollector.SINCE_START,
    StatisticsCollector.LAST_DAY,
    StatisticsCollector.LAST_HOUR
    };

  private final int period;
  private boolean started;
  
  private final Map<TimeWindow, StatUpdater> updaters = 
    new LinkedHashMap<TimeWindow, StatUpdater>();
  private final Map<String, Stat> statistics = new HashMap<String, Stat>();

  StatisticsCollector() {
    this(DEFAULT_PERIOD);
  }

  StatisticsCollector(int period) {
    this.period = period;
  }

  synchronized void start() {
    if (started) {
      return;
    }
    Timer timer = new Timer("Timer thread for monitoring ", true);
    TimerTask task = new TimerTask() {
      public void run() {
        update();
      }
    };
    long millis = period * 1000;
    timer.scheduleAtFixedRate(task, millis, millis);
    started = true;
  }

  protected synchronized void update() {
    for (StatUpdater c : updaters.values()) {
      c.update();
    }
  }

  Map<TimeWindow, StatUpdater> getUpdaters() {
    return Collections.unmodifiableMap(updaters);
  }

  Map<String, Stat> getStatistics() {
    return Collections.unmodifiableMap(statistics);
  }

  synchronized Stat createStat(String name) {
    return createStat(name, DEFAULT_COLLECT_WINDOWS);
  }

  synchronized Stat createStat(String name, TimeWindow[] windows) {
    if (statistics.get(name) != null) {
      throw new RuntimeException("Stat with name "+ name + 
          " is already defined");
    }
    Map<TimeWindow, TimeStat> timeStats = 
      new LinkedHashMap<TimeWindow, TimeStat>();
    for (TimeWindow window : windows) {
      StatUpdater collector = updaters.get(window);
      if (collector == null) {
        if(SINCE_START.equals(window)) {
          collector = new StatUpdater();
        } else {
          collector = new TimeWindowStatUpdater(window, period);
        }
        updaters.put(window, collector);
      }
      TimeStat timeStat = new TimeStat();
      collector.addTimeStat(name, timeStat);
      timeStats.put(window, timeStat);
    }

    Stat stat = new Stat(name, timeStats);
    statistics.put(name, stat);
    return stat;
  }

  synchronized Stat removeStat(String name) {
    Stat stat = statistics.remove(name);
    if (stat != null) {
      for (StatUpdater collector : updaters.values()) {
        collector.removeTimeStat(name);
      }
    }
    return stat;
  }

  static class TimeWindow {
    final String name;
    final int windowSize;
    final int updateGranularity;
    TimeWindow(String name, int windowSize, int updateGranularity) {
      if (updateGranularity > windowSize) {
        throw new RuntimeException(
            "Invalid TimeWindow: updateGranularity > windowSize");
      }
      this.name = name;
      this.windowSize = windowSize;
      this.updateGranularity = updateGranularity;
    }

    public int hashCode() {
      return name.hashCode() + updateGranularity + windowSize;
    }

    public boolean equals(Object obj) {
      if (this == obj)
        return true;
      if (obj == null)
        return false;
      if (getClass() != obj.getClass())
        return false;
      final TimeWindow other = (TimeWindow) obj;
      if (name == null) {
        if (other.name != null)
          return false;
      } else if (!name.equals(other.name))
        return false;
      if (updateGranularity != other.updateGranularity)
        return false;
      if (windowSize != other.windowSize)
        return false;
      return true;
    }
  }

  static class Stat {
    final String name;
    private Map<TimeWindow, TimeStat> timeStats;

    private Stat(String name, Map<TimeWindow, TimeStat> timeStats) {
      this.name = name;
      this.timeStats = timeStats;
    }

    public synchronized void inc(int incr) {
      for (TimeStat ts : timeStats.values()) {
        ts.inc(incr);
      }
    }

    public synchronized void inc() {
      inc(1);
    }

    public synchronized Map<TimeWindow, TimeStat> getValues() {
      return Collections.unmodifiableMap(timeStats);
    }

    static class TimeStat {
      private final LinkedList<Integer> buckets = new LinkedList<Integer>();
      private int value;
      private int currentValue;

      public synchronized int getValue() {
        return value;
      }

      private synchronized void inc(int i) {
        currentValue += i;
      }

      private synchronized void addBucket() {
        buckets.addLast(currentValue);
        setValueToCurrent();
      }

      private synchronized void setValueToCurrent() {
        value += currentValue;
        currentValue = 0;
      }

      private synchronized void removeBucket() {
        int removed = buckets.removeFirst();
        value -= removed;
      }
    }
  }

  private static class StatUpdater {

    protected final Map<String, TimeStat> statToCollect = 
      new HashMap<String, TimeStat>();

    synchronized void addTimeStat(String name, TimeStat s) {
      statToCollect.put(name, s);
    }

    synchronized TimeStat removeTimeStat(String name) {
      return statToCollect.remove(name);
    }

    synchronized void update() {
      for (TimeStat stat : statToCollect.values()) {
        stat.setValueToCurrent();
      }
    }
  }

  /**
   * Updates TimeWindow statistics in buckets.
   *
   */
  private static class TimeWindowStatUpdater extends StatUpdater{

    final int collectBuckets;
    final int updatesPerBucket;
    
    private int updates;
    private int buckets;

    TimeWindowStatUpdater(TimeWindow w, int updatePeriod) {
      if (updatePeriod > w.updateGranularity) {
        throw new RuntimeException(
            "Invalid conf: updatePeriod > updateGranularity");
      }
      collectBuckets = w.windowSize / w.updateGranularity;
      updatesPerBucket = w.updateGranularity / updatePeriod;
    }

    synchronized void update() {
      updates++;
      if (updates == updatesPerBucket) {
        for(TimeStat stat : statToCollect.values()) {
          stat.addBucket();
        }
        updates = 0;
        buckets++;
        if (buckets > collectBuckets) {
          for (TimeStat stat : statToCollect.values()) {
            stat.removeBucket();
          }
          buckets--;
        }
      }
    }
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop AMFeedback 源码

hadoop BackupStore 源码

hadoop BasicTypeSorterBase 源码

hadoop BufferSorter 源码

hadoop CleanupQueue 源码

hadoop Clock 源码

hadoop ClusterStatus 源码

hadoop Counters 源码

hadoop CumulativePeriodicStats 源码

hadoop DeprecatedQueueConfigurationParser 源码

0  赞