hadoop Command 源码

  • 2022-10-20
  • 浏览 (444)

haddop Command 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.shell;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.util.functional.RemoteIterators.cleanupRemoteIterator;

/**
 * An abstract class for the execution of a file system command
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving

abstract public class Command extends Configured {
  /** field name indicating the default name of the command */
  public static final String COMMAND_NAME_FIELD = "NAME";
  /** field name indicating the command's usage switches and arguments format */
  public static final String COMMAND_USAGE_FIELD = "USAGE";
  /** field name indicating the command's long description */
  public static final String COMMAND_DESCRIPTION_FIELD = "DESCRIPTION";
    
  protected String[] args;
  protected String name;
  protected int exitCode = 0;
  protected int numErrors = 0;
  protected boolean recursive = false;
  private int depth = 0;
  protected ArrayList<Exception> exceptions = new ArrayList<Exception>();

  private static final Logger LOG = LoggerFactory.getLogger(Command.class);

  /** allows stdout to be captured if necessary */
  public PrintStream out = System.out;
  /** allows stderr to be captured if necessary */
  public PrintStream err = System.err;
  /** allows the command factory to be used if necessary */
  private CommandFactory commandFactory = null;

  /** Constructor */
  protected Command() {
    out = System.out;
    err = System.err;
  }
  
  /**
   * Constructor.
   *
   * @param conf configuration.
   */
  protected Command(Configuration conf) {
    super(conf);
  }
  
  /** @return the command's name excluding the leading character - */
  abstract public String getCommandName();
  
  protected void setRecursive(boolean flag) {
    recursive = flag;
  }
  
  protected boolean isRecursive() {
    return recursive;
  }

  protected int getDepth() {
    return depth;
  }
  
  /** 
   * Execute the command on the input path
   * 
   * @param path the input path
   * @throws IOException if any error occurs
   */
  abstract protected void run(Path path) throws IOException;

  /**
   * Execute the command on the input path data. Commands can override to make
   * use of the resolved filesystem.
   * @param pathData The input path with resolved filesystem
   * @throws IOException raised on errors performing I/O.
   */
  protected void run(PathData pathData) throws IOException {
    run(pathData.path);
  }

  /** 
   * For each source path, execute the command
   * 
   * @return 0 if it runs successfully; -1 if it fails
   */
  public int runAll() {
    int exitCode = 0;
    for (String src : args) {
      try {
        PathData[] srcs = PathData.expandAsGlob(src, getConf());
        for (PathData s : srcs) {
          run(s);
        }
      } catch (IOException e) {
        exitCode = -1;
        displayError(e);
      }
    }
    return exitCode;
  }

  /**
   * sets the command factory for later use.
   * @param factory factory.
   */
  public void setCommandFactory(CommandFactory factory) {
    this.commandFactory = factory;
  }

  /**
   * retrieves the command factory.
   *
   * @return command factory.
   */
  protected CommandFactory getCommandFactory() {
    return this.commandFactory;
  }

  /**
   * Invokes the command handler.  The default behavior is to process options,
   * expand arguments, and then process each argument.
   * <pre>
   * run
   * |{@literal ->} {@link #processOptions(LinkedList)}
   * \{@literal ->} {@link #processRawArguments(LinkedList)}
   *      |{@literal ->} {@link #expandArguments(LinkedList)}
   *      |   \{@literal ->} {@link #expandArgument(String)}*
   *      \{@literal ->} {@link #processArguments(LinkedList)}
   *          |{@literal ->} {@link #processArgument(PathData)}*
   *          |   |{@literal ->} {@link #processPathArgument(PathData)}
   *          |   \{@literal ->} {@link #processPaths(PathData, PathData...)}
   *          |        \{@literal ->} {@link #processPath(PathData)}*
   *          \{@literal ->} {@link #processNonexistentPath(PathData)}
   * </pre>
   * Most commands will chose to implement just
   * {@link #processOptions(LinkedList)} and {@link #processPath(PathData)}
   * 
   * @param argv the list of command line arguments
   * @return the exit code for the command
   * @throws IllegalArgumentException if called with invalid arguments
   */
  public int run(String...argv) {
    LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv));
    try {
      if (isDeprecated()) {
        displayWarning(
            "DEPRECATED: Please use '"+ getReplacementCommand() + "' instead.");
      }
      processOptions(args);
      processRawArguments(args);
    } catch (CommandInterruptException e) {
      displayError("Interrupted");
      return 130;
    } catch (IOException e) {
      displayError(e);
    }
    
    return (numErrors == 0) ? exitCode : exitCodeForError();
  }

  /**
   * The exit code to be returned if any errors occur during execution.
   * This method is needed to account for the inconsistency in the exit
   * codes returned by various commands.
   * @return a non-zero exit code
   */
  protected int exitCodeForError() { return 1; }
  
  /**
   * Must be implemented by commands to process the command line flags and
   * check the bounds of the remaining arguments.  If an
   * IllegalArgumentException is thrown, the FsShell object will print the
   * short usage of the command.
   * @param args the command line arguments
   * @throws IOException raised on errors performing I/O.
   */
  protected void processOptions(LinkedList<String> args) throws IOException {}

  /**
   * Allows commands that don't use paths to handle the raw arguments.
   * Default behavior is to expand the arguments via
   * {@link #expandArguments(LinkedList)} and pass the resulting list to
   * {@link #processArguments(LinkedList)} 
   * @param args the list of argument strings
   * @throws IOException raised on errors performing I/O.
   */
  protected void processRawArguments(LinkedList<String> args)
  throws IOException {
    processArguments(expandArguments(args));
  }

  /**
   *  Expands a list of arguments into {@link PathData} objects.  The default
   *  behavior is to call {@link #expandArgument(String)} on each element
   *  which by default globs the argument.  The loop catches IOExceptions,
   *  increments the error count, and displays the exception.
   * @param args strings to expand into {@link PathData} objects
   * @return list of all {@link PathData} objects the arguments
   * @throws IOException if anything goes wrong...
   */
  protected LinkedList<PathData> expandArguments(LinkedList<String> args)
  throws IOException {
    LinkedList<PathData> expandedArgs = new LinkedList<PathData>();
    for (String arg : args) {
      try {
        expandedArgs.addAll(expandArgument(arg));
      } catch (IOException e) { // other exceptions are probably nasty
        displayError(e);
      }
    }
    return expandedArgs;
  }

  /**
   * Expand the given argument into a list of {@link PathData} objects.
   * The default behavior is to expand globs.  Commands may override to
   * perform other expansions on an argument.
   * @param arg string pattern to expand
   * @return list of {@link PathData} objects
   * @throws IOException if anything goes wrong...
   */
  protected List<PathData> expandArgument(String arg) throws IOException {
    PathData[] items = PathData.expandAsGlob(arg, getConf());
    if (items.length == 0) {
      // it's a glob that failed to match
      throw new PathNotFoundException(arg);
    }
    return Arrays.asList(items);
  }

  /**
   *  Processes the command's list of expanded arguments.
   *  {@link #processArgument(PathData)} will be invoked with each item
   *  in the list.  The loop catches IOExceptions, increments the error
   *  count, and displays the exception.
   *  @param args a list of {@link PathData} to process
   *  @throws IOException if anything goes wrong... 
   */
  protected void processArguments(LinkedList<PathData> args)
  throws IOException {
    for (PathData arg : args) {
      try {
        processArgument(arg);
      } catch (IOException e) {
        displayError(e);
      }
    }
  }

  /**
   * Processes a {@link PathData} item, calling
   * {@link #processPathArgument(PathData)} or
   * {@link #processNonexistentPath(PathData)} on each item.
   * @param item {@link PathData} item to process
   * @throws IOException if anything goes wrong...
   */
  protected void processArgument(PathData item) throws IOException {
    if (item.exists) {
      processPathArgument(item);
    } else {
      processNonexistentPath(item);
    }
  }

  /**
   *  This is the last chance to modify an argument before going into the
   *  (possibly) recursive {@link #processPaths(PathData, PathData...)}
   *  {@literal ->} {@link #processPath(PathData)} loop.  Ex.  ls and du use
   *  this to expand out directories.
   *  @param item a {@link PathData} representing a path which exists
   *  @throws IOException if anything goes wrong... 
   */
  protected void processPathArgument(PathData item) throws IOException {
    // null indicates that the call is not via recursion, ie. there is
    // no parent directory that was expanded
    depth = 0;
    processPaths(null, item);
  }
  
  /**
   *  Provides a hook for handling paths that don't exist.  By default it
   *  will throw an exception.  Primarily overriden by commands that create
   *  paths such as mkdir or touch.
   *  @param item the {@link PathData} that doesn't exist
   *  @throws FileNotFoundException if arg is a path and it doesn't exist
   *  @throws IOException if anything else goes wrong... 
   */
  protected void processNonexistentPath(PathData item) throws IOException {
    throw new PathNotFoundException(item.toString());
  }

  /**
   *  Iterates over the given expanded paths and invokes
   *  {@link #processPath(PathData)} on each element.  If "recursive" is true,
   *  will do a post-visit DFS on directories.
   *  @param parent if called via a recurse, will be the parent dir, else null
   *  @param items a list of {@link PathData} objects to process
   *  @throws IOException if anything goes wrong...
   */
  protected void processPaths(PathData parent, PathData ... items)
  throws IOException {
    for (PathData item : items) {
      try {
        processPathInternal(item);
      } catch (IOException e) {
        displayError(e);
      }
    }
  }

  /**
   * Iterates over the given expanded paths and invokes
   * {@link #processPath(PathData)} on each element. If "recursive" is true,
   * will do a post-visit DFS on directories.
   * @param parent if called via a recurse, will be the parent dir, else null
   * @param itemsIterator a iterator of {@link PathData} objects to process
   * @throws IOException if anything goes wrong...
   */
  protected void processPaths(PathData parent,
      RemoteIterator<PathData> itemsIterator) throws IOException {
    int groupSize = getListingGroupSize();
    if (groupSize == 0) {
      // No grouping of contents required.
      while (itemsIterator.hasNext()) {
        processPaths(parent, itemsIterator.next());
      }
    } else {
      List<PathData> items = new ArrayList<PathData>(groupSize);
      while (itemsIterator.hasNext()) {
        items.add(itemsIterator.next());
        if (!itemsIterator.hasNext() || items.size() == groupSize) {
          processPaths(parent, items.toArray(new PathData[items.size()]));
          items.clear();
        }
      }
    }
    cleanupRemoteIterator(itemsIterator);
  }

  private void processPathInternal(PathData item) throws IOException {
    processPath(item);
    if (recursive && isPathRecursable(item)) {
      recursePath(item);
    }
    postProcessPath(item);
  }

  /**
   * Whether the directory listing for a path should be sorted.?
   * @return true/false.
   */
  protected boolean isSorted() {
    return false;
  }

  /**
   * While using iterator method for listing for a path, whether to group items
   * and process as array? If so what is the size of array?
   * @return size of the grouping array.
   */
  protected int getListingGroupSize() {
    return 0;
  }

  /**
   * Determines whether a {@link PathData} item is recursable. Default
   * implementation is to recurse directories but can be overridden to recurse
   * through symbolic links.
   *
   * @param item
   *          a {@link PathData} object
   * @return true if the item is recursable, false otherwise
   * @throws IOException
   *           if anything goes wrong in the user-implementation
   */
  protected boolean isPathRecursable(PathData item) throws IOException {
    return item.stat.isDirectory();
  }

  /**
   * Hook for commands to implement an operation to be applied on each
   * path for the command.  Note implementation of this method is optional
   * if earlier methods in the chain handle the operation.
   * @param item a {@link PathData} object
   * @throws RuntimeException if invoked but not implemented
   * @throws IOException if anything else goes wrong in the user-implementation
   */  
  protected void processPath(PathData item) throws IOException {
    throw new RuntimeException("processPath() is not implemented");    
  }

  /**
   * Hook for commands to implement an operation to be applied on each
   * path for the command after being processed successfully
   * @param item a {@link PathData} object
   * @throws IOException if anything goes wrong...
   */
  protected void postProcessPath(PathData item) throws IOException {    
  }

  /**
   *  Gets the directory listing for a path and invokes
   *  {@link #processPaths(PathData, PathData...)}
   *  @param item {@link PathData} for directory to recurse into
   *  @throws IOException if anything goes wrong...
   */
  protected void recursePath(PathData item) throws IOException {
    try {
      depth++;
      if (isSorted()) {
        // use the non-iterative method for listing because explicit sorting is
        // required. Iterators not guaranteed to return sorted elements
        processPaths(item, item.getDirectoryContents());
      } else {
        processPaths(item, item.getDirectoryContentsIterator());
      }
    } finally {
      depth--;
    }
  }

  /**
   * Display an exception prefaced with the command name.  Also increments
   * the error count for the command which will result in a non-zero exit
   * code.
   * @param e exception to display
   */
  public void displayError(Exception e) {
    // build up a list of exceptions that occurred
    exceptions.add(e);
    // use runtime so it rips up through the stack and exits out 
    if (e instanceof InterruptedIOException) {
      throw new CommandInterruptException();
    }
    LOG.debug("{} failure", getName(), e);
    String errorMessage = e.getLocalizedMessage();
    if (errorMessage == null) {
      // this is an unexpected condition, so dump the whole exception since
      // it's probably a nasty internal error where the backtrace would be
      // useful
      errorMessage = StringUtils.stringifyException(e);
      LOG.debug(errorMessage);
    } else {
      errorMessage = errorMessage.split("\n", 2)[0];
    }
    displayError(errorMessage);
  }
  
  /**
   * Display an error string prefaced with the command name.  Also increments
   * the error count for the command which will result in a non-zero exit
   * code.
   * @param message error message to display
   */
  public void displayError(String message) {
    numErrors++;
    displayWarning(message);
  }
  
  /**
   * Display an warning string prefaced with the command name.
   * @param message warning message to display
   */
  public void displayWarning(String message) {
    err.println(getName() + ": " + message);
  }
  
  /**
   * The name of the command.  Will first try to use the assigned name
   * else fallback to the command's preferred name
   * @return name of the command
   */
  public String getName() {
    return (name == null)
      ? getCommandField(COMMAND_NAME_FIELD)
      : name.startsWith("-") ? name.substring(1) : name;
  }

  /**
   * Define the name of the command.
   * @param name as invoked
   */
  public void setName(String name) {
    this.name = name;
  }
  
  /**
   * The short usage suitable for the synopsis
   * @return "name options"
   */
  public String getUsage() {
    String cmd = "-" + getName();
    String usage = isDeprecated() ? "" : getCommandField(COMMAND_USAGE_FIELD);
    return usage.isEmpty() ? cmd : cmd + " " + usage; 
  }

  /**
   * The long usage suitable for help output
   * @return text of the usage
   */
  public String getDescription() {
    return isDeprecated()
      ? "(DEPRECATED) Same as '" + getReplacementCommand() + "'"
      : getCommandField(COMMAND_DESCRIPTION_FIELD);
  }

  /**
   * Is the command deprecated?
   * @return boolean
   */
  public final boolean isDeprecated() {
    return (getReplacementCommand() != null);
  }
  
  /**
   * The replacement for a deprecated command
   * @return null if not deprecated, else alternative command
   */
  public String getReplacementCommand() {
    return null;
  }

  /**
   * Get a public static class field
   * @param field the field to retrieve
   * @return String of the field
   */
  private String getCommandField(String field) {
    String value;
    try {
      Field f = this.getClass().getDeclaredField(field);
      f.setAccessible(true);
      value = f.get(this).toString();
    } catch (Exception e) {
      throw new RuntimeException(
          "failed to get " + this.getClass().getSimpleName()+"."+field, e);
    }
    return value;
  }
  
  @SuppressWarnings("serial")
  static class CommandInterruptException extends RuntimeException {}
}

相关信息

hadoop 源码目录

相关文章

hadoop AclCommands 源码

hadoop CommandFactory 源码

hadoop CommandFormat 源码

hadoop CommandUtils 源码

hadoop CommandWithDestination 源码

hadoop Concat 源码

hadoop CopyCommandWithMultiThread 源码

hadoop CopyCommands 源码

hadoop Count 源码

hadoop Delete 源码

0  赞