hadoop Configuration 源码

  • 2022-10-20
  • 浏览 (400)

haddop Configuration 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.conf;

import com.ctc.wstx.api.ReaderConfig;
import com.ctc.wstx.io.StreamBootstrapper;
import com.ctc.wstx.io.SystemId;
import com.ctc.wstx.stax.WstxInputFactory;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;

import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.net.JarURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;

import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.commons.collections.map.UnmodifiableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProvider.CredentialEntry;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.XMLUtils;

import org.codehaus.stax2.XMLStreamReader2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

/**
 * Provides access to configuration parameters.
 *
 * <h3 id="Resources">Resources</h3>
 *
 * <p>Configurations are specified by resources. A resource contains a set of
 * name/value pairs as XML data. Each resource is named by either a 
 * <code>String</code> or by a {@link Path}. If named by a <code>String</code>, 
 * then the classpath is examined for a file with that name.  If named by a 
 * <code>Path</code>, then the local filesystem is examined directly, without 
 * referring to the classpath.
 *
 * <p>Unless explicitly turned off, Hadoop by default specifies two 
 * resources, loaded in-order from the classpath: <ol>
 * <li><tt>
 * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
 * core-default.xml</a></tt>: Read-only defaults for hadoop.</li>
 * <li><tt>core-site.xml</tt>: Site-specific configuration for a given hadoop
 * installation.</li>
 * </ol>
 * Applications may add additional resources, which are loaded
 * subsequent to these resources in the order they are added.
 * 
 * <h4 id="FinalParams">Final Parameters</h4>
 *
 * <p>Configuration parameters may be declared <i>final</i>. 
 * Once a resource declares a value final, no subsequently-loaded 
 * resource can alter that value.  
 * For example, one might define a final parameter with:
 * <pre><code>
 *  &lt;property&gt;
 *    &lt;name&gt;dfs.hosts.include&lt;/name&gt;
 *    &lt;value&gt;/etc/hadoop/conf/hosts.include&lt;/value&gt;
 *    <b>&lt;final&gt;true&lt;/final&gt;</b>
 *  &lt;/property&gt;</code></pre>
 *
 * Administrators typically define parameters as final in 
 * <tt>core-site.xml</tt> for values that user applications may not alter.
 *
 * <h4 id="VariableExpansion">Variable Expansion</h4>
 *
 * <p>Value strings are first processed for <i>variable expansion</i>. The
 * available properties are:<ol>
 * <li>Other properties defined in this Configuration; and, if a name is
 * undefined here,</li>
 * <li>Environment variables in {@link System#getenv()} if a name starts with
 * "env.", or</li>
 * <li>Properties in {@link System#getProperties()}.</li>
 * </ol>
 *
 * <p>For example, if a configuration resource contains the following property
 * definitions: 
 * <pre><code>
 *  &lt;property&gt;
 *    &lt;name&gt;basedir&lt;/name&gt;
 *    &lt;value&gt;/user/${<i>user.name</i>}&lt;/value&gt;
 *  &lt;/property&gt;
 *  
 *  &lt;property&gt;
 *    &lt;name&gt;tempdir&lt;/name&gt;
 *    &lt;value&gt;${<i>basedir</i>}/tmp&lt;/value&gt;
 *  &lt;/property&gt;
 *
 *  &lt;property&gt;
 *    &lt;name&gt;otherdir&lt;/name&gt;
 *    &lt;value&gt;${<i>env.BASE_DIR</i>}/other&lt;/value&gt;
 *  &lt;/property&gt;
 *  </code></pre>
 *
 * <p>When <tt>conf.get("tempdir")</tt> is called, then <tt>${<i>basedir</i>}</tt>
 * will be resolved to another property in this Configuration, while
 * <tt>${<i>user.name</i>}</tt> would then ordinarily be resolved to the value
 * of the System property with that name.
 * <p>When <tt>conf.get("otherdir")</tt> is called, then <tt>${<i>env.BASE_DIR</i>}</tt>
 * will be resolved to the value of the <tt>${<i>BASE_DIR</i>}</tt> environment variable.
 * It supports <tt>${<i>env.NAME:-default</i>}</tt> and <tt>${<i>env.NAME-default</i>}</tt> notations.
 * The former is resolved to "default" if <tt>${<i>NAME</i>}</tt> environment variable is undefined
 * or its value is empty.
 * The latter behaves the same way only if <tt>${<i>NAME</i>}</tt> is undefined.
 * <p>By default, warnings will be given to any deprecated configuration 
 * parameters and these are suppressible by configuring
 * <tt>log4j.logger.org.apache.hadoop.conf.Configuration.deprecation</tt> in
 * log4j.properties file.
 *
 * <h4 id="Tags">Tags</h4>
 *
 * <p>Optionally we can tag related properties together by using tag
 * attributes. System tags are defined by hadoop.tags.system property. Users
 * can define there own custom tags in  hadoop.tags.custom property.
 *
 * <p>For example, we can tag existing property as:
 * <pre><code>
 *  &lt;property&gt;
 *    &lt;name&gt;dfs.replication&lt;/name&gt;
 *    &lt;value&gt;3&lt;/value&gt;
 *    &lt;tag&gt;HDFS,REQUIRED&lt;/tag&gt;
 *  &lt;/property&gt;
 *
 *  &lt;property&gt;
 *    &lt;name&gt;dfs.data.transfer.protection&lt;/name&gt;
 *    &lt;value&gt;3&lt;/value&gt;
 *    &lt;tag&gt;HDFS,SECURITY&lt;/tag&gt;
 *  &lt;/property&gt;
 * </code></pre>
 * <p> Properties marked with tags can be retrieved with <tt>conf
 * .getAllPropertiesByTag("HDFS")</tt> or <tt>conf.getAllPropertiesByTags
 * (Arrays.asList("YARN","SECURITY"))</tt>.</p>
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Configuration implements Iterable<Map.Entry<String,String>>,
                                      Writable {
  private static final Logger LOG =
      LoggerFactory.getLogger(Configuration.class);

  private static final Logger LOG_DEPRECATION =
      LoggerFactory.getLogger(
          "org.apache.hadoop.conf.Configuration.deprecation");
  private static final Set<String> TAGS = ConcurrentHashMap.newKeySet();

  private boolean quietmode = true;

  private static final String DEFAULT_STRING_CHECK =
    "testingforemptydefaultvalue";

  private static boolean restrictSystemPropsDefault = false;
  private boolean restrictSystemProps = restrictSystemPropsDefault;
  private boolean allowNullValueProperties = false;

  private static class Resource {
    private final Object resource;
    private final String name;
    private final boolean restrictParser;
    
    public Resource(Object resource) {
      this(resource, resource.toString());
    }

    public Resource(Object resource, boolean useRestrictedParser) {
      this(resource, resource.toString(), useRestrictedParser);
    }

    public Resource(Object resource, String name) {
      this(resource, name, getRestrictParserDefault(resource));
    }

    public Resource(Object resource, String name, boolean restrictParser) {
      this.resource = resource;
      this.name = name;
      this.restrictParser = restrictParser;
    }
    
    public String getName(){
      return name;
    }
    
    public Object getResource() {
      return resource;
    }

    public boolean isParserRestricted() {
      return restrictParser;
    }

    @Override
    public String toString() {
      return name;
    }

    private static boolean getRestrictParserDefault(Object resource) {
      if (resource instanceof String || !UserGroupInformation.isInitialized()) {
        return false;
      }
      UserGroupInformation user;
      try {
        user = UserGroupInformation.getCurrentUser();
      } catch (IOException e) {
        throw new RuntimeException("Unable to determine current user", e);
      }
      return user.getRealUser() != null;
    }
  }
  
  /**
   * List of configuration resources.
   */
  private ArrayList<Resource> resources = new ArrayList<Resource>();
  
  /**
   * The value reported as the setting resource when a key is set
   * by code rather than a file resource by dumpConfiguration.
   */
  static final String UNKNOWN_RESOURCE = "Unknown";


  /**
   * List of configuration parameters marked <b>final</b>. 
   */
  private Set<String> finalParameters = Collections.newSetFromMap(
      new ConcurrentHashMap<String, Boolean>());
  
  private boolean loadDefaults = true;

  /**
   * Configuration objects.
   */
  private static final WeakHashMap<Configuration,Object> REGISTRY = 
    new WeakHashMap<Configuration,Object>();

  /**
   * Map to hold properties by there tag groupings.
   */
  private final Map<String, Properties> propertyTagsMap =
      new ConcurrentHashMap<>();

  /**
   * List of default Resources. Resources are loaded in the order of the list 
   * entries
   */
  private static final CopyOnWriteArrayList<String> defaultResources =
    new CopyOnWriteArrayList<String>();

  private static final Map<ClassLoader, Map<String, WeakReference<Class<?>>>>
    CACHE_CLASSES = new WeakHashMap<ClassLoader, Map<String, WeakReference<Class<?>>>>();

  /**
   * Sentinel value to store negative cache results in {@link #CACHE_CLASSES}.
   */
  private static final Class<?> NEGATIVE_CACHE_SENTINEL =
    NegativeCacheSentinel.class;

  /**
   * Stores the mapping of key to the resource which modifies or loads 
   * the key most recently. Created lazily to avoid wasting memory.
   */
  private volatile Map<String, String[]> updatingResource;

  /**
   * Specify exact input factory to avoid time finding correct one.
   * Factory is reusable across un-synchronized threads once initialized
   */
  private static final WstxInputFactory XML_INPUT_FACTORY =
      new WstxInputFactory();

  /**
   * Class to keep the information about the keys which replace the deprecated
   * ones.
   * 
   * This class stores the new keys which replace the deprecated keys and also
   * gives a provision to have a custom message for each of the deprecated key
   * that is being replaced. It also provides method to get the appropriate
   * warning message which can be logged whenever the deprecated key is used.
   */
  private static class DeprecatedKeyInfo {
    private final String[] newKeys;
    private final String customMessage;
    private final AtomicBoolean accessed = new AtomicBoolean(false);

    DeprecatedKeyInfo(String[] newKeys, String customMessage) {
      this.newKeys = newKeys;
      this.customMessage = customMessage;
    }

    private final String getWarningMessage(String key) {
      return getWarningMessage(key, null);
    }

    /**
     * Method to provide the warning message. It gives the custom message if
     * non-null, and default message otherwise.
     * @param key the associated deprecated key.
     * @param source the property source.
     * @return message that is to be logged when a deprecated key is used.
     */
    private String getWarningMessage(String key, String source) {
      String warningMessage;
      if(customMessage == null) {
        StringBuilder message = new StringBuilder(key);
        if (source != null) {
          message.append(" in " + source);
        }
        message.append(" is deprecated. Instead, use ");
        for (int i = 0; i < newKeys.length; i++) {
          message.append(newKeys[i]);
          if(i != newKeys.length-1) {
            message.append(", ");
          }
        }
        warningMessage = message.toString();
      }
      else {
        warningMessage = customMessage;
      }
      return warningMessage;
    }

    boolean getAndSetAccessed() {
      return accessed.getAndSet(true);
    }

    public void clearAccessed() {
      accessed.set(false);
    }
  }
  
  /**
   * A pending addition to the global set of deprecated keys.
   */
  public static class DeprecationDelta {
    private final String key;
    private final String[] newKeys;
    private final String customMessage;

    DeprecationDelta(String key, String[] newKeys, String customMessage) {
      Preconditions.checkNotNull(key);
      Preconditions.checkNotNull(newKeys);
      Preconditions.checkArgument(newKeys.length > 0);
      this.key = key;
      this.newKeys = newKeys;
      this.customMessage = customMessage;
    }

    public DeprecationDelta(String key, String newKey, String customMessage) {
      this(key, new String[] { newKey }, customMessage);
    }

    public DeprecationDelta(String key, String newKey) {
      this(key, new String[] { newKey }, null);
    }

    public String getKey() {
      return key;
    }

    public String[] getNewKeys() {
      return newKeys;
    }

    public String getCustomMessage() {
      return customMessage;
    }
  }

  /**
   * The set of all keys which are deprecated.
   *
   * DeprecationContext objects are immutable.
   */
  private static class DeprecationContext {
    /**
     * Stores the deprecated keys, the new keys which replace the deprecated keys
     * and custom message(if any provided).
     */
    private final Map<String, DeprecatedKeyInfo> deprecatedKeyMap;

    /**
     * Stores a mapping from superseding keys to the keys which they deprecate.
     */
    private final Map<String, String> reverseDeprecatedKeyMap;

    /**
     * Create a new DeprecationContext by copying a previous DeprecationContext
     * and adding some deltas.
     *
     * @param other   The previous deprecation context to copy, or null to start
     *                from nothing.
     * @param deltas  The deltas to apply.
     */
    @SuppressWarnings("unchecked")
    DeprecationContext(DeprecationContext other, DeprecationDelta[] deltas) {
      HashMap<String, DeprecatedKeyInfo> newDeprecatedKeyMap = 
        new HashMap<String, DeprecatedKeyInfo>();
      HashMap<String, String> newReverseDeprecatedKeyMap =
        new HashMap<String, String>();
      if (other != null) {
        for (Entry<String, DeprecatedKeyInfo> entry :
            other.deprecatedKeyMap.entrySet()) {
          newDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
        }
        for (Entry<String, String> entry :
            other.reverseDeprecatedKeyMap.entrySet()) {
          newReverseDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
        }
      }
      for (DeprecationDelta delta : deltas) {
        if (!newDeprecatedKeyMap.containsKey(delta.getKey())) {
          DeprecatedKeyInfo newKeyInfo =
            new DeprecatedKeyInfo(delta.getNewKeys(), delta.getCustomMessage());
          newDeprecatedKeyMap.put(delta.key, newKeyInfo);
          for (String newKey : delta.getNewKeys()) {
            newReverseDeprecatedKeyMap.put(newKey, delta.key);
          }
        }
      }
      this.deprecatedKeyMap =
        UnmodifiableMap.decorate(newDeprecatedKeyMap);
      this.reverseDeprecatedKeyMap =
        UnmodifiableMap.decorate(newReverseDeprecatedKeyMap);
    }

    Map<String, DeprecatedKeyInfo> getDeprecatedKeyMap() {
      return deprecatedKeyMap;
    }

    Map<String, String> getReverseDeprecatedKeyMap() {
      return reverseDeprecatedKeyMap;
    }
  }
  
  private static DeprecationDelta[] defaultDeprecations = 
    new DeprecationDelta[] {
      new DeprecationDelta("topology.script.file.name", 
        CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY),
      new DeprecationDelta("topology.script.number.args", 
        CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY),
      new DeprecationDelta("hadoop.configured.node.mapping", 
        CommonConfigurationKeys.NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY),
      new DeprecationDelta("topology.node.switch.mapping.impl", 
        CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY),
      new DeprecationDelta("dfs.df.interval", 
        CommonConfigurationKeys.FS_DF_INTERVAL_KEY),
      new DeprecationDelta("fs.default.name", 
        CommonConfigurationKeys.FS_DEFAULT_NAME_KEY),
      new DeprecationDelta("dfs.umaskmode",
        CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY),
      new DeprecationDelta("dfs.nfs.exports.allowed.hosts",
          CommonConfigurationKeys.NFS_EXPORTS_ALLOWED_HOSTS_KEY)
    };

  /**
   * The global DeprecationContext.
   */
  private static AtomicReference<DeprecationContext> deprecationContext =
      new AtomicReference<DeprecationContext>(
          new DeprecationContext(null, defaultDeprecations));

  /**
   * Adds a set of deprecated keys to the global deprecations.
   *
   * This method is lockless.  It works by means of creating a new
   * DeprecationContext based on the old one, and then atomically swapping in
   * the new context.  If someone else updated the context in between us reading
   * the old context and swapping in the new one, we try again until we win the
   * race.
   *
   * @param deltas   The deprecations to add.
   */
  public static void addDeprecations(DeprecationDelta[] deltas) {
    DeprecationContext prev, next;
    do {
      prev = deprecationContext.get();
      next = new DeprecationContext(prev, deltas);
    } while (!deprecationContext.compareAndSet(prev, next));
  }

  /**
   * Adds the deprecated key to the global deprecation map.
   * It does not override any existing entries in the deprecation map.
   * This is to be used only by the developers in order to add deprecation of
   * keys, and attempts to call this method after loading resources once,
   * would lead to <tt>UnsupportedOperationException</tt>
   * 
   * If a key is deprecated in favor of multiple keys, they are all treated as 
   * aliases of each other, and setting any one of them resets all the others 
   * to the new value.
   *
   * If you have multiple deprecation entries to add, it is more efficient to
   * use #addDeprecations(DeprecationDelta[] deltas) instead.
   * 
   * @param key to be deprecated
   * @param newKeys list of keys that take up the values of deprecated key
   * @param customMessage depcrication message
   * @deprecated use {@link #addDeprecation(String key, String newKey,
      String customMessage)} instead
   */
  @Deprecated
  public static void addDeprecation(String key, String[] newKeys,
      String customMessage) {
    addDeprecations(new DeprecationDelta[] {
      new DeprecationDelta(key, newKeys, customMessage)
    });
  }

  /**
   * Adds the deprecated key to the global deprecation map.
   * It does not override any existing entries in the deprecation map.
   * This is to be used only by the developers in order to add deprecation of
   * keys, and attempts to call this method after loading resources once,
   * would lead to <tt>UnsupportedOperationException</tt>
   * 
   * If you have multiple deprecation entries to add, it is more efficient to
   * use #addDeprecations(DeprecationDelta[] deltas) instead.
   *
   * @param key to be deprecated
   * @param newKey key that take up the values of deprecated key
   * @param customMessage deprecation message
   */
  public static void addDeprecation(String key, String newKey,
	      String customMessage) {
	  addDeprecation(key, new String[] {newKey}, customMessage);
  }

  /**
   * Adds the deprecated key to the global deprecation map when no custom
   * message is provided.
   * It does not override any existing entries in the deprecation map.
   * This is to be used only by the developers in order to add deprecation of
   * keys, and attempts to call this method after loading resources once,
   * would lead to <tt>UnsupportedOperationException</tt>
   * 
   * If a key is deprecated in favor of multiple keys, they are all treated as 
   * aliases of each other, and setting any one of them resets all the others 
   * to the new value.
   * 
   * If you have multiple deprecation entries to add, it is more efficient to
   * use #addDeprecations(DeprecationDelta[] deltas) instead.
   *
   * @param key Key that is to be deprecated
   * @param newKeys list of keys that take up the values of deprecated key
   * @deprecated use {@link #addDeprecation(String key, String newKey)} instead
   */
  @Deprecated
  public static void addDeprecation(String key, String[] newKeys) {
    addDeprecation(key, newKeys, null);
  }
  
  /**
   * Adds the deprecated key to the global deprecation map when no custom
   * message is provided.
   * It does not override any existing entries in the deprecation map.
   * This is to be used only by the developers in order to add deprecation of
   * keys, and attempts to call this method after loading resources once,
   * would lead to <tt>UnsupportedOperationException</tt>
   * 
   * If you have multiple deprecation entries to add, it is more efficient to
   * use #addDeprecations(DeprecationDelta[] deltas) instead.
   *
   * @param key Key that is to be deprecated
   * @param newKey key that takes up the value of deprecated key
   */
  public static void addDeprecation(String key, String newKey) {
    addDeprecation(key, new String[] {newKey}, null);
  }
  
  /**
   * checks whether the given <code>key</code> is deprecated.
   * 
   * @param key the parameter which is to be checked for deprecation
   * @return <code>true</code> if the key is deprecated and 
   *         <code>false</code> otherwise.
   */
  public static boolean isDeprecated(String key) {
    return deprecationContext.get().getDeprecatedKeyMap().containsKey(key);
  }

  private static String getDeprecatedKey(String key) {
    return deprecationContext.get().getReverseDeprecatedKeyMap().get(key);
  }

  private static DeprecatedKeyInfo getDeprecatedKeyInfo(String key) {
    return deprecationContext.get().getDeprecatedKeyMap().get(key);
  }

  /**
   * Sets all deprecated properties that are not currently set but have a
   * corresponding new property that is set. Useful for iterating the
   * properties when all deprecated properties for currently set properties
   * need to be present.
   */
  public void setDeprecatedProperties() {
    DeprecationContext deprecations = deprecationContext.get();
    Properties props = getProps();
    Properties overlay = getOverlay();
    for (Map.Entry<String, DeprecatedKeyInfo> entry :
        deprecations.getDeprecatedKeyMap().entrySet()) {
      String depKey = entry.getKey();
      if (!overlay.contains(depKey)) {
        for (String newKey : entry.getValue().newKeys) {
          String val = overlay.getProperty(newKey);
          if (val != null) {
            props.setProperty(depKey, val);
            overlay.setProperty(depKey, val);
            break;
          }
        }
      }
    }
  }

  /**
   * Checks for the presence of the property <code>name</code> in the
   * deprecation map. Returns the first of the list of new keys if present
   * in the deprecation map or the <code>name</code> itself. If the property
   * is not presently set but the property map contains an entry for the
   * deprecated key, the value of the deprecated key is set as the value for
   * the provided property name.
   *
   * Also updates properties and overlays with deprecated keys, if the new
   * key does not already exist.
   *
   * @param deprecations deprecation context
   * @param name the property name
   * @return the first property in the list of properties mapping
   *         the <code>name</code> or the <code>name</code> itself.
   */
  private String[] handleDeprecation(DeprecationContext deprecations,
                                     String name) {
    if (null != name) {
      name = name.trim();
    }
    // Initialize the return value with requested name
    String[] names = new String[]{name};
    // Deprecated keys are logged once and an updated names are returned
    DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
    if (keyInfo != null) {
      if (!keyInfo.getAndSetAccessed()) {
        logDeprecation(keyInfo.getWarningMessage(name));
      }
      // Override return value for deprecated keys
      names = keyInfo.newKeys;
    }

    // Update properties with deprecated key if already loaded and new
    // deprecation has been added
    updatePropertiesWithDeprecatedKeys(deprecations, names);

    // If there are no overlay values we can return early
    Properties overlayProperties = getOverlay();
    if (overlayProperties.isEmpty()) {
      return names;
    }
    // Update properties and overlays with reverse lookup values
    for (String n : names) {
      String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(n);
      if (deprecatedKey != null && !overlayProperties.containsKey(n)) {
        String deprecatedValue = overlayProperties.getProperty(deprecatedKey);
        if (deprecatedValue != null) {
          getProps().setProperty(n, deprecatedValue);
          overlayProperties.setProperty(n, deprecatedValue);
        }
      }
    }
    return names;
  }

  private void updatePropertiesWithDeprecatedKeys(
      DeprecationContext deprecations, String[] newNames) {
    for (String newName : newNames) {
      String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(newName);
      if (deprecatedKey != null && !getProps().containsKey(newName)) {
        String deprecatedValue = getProps().getProperty(deprecatedKey);
        if (deprecatedValue != null) {
          getProps().setProperty(newName, deprecatedValue);
        }
      }
    }
  }
 
  private void handleDeprecation() {
    LOG.debug("Handling deprecation for all properties in config...");
    DeprecationContext deprecations = deprecationContext.get();
    Set<Object> keys = new HashSet<>();
    keys.addAll(getProps().keySet());
    for (Object item: keys) {
      LOG.debug("Handling deprecation for " + (String)item);
      handleDeprecation(deprecations, (String)item);
    }
  }
 
  static {
    // Add default resources
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");

    // print deprecation warning if hadoop-site.xml is found in classpath
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
      cL = Configuration.class.getClassLoader();
    }
    if (cL.getResource("hadoop-site.xml") != null) {
      LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
          "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
          + "mapred-site.xml and hdfs-site.xml to override properties of " +
          "core-default.xml, mapred-default.xml and hdfs-default.xml " +
          "respectively");
      addDefaultResource("hadoop-site.xml");
    }
  }

  private Properties properties;
  private Properties overlay;
  private ClassLoader classLoader;
  {
    classLoader = Thread.currentThread().getContextClassLoader();
    if (classLoader == null) {
      classLoader = Configuration.class.getClassLoader();
    }
  }
  
  /** A new configuration. */
  public Configuration() {
    this(true);
  }

  /** A new configuration where the behavior of reading from the default 
   * resources can be turned off.
   * 
   * If the parameter {@code loadDefaults} is false, the new instance
   * will not load resources from the default files. 
   * @param loadDefaults specifies whether to load from the default files
   */
  public Configuration(boolean loadDefaults) {
    this.loadDefaults = loadDefaults;

    synchronized(Configuration.class) {
      REGISTRY.put(this, null);
    }
  }
  
  /** 
   * A new configuration with the same settings cloned from another.
   * 
   * @param other the configuration from which to clone settings.
   */
  @SuppressWarnings("unchecked")
  public Configuration(Configuration other) {
    synchronized(other) {
      // Make sure we clone a finalized state
      // Resources like input streams can be processed only once
      other.getProps();
      this.resources = (ArrayList<Resource>) other.resources.clone();
      if (other.properties != null) {
        this.properties = (Properties)other.properties.clone();
      }

      if (other.overlay!=null) {
        this.overlay = (Properties)other.overlay.clone();
      }

      this.restrictSystemProps = other.restrictSystemProps;
      if (other.updatingResource != null) {
        this.updatingResource = new ConcurrentHashMap<String, String[]>(
           other.updatingResource);
      }
      this.finalParameters = Collections.newSetFromMap(
          new ConcurrentHashMap<String, Boolean>());
      this.finalParameters.addAll(other.finalParameters);
      this.propertyTagsMap.putAll(other.propertyTagsMap);
    }

    synchronized(Configuration.class) {
      REGISTRY.put(this, null);
    }
    this.classLoader = other.classLoader;
    this.loadDefaults = other.loadDefaults;
    setQuietMode(other.getQuietMode());
  }

  /**
   * Reload existing configuration instances.
   */
  public static synchronized void reloadExistingConfigurations() {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Reloading " + REGISTRY.keySet().size()
          + " existing configurations");
    }
    for (Configuration conf : REGISTRY.keySet()) {
      conf.reloadConfiguration();
    }
  }

  /**
   * Add a default resource. Resources are loaded in the order of the resources 
   * added.
   * @param name file name. File should be present in the classpath.
   */
  public static synchronized void addDefaultResource(String name) {
    if(!defaultResources.contains(name)) {
      defaultResources.add(name);
      for(Configuration conf : REGISTRY.keySet()) {
        if(conf.loadDefaults) {
          conf.reloadConfiguration();
        }
      }
    }
  }

  public static void setRestrictSystemPropertiesDefault(boolean val) {
    restrictSystemPropsDefault = val;
  }

  public void setRestrictSystemProperties(boolean val) {
    this.restrictSystemProps = val;
  }

  /**
   * Add a configuration resource. 
   * 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * 
   * @param name resource to be added, the classpath is examined for a file 
   *             with that name.
   */
  public void addResource(String name) {
    addResourceObject(new Resource(name));
  }

  public void addResource(String name, boolean restrictedParser) {
    addResourceObject(new Resource(name, restrictedParser));
  }

  /**
   * Add a configuration resource. 
   * 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * 
   * @param url url of the resource to be added, the local filesystem is 
   *            examined directly to find the resource, without referring to 
   *            the classpath.
   */
  public void addResource(URL url) {
    addResourceObject(new Resource(url));
  }

  public void addResource(URL url, boolean restrictedParser) {
    addResourceObject(new Resource(url, restrictedParser));
  }

  /**
   * Add a configuration resource. 
   * 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * 
   * @param file file-path of resource to be added, the local filesystem is
   *             examined directly to find the resource, without referring to 
   *             the classpath.
   */
  public void addResource(Path file) {
    addResourceObject(new Resource(file));
  }

  public void addResource(Path file, boolean restrictedParser) {
    addResourceObject(new Resource(file, restrictedParser));
  }

  /**
   * Add a configuration resource. 
   * 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * 
   * WARNING: The contents of the InputStream will be cached, by this method. 
   * So use this sparingly because it does increase the memory consumption.
   * 
   * @param in InputStream to deserialize the object from. In will be read from
   * when a get or set is called next.  After it is read the stream will be
   * closed. 
   */
  public void addResource(InputStream in) {
    addResourceObject(new Resource(in));
  }

  public void addResource(InputStream in, boolean restrictedParser) {
    addResourceObject(new Resource(in, restrictedParser));
  }

  /**
   * Add a configuration resource. 
   * 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * 
   * @param in InputStream to deserialize the object from.
   * @param name the name of the resource because InputStream.toString is not
   * very descriptive some times.  
   */
  public void addResource(InputStream in, String name) {
    addResourceObject(new Resource(in, name));
  }

  public void addResource(InputStream in, String name,
      boolean restrictedParser) {
    addResourceObject(new Resource(in, name, restrictedParser));
  }

  /**
   * Add a configuration resource.
   *
   * The properties of this resource will override properties of previously
   * added resources, unless they were marked <a href="#Final">final</a>.
   *
   * @param conf Configuration object from which to load properties
   */
  public void addResource(Configuration conf) {
    addResourceObject(new Resource(conf.getProps(), conf.restrictSystemProps));
  }

  
  
  /**
   * Reload configuration from previously added resources.
   *
   * This method will clear all the configuration read from the added 
   * resources, and final parameters. This will make the resources to 
   * be read again before accessing the values. Values that are added
   * via set methods will overlay values read from the resources.
   */
  public synchronized void reloadConfiguration() {
    properties = null;                            // trigger reload
    finalParameters.clear();                      // clear site-limits
  }

  private synchronized void addResourceObject(Resource resource) {
    resources.add(resource);                      // add to resources
    restrictSystemProps |= resource.isParserRestricted();
    loadProps(properties, resources.size() - 1, false);
  }

  private static final int MAX_SUBST = 20;

  private static final int SUB_START_IDX = 0;
  private static final int SUB_END_IDX = SUB_START_IDX + 1;

  /**
   * This is a manual implementation of the following regex
   * "\\$\\{[^\\}\\$\u0020]+\\}". It can be 15x more efficient than
   * a regex matcher as demonstrated by HADOOP-11506. This is noticeable with
   * Hadoop apps building on the assumption Configuration#get is an O(1)
   * hash table lookup, especially when the eval is a long string.
   *
   * @param eval a string that may contain variables requiring expansion.
   * @return a 2-element int array res such that
   * eval.substring(res[0], res[1]) is "var" for the left-most occurrence of
   * ${var} in eval. If no variable is found -1, -1 is returned.
   */
  private static int[] findSubVariable(String eval) {
    int[] result = {-1, -1};

    int matchStart;
    int leftBrace;

    // scanning for a brace first because it's less frequent than $
    // that can occur in nested class names
    //
    match_loop:
    for (matchStart = 1, leftBrace = eval.indexOf('{', matchStart);
         // minimum left brace position (follows '$')
         leftBrace > 0
         // right brace of a smallest valid expression "${c}"
         && leftBrace + "{c".length() < eval.length();
         leftBrace = eval.indexOf('{', matchStart)) {
      int matchedLen = 0;
      if (eval.charAt(leftBrace - 1) == '$') {
        int subStart = leftBrace + 1; // after '{'
        for (int i = subStart; i < eval.length(); i++) {
          switch (eval.charAt(i)) {
            case '}':
              if (matchedLen > 0) { // match
                result[SUB_START_IDX] = subStart;
                result[SUB_END_IDX] = subStart + matchedLen;
                break match_loop;
              }
              // fall through to skip 1 char
            case ' ':
            case '$':
              matchStart = i + 1;
              continue match_loop;
            default:
              matchedLen++;
          }
        }
        // scanned from "${"  to the end of eval, and no reset via ' ', '$':
        //    no match!
        break match_loop;
      } else {
        // not a start of a variable
        //
        matchStart = leftBrace + 1;
      }
    }
    return result;
  }

  /**
   * Provides a public wrapper over substituteVars in order to avoid compatibility issues.
   * See HADOOP-18021 for further details.
   *
   * @param expr the literal value of a config key
   * @return null if expr is null, otherwise the value resulting from expanding
   * expr using the algorithm above.
   * @throws IllegalArgumentException when more than
   * {@link Configuration#MAX_SUBST} replacements are required
   */
  public String substituteCommonVariables(String expr) {
    return substituteVars(expr);
  }

  /**
   * Attempts to repeatedly expand the value {@code expr} by replacing the
   * left-most substring of the form "${var}" in the following precedence order
   * <ol>
   *   <li>by the value of the environment variable "var" if defined</li>
   *   <li>by the value of the Java system property "var" if defined</li>
   *   <li>by the value of the configuration key "var" if defined</li>
   * </ol>
   *
   * If var is unbounded the current state of expansion "prefix${var}suffix" is
   * returned.
   * <p>
   * This function also detects self-referential substitutions, i.e.
   * <pre>
   *   {@code
   *   foo.bar = ${foo.bar}
   *   }
   * </pre>
   * If a cycle is detected then the original expr is returned. Loops
   * involving multiple substitutions are not detected.
   *
   * In order not to introduce breaking changes (as Oozie for example contains a method with the
   * same name and same signature) do not make this method public, use substituteCommonVariables
   * in this case.
   *
   * @param expr the literal value of a config key
   * @return null if expr is null, otherwise the value resulting from expanding
   * expr using the algorithm above.
   * @throws IllegalArgumentException when more than
   * {@link Configuration#MAX_SUBST} replacements are required
   */
  private String substituteVars(String expr) {
    if (expr == null) {
      return null;
    }
    String eval = expr;
    for(int s = 0; s < MAX_SUBST; s++) {
      final int[] varBounds = findSubVariable(eval);
      if (varBounds[SUB_START_IDX] == -1) {
        return eval;
      }
      final String var = eval.substring(varBounds[SUB_START_IDX],
          varBounds[SUB_END_IDX]);
      String val = null;
      try {
        // evaluate system properties or environment variables even when
        // the configuration is restricted -the restrictions are enforced
        // in the getenv/getProperty calls
        if (var.startsWith("env.") && 4 < var.length()) {
          String v = var.substring(4);
          int i = 0;
          for (; i < v.length(); i++) {
            char c = v.charAt(i);
            if (c == ':' && i < v.length() - 1 && v.charAt(i + 1) == '-') {
              val = getenv(v.substring(0, i));
              if (val == null || val.length() == 0) {
                val = v.substring(i + 2);
              }
              break;
            } else if (c == '-') {
              val = getenv(v.substring(0, i));
              if (val == null) {
                val = v.substring(i + 1);
              }
              break;
            }
          }
          if (i == v.length()) {
            val = getenv(v);
          }
        } else {
          val = getProperty(var);
        }
      } catch (SecurityException se) {
        LOG.warn("Unexpected SecurityException in Configuration", se);
      }
      if (val == null) {
        val = getRaw(var);
      }
      if (val == null) {
        return eval; // return literal ${var}: var is unbound
      }

      final int dollar = varBounds[SUB_START_IDX] - "${".length();
      final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length();
      final String refVar = eval.substring(dollar, afterRightBrace);

      // detect self-referential values
      if (val.contains(refVar)) {
        return expr; // return original expression if there is a loop
      }

      // substitute
      eval = eval.substring(0, dollar)
             + val
             + eval.substring(afterRightBrace);
    }
    throw new IllegalStateException("Variable substitution depth too large: " 
                                    + MAX_SUBST + " " + expr);
  }

  /**
   * Get the environment variable value if
   * {@link #restrictSystemProps} does not block this.
   * @param name environment variable name.
   * @return the value or null if either it is unset or access forbidden.
   */
  String getenv(String name) {
    if (!restrictSystemProps) {
      return System.getenv(name);
    } else {
      return null;
    }
  }

  /**
   * Get a system property value if
   * {@link #restrictSystemProps} does not block this.
   * @param key property key
   * @return the value or null if either it is unset or access forbidden.
   */
  String getProperty(String key) {
    if (!restrictSystemProps) {
      return System.getProperty(key);
    } else {
      return null;
    }
  }

  /**
   * Get the value of the <code>name</code> property, <code>null</code> if
   * no such property exists. If the key is deprecated, it returns the value of
   * the first key which replaces the deprecated key and is not null.
   * 
   * Values are processed for <a href="#VariableExpansion">variable expansion</a> 
   * before being returned.
   *
   * As a side effect get loads the properties from the sources if called for
   * the first time as a lazy init.
   * 
   * @param name the property name, will be trimmed before get value.
   * @return the value of the <code>name</code> or its replacing property, 
   *         or null if no such property exists.
   */
  public String get(String name) {
    String[] names = handleDeprecation(deprecationContext.get(), name);
    String result = null;
    for(String n : names) {
      result = substituteVars(getProps().getProperty(n));
    }
    return result;
  }

  /**
   * Set Configuration to allow keys without values during setup.  Intended
   * for use during testing.
   *
   * @param val If true, will allow Configuration to store keys without values
   */
  @VisibleForTesting
  public void setAllowNullValueProperties( boolean val ) {
    this.allowNullValueProperties = val;
  }

  public void setRestrictSystemProps(boolean val) {
    this.restrictSystemProps = val;
  }

  /**
   * Return existence of the <code>name</code> property, but only for
   * names which have no valid value, usually non-existent or commented
   * out in XML.
   *
   * @param name the property name
   * @return true if the property <code>name</code> exists without value
   */
  @VisibleForTesting
  public boolean onlyKeyExists(String name) {
    String[] names = handleDeprecation(deprecationContext.get(), name);
    for(String n : names) {
      if ( getProps().getProperty(n,DEFAULT_STRING_CHECK)
               .equals(DEFAULT_STRING_CHECK) ) {
        return true;
      }
    }
    return false;
  }

  /**
   * Get the value of the <code>name</code> property as a trimmed <code>String</code>, 
   * <code>null</code> if no such property exists. 
   * If the key is deprecated, it returns the value of
   * the first key which replaces the deprecated key and is not null
   * 
   * Values are processed for <a href="#VariableExpansion">variable expansion</a> 
   * before being returned. 
   * 
   * @param name the property name.
   * @return the value of the <code>name</code> or its replacing property, 
   *         or null if no such property exists.
   */
  public String getTrimmed(String name) {
    String value = get(name);
    
    if (null == value) {
      return null;
    } else {
      return value.trim();
    }
  }
  
  /**
   * Get the value of the <code>name</code> property as a trimmed <code>String</code>, 
   * <code>defaultValue</code> if no such property exists. 
   * See @{Configuration#getTrimmed} for more details.
   * 
   * @param name          the property name.
   * @param defaultValue  the property default value.
   * @return              the value of the <code>name</code> or defaultValue
   *                      if it is not set.
   */
  public String getTrimmed(String name, String defaultValue) {
    String ret = getTrimmed(name);
    return ret == null ? defaultValue : ret;
  }

  /**
   * Get the value of the <code>name</code> property, without doing
   * <a href="#VariableExpansion">variable expansion</a>.If the key is 
   * deprecated, it returns the value of the first key which replaces 
   * the deprecated key and is not null.
   * 
   * @param name the property name.
   * @return the value of the <code>name</code> property or 
   *         its replacing property and null if no such property exists.
   */
  public String getRaw(String name) {
    String[] names = handleDeprecation(deprecationContext.get(), name);
    String result = null;
    for(String n : names) {
      result = getProps().getProperty(n);
    }
    return result;
  }

  /**
   * Returns alternative names (non-deprecated keys or previously-set deprecated keys)
   * for a given non-deprecated key.
   * If the given key is deprecated, return null.
   *
   * @param name property name.
   * @return alternative names.
   */
  private String[] getAlternativeNames(String name) {
    String altNames[] = null;
    DeprecatedKeyInfo keyInfo = null;
    DeprecationContext cur = deprecationContext.get();
    String depKey = cur.getReverseDeprecatedKeyMap().get(name);
    if(depKey != null) {
      keyInfo = cur.getDeprecatedKeyMap().get(depKey);
      if(keyInfo.newKeys.length > 0) {
        if(getProps().containsKey(depKey)) {
          //if deprecated key is previously set explicitly
          List<String> list = new ArrayList<String>();
          list.addAll(Arrays.asList(keyInfo.newKeys));
          list.add(depKey);
          altNames = list.toArray(new String[list.size()]);
        }
        else {
          altNames = keyInfo.newKeys;
        }
      }
    }
    return altNames;
  }

  /** 
   * Set the <code>value</code> of the <code>name</code> property. If 
   * <code>name</code> is deprecated or there is a deprecated name associated to it,
   * it sets the value to both names. Name will be trimmed before put into
   * configuration.
   * 
   * @param name property name.
   * @param value property value.
   */
  public void set(String name, String value) {
    set(name, value, null);
  }
  
  /** 
   * Set the <code>value</code> of the <code>name</code> property. If 
   * <code>name</code> is deprecated, it also sets the <code>value</code> to
   * the keys that replace the deprecated key. Name will be trimmed before put
   * into configuration.
   *
   * @param name property name.
   * @param value property value.
   * @param source the place that this configuration value came from 
   * (For debugging).
   * @throws IllegalArgumentException when the value or name is null.
   */
  public void set(String name, String value, String source) {
    Preconditions.checkArgument(
        name != null,
        "Property name must not be null");
    Preconditions.checkArgument(
        value != null,
        "The value of property %s must not be null", name);
    name = name.trim();
    DeprecationContext deprecations = deprecationContext.get();
    if (deprecations.getDeprecatedKeyMap().isEmpty()) {
      getProps();
    }
    getOverlay().setProperty(name, value);
    getProps().setProperty(name, value);
    String newSource = (source == null ? "programmatically" : source);

    if (!isDeprecated(name)) {
      putIntoUpdatingResource(name, new String[] {newSource});
      String[] altNames = getAlternativeNames(name);
      if(altNames != null) {
        for(String n: altNames) {
          if(!n.equals(name)) {
            getOverlay().setProperty(n, value);
            getProps().setProperty(n, value);
            putIntoUpdatingResource(n, new String[] {newSource});
          }
        }
      }
    }
    else {
      String[] names = handleDeprecation(deprecationContext.get(), name);
      String altSource = "because " + name + " is deprecated";
      for(String n : names) {
        getOverlay().setProperty(n, value);
        getProps().setProperty(n, value);
        putIntoUpdatingResource(n, new String[] {altSource});
      }
    }
  }

  @VisibleForTesting
  void logDeprecation(String message) {
    LOG_DEPRECATION.info(message);
  }

  void logDeprecationOnce(String name, String source) {
    DeprecatedKeyInfo keyInfo = getDeprecatedKeyInfo(name);
    if (keyInfo != null && !keyInfo.getAndSetAccessed()) {
      LOG_DEPRECATION.info(keyInfo.getWarningMessage(name, source));
    }
  }

  /**
   * Unset a previously set property.
   * @param name the property name
   */
  public synchronized void unset(String name) {
    String[] names = null;
    if (!isDeprecated(name)) {
      names = getAlternativeNames(name);
      if(names == null) {
    	  names = new String[]{name};
      }
    }
    else {
      names = handleDeprecation(deprecationContext.get(), name);
    }

    for(String n: names) {
      getOverlay().remove(n);
      getProps().remove(n);
    }
  }

  /**
   * Sets a property if it is currently unset.
   * @param name the property name
   * @param value the new value
   */
  public synchronized void setIfUnset(String name, String value) {
    if (get(name) == null) {
      set(name, value);
    }
  }
  
  private synchronized Properties getOverlay() {
    if (overlay==null){
      overlay=new Properties();
    }
    return overlay;
  }

  /** 
   * Get the value of the <code>name</code>. If the key is deprecated,
   * it returns the value of the first key which replaces the deprecated key
   * and is not null.
   * If no such property exists,
   * then <code>defaultValue</code> is returned.
   * 
   * @param name property name, will be trimmed before get value.
   * @param defaultValue default value.
   * @return property value, or <code>defaultValue</code> if the property 
   *         doesn't exist.                    
   */
  public String get(String name, String defaultValue) {
    String[] names = handleDeprecation(deprecationContext.get(), name);
    String result = null;
    for(String n : names) {
      result = substituteVars(getProps().getProperty(n, defaultValue));
    }
    return result;
  }

  /** 
   * Get the value of the <code>name</code> property as an <code>int</code>.
   *   
   * If no such property exists, the provided default value is returned,
   * or if the specified value is not a valid <code>int</code>,
   * then an error is thrown.
   * 
   * @param name property name.
   * @param defaultValue default value.
   * @throws NumberFormatException when the value is invalid
   * @return property value as an <code>int</code>, 
   *         or <code>defaultValue</code>. 
   */
  public int getInt(String name, int defaultValue) {
    String valueString = getTrimmed(name);
    if (valueString == null)
      return defaultValue;
    String hexString = getHexDigits(valueString);
    if (hexString != null) {
      return Integer.parseInt(hexString, 16);
    }
    return Integer.parseInt(valueString);
  }
  
  /**
   * Get the value of the <code>name</code> property as a set of comma-delimited
   * <code>int</code> values.
   * 
   * If no such property exists, an empty array is returned.
   * 
   * @param name property name
   * @return property value interpreted as an array of comma-delimited
   *         <code>int</code> values
   */
  public int[] getInts(String name) {
    String[] strings = getTrimmedStrings(name);
    int[] ints = new int[strings.length];
    for (int i = 0; i < strings.length; i++) {
      ints[i] = Integer.parseInt(strings[i]);
    }
    return ints;
  }

  /** 
   * Set the value of the <code>name</code> property to an <code>int</code>.
   * 
   * @param name property name.
   * @param value <code>int</code> value of the property.
   */
  public void setInt(String name, int value) {
    set(name, Integer.toString(value));
  }


  /** 
   * Get the value of the <code>name</code> property as a <code>long</code>.  
   * If no such property exists, the provided default value is returned,
   * or if the specified value is not a valid <code>long</code>,
   * then an error is thrown.
   * 
   * @param name property name.
   * @param defaultValue default value.
   * @throws NumberFormatException when the value is invalid
   * @return property value as a <code>long</code>, 
   *         or <code>defaultValue</code>. 
   */
  public long getLong(String name, long defaultValue) {
    String valueString = getTrimmed(name);
    if (valueString == null)
      return defaultValue;
    String hexString = getHexDigits(valueString);
    if (hexString != null) {
      return Long.parseLong(hexString, 16);
    }
    return Long.parseLong(valueString);
  }

  /**
   * Get the value of the <code>name</code> property as a <code>long</code> or
   * human readable format. If no such property exists, the provided default
   * value is returned, or if the specified value is not a valid
   * <code>long</code> or human readable format, then an error is thrown. You
   * can use the following suffix (case insensitive): k(kilo), m(mega), g(giga),
   * t(tera), p(peta), e(exa)
   *
   * @param name property name.
   * @param defaultValue default value.
   * @throws NumberFormatException when the value is invalid
   * @return property value as a <code>long</code>,
   *         or <code>defaultValue</code>.
   */
  public long getLongBytes(String name, long defaultValue) {
    String valueString = getTrimmed(name);
    if (valueString == null)
      return defaultValue;
    return StringUtils.TraditionalBinaryPrefix.string2long(valueString);
  }

  private String getHexDigits(String value) {
    boolean negative = false;
    String str = value;
    String hexString = null;
    if (value.startsWith("-")) {
      negative = true;
      str = value.substring(1);
    }
    if (str.startsWith("0x") || str.startsWith("0X")) {
      hexString = str.substring(2);
      if (negative) {
        hexString = "-" + hexString;
      }
      return hexString;
    }
    return null;
  }
  
  /** 
   * Set the value of the <code>name</code> property to a <code>long</code>.
   * 
   * @param name property name.
   * @param value <code>long</code> value of the property.
   */
  public void setLong(String name, long value) {
    set(name, Long.toString(value));
  }

  /** 
   * Get the value of the <code>name</code> property as a <code>float</code>.  
   * If no such property exists, the provided default value is returned,
   * or if the specified value is not a valid <code>float</code>,
   * then an error is thrown.
   *
   * @param name property name.
   * @param defaultValue default value.
   * @throws NumberFormatException when the value is invalid
   * @return property value as a <code>float</code>, 
   *         or <code>defaultValue</code>. 
   */
  public float getFloat(String name, float defaultValue) {
    String valueString = getTrimmed(name);
    if (valueString == null)
      return defaultValue;
    return Float.parseFloat(valueString);
  }

  /**
   * Set the value of the <code>name</code> property to a <code>float</code>.
   * 
   * @param name property name.
   * @param value property value.
   */
  public void setFloat(String name, float value) {
    set(name,Float.toString(value));
  }

  /** 
   * Get the value of the <code>name</code> property as a <code>double</code>.  
   * If no such property exists, the provided default value is returned,
   * or if the specified value is not a valid <code>double</code>,
   * then an error is thrown.
   *
   * @param name property name.
   * @param defaultValue default value.
   * @throws NumberFormatException when the value is invalid
   * @return property value as a <code>double</code>, 
   *         or <code>defaultValue</code>. 
   */
  public double getDouble(String name, double defaultValue) {
    String valueString = getTrimmed(name);
    if (valueString == null)
      return defaultValue;
    return Double.parseDouble(valueString);
  }

  /**
   * Set the value of the <code>name</code> property to a <code>double</code>.
   * 
   * @param name property name.
   * @param value property value.
   */
  public void setDouble(String name, double value) {
    set(name,Double.toString(value));
  }
 
  /** 
   * Get the value of the <code>name</code> property as a <code>boolean</code>.  
   * If no such property is specified, or if the specified value is not a valid
   * <code>boolean</code>, then <code>defaultValue</code> is returned.
   * 
   * @param name property name.
   * @param defaultValue default value.
   * @return property value as a <code>boolean</code>, 
   *         or <code>defaultValue</code>. 
   */
  public boolean getBoolean(String name, boolean defaultValue) {
    String valueString = getTrimmed(name);
    if (null == valueString || valueString.isEmpty()) {
      return defaultValue;
    }

    if (StringUtils.equalsIgnoreCase("true", valueString))
      return true;
    else if (StringUtils.equalsIgnoreCase("false", valueString))
      return false;
    else {
      LOG.warn("Invalid value for boolean: " + valueString +
               ", choose default value: " + defaultValue + " for " + name);
      return defaultValue;
    }
  }

  /** 
   * Set the value of the <code>name</code> property to a <code>boolean</code>.
   * 
   * @param name property name.
   * @param value <code>boolean</code> value of the property.
   */
  public void setBoolean(String name, boolean value) {
    set(name, Boolean.toString(value));
  }

  /**
   * Set the given property, if it is currently unset.
   * @param name property name
   * @param value new value
   */
  public void setBooleanIfUnset(String name, boolean value) {
    setIfUnset(name, Boolean.toString(value));
  }

  /**
   * Set the value of the <code>name</code> property to the given type. This
   * is equivalent to <code>set(&lt;name&gt;, value.toString())</code>.
   * @param name property name
   * @param value new value
   * @param <T> enumeration type
   */
  public <T extends Enum<T>> void setEnum(String name, T value) {
    set(name, value.toString());
  }

  /**
   * Return value matching this enumerated type.
   * Note that the returned value is trimmed by this method.
   * @param name Property name
   * @param defaultValue Value returned if no mapping exists
   * @param <T> enumeration type
   * @throws IllegalArgumentException If mapping is illegal for the type
   * provided
   * @return enumeration type
   */
  public <T extends Enum<T>> T getEnum(String name, T defaultValue) {
    final String val = getTrimmed(name);
    return null == val
      ? defaultValue
      : Enum.valueOf(defaultValue.getDeclaringClass(), val);
  }

  enum ParsedTimeDuration {
    NS {
      TimeUnit unit() { return TimeUnit.NANOSECONDS; }
      String suffix() { return "ns"; }
    },
    US {
      TimeUnit unit() { return TimeUnit.MICROSECONDS; }
      String suffix() { return "us"; }
    },
    MS {
      TimeUnit unit() { return TimeUnit.MILLISECONDS; }
      String suffix() { return "ms"; }
    },
    S {
      TimeUnit unit() { return TimeUnit.SECONDS; }
      String suffix() { return "s"; }
    },
    M {
      TimeUnit unit() { return TimeUnit.MINUTES; }
      String suffix() { return "m"; }
    },
    H {
      TimeUnit unit() { return TimeUnit.HOURS; }
      String suffix() { return "h"; }
    },
    D {
      TimeUnit unit() { return TimeUnit.DAYS; }
      String suffix() { return "d"; }
    };
    abstract TimeUnit unit();
    abstract String suffix();
    static ParsedTimeDuration unitFor(String s) {
      for (ParsedTimeDuration ptd : values()) {
        // iteration order is in decl order, so SECONDS matched last
        if (s.endsWith(ptd.suffix())) {
          return ptd;
        }
      }
      return null;
    }
    static ParsedTimeDuration unitFor(TimeUnit unit) {
      for (ParsedTimeDuration ptd : values()) {
        if (ptd.unit() == unit) {
          return ptd;
        }
      }
      return null;
    }
  }

  /**
   * Set the value of <code>name</code> to the given time duration. This
   * is equivalent to <code>set(&lt;name&gt;, value + &lt;time suffix&gt;)</code>.
   * @param name Property name
   * @param value Time duration
   * @param unit Unit of time
   */
  public void setTimeDuration(String name, long value, TimeUnit unit) {
    set(name, value + ParsedTimeDuration.unitFor(unit).suffix());
  }

  /**
   * Return time duration in the given time unit. Valid units are encoded in
   * properties as suffixes: nanoseconds (ns), microseconds (us), milliseconds
   * (ms), seconds (s), minutes (m), hours (h), and days (d).
   *
   * @param name Property name
   * @param defaultValue Value returned if no mapping exists.
   * @param unit Unit to convert the stored property, if it exists.
   * @throws NumberFormatException If the property stripped of its unit is not
   *         a number
   * @return time duration in given time unit
   */
  public long getTimeDuration(String name, long defaultValue, TimeUnit unit) {
    return getTimeDuration(name, defaultValue, unit, unit);
  }

  public long getTimeDuration(String name, String defaultValue, TimeUnit unit) {
    return getTimeDuration(name, defaultValue, unit, unit);
  }

  /**
   * Return time duration in the given time unit. Valid units are encoded in
   * properties as suffixes: nanoseconds (ns), microseconds (us), milliseconds
   * (ms), seconds (s), minutes (m), hours (h), and days (d). If no unit is
   * provided, the default unit is applied.
   *
   * @param name Property name
   * @param defaultValue Value returned if no mapping exists.
   * @param defaultUnit Default time unit if no valid suffix is provided.
   * @param returnUnit The unit used for the returned value.
   * @throws NumberFormatException If the property stripped of its unit is not
   *         a number
   * @return time duration in given time unit
   */
  public long getTimeDuration(String name, long defaultValue,
      TimeUnit defaultUnit, TimeUnit returnUnit) {
    String vStr = get(name);
    if (null == vStr) {
      return returnUnit.convert(defaultValue, defaultUnit);
    } else {
      return getTimeDurationHelper(name, vStr, defaultUnit, returnUnit);
    }
  }

  public long getTimeDuration(String name, String defaultValue,
      TimeUnit defaultUnit, TimeUnit returnUnit) {
    String vStr = get(name);
    if (null == vStr) {
      return getTimeDurationHelper(name, defaultValue, defaultUnit, returnUnit);
    } else {
      return getTimeDurationHelper(name, vStr, defaultUnit, returnUnit);
    }
  }

  /**
   * Return time duration in the given time unit. Valid units are encoded in
   * properties as suffixes: nanoseconds (ns), microseconds (us), milliseconds
   * (ms), seconds (s), minutes (m), hours (h), and days (d).
   *
   * @param name Property name
   * @param vStr The string value with time unit suffix to be converted.
   * @param unit Unit to convert the stored property, if it exists.
   * @return time duration in given time unit.
   */
  public long getTimeDurationHelper(String name, String vStr, TimeUnit unit) {
    return getTimeDurationHelper(name, vStr, unit, unit);
  }

  /**
   * Return time duration in the given time unit. Valid units are encoded in
   * properties as suffixes: nanoseconds (ns), microseconds (us), milliseconds
   * (ms), seconds (s), minutes (m), hours (h), and days (d).
   *
   * @param name Property name
   * @param vStr The string value with time unit suffix to be converted.
   * @param defaultUnit Unit to convert the stored property, if it exists.
   * @param returnUnit Unit for the returned value.
   * @return time duration in given time unit.
   */
  private long getTimeDurationHelper(String name, String vStr,
      TimeUnit defaultUnit, TimeUnit returnUnit) {
    vStr = vStr.trim();
    vStr = StringUtils.toLowerCase(vStr);
    ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
    if (null == vUnit) {
      vUnit = ParsedTimeDuration.unitFor(defaultUnit);
    } else {
      vStr = vStr.substring(0, vStr.lastIndexOf(vUnit.suffix()));
    }

    long raw = Long.parseLong(vStr);
    long converted = returnUnit.convert(raw, vUnit.unit());
    if (vUnit.unit().convert(converted, returnUnit) < raw) {
      logDeprecation("Possible loss of precision converting " + vStr
          + vUnit.suffix() + " to " + returnUnit + " for " + name);
    }
    return converted;
  }

  public long[] getTimeDurations(String name, TimeUnit unit) {
    String[] strings = getTrimmedStrings(name);
    long[] durations = new long[strings.length];
    for (int i = 0; i < strings.length; i++) {
      durations[i] = getTimeDurationHelper(name, strings[i], unit);
    }
    return durations;
  }
  /**
   * Gets the Storage Size from the config, or returns the defaultValue. The
   * unit of return value is specified in target unit.
   *
   * @param name - Key Name
   * @param defaultValue - Default Value -- e.g. 100MB
   * @param targetUnit - The units that we want result to be in.
   * @return double -- formatted in target Units
   */
  public double getStorageSize(String name, String defaultValue,
      StorageUnit targetUnit) {
    Preconditions.checkState(isNotBlank(name), "Key cannot be blank.");
    String vString = get(name);
    if (isBlank(vString)) {
      vString = defaultValue;
    }

    // Please note: There is a bit of subtlety here. If the user specifies
    // the default unit as "1GB", but the requested unit is MB, we will return
    // the format in MB even thought the default string is specified in GB.

    // Converts a string like "1GB" to to unit specified in targetUnit.

    StorageSize measure = StorageSize.parse(vString);
    return convertStorageUnit(measure.getValue(), measure.getUnit(),
        targetUnit);
  }

  /**
   * Gets storage size from a config file.
   *
   * @param name - Key to read.
   * @param defaultValue - The default value to return in case the key is
   * not present.
   * @param targetUnit - The Storage unit that should be used
   * for the return value.
   * @return - double value in the Storage Unit specified.
   */
  public double getStorageSize(String name, double defaultValue,
      StorageUnit targetUnit) {
    Preconditions.checkNotNull(targetUnit, "Conversion unit cannot be null.");
    Preconditions.checkState(isNotBlank(name), "Name cannot be blank.");
    String vString = get(name);
    if (isBlank(vString)) {
      return targetUnit.getDefault(defaultValue);
    }

    StorageSize measure = StorageSize.parse(vString);
    return convertStorageUnit(measure.getValue(), measure.getUnit(),
        targetUnit);

  }

  /**
   * Sets Storage Size for the specified key.
   *
   * @param name - Key to set.
   * @param value - The numeric value to set.
   * @param unit - Storage Unit to be used.
   */
  public void setStorageSize(String name, double value, StorageUnit unit) {
    set(name, value + unit.getShortName());
  }

  /**
   * convert the value from one storage unit to another.
   *
   * @param value - value
   * @param sourceUnit - Source unit to convert from
   * @param targetUnit - target unit.
   * @return double.
   */
  private double convertStorageUnit(double value, StorageUnit sourceUnit,
      StorageUnit targetUnit) {
    double byteValue = sourceUnit.toBytes(value);
    return targetUnit.fromBytes(byteValue);
  }

  /**
   * Get the value of the <code>name</code> property as a <code>Pattern</code>.
   * If no such property is specified, or if the specified value is not a valid
   * <code>Pattern</code>, then <code>DefaultValue</code> is returned.
   * Note that the returned value is NOT trimmed by this method.
   *
   * @param name property name
   * @param defaultValue default value
   * @return property value as a compiled Pattern, or defaultValue
   */
  public Pattern getPattern(String name, Pattern defaultValue) {
    String valString = get(name);
    if (null == valString || valString.isEmpty()) {
      return defaultValue;
    }
    try {
      return Pattern.compile(valString);
    } catch (PatternSyntaxException pse) {
      LOG.warn("Regular expression '" + valString + "' for property '" +
               name + "' not valid. Using default", pse);
      return defaultValue;
    }
  }

  /**
   * Set the given property to <code>Pattern</code>.
   * If the pattern is passed as null, sets the empty pattern which results in
   * further calls to getPattern(...) returning the default value.
   *
   * @param name property name
   * @param pattern new value
   */
  public void setPattern(String name, Pattern pattern) {
    assert pattern != null : "Pattern cannot be null";
    set(name, pattern.pattern());
  }

  /**
   * Gets information about why a property was set.  Typically this is the 
   * path to the resource objects (file, URL, etc.) the property came from, but
   * it can also indicate that it was set programmatically, or because of the
   * command line.
   *
   * @param name - The property name to get the source of.
   * @return null - If the property or its source wasn't found. Otherwise, 
   * returns a list of the sources of the resource.  The older sources are
   * the first ones in the list.  So for example if a configuration is set from
   * the command line, and then written out to a file that is read back in the
   * first entry would indicate that it was set from the command line, while
   * the second one would indicate the file that the new configuration was read
   * in from.
   */
  @InterfaceStability.Unstable
  public synchronized String[] getPropertySources(String name) {
    if (properties == null) {
      // If properties is null, it means a resource was newly added
      // but the props were cleared so as to load it upon future
      // requests. So lets force a load by asking a properties list.
      getProps();
    }
    // Return a null right away if our properties still
    // haven't loaded or the resource mapping isn't defined
    if (properties == null || updatingResource == null) {
      return null;
    } else {
      String[] source = updatingResource.get(name);
      if(source == null) {
        return null;
      } else {
        return Arrays.copyOf(source, source.length);
      }
    }
  }

  /**
   * A class that represents a set of positive integer ranges. It parses 
   * strings of the form: "2-3,5,7-" where ranges are separated by comma and 
   * the lower/upper bounds are separated by dash. Either the lower or upper 
   * bound may be omitted meaning all values up to or over. So the string 
   * above means 2, 3, 5, and 7, 8, 9, ...
   */
  public static class IntegerRanges implements Iterable<Integer>{
    private static class Range {
      int start;
      int end;
    }
    
    private static class RangeNumberIterator implements Iterator<Integer> {
      Iterator<Range> internal;
      int at;
      int end;

      public RangeNumberIterator(List<Range> ranges) {
        if (ranges != null) {
          internal = ranges.iterator();
        }
        at = -1;
        end = -2;
      }
      
      @Override
      public boolean hasNext() {
        if (at <= end) {
          return true;
        } else if (internal != null){
          return internal.hasNext();
        }
        return false;
      }

      @Override
      public Integer next() {
        if (at <= end) {
          at++;
          return at - 1;
        } else if (internal != null){
          Range found = internal.next();
          if (found != null) {
            at = found.start;
            end = found.end;
            at++;
            return at - 1;
          }
        }
        return null;
      }

      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }
    };

    List<Range> ranges = new ArrayList<Range>();
    
    public IntegerRanges() {
    }
    
    public IntegerRanges(String newValue) {
      StringTokenizer itr = new StringTokenizer(newValue, ",");
      while (itr.hasMoreTokens()) {
        String rng = itr.nextToken().trim();
        String[] parts = rng.split("-", 3);
        if (parts.length < 1 || parts.length > 2) {
          throw new IllegalArgumentException("integer range badly formed: " + 
                                             rng);
        }
        Range r = new Range();
        r.start = convertToInt(parts[0], 0);
        if (parts.length == 2) {
          r.end = convertToInt(parts[1], Integer.MAX_VALUE);
        } else {
          r.end = r.start;
        }
        if (r.start > r.end) {
          throw new IllegalArgumentException("IntegerRange from " + r.start + 
                                             " to " + r.end + " is invalid");
        }
        ranges.add(r);
      }
    }

    /**
     * Convert a string to an int treating empty strings as the default value.
     * @param value the string value
     * @param defaultValue the value for if the string is empty
     * @return the desired integer
     */
    private static int convertToInt(String value, int defaultValue) {
      String trim = value.trim();
      if (trim.length() == 0) {
        return defaultValue;
      }
      return Integer.parseInt(trim);
    }

    /**
     * Is the given value in the set of ranges.
     * @param value the value to check
     * @return is the value in the ranges?
     */
    public boolean isIncluded(int value) {
      for(Range r: ranges) {
        if (r.start <= value && value <= r.end) {
          return true;
        }
      }
      return false;
    }
    
    /**
     * @return true if there are no values in this range, else false.
     */
    public boolean isEmpty() {
      return ranges == null || ranges.isEmpty();
    }
    
    @Override
    public String toString() {
      StringBuilder result = new StringBuilder();
      boolean first = true;
      for(Range r: ranges) {
        if (first) {
          first = false;
        } else {
          result.append(',');
        }
        result.append(r.start);
        result.append('-');
        result.append(r.end);
      }
      return result.toString();
    }

    /**
     * Get range start for the first integer range.
     * @return range start.
     */
    public int getRangeStart() {
      if (ranges == null || ranges.isEmpty()) {
        return -1;
      }
      Range r = ranges.get(0);
      return r.start;
    }

    @Override
    public Iterator<Integer> iterator() {
      return new RangeNumberIterator(ranges);
    }
    
  }

  /**
   * Parse the given attribute as a set of integer ranges.
   * @param name the attribute name
   * @param defaultValue the default value if it is not set
   * @return a new set of ranges from the configured value
   */
  public IntegerRanges getRange(String name, String defaultValue) {
    return new IntegerRanges(get(name, defaultValue));
  }

  /** 
   * Get the comma delimited values of the <code>name</code> property as 
   * a collection of <code>String</code>s.  
   * If no such property is specified then empty collection is returned.
   * <p>
   * This is an optimized version of {@link #getStrings(String)}
   * 
   * @param name property name.
   * @return property value as a collection of <code>String</code>s. 
   */
  public Collection<String> getStringCollection(String name) {
    String valueString = get(name);
    return StringUtils.getStringCollection(valueString);
  }

  /** 
   * Get the comma delimited values of the <code>name</code> property as 
   * an array of <code>String</code>s.  
   * If no such property is specified then <code>null</code> is returned.
   * 
   * @param name property name.
   * @return property value as an array of <code>String</code>s, 
   *         or <code>null</code>. 
   */
  public String[] getStrings(String name) {
    String valueString = get(name);
    return StringUtils.getStrings(valueString);
  }

  /** 
   * Get the comma delimited values of the <code>name</code> property as 
   * an array of <code>String</code>s.  
   * If no such property is specified then default value is returned.
   * 
   * @param name property name.
   * @param defaultValue The default value
   * @return property value as an array of <code>String</code>s, 
   *         or default value. 
   */
  public String[] getStrings(String name, String... defaultValue) {
    String valueString = get(name);
    if (valueString == null) {
      return defaultValue;
    } else {
      return StringUtils.getStrings(valueString);
    }
  }
  
  /** 
   * Get the comma delimited values of the <code>name</code> property as 
   * a collection of <code>String</code>s, trimmed of the leading and trailing whitespace.  
   * If no such property is specified then empty <code>Collection</code> is returned.
   *
   * @param name property name.
   * @return property value as a collection of <code>String</code>s, or empty <code>Collection</code> 
   */
  public Collection<String> getTrimmedStringCollection(String name) {
    String valueString = get(name);
    if (null == valueString) {
      Collection<String> empty = new ArrayList<String>();
      return empty;
    }
    return StringUtils.getTrimmedStringCollection(valueString);
  }
  
  /** 
   * Get the comma delimited values of the <code>name</code> property as 
   * an array of <code>String</code>s, trimmed of the leading and trailing whitespace.
   * If no such property is specified then an empty array is returned.
   * 
   * @param name property name.
   * @return property value as an array of trimmed <code>String</code>s, 
   *         or empty array. 
   */
  public String[] getTrimmedStrings(String name) {
    String valueString = get(name);
    return StringUtils.getTrimmedStrings(valueString);
  }

  /** 
   * Get the comma delimited values of the <code>name</code> property as 
   * an array of <code>String</code>s, trimmed of the leading and trailing whitespace.
   * If no such property is specified then default value is returned.
   * 
   * @param name property name.
   * @param defaultValue The default value
   * @return property value as an array of trimmed <code>String</code>s, 
   *         or default value. 
   */
  public String[] getTrimmedStrings(String name, String... defaultValue) {
    String valueString = get(name);
    if (null == valueString) {
      return defaultValue;
    } else {
      return StringUtils.getTrimmedStrings(valueString);
    }
  }

  /** 
   * Set the array of string values for the <code>name</code> property as 
   * as comma delimited values.  
   * 
   * @param name property name.
   * @param values The values
   */
  public void setStrings(String name, String... values) {
    set(name, StringUtils.arrayToString(values));
  }

  /**
   * Get the value for a known password configuration element.
   * In order to enable the elimination of clear text passwords in config,
   * this method attempts to resolve the property name as an alias through
   * the CredentialProvider API and conditionally fallsback to config.
   * @param name property name
   * @return password
   * @throws IOException when error in fetching password
   */
  public char[] getPassword(String name) throws IOException {
    char[] pass = null;

    pass = getPasswordFromCredentialProviders(name);

    if (pass == null) {
      pass = getPasswordFromConfig(name);
    }

    return pass;
  }

  /**
   * Get the credential entry by name from a credential provider.
   *
   * Handle key deprecation.
   *
   * @param provider a credential provider
   * @param name alias of the credential
   * @return the credential entry or null if not found
   */
  private CredentialEntry getCredentialEntry(CredentialProvider provider,
                                             String name) throws IOException {
    CredentialEntry entry = provider.getCredentialEntry(name);
    if (entry != null) {
      return entry;
    }

    // The old name is stored in the credential provider.
    String oldName = getDeprecatedKey(name);
    if (oldName != null) {
      entry = provider.getCredentialEntry(oldName);
      if (entry != null) {
        logDeprecationOnce(oldName, provider.toString());
        return entry;
      }
    }

    // The name is deprecated.
    DeprecatedKeyInfo keyInfo = getDeprecatedKeyInfo(name);
    if (keyInfo != null && keyInfo.newKeys != null) {
      for (String newName : keyInfo.newKeys) {
        entry = provider.getCredentialEntry(newName);
        if (entry != null) {
          logDeprecationOnce(name, null);
          return entry;
        }
      }
    }

    return null;
  }

  /**
   * Try and resolve the provided element name as a credential provider
   * alias.
   * @param name alias of the provisioned credential
   * @return password or null if not found
   * @throws IOException when error in fetching password
   */
  public char[] getPasswordFromCredentialProviders(String name)
      throws IOException {
    char[] pass = null;
    try {
      List<CredentialProvider> providers =
          CredentialProviderFactory.getProviders(this);

      if (providers != null) {
        for (CredentialProvider provider : providers) {
          try {
            CredentialEntry entry = getCredentialEntry(provider, name);
            if (entry != null) {
              pass = entry.getCredential();
              break;
            }
          }
          catch (IOException ioe) {
            throw new IOException("Can't get key " + name + " from key provider" +
            		"of type: " + provider.getClass().getName() + ".", ioe);
          }
        }
      }
    }
    catch (IOException ioe) {
      throw new IOException("Configuration problem with provider path.", ioe);
    }

    return pass;
  }

  /**
   * Fallback to clear text passwords in configuration.
   * @param name the property name.
   * @return clear text password or null
   */
  protected char[] getPasswordFromConfig(String name) {
    char[] pass = null;
    if (getBoolean(CredentialProvider.CLEAR_TEXT_FALLBACK,
        CommonConfigurationKeysPublic.
            HADOOP_SECURITY_CREDENTIAL_CLEAR_TEXT_FALLBACK_DEFAULT)) {
      String passStr = get(name);
      if (passStr != null) {
        pass = passStr.toCharArray();
      }
    }
    return pass;
  }

  /**
   * Get the socket address for <code>hostProperty</code> as a
   * <code>InetSocketAddress</code>. If <code>hostProperty</code> is
   * <code>null</code>, <code>addressProperty</code> will be used. This
   * is useful for cases where we want to differentiate between host
   * bind address and address clients should use to establish connection.
   *
   * @param hostProperty bind host property name.
   * @param addressProperty address property name.
   * @param defaultAddressValue the default value
   * @param defaultPort the default port
   * @return InetSocketAddress
   */
  public InetSocketAddress getSocketAddr(
      String hostProperty,
      String addressProperty,
      String defaultAddressValue,
      int defaultPort) {

    InetSocketAddress bindAddr = getSocketAddr(
      addressProperty, defaultAddressValue, defaultPort);

    final String host = get(hostProperty);

    if (host == null || host.isEmpty()) {
      return bindAddr;
    }

    return NetUtils.createSocketAddr(
        host, bindAddr.getPort(), hostProperty);
  }

  /**
   * Get the socket address for <code>name</code> property as a
   * <code>InetSocketAddress</code>.
   * @param name property name.
   * @param defaultAddress the default value
   * @param defaultPort the default port
   * @return InetSocketAddress
   */
  public InetSocketAddress getSocketAddr(
      String name, String defaultAddress, int defaultPort) {
    final String address = getTrimmed(name, defaultAddress);
    return NetUtils.createSocketAddr(address, defaultPort, name);
  }

  /**
   * Set the socket address for the <code>name</code> property as
   * a <code>host:port</code>.
   * @param name property name.
   * @param addr inetSocketAddress addr.
   */
  public void setSocketAddr(String name, InetSocketAddress addr) {
    set(name, NetUtils.getHostPortString(addr));
  }

  /**
   * Set the socket address a client can use to connect for the
   * <code>name</code> property as a <code>host:port</code>.  The wildcard
   * address is replaced with the local host's address. If the host and address
   * properties are configured the host component of the address will be combined
   * with the port component of the addr to generate the address.  This is to allow
   * optional control over which host name is used in multi-home bind-host
   * cases where a host can have multiple names
   * @param hostProperty the bind-host configuration name
   * @param addressProperty the service address configuration name
   * @param defaultAddressValue the service default address configuration value
   * @param addr InetSocketAddress of the service listener
   * @return InetSocketAddress for clients to connect
   */
  public InetSocketAddress updateConnectAddr(
      String hostProperty,
      String addressProperty,
      String defaultAddressValue,
      InetSocketAddress addr) {

    final String host = get(hostProperty);
    final String connectHostPort = getTrimmed(addressProperty, defaultAddressValue);

    if (host == null || host.isEmpty() || connectHostPort == null || connectHostPort.isEmpty()) {
      //not our case, fall back to original logic
      return updateConnectAddr(addressProperty, addr);
    }

    final String connectHost = connectHostPort.split(":")[0];
    // Create connect address using client address hostname and server port.
    return updateConnectAddr(addressProperty, NetUtils.createSocketAddrForHost(
        connectHost, addr.getPort()));
  }
  
  /**
   * Set the socket address a client can use to connect for the
   * <code>name</code> property as a <code>host:port</code>.  The wildcard
   * address is replaced with the local host's address.
   * @param name property name.
   * @param addr InetSocketAddress of a listener to store in the given property
   * @return InetSocketAddress for clients to connect
   */
  public InetSocketAddress updateConnectAddr(String name,
                                             InetSocketAddress addr) {
    final InetSocketAddress connectAddr = NetUtils.getConnectAddress(addr);
    setSocketAddr(name, connectAddr);
    return connectAddr;
  }
  
  /**
   * Load a class by name.
   * 
   * @param name the class name.
   * @return the class object.
   * @throws ClassNotFoundException if the class is not found.
   */
  public Class<?> getClassByName(String name) throws ClassNotFoundException {
    Class<?> ret = getClassByNameOrNull(name);
    if (ret == null) {
      throw new ClassNotFoundException("Class " + name + " not found");
    }
    return ret;
  }
  
  /**
   * Load a class by name, returning null rather than throwing an exception
   * if it couldn't be loaded. This is to avoid the overhead of creating
   * an exception.
   * 
   * @param name the class name
   * @return the class object, or null if it could not be found.
   */
  public Class<?> getClassByNameOrNull(String name) {
    Map<String, WeakReference<Class<?>>> map;
    
    synchronized (CACHE_CLASSES) {
      map = CACHE_CLASSES.get(classLoader);
      if (map == null) {
        map = Collections.synchronizedMap(
          new WeakHashMap<String, WeakReference<Class<?>>>());
        CACHE_CLASSES.put(classLoader, map);
      }
    }

    Class<?> clazz = null;
    WeakReference<Class<?>> ref = map.get(name); 
    if (ref != null) {
       clazz = ref.get();
    }
     
    if (clazz == null) {
      try {
        clazz = Class.forName(name, true, classLoader);
      } catch (ClassNotFoundException e) {
        // Leave a marker that the class isn't found
        map.put(name, new WeakReference<Class<?>>(NEGATIVE_CACHE_SENTINEL));
        return null;
      }
      // two putters can race here, but they'll put the same class
      map.put(name, new WeakReference<Class<?>>(clazz));
      return clazz;
    } else if (clazz == NEGATIVE_CACHE_SENTINEL) {
      return null; // not found
    } else {
      // cache hit
      return clazz;
    }
  }

  /** 
   * Get the value of the <code>name</code> property
   * as an array of <code>Class</code>.
   * The value of the property specifies a list of comma separated class names.  
   * If no such property is specified, then <code>defaultValue</code> is 
   * returned.
   * 
   * @param name the property name.
   * @param defaultValue default value.
   * @return property value as a <code>Class[]</code>, 
   *         or <code>defaultValue</code>. 
   */
  public Class<?>[] getClasses(String name, Class<?> ... defaultValue) {
    String valueString = getRaw(name);
    if (null == valueString) {
      return defaultValue;
    }
    String[] classnames = getTrimmedStrings(name);
    try {
      Class<?>[] classes = new Class<?>[classnames.length];
      for(int i = 0; i < classnames.length; i++) {
        classes[i] = getClassByName(classnames[i]);
      }
      return classes;
    } catch (ClassNotFoundException e) {
      throw new RuntimeException(e);
    }
  }

  /** 
   * Get the value of the <code>name</code> property as a <code>Class</code>.  
   * If no such property is specified, then <code>defaultValue</code> is 
   * returned.
   * 
   * @param name the conf key name.
   * @param defaultValue default value.
   * @return property value as a <code>Class</code>, 
   *         or <code>defaultValue</code>. 
   */
  public Class<?> getClass(String name, Class<?> defaultValue) {
    String valueString = getTrimmed(name);
    if (valueString == null)
      return defaultValue;
    try {
      return getClassByName(valueString);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException(e);
    }
  }

  /** 
   * Get the value of the <code>name</code> property as a <code>Class</code>
   * implementing the interface specified by <code>xface</code>.
   *   
   * If no such property is specified, then <code>defaultValue</code> is 
   * returned.
   * 
   * An exception is thrown if the returned class does not implement the named
   * interface. 
   * 
   * @param name the conf key name.
   * @param defaultValue default value.
   * @param xface the interface implemented by the named class.
   * @param <U> Interface class type.
   * @return property value as a <code>Class</code>, 
   *         or <code>defaultValue</code>.
   */
  public <U> Class<? extends U> getClass(String name, 
                                         Class<? extends U> defaultValue, 
                                         Class<U> xface) {
    try {
      Class<?> theClass = getClass(name, defaultValue);
      if (theClass != null && !xface.isAssignableFrom(theClass))
        throw new RuntimeException(theClass+" not "+xface.getName());
      else if (theClass != null)
        return theClass.asSubclass(xface);
      else
        return null;
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Get the value of the <code>name</code> property as a <code>List</code>
   * of objects implementing the interface specified by <code>xface</code>.
   * 
   * An exception is thrown if any of the classes does not exist, or if it does
   * not implement the named interface.
   * 
   * @param name the property name.
   * @param xface the interface implemented by the classes named by
   *        <code>name</code>.
   * @param <U> Interface class type.
   * @return a <code>List</code> of objects implementing <code>xface</code>.
   */
  @SuppressWarnings("unchecked")
  public <U> List<U> getInstances(String name, Class<U> xface) {
    List<U> ret = new ArrayList<U>();
    Class<?>[] classes = getClasses(name);
    for (Class<?> cl: classes) {
      if (!xface.isAssignableFrom(cl)) {
        throw new RuntimeException(cl + " does not implement " + xface);
      }
      ret.add((U)ReflectionUtils.newInstance(cl, this));
    }
    return ret;
  }

  /** 
   * Set the value of the <code>name</code> property to the name of a 
   * <code>theClass</code> implementing the given interface <code>xface</code>.
   * 
   * An exception is thrown if <code>theClass</code> does not implement the 
   * interface <code>xface</code>. 
   * 
   * @param name property name.
   * @param theClass property value.
   * @param xface the interface implemented by the named class.
   */
  public void setClass(String name, Class<?> theClass, Class<?> xface) {
    if (!xface.isAssignableFrom(theClass))
      throw new RuntimeException(theClass+" not "+xface.getName());
    set(name, theClass.getName());
  }

  /**
   * Get a local file under a directory named by <i>dirsProp</i> with
   * the given <i>path</i>.  If <i>dirsProp</i> contains multiple directories,
   * then one is chosen based on <i>path</i>'s hash code.  If the selected
   * directory does not exist, an attempt is made to create it.
   *
   * @param dirsProp directory in which to locate the file.
   * @param path file-path.
   * @return local file under the directory with the given path.
   * @throws IOException raised on errors performing I/O.
   */
  public Path getLocalPath(String dirsProp, String path)
    throws IOException {
    String[] dirs = getTrimmedStrings(dirsProp);
    int hashCode = path.hashCode();
    FileSystem fs = FileSystem.getLocal(this);
    for (int i = 0; i < dirs.length; i++) {  // try each local dir
      int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
      Path file = new Path(dirs[index], path);
      Path dir = file.getParent();
      if (fs.mkdirs(dir) || fs.exists(dir)) {
        return file;
      }
    }
    LOG.warn("Could not make " + path + 
             " in local directories from " + dirsProp);
    for(int i=0; i < dirs.length; i++) {
      int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
      LOG.warn(dirsProp + "[" + index + "]=" + dirs[index]);
    }
    throw new IOException("No valid local directories in property: "+dirsProp);
  }

  /**
   * Get a local file name under a directory named in <i>dirsProp</i> with
   * the given <i>path</i>.  If <i>dirsProp</i> contains multiple directories,
   * then one is chosen based on <i>path</i>'s hash code.  If the selected
   * directory does not exist, an attempt is made to create it.
   *
   * @param dirsProp directory in which to locate the file.
   * @param path file-path.
   * @return local file under the directory with the given path.
   * @throws IOException raised on errors performing I/O.
   */
  public File getFile(String dirsProp, String path)
    throws IOException {
    String[] dirs = getTrimmedStrings(dirsProp);
    int hashCode = path.hashCode();
    for (int i = 0; i < dirs.length; i++) {  // try each local dir
      int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
      File file = new File(dirs[index], path);
      File dir = file.getParentFile();
      if (dir.exists() || dir.mkdirs()) {
        return file;
      }
    }
    throw new IOException("No valid local directories in property: "+dirsProp);
  }

  /** 
   * Get the {@link URL} for the named resource.
   * 
   * @param name resource name.
   * @return the url for the named resource.
   */
  public URL getResource(String name) {
    return classLoader.getResource(name);
  }
  
  /** 
   * Get an input stream attached to the configuration resource with the
   * given <code>name</code>.
   * 
   * @param name configuration resource name.
   * @return an input stream attached to the resource.
   */
  public InputStream getConfResourceAsInputStream(String name) {
    try {
      URL url= getResource(name);

      if (url == null) {
        LOG.info(name + " not found");
        return null;
      } else {
        LOG.info("found resource " + name + " at " + url);
      }

      return url.openStream();
    } catch (Exception e) {
      return null;
    }
  }

  /** 
   * Get a {@link Reader} attached to the configuration resource with the
   * given <code>name</code>.
   * 
   * @param name configuration resource name.
   * @return a reader attached to the resource.
   */
  public Reader getConfResourceAsReader(String name) {
    try {
      URL url= getResource(name);

      if (url == null) {
        LOG.info(name + " not found");
        return null;
      } else {
        LOG.info("found resource " + name + " at " + url);
      }

      return new InputStreamReader(url.openStream(), Charsets.UTF_8);
    } catch (Exception e) {
      return null;
    }
  }

  /**
   * Get the set of parameters marked final.
   *
   * @return final parameter set.
   */
  public Set<String> getFinalParameters() {
    Set<String> setFinalParams = Collections.newSetFromMap(
        new ConcurrentHashMap<String, Boolean>());
    setFinalParams.addAll(finalParameters);
    return setFinalParams;
  }

  protected synchronized Properties getProps() {
    if (properties == null) {
      properties = new Properties();
      loadProps(properties, 0, true);
    }
    return properties;
  }

  /**
   * Loads the resource at a given index into the properties.
   * @param props the object containing the loaded properties.
   * @param startIdx the index where the new resource has been added.
   * @param fullReload flag whether we do complete reload of the conf instead
   *                   of just loading the new resource.
   */
  private synchronized void loadProps(final Properties props,
      final int startIdx, final boolean fullReload) {
    if (props != null) {
      Map<String, String[]> backup =
          updatingResource != null
              ? new ConcurrentHashMap<>(updatingResource) : null;
      loadResources(props, resources, startIdx, fullReload, quietmode);
      if (overlay != null) {
        props.putAll(overlay);
        if (backup != null) {
          for (Map.Entry<Object, Object> item : overlay.entrySet()) {
            String key = (String) item.getKey();
            String[] source = backup.get(key);
            if (source != null) {
              updatingResource.put(key, source);
            }
          }
        }
      }
    }
  }

  /**
   * Return the number of keys in the configuration.
   *
   * @return number of keys in the configuration.
   */
  public int size() {
    return getProps().size();
  }

  /**
   * Clears all keys from the configuration.
   */
  public void clear() {
    getProps().clear();
    getOverlay().clear();
  }

  /**
   * Get an {@link Iterator} to go through the list of <code>String</code> 
   * key-value pairs in the configuration.
   * 
   * @return an iterator over the entries.
   */
  @Override
  public Iterator<Map.Entry<String, String>> iterator() {
    // Get a copy of just the string to string pairs. After the old object
    // methods that allow non-strings to be put into configurations are removed,
    // we could replace properties with a Map<String,String> and get rid of this
    // code.
    Properties props = getProps();
    Map<String, String> result = new HashMap<>();
    synchronized (props) {
      for (Map.Entry<Object, Object> item : props.entrySet()) {
        if (item.getKey() instanceof String && item.getValue() instanceof String) {
          result.put((String) item.getKey(), (String) item.getValue());
        }
      }
    }
    return result.entrySet().iterator();
  }

  /**
   * Constructs a mapping of configuration and includes all properties that
   * start with the specified configuration prefix.  Property names in the
   * mapping are trimmed to remove the configuration prefix.
   *
   * @param confPrefix configuration prefix
   * @return mapping of configuration properties with prefix stripped
   */
  public Map<String, String> getPropsWithPrefix(String confPrefix) {
    Properties props = getProps();
    Map<String, String> configMap = new HashMap<>();
    for (String name : props.stringPropertyNames()) {
      if (name.startsWith(confPrefix)) {
        String value = get(name);
        String keyName = name.substring(confPrefix.length());
        configMap.put(keyName, value);
      }
    }
    return configMap;
  }

  private XMLStreamReader parse(URL url, boolean restricted)
      throws IOException, XMLStreamException {
    if (!quietmode) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("parsing URL " + url);
      }
    }
    if (url == null) {
      return null;
    }

    URLConnection connection = url.openConnection();
    if (connection instanceof JarURLConnection) {
      // Disable caching for JarURLConnection to avoid sharing JarFile
      // with other users.
      connection.setUseCaches(false);
    }
    return parse(connection.getInputStream(), url.toString(), restricted);
  }

  private XMLStreamReader parse(InputStream is, String systemIdStr,
      boolean restricted) throws IOException, XMLStreamException {
    if (!quietmode) {
      LOG.debug("parsing input stream " + is);
    }
    if (is == null) {
      return null;
    }
    SystemId systemId = SystemId.construct(systemIdStr);
    ReaderConfig readerConfig = XML_INPUT_FACTORY.createPrivateConfig();
    if (restricted) {
      readerConfig.setProperty(XMLInputFactory.SUPPORT_DTD, false);
    }
    return XML_INPUT_FACTORY.createSR(readerConfig, systemId,
        StreamBootstrapper.getInstance(null, systemId, is), false, true);
  }

  private void loadResources(Properties properties,
                             ArrayList<Resource> resources,
                             int startIdx,
                             boolean fullReload,
                             boolean quiet) {
    if(loadDefaults && fullReload) {
      for (String resource : defaultResources) {
        loadResource(properties, new Resource(resource, false), quiet);
      }
    }
    
    for (int i = startIdx; i < resources.size(); i++) {
      Resource ret = loadResource(properties, resources.get(i), quiet);
      if (ret != null) {
        resources.set(i, ret);
      }
    }
    this.addTags(properties);
  }
  
  private Resource loadResource(Properties properties,
                                Resource wrapper, boolean quiet) {
    String name = UNKNOWN_RESOURCE;
    try {
      Object resource = wrapper.getResource();
      name = wrapper.getName();
      boolean returnCachedProperties = false;

      if (resource instanceof InputStream) {
        returnCachedProperties = true;
      } else if (resource instanceof Properties) {
        overlay(properties, (Properties)resource);
      }

      XMLStreamReader2 reader = getStreamReader(wrapper, quiet);
      if (reader == null) {
        if (quiet) {
          return null;
        }
        throw new RuntimeException(resource + " not found");
      }
      Properties toAddTo = properties;
      if(returnCachedProperties) {
        toAddTo = new Properties();
      }

      List<ParsedItem> items = new Parser(reader, wrapper, quiet).parse();
      for (ParsedItem item : items) {
        loadProperty(toAddTo, item.name, item.key, item.value,
            item.isFinal, item.sources);
      }
      reader.close();

      if (returnCachedProperties) {
        overlay(properties, toAddTo);
        return new Resource(toAddTo, name, wrapper.isParserRestricted());
      }
      return null;
    } catch (IOException e) {
      LOG.error("error parsing conf " + name, e);
      throw new RuntimeException(e);
    } catch (XMLStreamException e) {
      LOG.error("error parsing conf " + name, e);
      throw new RuntimeException(e);
    }
  }

  private XMLStreamReader2 getStreamReader(Resource wrapper, boolean quiet)
      throws XMLStreamException, IOException {
    Object resource = wrapper.getResource();
    boolean isRestricted = wrapper.isParserRestricted();
    XMLStreamReader2 reader = null;
    if (resource instanceof URL) {                  // an URL resource
      reader  = (XMLStreamReader2)parse((URL)resource, isRestricted);
    } else if (resource instanceof String) {        // a CLASSPATH resource
      URL url = getResource((String)resource);
      reader = (XMLStreamReader2)parse(url, isRestricted);
    } else if (resource instanceof Path) {          // a file resource
      // Can't use FileSystem API or we get an infinite loop
      // since FileSystem uses Configuration API.  Use java.io.File instead.
      File file = new File(((Path)resource).toUri().getPath())
        .getAbsoluteFile();
      if (file.exists()) {
        if (!quiet) {
          LOG.debug("parsing File " + file);
        }
        reader = (XMLStreamReader2)parse(new BufferedInputStream(
            Files.newInputStream(file.toPath())), ((Path) resource).toString(),
            isRestricted);
      }
    } else if (resource instanceof InputStream) {
      reader = (XMLStreamReader2)parse((InputStream)resource, null,
          isRestricted);
    }
    return reader;
  }

  private static class ParsedItem {
    String name;
    String key;
    String value;
    boolean isFinal;
    String[] sources;

    ParsedItem(String name, String key, String value,
        boolean isFinal, String[] sources) {
      this.name = name;
      this.key = key;
      this.value = value;
      this.isFinal = isFinal;
      this.sources = sources;
    }
  }

  /**
   * Parser to consume SAX stream of XML elements from a Configuration.
   */
  private class Parser {
    private final XMLStreamReader2 reader;
    private final Resource wrapper;
    private final String name;
    private final String[] nameSingletonArray;
    private final boolean isRestricted;
    private final boolean quiet;

    DeprecationContext deprecations = deprecationContext.get();

    private StringBuilder token = new StringBuilder();
    private String confName = null;
    private String confValue = null;
    private String confInclude = null;
    private String confTag = null;
    private boolean confFinal = false;
    private boolean fallbackAllowed = false;
    private boolean fallbackEntered = false;
    private boolean parseToken = false;
    private List<String> confSource = new ArrayList<>();
    private List<ParsedItem> results = new ArrayList<>();

    Parser(XMLStreamReader2 reader,
           Resource wrapper,
           boolean quiet) {
      this.reader = reader;
      this.wrapper = wrapper;
      this.name = wrapper.getName();
      this.nameSingletonArray = new String[]{ name };
      this.isRestricted = wrapper.isParserRestricted();
      this.quiet = quiet;

    }

    List<ParsedItem> parse() throws IOException, XMLStreamException {
      while (reader.hasNext()) {
        parseNext();
      }
      return results;
    }

    private void handleStartElement() throws XMLStreamException, IOException {
      switch (reader.getLocalName()) {
      case "property":
        handleStartProperty();
        break;

      case "name":
      case "value":
      case "final":
      case "source":
      case "tag":
        parseToken = true;
        token.setLength(0);
        break;
      case "include":
        handleInclude();
        break;
      case "fallback":
        fallbackEntered = true;
        break;
      case "configuration":
        break;
      default:
        break;
      }
    }

    private void handleStartProperty() {
      confName = null;
      confValue = null;
      confFinal = false;
      confTag = null;
      confSource.clear();

      // First test for short format configuration
      int attrCount = reader.getAttributeCount();
      for (int i = 0; i < attrCount; i++) {
        String propertyAttr = reader.getAttributeLocalName(i);
        if ("name".equals(propertyAttr)) {
          confName = StringInterner.weakIntern(
              reader.getAttributeValue(i));
        } else if ("value".equals(propertyAttr)) {
          confValue = StringInterner.weakIntern(
              reader.getAttributeValue(i));
        } else if ("final".equals(propertyAttr)) {
          confFinal = "true".equals(reader.getAttributeValue(i));
        } else if ("source".equals(propertyAttr)) {
          confSource.add(StringInterner.weakIntern(
              reader.getAttributeValue(i)));
        } else if ("tag".equals(propertyAttr)) {
          confTag = StringInterner
              .weakIntern(reader.getAttributeValue(i));
        }
      }
    }

    private void handleInclude() throws XMLStreamException, IOException {
      // Determine href for xi:include
      confInclude = null;
      int attrCount = reader.getAttributeCount();
      List<ParsedItem> items;
      for (int i = 0; i < attrCount; i++) {
        String attrName = reader.getAttributeLocalName(i);
        if ("href".equals(attrName)) {
          confInclude = reader.getAttributeValue(i);
        }
      }
      if (confInclude == null) {
        return;
      }
      if (isRestricted) {
        throw new RuntimeException("Error parsing resource " + wrapper
            + ": XInclude is not supported for restricted resources");
      }
      // Determine if the included resource is a classpath resource
      // otherwise fallback to a file resource
      // xi:include are treated as inline and retain current source
      URL include = getResource(confInclude);
      if (include != null) {
        Resource classpathResource = new Resource(include, name,
            wrapper.isParserRestricted());
        // This is only called recursively while the lock is already held
        // by this thread, but synchronizing avoids a findbugs warning.
        synchronized (Configuration.this) {
          XMLStreamReader2 includeReader =
              getStreamReader(classpathResource, quiet);
          if (includeReader == null) {
            throw new RuntimeException(classpathResource + " not found");
          }
          items = new Parser(includeReader, classpathResource, quiet).parse();
        }
      } else {
        URL url;
        try {
          url = new URL(confInclude);
          url.openConnection().connect();
        } catch (IOException ioe) {
          File href = new File(confInclude);
          if (!href.isAbsolute()) {
            // Included resources are relative to the current resource
            File baseFile;

            try {
              baseFile = new File(new URI(name));
            } catch (IllegalArgumentException | URISyntaxException e) {
              baseFile = new File(name);
            }

            baseFile = baseFile.getParentFile();
            href = new File(baseFile, href.getPath());
          }
          if (!href.exists()) {
            // Resource errors are non-fatal iff there is 1 xi:fallback
            fallbackAllowed = true;
            return;
          }
          url = href.toURI().toURL();
        }
        Resource uriResource = new Resource(url, name,
            wrapper.isParserRestricted());
        // This is only called recursively while the lock is already held
        // by this thread, but synchronizing avoids a findbugs warning.
        synchronized (Configuration.this) {
          XMLStreamReader2 includeReader =
              getStreamReader(uriResource, quiet);
          if (includeReader == null) {
            throw new RuntimeException(uriResource + " not found");
          }
          items = new Parser(includeReader, uriResource, quiet).parse();
        }
      }
      results.addAll(items);
    }

    void handleEndElement() throws IOException {
      String tokenStr = token.toString();
      switch (reader.getLocalName()) {
      case "name":
        if (token.length() > 0) {
          confName = StringInterner.weakIntern(tokenStr.trim());
        }
        break;
      case "value":
        if (token.length() > 0) {
          confValue = StringInterner.weakIntern(tokenStr);
        }
        break;
      case "final":
        confFinal = "true".equals(tokenStr);
        break;
      case "source":
        confSource.add(StringInterner.weakIntern(tokenStr));
        break;
      case "tag":
        if (token.length() > 0) {
          confTag = StringInterner.weakIntern(tokenStr);
        }
        break;
      case "include":
        if (fallbackAllowed && !fallbackEntered) {
          throw new IOException("Fetch fail on include for '"
              + confInclude + "' with no fallback while loading '"
              + name + "'");
        }
        fallbackAllowed = false;
        fallbackEntered = false;
        break;
      case "property":
        handleEndProperty();
        break;
      default:
        break;
      }
    }

    void handleEndProperty() {
      if (confName == null || (!fallbackAllowed && fallbackEntered)) {
        return;
      }
      String[] confSourceArray;
      if (confSource.isEmpty()) {
        confSourceArray = nameSingletonArray;
      } else {
        confSource.add(name);
        confSourceArray = confSource.toArray(new String[confSource.size()]);
      }

      // Read tags and put them in propertyTagsMap
      if (confTag != null) {
        readTagFromConfig(confTag, confName, confValue, confSourceArray);
      }

      DeprecatedKeyInfo keyInfo =
          deprecations.getDeprecatedKeyMap().get(confName);

      if (keyInfo != null) {
        keyInfo.clearAccessed();
        for (String key : keyInfo.newKeys) {
          // update new keys with deprecated key's value
          results.add(new ParsedItem(
              name, key, confValue, confFinal, confSourceArray));
        }
      } else {
        results.add(new ParsedItem(name, confName, confValue, confFinal,
            confSourceArray));
      }
    }

    void parseNext() throws IOException, XMLStreamException {
      switch (reader.next()) {
      case XMLStreamConstants.START_ELEMENT:
        handleStartElement();
        break;
      case XMLStreamConstants.CHARACTERS:
      case XMLStreamConstants.CDATA:
        if (parseToken) {
          char[] text = reader.getTextCharacters();
          token.append(text, reader.getTextStart(), reader.getTextLength());
        }
        break;
      case XMLStreamConstants.END_ELEMENT:
        handleEndElement();
        break;
      default:
        break;
      }
    }
  }

  /**
   * Add tags defined in HADOOP_TAGS_SYSTEM, HADOOP_TAGS_CUSTOM.
   * @param prop properties.
   */
  public void addTags(Properties prop) {
    // Get all system tags
    try {
      if (prop.containsKey(CommonConfigurationKeys.HADOOP_TAGS_SYSTEM)) {
        String systemTags = prop.getProperty(CommonConfigurationKeys
            .HADOOP_TAGS_SYSTEM);
        TAGS.addAll(Arrays.asList(systemTags.split(",")));
      }
      // Get all custom tags
      if (prop.containsKey(CommonConfigurationKeys.HADOOP_TAGS_CUSTOM)) {
        String customTags = prop.getProperty(CommonConfigurationKeys
            .HADOOP_TAGS_CUSTOM);
        TAGS.addAll(Arrays.asList(customTags.split(",")));
      }

      if (prop.containsKey(CommonConfigurationKeys.HADOOP_SYSTEM_TAGS)) {
        String systemTags = prop.getProperty(CommonConfigurationKeys
            .HADOOP_SYSTEM_TAGS);
        TAGS.addAll(Arrays.asList(systemTags.split(",")));
      }
      // Get all custom tags
      if (prop.containsKey(CommonConfigurationKeys.HADOOP_CUSTOM_TAGS)) {
        String customTags = prop.getProperty(CommonConfigurationKeys
            .HADOOP_CUSTOM_TAGS);
        TAGS.addAll(Arrays.asList(customTags.split(",")));
      }

    } catch (Exception ex) {
      LOG.trace("Error adding tags in configuration", ex);
    }

  }

  /**
   * Read the values passed as tags and store them in a
   * map for later retrieval.
   * @param attributeValue
   * @param confName
   * @param confValue
   * @param confSource
   */
  private void readTagFromConfig(String attributeValue, String confName, String
      confValue, String[] confSource) {
    for (String tagStr : attributeValue.split(",")) {
      try {
        tagStr = tagStr.trim();
        // Handle property with no/null value
        if (confValue == null) {
          confValue = "";
        }
        if (propertyTagsMap.containsKey(tagStr)) {
          propertyTagsMap.get(tagStr).setProperty(confName, confValue);
        } else {
          Properties props = new Properties();
          props.setProperty(confName, confValue);
          propertyTagsMap.put(tagStr, props);
        }
      } catch (Exception ex) {
        // Log the exception at trace level.
        LOG.trace("Tag '{}' for property:{} Source:{}", tagStr, confName,
            confSource, ex);
      }
    }
  }

  private void overlay(Properties to, Properties from) {
    synchronized (from) {
      for (Entry<Object, Object> entry : from.entrySet()) {
        to.put(entry.getKey(), entry.getValue());
      }
    }
  }

  private void loadProperty(Properties properties, String name, String attr,
      String value, boolean finalParameter, String[] source) {
    if (value != null || allowNullValueProperties) {
      if (value == null) {
        value = DEFAULT_STRING_CHECK;
      }
      if (!finalParameters.contains(attr)) {
        properties.setProperty(attr, value);
        if (source != null) {
          putIntoUpdatingResource(attr, source);
        }
      } else {
        // This is a final parameter so check for overrides.
        checkForOverride(this.properties, name, attr, value);
        if (this.properties != properties) {
          checkForOverride(properties, name, attr, value);
        }
      }
    }
    if (finalParameter && attr != null) {
      finalParameters.add(attr);
    }
  }

  /**
   * Print a warning if a property with a given name already exists with a
   * different value.
   */
  private void checkForOverride(Properties properties, String name, String attr, String value) {
    String propertyValue = properties.getProperty(attr);
    if (propertyValue != null && !propertyValue.equals(value)) {
      LOG.warn(name + ":an attempt to override final parameter: " + attr
          + ";  Ignoring.");
    }
  }

  /**
   * Write out the non-default properties in this configuration to the given
   * {@link OutputStream} using UTF-8 encoding.
   *
   * @param out the output stream to write to.
   * @throws IOException raised on errors performing I/O.
   */
  public void writeXml(OutputStream out) throws IOException {
    writeXml(new OutputStreamWriter(out, "UTF-8"));
  }

  public void writeXml(Writer out) throws IOException {
    writeXml(null, out);
  }

  /**
   * Write out the non-default properties in this configuration to the
   * given {@link Writer}.
   * <ul>
   * <li>
   * When property name is not empty and the property exists in the
   * configuration, this method writes the property and its attributes
   * to the {@link Writer}.
   * </li>
   *
   * <li>
   * When property name is null or empty, this method writes all the
   * configuration properties and their attributes to the {@link Writer}.
   * </li>
   *
   * <li>
   * When property name is not empty but the property doesn't exist in
   * the configuration, this method throws an {@link IllegalArgumentException}.
   * </li>
   * </ul>
   * @param propertyName xml property name.
   * @param out the writer to write to.
   * @param config configuration.
   * @throws IOException raised on errors performing I/O.
   */
  public void writeXml(@Nullable String propertyName, Writer out, Configuration config)
      throws IOException, IllegalArgumentException {
    ConfigRedactor redactor = config != null ? new ConfigRedactor(this) : null;
    Document doc = asXmlDocument(propertyName, redactor);

    try {
      DOMSource source = new DOMSource(doc);
      StreamResult result = new StreamResult(out);
      TransformerFactory transFactory = XMLUtils.newSecureTransformerFactory();
      Transformer transformer = transFactory.newTransformer();

      // Important to not hold Configuration log while writing result, since
      // 'out' may be an HDFS stream which needs to lock this configuration
      // from another thread.
      transformer.transform(source, result);
    } catch (TransformerException te) {
      throw new IOException(te);
    }
  }

  public void writeXml(@Nullable String propertyName, Writer out)
      throws IOException, IllegalArgumentException {
    writeXml(propertyName, out, null);
  }

  /**
   * Return the XML DOM corresponding to this Configuration.
   */
  private synchronized Document asXmlDocument(@Nullable String propertyName,
      ConfigRedactor redactor) throws IOException, IllegalArgumentException {
    Document doc;
    try {
      doc = DocumentBuilderFactory
          .newInstance()
          .newDocumentBuilder()
          .newDocument();
    } catch (ParserConfigurationException pe) {
      throw new IOException(pe);
    }

    Element conf = doc.createElement("configuration");
    doc.appendChild(conf);
    conf.appendChild(doc.createTextNode("\n"));
    handleDeprecation(); //ensure properties is set and deprecation is handled

    if(!Strings.isNullOrEmpty(propertyName)) {
      if (!properties.containsKey(propertyName)) {
        // given property not found, illegal argument
        throw new IllegalArgumentException("Property " +
            propertyName + " not found");
      } else {
        // given property is found, write single property
        appendXMLProperty(doc, conf, propertyName, redactor);
        conf.appendChild(doc.createTextNode("\n"));
      }
    } else {
      // append all elements
      for (Enumeration<Object> e = properties.keys(); e.hasMoreElements();) {
        appendXMLProperty(doc, conf, (String)e.nextElement(), redactor);
        conf.appendChild(doc.createTextNode("\n"));
      }
    }
    return doc;
  }

  /**
   *  Append a property with its attributes to a given {#link Document}
   *  if the property is found in configuration.
   *
   * @param doc
   * @param conf
   * @param propertyName
   */
  private synchronized void appendXMLProperty(Document doc, Element conf,
      String propertyName, ConfigRedactor redactor) {
    // skip writing if given property name is empty or null
    if (!Strings.isNullOrEmpty(propertyName)) {
      String value = properties.getProperty(propertyName);
      if (value != null) {
        Element propNode = doc.createElement("property");
        conf.appendChild(propNode);

        Element nameNode = doc.createElement("name");
        nameNode.appendChild(doc.createTextNode(propertyName));
        propNode.appendChild(nameNode);

        Element valueNode = doc.createElement("value");
        String propertyValue = properties.getProperty(propertyName);
        if (redactor != null) {
          propertyValue = redactor.redactXml(propertyName, propertyValue);
        }
        valueNode.appendChild(doc.createTextNode(propertyValue));
        propNode.appendChild(valueNode);

        Element finalNode = doc.createElement("final");
        finalNode.appendChild(doc.createTextNode(
            String.valueOf(finalParameters.contains(propertyName))));
        propNode.appendChild(finalNode);

        if (updatingResource != null) {
          String[] sources = updatingResource.get(propertyName);
          if(sources != null) {
            for(String s : sources) {
              Element sourceNode = doc.createElement("source");
              sourceNode.appendChild(doc.createTextNode(s));
              propNode.appendChild(sourceNode);
            }
          }
        }
      }
    }
  }

  /**
   *  Writes properties and their attributes (final and resource)
   *  to the given {@link Writer}.
   *  <ul>
   *  <li>
   *  When propertyName is not empty, and the property exists
   *  in the configuration, the format of the output would be,
   *  <pre>
   *  {
   *    "property": {
   *      "key" : "key1",
   *      "value" : "value1",
   *      "isFinal" : "key1.isFinal",
   *      "resource" : "key1.resource"
   *    }
   *  }
   *  </pre>
   *  </li>
   *
   *  <li>
   *  When propertyName is null or empty, it behaves same as
   *  {@link #dumpConfiguration(Configuration, Writer)}, the
   *  output would be,
   *  <pre>
   *  { "properties" :
   *      [ { key : "key1",
   *          value : "value1",
   *          isFinal : "key1.isFinal",
   *          resource : "key1.resource" },
   *        { key : "key2",
   *          value : "value2",
   *          isFinal : "ke2.isFinal",
   *          resource : "key2.resource" }
   *       ]
   *   }
   *  </pre>
   *  </li>
   *
   *  <li>
   *  When propertyName is not empty, and the property is not
   *  found in the configuration, this method will throw an
   *  {@link IllegalArgumentException}.
   *  </li>
   *  </ul>
   *  <p>
   * @param config the configuration
   * @param propertyName property name
   * @param out the Writer to write to
   * @throws IOException raised on errors performing I/O.
   * @throws IllegalArgumentException when property name is not
   *   empty and the property is not found in configuration
   **/
  public static void dumpConfiguration(Configuration config,
      String propertyName, Writer out) throws IOException {
    if(Strings.isNullOrEmpty(propertyName)) {
      dumpConfiguration(config, out);
    } else if (Strings.isNullOrEmpty(config.get(propertyName))) {
      throw new IllegalArgumentException("Property " +
          propertyName + " not found");
    } else {
      JsonFactory dumpFactory = new JsonFactory();
      JsonGenerator dumpGenerator = dumpFactory.createGenerator(out);
      dumpGenerator.writeStartObject();
      dumpGenerator.writeFieldName("property");
      appendJSONProperty(dumpGenerator, config, propertyName,
          new ConfigRedactor(config));
      dumpGenerator.writeEndObject();
      dumpGenerator.flush();
    }
  }

  /**
   *  Writes out all properties and their attributes (final and resource) to
   *  the given {@link Writer}, the format of the output would be,
   *
   *  <pre>
   *  { "properties" :
   *      [ { key : "key1",
   *          value : "value1",
   *          isFinal : "key1.isFinal",
   *          resource : "key1.resource" },
   *        { key : "key2",
   *          value : "value2",
   *          isFinal : "ke2.isFinal",
   *          resource : "key2.resource" }
   *       ]
   *   }
   *  </pre>
   *
   *  It does not output the properties of the configuration object which
   *  is loaded from an input stream.
   *  <p>
   *
   * @param config the configuration
   * @param out the Writer to write to
   * @throws IOException raised on errors performing I/O.
   */
  public static void dumpConfiguration(Configuration config,
      Writer out) throws IOException {
    JsonFactory dumpFactory = new JsonFactory();
    JsonGenerator dumpGenerator = dumpFactory.createGenerator(out);
    dumpGenerator.writeStartObject();
    dumpGenerator.writeFieldName("properties");
    dumpGenerator.writeStartArray();
    dumpGenerator.flush();
    ConfigRedactor redactor = new ConfigRedactor(config);
    synchronized (config) {
      for (Map.Entry<Object,Object> item: config.getProps().entrySet()) {
        appendJSONProperty(dumpGenerator, config, item.getKey().toString(),
            redactor);
      }
    }
    dumpGenerator.writeEndArray();
    dumpGenerator.writeEndObject();
    dumpGenerator.flush();
  }

  /**
   * Write property and its attributes as json format to given
   * {@link JsonGenerator}.
   *
   * @param jsonGen json writer
   * @param config configuration
   * @param name property name
   * @throws IOException raised on errors performing I/O.
   */
  private static void appendJSONProperty(JsonGenerator jsonGen,
      Configuration config, String name, ConfigRedactor redactor)
      throws IOException {
    // skip writing if given property name is empty or null
    if(!Strings.isNullOrEmpty(name) && jsonGen != null) {
      jsonGen.writeStartObject();
      jsonGen.writeStringField("key", name);
      jsonGen.writeStringField("value",
          redactor.redact(name, config.get(name)));
      jsonGen.writeBooleanField("isFinal",
          config.finalParameters.contains(name));
      String[] resources = config.updatingResource != null ?
          config.updatingResource.get(name) : null;
      String resource = UNKNOWN_RESOURCE;
      if (resources != null && resources.length > 0) {
        resource = resources[0];
      }
      jsonGen.writeStringField("resource", resource);
      jsonGen.writeEndObject();
    }
  }

  /**
   * Get the {@link ClassLoader} for this job.
   *
   * @return the correct class loader.
   */
  public ClassLoader getClassLoader() {
    return classLoader;
  }
  
  /**
   * Set the class loader that will be used to load the various objects.
   * 
   * @param classLoader the new class loader.
   */
  public void setClassLoader(ClassLoader classLoader) {
    this.classLoader = classLoader;
  }
  
  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append("Configuration: ");
    if(loadDefaults) {
      toString(defaultResources, sb);
      if(resources.size()>0) {
        sb.append(", ");
      }
    }
    toString(resources, sb);
    return sb.toString();
  }
  
  private <T> void toString(List<T> resources, StringBuilder sb) {
    ListIterator<T> i = resources.listIterator();
    while (i.hasNext()) {
      if (i.nextIndex() != 0) {
        sb.append(", ");
      }
      sb.append(i.next());
    }
  }

  /** 
   * Set the quietness-mode. 
   * 
   * In the quiet-mode, error and informational messages might not be logged.
   * 
   * @param quietmode <code>true</code> to set quiet-mode on, <code>false</code>
   *              to turn it off.
   */
  public synchronized void setQuietMode(boolean quietmode) {
    this.quietmode = quietmode;
  }

  synchronized boolean getQuietMode() {
    return this.quietmode;
  }
  
  /** For debugging.  List non-default properties to the terminal and exit.
   * @param args the argument to be parsed.
   * @throws Exception exception.
   */
  public static void main(String[] args) throws Exception {
    new Configuration().writeXml(System.out);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    clear();
    int size = WritableUtils.readVInt(in);
    for(int i=0; i < size; ++i) {
      String key = org.apache.hadoop.io.Text.readString(in);
      String value = org.apache.hadoop.io.Text.readString(in);
      set(key, value); 
      String sources[] = WritableUtils.readCompressedStringArray(in);
      if (sources != null) {
        putIntoUpdatingResource(key, sources);
      }
    }
  }

  //@Override
  @Override
  public void write(DataOutput out) throws IOException {
    Properties props = getProps();
    WritableUtils.writeVInt(out, props.size());
    for(Map.Entry<Object, Object> item: props.entrySet()) {
      org.apache.hadoop.io.Text.writeString(out, (String) item.getKey());
      org.apache.hadoop.io.Text.writeString(out, (String) item.getValue());
      WritableUtils.writeCompressedStringArray(out, updatingResource != null ?
          updatingResource.get(item.getKey()) : null);
    }
  }
  
  /**
   * get keys matching the the regex.
   * @param regex the regex to match against.
   * @return {@literal Map<String,String>} with matching keys
   */
  public Map<String,String> getValByRegex(String regex) {
    Pattern p = Pattern.compile(regex);

    Map<String,String> result = new HashMap<String,String>();
    List<String> resultKeys = new ArrayList<>();
    Matcher m;

    for(Map.Entry<Object,Object> item: getProps().entrySet()) {
      if (item.getKey() instanceof String && 
          item.getValue() instanceof String) {
        m = p.matcher((String)item.getKey());
        if(m.find()) { // match
          resultKeys.add((String) item.getKey());
        }
      }
    }
    resultKeys.forEach(item ->
            result.put(item, substituteVars(getProps().getProperty(item))));
    return result;
  }

  /**
   * A unique class which is used as a sentinel value in the caching
   * for getClassByName. {@link Configuration#getClassByNameOrNull(String)}
   */
  private static abstract class NegativeCacheSentinel {}

  public static void dumpDeprecatedKeys() {
    DeprecationContext deprecations = deprecationContext.get();
    for (Map.Entry<String, DeprecatedKeyInfo> entry :
        deprecations.getDeprecatedKeyMap().entrySet()) {
      StringBuilder newKeys = new StringBuilder();
      for (String newKey : entry.getValue().newKeys) {
        newKeys.append(newKey).append("\t");
      }
      System.out.println(entry.getKey() + "\t" + newKeys.toString());
    }
  }

  /**
   * Returns whether or not a deprecated name has been warned. If the name is not
   * deprecated then always return false
   * @param name proprties.
   * @return true if name is a warned deprecation.
   */
  public static boolean hasWarnedDeprecation(String name) {
    DeprecationContext deprecations = deprecationContext.get();
    if(deprecations.getDeprecatedKeyMap().containsKey(name)) {
      if(deprecations.getDeprecatedKeyMap().get(name).accessed.get()) {
        return true;
      }
    }
    return false;
  }

  /**
   * Get all properties belonging to tag.
   * @param tag tag
   * @return Properties with matching tag
   */
  public Properties getAllPropertiesByTag(final String tag) {
    Properties props = new Properties();
    if (propertyTagsMap.containsKey(tag)) {
      props.putAll(propertyTagsMap.get(tag));
    }
    return props;
  }

  /**
   * Get all properties belonging to list of input tags. Calls
   * getAllPropertiesByTag internally.
   * @param tagList list of input tags
   * @return Properties with matching tags
   */
  public Properties getAllPropertiesByTags(final List<String> tagList) {
    Properties prop = new Properties();
    for (String tag : tagList) {
      prop.putAll(this.getAllPropertiesByTag(tag));
    }
    return prop;
  }

  /**
   * Get Property tag Enum corresponding to given source.
   *
   * @param tagStr String representation of Enum
   * @return true if tagStr is a valid tag
   */
  public boolean isPropertyTag(String tagStr) {
    return this.TAGS.contains(tagStr);
  }

  private void putIntoUpdatingResource(String key, String[] value) {
    Map<String, String[]> localUR = updatingResource;
    if (localUR == null) {
      synchronized (this) {
        localUR = updatingResource;
        if (localUR == null) {
          updatingResource = localUR = new ConcurrentHashMap<>(8);
        }
      }
    }
    localUR.put(key, value);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ConfServlet 源码

hadoop ConfigRedactor 源码

hadoop Configurable 源码

hadoop ConfigurationWithLogging 源码

hadoop Configured 源码

hadoop Reconfigurable 源码

hadoop ReconfigurableBase 源码

hadoop ReconfigurationException 源码

hadoop ReconfigurationServlet 源码

hadoop ReconfigurationTaskStatus 源码

0  赞