hadoop AbstractCounters 源码

  • 2022-10-20
  • 浏览 (158)

haddop AbstractCounters 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.hadoop.mapreduce.counters;

import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.getFrameworkGroupId;
import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.isFrameworkGroup;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.util.StringInterner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;

 * An abstract class to provide common implementation for the Counters
 * container in both mapred and mapreduce packages.
 * @param <C> type of counter inside the counters
 * @param <G> type of group inside the counters
public abstract class AbstractCounters<C extends Counter,
                                       G extends CounterGroupBase<C>>
    implements Writable, Iterable<G> {

  protected static final Logger LOG =

   * A cache from enum values to the associated counter.
  private final Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
  //framework & fs groups
  private final Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>();
  // other groups
  private final Map<String, G> groups = new ConcurrentSkipListMap<String, G>();
  private final CounterGroupFactory<C, G> groupFactory;

  // For framework counter serialization without strings
  enum GroupType { FRAMEWORK, FILESYSTEM };

  // Writes only framework and fs counters if false.
  private boolean writeAllCounters = true;

  private static final Map<String, String> legacyMap = Maps.newHashMap();
  static {
    legacyMap.put("FileSystemCounters", FileSystemCounter.class.getName());

  private final Limits limits = new Limits();

  public AbstractCounters(CounterGroupFactory<C, G> gf) {
    groupFactory = gf;

   * Construct from another counters object.
   * @param <C1> type of the other counter
   * @param <G1> type of the other counter group
   * @param counters the counters object to copy
   * @param groupFactory the factory for new groups
  public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
  AbstractCounters(AbstractCounters<C1, G1> counters,
                   CounterGroupFactory<C, G> groupFactory) {
    this.groupFactory = groupFactory;
    for(G1 group: counters) {
      String name = group.getName();
      G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
      (isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
      for(Counter counter: group) {
        newGroup.addCounter(counter.getName(), counter.getDisplayName(),

  /** Add a group.
   * @param group object to add
   * @return the group
  public synchronized G addGroup(G group) {
    String name = group.getName();
    if (isFrameworkGroup(name)) {
      fgroups.put(name, group);
    } else {
      limits.checkGroups(groups.size() + 1);
      groups.put(name, group);
    return group;

   * Add a new group
   * @param name of the group
   * @param displayName of the group
   * @return the group
  public G addGroup(String name, String displayName) {
    return addGroup(groupFactory.newGroup(name, displayName, limits));

   * Find a counter, create one if necessary
   * @param groupName of the counter
   * @param counterName name of the counter
   * @return the matching counter
  public C findCounter(String groupName, String counterName) {
    G grp = getGroup(groupName);
    return grp.findCounter(counterName);

   * Find the counter for the given enum. The same enum will always return the
   * same counter.
   * @param key the counter key
   * @return the matching counter object
  public synchronized C findCounter(Enum<?> key) {
    C counter = cache.get(key);
    if (counter == null) {
      counter = findCounter(key.getDeclaringClass().getName(), key.name());
      cache.put(key, counter);
    return counter;

   * Find the file system counter for the given scheme and enum.
   * @param scheme of the file system
   * @param key the enum of the counter
   * @return the file system counter
  public synchronized C findCounter(String scheme, FileSystemCounter key) {
    return ((FileSystemCounterGroup<C>) getGroup(
        findCounter(scheme, key);

   * Returns the names of all counter classes.
   * @return Set of counter names.
  public synchronized Iterable<String> getGroupNames() {
    HashSet<String> deprecated = new HashSet<String>();
    for(Map.Entry<String, String> entry : legacyMap.entrySet()) {
      String newGroup = entry.getValue();
      boolean isFGroup = isFrameworkGroup(newGroup);
      if(isFGroup ? fgroups.containsKey(newGroup) : groups.containsKey(newGroup)) {
    return Iterables.concat(fgroups.keySet(), groups.keySet(), deprecated);

  public Iterator<G> iterator() {
    return Iterators.concat(fgroups.values().iterator(),

   * Returns the named counter group, or an empty group if there is none
   * with the specified name.
   * @param groupName name of the group
   * @return the group
  public synchronized G getGroup(String groupName) {

    // filterGroupName
    boolean groupNameInLegacyMap = true;
    String newGroupName = legacyMap.get(groupName);
    if (newGroupName == null) {
      groupNameInLegacyMap = false;
      newGroupName = Limits.filterGroupName(groupName);

    boolean isFGroup = isFrameworkGroup(newGroupName);
    G group = isFGroup ? fgroups.get(newGroupName) : groups.get(newGroupName);
    if (group == null) {
      group = groupFactory.newGroup(newGroupName, limits);
      if (isFGroup) {
        fgroups.put(newGroupName, group);
      } else {
        limits.checkGroups(groups.size() + 1);
        groups.put(newGroupName, group);
      if (groupNameInLegacyMap) {
        LOG.warn("Group " + groupName + " is deprecated. Use " + newGroupName
            + " instead");
    return group;

   * Returns the total number of counters, by summing the number of counters
   * in each group.
   * @return the total number of counters
  public synchronized int countCounters() {
    int result = 0;
    for (G group : this) {
      result += group.size();
    return result;

   * Write the set of groups.
   * Counters ::= version #fgroups (groupId, group)* #groups (group)*
  public synchronized void write(DataOutput out) throws IOException {
    WritableUtils.writeVInt(out, groupFactory.version());
    WritableUtils.writeVInt(out, fgroups.size());  // framework groups first
    for (G group : fgroups.values()) {
      if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
        WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
        WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
      } else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
        WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
    if (writeAllCounters) {
      WritableUtils.writeVInt(out, groups.size());
      for (G group : groups.values()) {
        Text.writeString(out, group.getName());
    } else {
      WritableUtils.writeVInt(out, 0);

  public synchronized void readFields(DataInput in) throws IOException {
    int version = WritableUtils.readVInt(in);
    if (version != groupFactory.version()) {
      throw new IOException("Counters version mismatch, expected "+
          groupFactory.version() +" got "+ version);
    int numFGroups = WritableUtils.readVInt(in);
    GroupType[] groupTypes = GroupType.values();
    while (numFGroups-- > 0) {
      GroupType groupType = groupTypes[WritableUtils.readVInt(in)];
      G group;
      switch (groupType) {
        case FILESYSTEM: // with nothing
          group = groupFactory.newFileSystemGroup();
        case FRAMEWORK:  // with group id
          group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in));
        default: // Silence dumb compiler, as it would've thrown earlier
          throw new IOException("Unexpected counter group type: "+ groupType);
      fgroups.put(group.getName(), group);
    int numGroups = WritableUtils.readVInt(in);
    while (numGroups-- > 0) {
      limits.checkGroups(groups.size() + 1);
      G group = groupFactory.newGenericGroup(
          StringInterner.weakIntern(Text.readString(in)), null, limits);
      groups.put(group.getName(), group);

   * Return textual representation of the counter values.
   * @return the string
  public synchronized String toString() {
    StringBuilder sb = new StringBuilder("Counters: " + countCounters());
    for (G group: this) {
      for (Counter counter: group) {
    return sb.toString();

   * Increments multiple counters by their amounts in another Counters
   * instance.
   * @param other the other Counters instance
  public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
    for(G right : other) {
      String groupName = right.getName();
      G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName);
      if (left == null) {
        left = addGroup(groupName, right.getDisplayName());

  public boolean equals(Object genericRight) {
    if (genericRight instanceof AbstractCounters<?, ?>) {
      return Iterators.elementsEqual(iterator(),
          ((AbstractCounters<C, G>)genericRight).iterator());
    return false;

  public int hashCode() {
    return groups.hashCode();

   * Set the "writeAllCounters" option to true or false
   * @param send  if true all counters would be serialized, otherwise only
   *              framework counters would be serialized in
   *              {@link #write(DataOutput)}
  public void setWriteAllCounters(boolean send) {
    writeAllCounters = send;

   * Get the "writeAllCounters" option
   * @return true of all counters would serialized
  public boolean getWriteAllCounters() {
    return writeAllCounters;

  public Limits limits() {
    return limits;


hadoop 源码目录


hadoop AbstractCounter 源码

hadoop AbstractCounterGroup 源码

hadoop CounterGroupBase 源码

hadoop CounterGroupFactory 源码

hadoop FileSystemCounterGroup 源码

hadoop FrameworkCounterGroup 源码

hadoop GenericCounter 源码

hadoop LimitExceededException 源码

hadoop Limits 源码

hadoop package-info 源码

0  赞