hadoop Job 源码

  • 2022-10-20
  • 浏览 (153)

haddop Job 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The job submitter's view of the Job.
 * 
 * <p>It allows the user to configure the
 * job, submit it, control its execution, and query the state. The set methods
 * only work until the job is submitted, afterwards they will throw an 
 * IllegalStateException. </p>
 * 
 * <p>
 * Normally the user creates the application, describes various facets of the
 * job via {@link Job} and then submits the job and monitor its progress.</p>
 * 
 * <p>Here is an example on how to submit a job:</p>
 * <p><blockquote><pre>
 *     // Create a new Job
 *     Job job = Job.getInstance();
 *     job.setJarByClass(MyJob.class);
 *     
 *     // Specify various job-specific parameters     
 *     job.setJobName("myjob");
 *     
 *     job.setInputPath(new Path("in"));
 *     job.setOutputPath(new Path("out"));
 *     
 *     job.setMapperClass(MyJob.MyMapper.class);
 *     job.setReducerClass(MyJob.MyReducer.class);
 *
 *     // Submit the job, then poll for progress until the job is complete
 *     job.waitForCompletion(true);
 * </pre></blockquote>
 * 
 * 
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Job extends JobContextImpl implements JobContext, AutoCloseable {
  private static final Logger LOG = LoggerFactory.getLogger(Job.class);

  @InterfaceStability.Evolving
  public enum JobState {DEFINE, RUNNING};
  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
  public static final String COMPLETION_POLL_INTERVAL_KEY = 
    "mapreduce.client.completion.pollinterval";
  
  /** Default completionPollIntervalMillis is 5000 ms. */
  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
    "mapreduce.client.progressmonitor.pollinterval";
  /** Default progMonitorPollIntervalMillis is 1000 ms. */
  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;

  public static final String USED_GENERIC_PARSER = 
      "mapreduce.client.genericoptionsparser.used";
  public static final String SUBMIT_REPLICATION = 
      "mapreduce.client.submit.file.replication";
  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
  public static final String USE_WILDCARD_FOR_LIBJARS =
      "mapreduce.client.libjars.wildcard";
  public static final boolean DEFAULT_USE_WILDCARD_FOR_LIBJARS = true;

  @InterfaceStability.Evolving
  public enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }

  static {
    ConfigUtil.loadResources();
  }

  private JobState state = JobState.DEFINE;
  private JobStatus status;
  private long statustime;
  private Cluster cluster;
  private ReservationId reservationId;

  /**
   * @deprecated Use {@link #getInstance()}
   */
  @Deprecated
  public Job() throws IOException {
    this(new JobConf(new Configuration()));
  }

  /**
   * @deprecated Use {@link #getInstance(Configuration)}
   */
  @Deprecated
  public Job(Configuration conf) throws IOException {
    this(new JobConf(conf));
  }

  /**
   * @deprecated Use {@link #getInstance(Configuration, String)}
   */
  @Deprecated
  public Job(Configuration conf, String jobName) throws IOException {
    this(new JobConf(conf));
    setJobName(jobName);
  }

  Job(JobConf conf) throws IOException {
    super(conf, null);
    // propagate existing user credentials to job
    this.credentials.mergeAll(this.ugi.getCredentials());
    this.cluster = null;
  }

  Job(JobStatus status, JobConf conf) throws IOException {
    this(conf);
    setJobID(status.getJobID());
    this.status = status;
    state = JobState.RUNNING;
  }

      
  /**
   * Creates a new {@link Job} with no particular {@link Cluster} .
   * A Cluster will be created with a generic {@link Configuration}.
   * 
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  public static Job getInstance() throws IOException {
    // create with a null Cluster
    return getInstance(new Configuration());
  }
      
  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and a 
   * given {@link Configuration}.
   * 
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
   * that any necessary internal modifications do not reflect on the incoming 
   * parameter.
   * 
   * A Cluster will be created from the conf parameter only when it's needed.
   * 
   * @param conf the configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  public static Job getInstance(Configuration conf) throws IOException {
    // create with a null Cluster
    JobConf jobConf = new JobConf(conf);
    return new Job(jobConf);
  }

      
  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
   * A Cluster will be created from the conf parameter only when it's needed.
   *
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
   * that any necessary internal modifications do not reflect on the incoming 
   * parameter.
   * 
   * @param conf the configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  public static Job getInstance(Configuration conf, String jobName)
           throws IOException {
    // create with a null Cluster
    Job result = getInstance(conf);
    result.setJobName(jobName);
    return result;
  }
  
  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and given
   * {@link Configuration} and {@link JobStatus}.
   * A Cluster will be created from the conf parameter only when it's needed.
   * 
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
   * that any necessary internal modifications do not reflect on the incoming 
   * parameter.
   * 
   * @param status job status
   * @param conf job configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  public static Job getInstance(JobStatus status, Configuration conf) 
  throws IOException {
    return new Job(status, new JobConf(conf));
  }

  /**
   * Creates a new {@link Job} with no particular {@link Cluster}.
   * A Cluster will be created from the conf parameter only when it's needed.
   *
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
   * that any necessary internal modifications do not reflect on the incoming 
   * parameter.
   * 
   * @param ignored
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   * @deprecated Use {@link #getInstance()}
   */
  @Deprecated
  public static Job getInstance(Cluster ignored) throws IOException {
    return getInstance();
  }
  
  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and given
   * {@link Configuration}.
   * A Cluster will be created from the conf parameter only when it's needed.
   * 
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
   * that any necessary internal modifications do not reflect on the incoming 
   * parameter.
   * 
   * @param ignored
   * @param conf job configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   * @deprecated Use {@link #getInstance(Configuration)}
   */
  @Deprecated
  public static Job getInstance(Cluster ignored, Configuration conf) 
      throws IOException {
    return getInstance(conf);
  }
  
  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and given
   * {@link Configuration} and {@link JobStatus}.
   * A Cluster will be created from the conf parameter only when it's needed.
   * 
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
   * that any necessary internal modifications do not reflect on the incoming 
   * parameter.
   * 
   * @param cluster cluster
   * @param status job status
   * @param conf job configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  @Private
  public static Job getInstance(Cluster cluster, JobStatus status, 
      Configuration conf) throws IOException {
    Job job = getInstance(status, conf);
    job.setCluster(cluster);
    return job;
  }

  private void ensureState(JobState state) throws IllegalStateException {
    if (state != this.state) {
      throw new IllegalStateException("Job in state "+ this.state + 
                                      " instead of " + state);
    }

    if (state == JobState.RUNNING && cluster == null) {
      throw new IllegalStateException
        ("Job in state " + this.state
         + ", but it isn't attached to any job tracker!");
    }
  }

  /**
   * Some methods rely on having a recent job status object.  Refresh
   * it, if necessary
   */
  synchronized void ensureFreshStatus() 
      throws IOException {
    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
      updateStatus();
    }
  }
    
  /** Some methods need to update status immediately. So, refresh
   * immediately
   * @throws IOException
   */
  synchronized void updateStatus() throws IOException {
    try {
      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        @Override
        public JobStatus run() throws IOException, InterruptedException {
          return cluster.getClient().getJobStatus(getJobID());
        }
      });
    }
    catch (InterruptedException ie) {
      throw new IOException(ie);
    }
    if (this.status == null) {
      throw new IOException("Job status not available ");
    }
    this.statustime = System.currentTimeMillis();
  }
  
  public JobStatus getStatus() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status;
  }

  /**
   * Returns the current state of the Job.
   * 
   * @return JobStatus#State
   * @throws IOException
   * @throws InterruptedException
   */
  public JobStatus.State getJobState() 
      throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status.getState();
  }
  
  /**
   * Get the URL where some job progress information will be displayed.
   * 
   * @return the URL where some job progress information will be displayed.
   */
  public String getTrackingURL(){
    ensureState(JobState.RUNNING);
    return status.getTrackingUrl().toString();
  }

  /**
   * Get the path of the submitted job configuration.
   * 
   * @return the path of the submitted job configuration.
   */
  public String getJobFile() {
    ensureState(JobState.RUNNING);
    return status.getJobFile();
  }

  /**
   * Get start time of the job.
   * 
   * @return the start time of the job
   */
  public long getStartTime() {
    ensureState(JobState.RUNNING);
    return status.getStartTime();
  }

  /**
   * Get finish time of the job.
   * 
   * @return the finish time of the job
   */
  public long getFinishTime() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status.getFinishTime();
  }

  /**
   * Get scheduling info of the job.
   * 
   * @return the scheduling info of the job
   */
  public String getSchedulingInfo() {
    ensureState(JobState.RUNNING);
    return status.getSchedulingInfo();
  }

  /**
   * Get scheduling info of the job.
   * 
   * @return the priority info of the job
   */
  public JobPriority getPriority() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status.getPriority();
  }

  /**
   * The user-specified job name.
   */
  public String getJobName() {
    if (state == JobState.DEFINE || status == null) {
      return super.getJobName();
    }
    ensureState(JobState.RUNNING);
    return status.getJobName();
  }

  public String getHistoryUrl() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status.getHistoryFile();
  }

  public boolean isRetired() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status.isRetired();
  }
  
  @Private
  public Cluster getCluster() {
    return cluster;
  }

  /** Only for mocks in unit tests. */
  @Private
  private void setCluster(Cluster cluster) {
    this.cluster = cluster;
  }

  /**
   * Dump stats to screen.
   */
  @Override
  public String toString() {
    ensureState(JobState.RUNNING);
    String reasonforFailure = " ";
    int numMaps = 0;
    int numReduces = 0;
    try {
      updateStatus();
      if (status.getState().equals(JobStatus.State.FAILED))
        reasonforFailure = getTaskFailureEventString();
      numMaps = getTaskReports(TaskType.MAP).length;
      numReduces = getTaskReports(TaskType.REDUCE).length;
    } catch (IOException e) {
    } catch (InterruptedException ie) {
    }
    StringBuffer sb = new StringBuffer();
    sb.append("Job: ").append(status.getJobID()).append("\n");
    sb.append("Job File: ").append(status.getJobFile()).append("\n");
    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
    sb.append("\n");
    sb.append("Uber job : ").append(status.isUber()).append("\n");
    sb.append("Number of maps: ").append(numMaps).append("\n");
    sb.append("Number of reduces: ").append(numReduces).append("\n");
    sb.append("map() completion: ");
    sb.append(status.getMapProgress()).append("\n");
    sb.append("reduce() completion: ");
    sb.append(status.getReduceProgress()).append("\n");
    sb.append("Job state: ");
    sb.append(status.getState()).append("\n");
    sb.append("retired: ").append(status.isRetired()).append("\n");
    sb.append("reason for failure: ").append(reasonforFailure);
    return sb.toString();
  }

  /**
   * @return taskid which caused job failure
   * @throws IOException
   * @throws InterruptedException
   */
  String getTaskFailureEventString() throws IOException,
      InterruptedException {
    int failCount = 1;
    TaskCompletionEvent lastEvent = null;
    TaskCompletionEvent[] events = ugi.doAs(new 
        PrivilegedExceptionAction<TaskCompletionEvent[]>() {
          @Override
          public TaskCompletionEvent[] run() throws IOException,
          InterruptedException {
            return cluster.getClient().getTaskCompletionEvents(
                status.getJobID(), 0, 10);
          }
        });
    for (TaskCompletionEvent event : events) {
      if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
        failCount++;
        lastEvent = event;
      }
    }
    if (lastEvent == null) {
      return "There are no failed tasks for the job. "
          + "Job is failed due to some other reason and reason "
          + "can be found in the logs.";
    }
    String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
    String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
    return (" task " + taskID + " failed " +
      failCount + " times " + "For details check tasktracker at: " +
      lastEvent.getTaskTrackerHttp());
  }

  /**
   * Get the information of the current state of the tasks of a job.
   * 
   * @param type Type of the task
   * @return the list of all of the map tips.
   * @throws IOException
   */
  public TaskReport[] getTaskReports(TaskType type) 
      throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    final TaskType tmpType = type;
    return ugi.doAs(new PrivilegedExceptionAction<TaskReport[]>() {
      public TaskReport[] run() throws IOException, InterruptedException {
        return cluster.getClient().getTaskReports(getJobID(), tmpType);
      }
    });
  }

  /**
   * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
   * and 1.0.  When all map tasks have completed, the function returns 1.0.
   * 
   * @return the progress of the job's map-tasks.
   * @throws IOException
   */
  public float mapProgress() throws IOException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getMapProgress();
  }

  /**
   * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
   * 
   * @return the progress of the job's reduce-tasks.
   * @throws IOException
   */
  public float reduceProgress() throws IOException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getReduceProgress();
  }

  /**
   * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
   * 
   * @return the progress of the job's cleanup-tasks.
   * @throws IOException
   */
  public float cleanupProgress() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getCleanupProgress();
  }

  /**
   * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
   * 
   * @return the progress of the job's setup-tasks.
   * @throws IOException
   */
  public float setupProgress() throws IOException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getSetupProgress();
  }

  /**
   * Check if the job is finished or not. 
   * This is a non-blocking call.
   * 
   * @return <code>true</code> if the job is complete, else <code>false</code>.
   * @throws IOException
   */
  public boolean isComplete() throws IOException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status.isJobComplete();
  }

  /**
   * Check if the job completed successfully. 
   * 
   * @return <code>true</code> if the job succeeded, else <code>false</code>.
   * @throws IOException
   */
  public boolean isSuccessful() throws IOException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status.getState() == JobStatus.State.SUCCEEDED;
  }

  /**
   * Kill the running job.  Blocks until all job tasks have been
   * killed as well.  If the job is no longer running, it simply returns.
   * 
   * @throws IOException
   */
  public void killJob() throws IOException {
    ensureState(JobState.RUNNING);
    try {
      cluster.getClient().killJob(getJobID());
    }
    catch (InterruptedException ie) {
      throw new IOException(ie);
    }
  }

  /**
   * Set the priority of a running job.
   * @param jobPriority the new priority for the job.
   * @throws IOException
   */
  public void setPriority(JobPriority jobPriority) throws IOException,
      InterruptedException {
    if (state == JobState.DEFINE) {
      if (jobPriority == JobPriority.UNDEFINED_PRIORITY) {
        conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority));
      } else {
        conf.setJobPriority(org.apache.hadoop.mapred.JobPriority
            .valueOf(jobPriority.name()));
      }
    } else {
      ensureState(JobState.RUNNING);
      final int tmpPriority = convertPriorityToInteger(jobPriority);
      ugi.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public Object run() throws IOException, InterruptedException {
          cluster.getClient()
              .setJobPriority(getJobID(), Integer.toString(tmpPriority));
          return null;
        }
      });
    }
  }

  /**
   * Set the priority of a running job.
   *
   * @param jobPriority
   *          the new priority for the job.
   * @throws IOException
   */
  public void setPriorityAsInteger(int jobPriority) throws IOException,
      InterruptedException {
    if (state == JobState.DEFINE) {
      conf.setJobPriorityAsInteger(jobPriority);
    } else {
      ensureState(JobState.RUNNING);
      final int tmpPriority = jobPriority;
      ugi.doAs(new PrivilegedExceptionAction<Object>() {
        @Override
        public Object run() throws IOException, InterruptedException {
          cluster.getClient()
              .setJobPriority(getJobID(), Integer.toString(tmpPriority));
          return null;
        }
      });
    }
  }

  private int convertPriorityToInteger(JobPriority jobPriority) {
    switch (jobPriority) {
    case VERY_HIGH :
      return 5;
    case HIGH :
      return 4;
    case NORMAL :
      return 3;
    case LOW :
      return 2;
    case VERY_LOW :
      return 1;
    case DEFAULT :
      return 0;
    default:
      break;
    }
    // For UNDEFINED_PRIORITY, we can set it to default for better handling
    return 0;
  }

  /**
   * Get events indicating completion (success/failure) of component tasks.
   *  
   * @param startFrom index to start fetching events from
   * @param numEvents number of events to fetch
   * @return an array of {@link TaskCompletionEvent}s
   * @throws IOException
   */
  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom,
      final int numEvents) throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    return ugi.doAs(new PrivilegedExceptionAction<TaskCompletionEvent[]>() {
      @Override
      public TaskCompletionEvent[] run() throws IOException, InterruptedException {
        return cluster.getClient().getTaskCompletionEvents(getJobID(),
            startFrom, numEvents); 
      }
    });
  }

  /**
   * Get events indicating completion (success/failure) of component tasks.
   *  
   * @param startFrom index to start fetching events from
   * @return an array of {@link org.apache.hadoop.mapred.TaskCompletionEvent}s
   * @throws IOException
   */
  public org.apache.hadoop.mapred.TaskCompletionEvent[]
    getTaskCompletionEvents(final int startFrom) throws IOException {
    try {
      TaskCompletionEvent[] events = getTaskCompletionEvents(startFrom, 10);
      org.apache.hadoop.mapred.TaskCompletionEvent[] retEvents =
          new org.apache.hadoop.mapred.TaskCompletionEvent[events.length];
      for (int i = 0; i < events.length; i++) {
        retEvents[i] = org.apache.hadoop.mapred.TaskCompletionEvent.downgrade
            (events[i]);
      }
      return retEvents;
    } catch (InterruptedException ie) {
      throw new IOException(ie);
    }
  }

  /**
   * Kill indicated task attempt.
   * @param taskId the id of the task to kill.
   * @param shouldFail if <code>true</code> the task is failed and added
   *                   to failed tasks list, otherwise it is just killed,
   *                   w/o affecting job failure status.
   */
  @Private
  public boolean killTask(final TaskAttemptID taskId,
                          final boolean shouldFail) throws IOException {
    ensureState(JobState.RUNNING);
    try {
      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
        public Boolean run() throws IOException, InterruptedException {
          return cluster.getClient().killTask(taskId, shouldFail);
        }
      });
    }
    catch (InterruptedException ie) {
      throw new IOException(ie);
    }
  }

  /**
   * Kill indicated task attempt.
   * 
   * @param taskId the id of the task to be terminated.
   * @throws IOException
   */
  public void killTask(final TaskAttemptID taskId)
      throws IOException {
    killTask(taskId, false);
  }

  /**
   * Fail indicated task attempt.
   * 
   * @param taskId the id of the task to be terminated.
   * @throws IOException
   */
  public void failTask(final TaskAttemptID taskId)
      throws IOException {
    killTask(taskId, true);
  }

  /**
   * Gets the counters for this job. May return null if the job has been
   * retired and the job is no longer in the completed job store.
   * 
   * @return the counters for this job.
   * @throws IOException
   */
  public Counters getCounters() 
      throws IOException {
    ensureState(JobState.RUNNING);
    try {
      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
        @Override
        public Counters run() throws IOException, InterruptedException {
          return cluster.getClient().getJobCounters(getJobID());
        }
      });
    }
    catch (InterruptedException ie) {
      throw new IOException(ie);
    }
  }

  /**
   * Gets the diagnostic messages for a given task attempt.
   * @param taskid
   * @return the list of diagnostic messages for the task
   * @throws IOException
   */
  public String[] getTaskDiagnostics(final TaskAttemptID taskid) 
      throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    return ugi.doAs(new PrivilegedExceptionAction<String[]>() {
      @Override
      public String[] run() throws IOException, InterruptedException {
        return cluster.getClient().getTaskDiagnostics(taskid);
      }
    });
  }

  /**
   * Set the number of reduce tasks for the job.
   * @param tasks the number of reduce tasks
   * @throws IllegalStateException if the job is submitted
   */
  public void setNumReduceTasks(int tasks) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setNumReduceTasks(tasks);
  }

  /**
   * Set the current working directory for the default file system.
   * 
   * @param dir the new current working directory.
   * @throws IllegalStateException if the job is submitted
   */
  public void setWorkingDirectory(Path dir) throws IOException {
    ensureState(JobState.DEFINE);
    conf.setWorkingDirectory(dir);
  }

  /**
   * Set the {@link InputFormat} for the job.
   * @param cls the <code>InputFormat</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setInputFormatClass(Class<? extends InputFormat> cls
                                  ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
                  InputFormat.class);
  }

  /**
   * Set the {@link OutputFormat} for the job.
   * @param cls the <code>OutputFormat</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setOutputFormatClass(Class<? extends OutputFormat> cls
                                   ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
                  OutputFormat.class);
  }

  /**
   * Set the {@link Mapper} for the job.
   * @param cls the <code>Mapper</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setMapperClass(Class<? extends Mapper> cls
                             ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
  }

  /**
   * Set the Jar by finding where a given class came from.
   * @param cls the example class
   */
  public void setJarByClass(Class<?> cls) {
    ensureState(JobState.DEFINE);
    conf.setJarByClass(cls);
  }

  /**
   * Set the job jar 
   */
  public void setJar(String jar) {
    ensureState(JobState.DEFINE);
    conf.setJar(jar);
  }

  /**
   * Set the reported username for this job.
   * 
   * @param user the username for this job.
   */
  public void setUser(String user) {
    ensureState(JobState.DEFINE);
    conf.setUser(user);
  }

  /**
   * Set the combiner class for the job.
   * @param cls the combiner to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setCombinerClass(Class<? extends Reducer> cls
                               ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
  }

  /**
   * Set the {@link Reducer} for the job.
   * @param cls the <code>Reducer</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setReducerClass(Class<? extends Reducer> cls
                              ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
  }

  /**
   * Set the {@link Partitioner} for the job.
   * @param cls the <code>Partitioner</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setPartitionerClass(Class<? extends Partitioner> cls
                                  ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
                  Partitioner.class);
  }

  /**
   * Set the key class for the map output data. This allows the user to
   * specify the map output key class to be different than the final output
   * value class.
   * 
   * @param theClass the map output key class.
   * @throws IllegalStateException if the job is submitted
   */
  public void setMapOutputKeyClass(Class<?> theClass
                                   ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setMapOutputKeyClass(theClass);
  }

  /**
   * Set the value class for the map output data. This allows the user to
   * specify the map output value class to be different than the final output
   * value class.
   * 
   * @param theClass the map output value class.
   * @throws IllegalStateException if the job is submitted
   */
  public void setMapOutputValueClass(Class<?> theClass
                                     ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setMapOutputValueClass(theClass);
  }

  /**
   * Set the key class for the job output data.
   * 
   * @param theClass the key class for the job output data.
   * @throws IllegalStateException if the job is submitted
   */
  public void setOutputKeyClass(Class<?> theClass
                                ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputKeyClass(theClass);
  }

  /**
   * Set the value class for job outputs.
   * 
   * @param theClass the value class for job outputs.
   * @throws IllegalStateException if the job is submitted
   */
  public void setOutputValueClass(Class<?> theClass
                                  ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputValueClass(theClass);
  }

  /**
   * Define the comparator that controls which keys are grouped together
   * for a single call to combiner,
   * {@link Reducer#reduce(Object, Iterable,
   * org.apache.hadoop.mapreduce.Reducer.Context)}
   *
   * @param cls the raw comparator to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setCombinerKeyGroupingComparatorClass(
      Class<? extends RawComparator> cls) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setCombinerKeyGroupingComparator(cls);
  }

  /**
   * Define the comparator that controls how the keys are sorted before they
   * are passed to the {@link Reducer}.
   * @param cls the raw comparator
   * @throws IllegalStateException if the job is submitted
   * @see #setCombinerKeyGroupingComparatorClass(Class)
   */
  public void setSortComparatorClass(Class<? extends RawComparator> cls
                                     ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputKeyComparatorClass(cls);
  }

  /**
   * Define the comparator that controls which keys are grouped together
   * for a single call to 
   * {@link Reducer#reduce(Object, Iterable, 
   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
   * @param cls the raw comparator to use
   * @throws IllegalStateException if the job is submitted
   * @see #setCombinerKeyGroupingComparatorClass(Class)
   */
  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
                                         ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputValueGroupingComparator(cls);
  }

  /**
   * Set the user-specified job name.
   * 
   * @param name the job's new name.
   * @throws IllegalStateException if the job is submitted
   */
  public void setJobName(String name) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setJobName(name);
  }

  /**
   * Turn speculative execution on or off for this job. 
   * 
   * @param speculativeExecution <code>true</code> if speculative execution 
   *                             should be turned on, else <code>false</code>.
   */
  public void setSpeculativeExecution(boolean speculativeExecution) {
    ensureState(JobState.DEFINE);
    conf.setSpeculativeExecution(speculativeExecution);
  }

  /**
   * Turn speculative execution on or off for this job for map tasks. 
   * 
   * @param speculativeExecution <code>true</code> if speculative execution 
   *                             should be turned on for map tasks,
   *                             else <code>false</code>.
   */
  public void setMapSpeculativeExecution(boolean speculativeExecution) {
    ensureState(JobState.DEFINE);
    conf.setMapSpeculativeExecution(speculativeExecution);
  }

  /**
   * Turn speculative execution on or off for this job for reduce tasks. 
   * 
   * @param speculativeExecution <code>true</code> if speculative execution 
   *                             should be turned on for reduce tasks,
   *                             else <code>false</code>.
   */
  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
    ensureState(JobState.DEFINE);
    conf.setReduceSpeculativeExecution(speculativeExecution);
  }

  /**
   * Specify whether job-setup and job-cleanup is needed for the job 
   * 
   * @param needed If <code>true</code>, job-setup and job-cleanup will be
   *               considered from {@link OutputCommitter} 
   *               else ignored.
   */
  public void setJobSetupCleanupNeeded(boolean needed) {
    ensureState(JobState.DEFINE);
    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
  }

  /**
   * Set the given set of archives
   * @param archives The list of archives that need to be localized
   */
  public void setCacheArchives(URI[] archives) {
    ensureState(JobState.DEFINE);
    setCacheArchives(archives, conf);
  }

  /**
   * Set the configuration with the given set of archives.
   *
   * @param archives The list of archives that need to be localized.
   * @param conf Configuration which will be changed.
   */
  public static void setCacheArchives(URI[] archives, Configuration conf) {
    String cacheArchives = StringUtils.uriToString(archives);
    conf.set(MRJobConfig.CACHE_ARCHIVES, cacheArchives);
  }

  /**
   * Set the given set of files
   * @param files The list of files that need to be localized
   */
  public void setCacheFiles(URI[] files) {
    ensureState(JobState.DEFINE);
    setCacheFiles(files, conf);
  }

  /**
   * Set the configuration with the given set of files.
   *
   * @param files The list of files that need to be localized.
   * @param conf Configuration which will be changed.
   */
  public static void setCacheFiles(URI[] files, Configuration conf) {
    String cacheFiles = StringUtils.uriToString(files);
    conf.set(MRJobConfig.CACHE_FILES, cacheFiles);
  }

  /**
   * Add a archives to be localized
   * @param uri The uri of the cache to be localized
   */
  public void addCacheArchive(URI uri) {
    ensureState(JobState.DEFINE);
    addCacheArchive(uri, conf);
  }

  /**
   * Add an archives to be localized to the conf.
   *
   * @param uri  The uri of the cache to be localized.
   * @param conf Configuration to add the cache to.
   */
  public static void addCacheArchive(URI uri, Configuration conf) {
    String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
    conf.set(MRJobConfig.CACHE_ARCHIVES,
        archives == null ? uri.toString() : archives + "," + uri.toString());
  }

  /**
   * Add a file to be localized
   * @param uri The uri of the cache to be localized
   */
  public void addCacheFile(URI uri) {
    ensureState(JobState.DEFINE);
    addCacheFile(uri, conf);
  }

  /**
   * Add a file to be localized to the conf. The localized file will be
   * downloaded to the execution node(s), and a link will be created to the
   * file from the job's working directory. If the last part of URI's path name
   * is "*", then the entire parent directory will be localized and links
   * will be created from the job's working directory to each file in the
   * parent directory.
   * <p>
   * The access permissions of the file will determine whether the localized
   * file will be shared across jobs. If the file is not readable by other or
   * if any of its parent directories is not executable by other, then the
   * file will not be shared. In the case of a path that ends in "/*",
   * sharing of the localized files will be determined solely from the
   * access permissions of the parent directories. The access permissions of
   * the individual files will be ignored.
   *
   * @param uri  The uri of the cache to be localized.
   * @param conf Configuration to add the cache to.
   */
  public static void addCacheFile(URI uri, Configuration conf) {
    String files = conf.get(MRJobConfig.CACHE_FILES);
    conf.set(MRJobConfig.CACHE_FILES,
        files == null ? uri.toString() : files + "," + uri.toString());
  }

  /**
   * Add an file path to the current set of classpath entries It adds the file
   * to cache as well.
   * 
   * Files added with this method will not be unpacked while being added to the
   * classpath.
   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
   * method instead.
   *
   * @param file Path of the file to be added
   */
  public void addFileToClassPath(Path file)
    throws IOException {
    ensureState(JobState.DEFINE);
    addFileToClassPath(file, conf, file.getFileSystem(conf));
  }

  /**
   * Add a file path to the current set of classpath entries. The file will
   * also be added to the cache.
   *
   * @param file Path of the file to be added.
   * @param conf Configuration that contains the classpath setting.
   * @param fs FileSystem with respect to which {@code file} should be interpreted.
   */
  public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs) {
    addFileToClassPath(file, conf, fs, true);
  }

  /**
   * Add a file path to the current set of classpath entries. The file will
   * also be added to the cache if {@code addToCache} is true.
   *
   * @param file Path of the file to be added.
   * @param conf Configuration that contains the classpath setting.
   * @param fs FileSystem with respect to which {@code file} should be interpreted.
   * @param addToCache Whether the file should also be added to the cache list.
   */
  public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs,
      boolean addToCache) {
    String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
    conf.set(MRJobConfig.CLASSPATH_FILES,
        classpath == null ? file.toString() : classpath + "," + file.toString());
    if (addToCache) {
      URI uri = fs.makeQualified(file).toUri();
      Job.addCacheFile(uri, conf);
    }
  }

  /**
   * Add an archive path to the current set of classpath entries. It adds the
   * archive to cache as well.
   * 
   * Archive files will be unpacked and added to the classpath
   * when being distributed.
   *
   * @param archive Path of the archive to be added
   */
  public void addArchiveToClassPath(Path archive)
    throws IOException {
    ensureState(JobState.DEFINE);
    addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
  }

  /**
   * Add an archive path to the current set of classpath entries. It adds the
   * archive to cache as well.
   *
   * @param archive Path of the archive to be added.
   * @param conf Configuration that contains the classpath setting.
   * @param fs FileSystem with respect to which {@code archive} should be interpreted.
   */
  public static void addArchiveToClassPath(Path archive, Configuration conf, FileSystem fs) {
    String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES);
    conf.set(MRJobConfig.CLASSPATH_ARCHIVES,
        classpath == null ? archive.toString() : classpath + "," + archive.toString());
    URI uri = fs.makeQualified(archive).toUri();
    Job.addCacheArchive(uri, conf);
  }

  /**
   * Originally intended to enable symlinks, but currently symlinks cannot be
   * disabled.
   */
  @Deprecated
  public void createSymlink() {
    ensureState(JobState.DEFINE);
    DistributedCache.createSymlink(conf);
  }
  
  /** 
   * Expert: Set the number of maximum attempts that will be made to run a
   * map task.
   * 
   * @param n the number of attempts per map task.
   */
  public void setMaxMapAttempts(int n) {
    ensureState(JobState.DEFINE);
    conf.setMaxMapAttempts(n);
  }

  /** 
   * Expert: Set the number of maximum attempts that will be made to run a
   * reduce task.
   * 
   * @param n the number of attempts per reduce task.
   */
  public void setMaxReduceAttempts(int n) {
    ensureState(JobState.DEFINE);
    conf.setMaxReduceAttempts(n);
  }

  /**
   * Set whether the system should collect profiler information for some of 
   * the tasks in this job? The information is stored in the user log 
   * directory.
   * @param newValue true means it should be gathered
   */
  public void setProfileEnabled(boolean newValue) {
    ensureState(JobState.DEFINE);
    conf.setProfileEnabled(newValue);
  }

  /**
   * Set the profiler configuration arguments. If the string contains a '%s' it
   * will be replaced with the name of the profiling output file when the task
   * runs.
   *
   * This value is passed to the task child JVM on the command line.
   *
   * @param value the configuration string
   */
  public void setProfileParams(String value) {
    ensureState(JobState.DEFINE);
    conf.setProfileParams(value);
  }

  /**
   * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
   * must also be called.
   * @param newValue a set of integer ranges of the map ids
   */
  public void setProfileTaskRange(boolean isMap, String newValue) {
    ensureState(JobState.DEFINE);
    conf.setProfileTaskRange(isMap, newValue);
  }

  private void ensureNotSet(String attr, String msg) throws IOException {
    if (conf.get(attr) != null) {
      throw new IOException(attr + " is incompatible with " + msg + " mode.");
    }    
  }
  
  /**
   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
   * tokens upon job completion. Defaults to true.
   */
  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
    ensureState(JobState.DEFINE);
    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
  }

  /**
   * Default to the new APIs unless they are explicitly set or the old mapper or
   * reduce attributes are used.
   * @throws IOException if the configuration is inconsistent
   */
  private void setUseNewAPI() throws IOException {
    int numReduces = conf.getNumReduceTasks();
    String oldMapperClass = "mapred.mapper.class";
    String oldReduceClass = "mapred.reducer.class";
    conf.setBooleanIfUnset("mapred.mapper.new-api",
                           conf.get(oldMapperClass) == null);
    if (conf.getUseNewMapper()) {
      String mode = "new map API";
      ensureNotSet("mapred.input.format.class", mode);
      ensureNotSet(oldMapperClass, mode);
      if (numReduces != 0) {
        ensureNotSet("mapred.partitioner.class", mode);
       } else {
        ensureNotSet("mapred.output.format.class", mode);
      }      
    } else {
      String mode = "map compatibility";
      ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
      ensureNotSet(MAP_CLASS_ATTR, mode);
      if (numReduces != 0) {
        ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
       } else {
        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
      }
    }
    if (numReduces != 0) {
      conf.setBooleanIfUnset("mapred.reducer.new-api",
                             conf.get(oldReduceClass) == null);
      if (conf.getUseNewReducer()) {
        String mode = "new reduce API";
        ensureNotSet("mapred.output.format.class", mode);
        ensureNotSet(oldReduceClass, mode);   
      } else {
        String mode = "reduce compatibility";
        ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
        ensureNotSet(REDUCE_CLASS_ATTR, mode);   
      }
    }   
  }

  /**
   * Add a file to job config for shared cache processing. If shared cache is
   * enabled, it will return true, otherwise, return false. We don't check with
   * SCM here given application might not be able to provide the job id;
   * ClientSCMProtocol.use requires the application id. Job Submitter will read
   * the files from job config and take care of things.
   *
   * @param resource The resource that Job Submitter will process later using
   *          shared cache.
   * @param conf Configuration to add the resource to
   * @return whether the resource has been added to the configuration
   */
  @Unstable
  public static boolean addFileToSharedCache(URI resource, Configuration conf) {
    SharedCacheConfig scConfig = new SharedCacheConfig();
    scConfig.init(conf);
    if (scConfig.isSharedCacheFilesEnabled()) {
      String files = conf.get(MRJobConfig.FILES_FOR_SHARED_CACHE);
      conf.set(
          MRJobConfig.FILES_FOR_SHARED_CACHE,
          files == null ? resource.toString() : files + ","
              + resource.toString());
      return true;
    } else {
      return false;
    }
  }

  /**
   * Add a file to job config for shared cache processing. If shared cache is
   * enabled, it will return true, otherwise, return false. We don't check with
   * SCM here given application might not be able to provide the job id;
   * ClientSCMProtocol.use requires the application id. Job Submitter will read
   * the files from job config and take care of things. Job Submitter will also
   * add the file to classpath. Intended to be used by user code.
   *
   * @param resource The resource that Job Submitter will process later using
   *          shared cache.
   * @param conf Configuration to add the resource to
   * @return whether the resource has been added to the configuration
   */
  @Unstable
  public static boolean addFileToSharedCacheAndClasspath(URI resource,
      Configuration conf) {
    SharedCacheConfig scConfig = new SharedCacheConfig();
    scConfig.init(conf);
    if (scConfig.isSharedCacheLibjarsEnabled()) {
      String files =
          conf.get(MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE);
      conf.set(
          MRJobConfig.FILES_FOR_CLASSPATH_AND_SHARED_CACHE,
          files == null ? resource.toString() : files + ","
              + resource.toString());
      return true;
    } else {
      return false;
    }
  }

  /**
   * Add an archive to job config for shared cache processing. If shared cache
   * is enabled, it will return true, otherwise, return false. We don't check
   * with SCM here given application might not be able to provide the job id;
   * ClientSCMProtocol.use requires the application id. Job Submitter will read
   * the files from job config and take care of things. Intended to be used by
   * user code.
   *
   * @param resource The resource that Job Submitter will process later using
   *          shared cache.
   * @param conf Configuration to add the resource to
   * @return whether the resource has been added to the configuration
   */
  @Unstable
  public static boolean addArchiveToSharedCache(URI resource,
      Configuration conf) {
    SharedCacheConfig scConfig = new SharedCacheConfig();
    scConfig.init(conf);
    if (scConfig.isSharedCacheArchivesEnabled()) {
      String files = conf.get(MRJobConfig.ARCHIVES_FOR_SHARED_CACHE);
      conf.set(
          MRJobConfig.ARCHIVES_FOR_SHARED_CACHE,
          files == null ? resource.toString() : files + ","
              + resource.toString());
      return true;
    } else {
      return false;
    }
  }

  /**
   * This is to set the shared cache upload policies for files. If the parameter
   * was previously set, this method will replace the old value with the new
   * provided map.
   *
   * @param conf Configuration which stores the shared cache upload policies
   * @param policies A map containing the shared cache upload policies for a set
   *          of resources. The key is the url of the resource and the value is
   *          the upload policy. True if it should be uploaded, false otherwise.
   */
  @Unstable
  public static void setFileSharedCacheUploadPolicies(Configuration conf,
      Map<String, Boolean> policies) {
    setSharedCacheUploadPolicies(conf, policies, true);
  }

  /**
   * This is to set the shared cache upload policies for archives. If the
   * parameter was previously set, this method will replace the old value with
   * the new provided map.
   *
   * @param conf Configuration which stores the shared cache upload policies
   * @param policies A map containing the shared cache upload policies for a set
   *          of resources. The key is the url of the resource and the value is
   *          the upload policy. True if it should be uploaded, false otherwise.
   */
  @Unstable
  public static void setArchiveSharedCacheUploadPolicies(Configuration conf,
      Map<String, Boolean> policies) {
    setSharedCacheUploadPolicies(conf, policies, false);
  }

  // We use a double colon because a colon is a reserved character in a URI and
  // there should not be two colons next to each other.
  private static final String DELIM = "::";

  /**
   * Set the shared cache upload policies config parameter. This is done by
   * serializing the provided map of shared cache upload policies into a config
   * parameter. If the parameter was previously set, this method will replace
   * the old value with the new provided map.
   *
   * @param conf Configuration which stores the shared cache upload policies
   * @param policies A map containing the shared cache upload policies for a set
   *          of resources. The key is the url of the resource and the value is
   *          the upload policy. True if it should be uploaded, false otherwise.
   * @param areFiles True if these policies are for files, false if they are for
   *          archives.
   */
  private static void setSharedCacheUploadPolicies(Configuration conf,
      Map<String, Boolean> policies, boolean areFiles) {
    String confParam = areFiles ?
        MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
        MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
    // If no policy is provided, we will reset the config by setting an empty
    // string value. In other words, cleaning up existing policies. This is
    // useful when we try to clean up shared cache upload policies for
    // non-application master tasks. See MAPREDUCE-7294 for details.
    if (policies == null || policies.size() == 0) {
      conf.set(confParam, "");
      return;
    }
    StringBuilder sb = new StringBuilder();
    policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(","));
    sb.deleteCharAt(sb.length() - 1);
    conf.set(confParam, sb.toString());
  }

  /**
   * Deserialize a map of shared cache upload policies from a config parameter.
   *
   * @param conf Configuration which stores the shared cache upload policies
   * @param areFiles True if these policies are for files, false if they are for
   *          archives.
   * @return A map containing the shared cache upload policies for a set of
   *         resources. The key is the url of the resource and the value is the
   *         upload policy. True if it should be uploaded, false otherwise.
   */
  private static Map<String, Boolean> getSharedCacheUploadPolicies(
      Configuration conf, boolean areFiles) {
    String confParam =
        areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
            : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
    Collection<String> policies = conf.getStringCollection(confParam);
    String[] policy;
    Map<String, Boolean> policyMap = new LinkedHashMap<String, Boolean>();
    for (String s : policies) {
      policy = s.split(DELIM);
      if (policy.length != 2) {
        LOG.error(confParam
            + " is mis-formatted, returning empty shared cache upload policies."
            + " Error on [" + s + "]");
        return new LinkedHashMap<String, Boolean>();
      }
      policyMap.put(policy[0], Boolean.parseBoolean(policy[1]));
    }
    return policyMap;
  }

  /**
   * This is to get the shared cache upload policies for files.
   *
   * @param conf Configuration which stores the shared cache upload policies
   * @return A map containing the shared cache upload policies for a set of
   *         resources. The key is the url of the resource and the value is the
   *         upload policy. True if it should be uploaded, false otherwise.
   */
  @Unstable
  public static Map<String, Boolean> getFileSharedCacheUploadPolicies(
      Configuration conf) {
    return getSharedCacheUploadPolicies(conf, true);
  }

  /**
   * This is to get the shared cache upload policies for archives.
   *
   * @param conf Configuration which stores the shared cache upload policies
   * @return A map containing the shared cache upload policies for a set of
   *         resources. The key is the url of the resource and the value is the
   *         upload policy. True if it should be uploaded, false otherwise.
   */
  @Unstable
  public static Map<String, Boolean> getArchiveSharedCacheUploadPolicies(
      Configuration conf) {
    return getSharedCacheUploadPolicies(conf, false);
  }

  /** Only for mocking via unit tests. */
  @Private
  @VisibleForTesting
  synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

  boolean isConnected() {
    return cluster != null;
  }

  /** Only for mocking via unit tests. */
  @Private
  @VisibleForTesting
  JobSubmitter getJobSubmitter(FileSystem fs,
      ClientProtocol submitClient) throws IOException {
    return new JobSubmitter(fs, submitClient);
  }
  /**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
  
  /**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
  
  /**
   * Monitor a job and print status in real-time as progress is made and tasks 
   * fail.
   * @return true if the job succeeded
   * @throws IOException if communication to the JobTracker fails
   */
  public boolean monitorAndPrintJob() 
      throws IOException, InterruptedException {
    String lastReport = null;
    Job.TaskStatusFilter filter;
    Configuration clientConf = getConfiguration();
    filter = Job.getTaskOutputFilter(clientConf);
    JobID jobId = getJobID();
    LOG.info("Running job: " + jobId);
    int eventCounter = 0;
    boolean profiling = getProfileEnabled();
    IntegerRanges mapRanges = getProfileTaskRange(true);
    IntegerRanges reduceRanges = getProfileTaskRange(false);
    int progMonitorPollIntervalMillis = 
      Job.getProgressPollInterval(clientConf);
    /* make sure to report full progress after the job is done */
    boolean reportedAfterCompletion = false;
    boolean reportedUberMode = false;
    while (!isComplete() || !reportedAfterCompletion) {
      if (isComplete()) {
        reportedAfterCompletion = true;
      } else {
        Thread.sleep(progMonitorPollIntervalMillis);
      }
      if (status.getState() == JobStatus.State.PREP) {
        continue;
      }      
      if (!reportedUberMode) {
        reportedUberMode = true;
        LOG.info("Job " + jobId + " running in uber mode : " + isUber());
      }      
      String report = 
        (" map " + StringUtils.formatPercent(mapProgress(), 0)+
            " reduce " + 
            StringUtils.formatPercent(reduceProgress(), 0));
      if (!report.equals(lastReport)) {
        LOG.info(report);
        lastReport = report;
      }

      TaskCompletionEvent[] events = 
        getTaskCompletionEvents(eventCounter, 10); 
      eventCounter += events.length;
      printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
    }
    boolean success = isSuccessful();
    if (success) {
      LOG.info("Job " + jobId + " completed successfully");
    } else {
      LOG.info("Job " + jobId + " failed with state " + status.getState() + 
          " due to: " + status.getFailureInfo());
    }
    Counters counters = getCounters();
    if (counters != null) {
      LOG.info(counters.toString());
    }
    return success;
  }

  private void printTaskEvents(TaskCompletionEvent[] events,
      Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
      IntegerRanges reduceRanges) throws IOException, InterruptedException {
    for (TaskCompletionEvent event : events) {
      switch (filter) {
      case NONE:
        break;
      case SUCCEEDED:
        if (event.getStatus() == 
          TaskCompletionEvent.Status.SUCCEEDED) {
          LOG.info(event.toString());
        }
        break; 
      case FAILED:
        if (event.getStatus() == 
          TaskCompletionEvent.Status.FAILED) {
          LOG.info(event.toString());
          // Displaying the task diagnostic information
          TaskAttemptID taskId = event.getTaskAttemptId();
          String[] taskDiagnostics = getTaskDiagnostics(taskId); 
          if (taskDiagnostics != null) {
            for (String diagnostics : taskDiagnostics) {
              System.err.println(diagnostics);
            }
          }
        }
        break; 
      case KILLED:
        if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
          LOG.info(event.toString());
        }
        break; 
      case ALL:
        LOG.info(event.toString());
        break;
      }
    }
  }

  /** The interval at which monitorAndPrintJob() prints status */
  public static int getProgressPollInterval(Configuration conf) {
    // Read progress monitor poll interval from config. Default is 1 second.
    int progMonitorPollIntervalMillis = conf.getInt(
      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
    if (progMonitorPollIntervalMillis < 1) {
      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
        " has been set to an invalid value; "
        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
    }
    return progMonitorPollIntervalMillis;
  }

  /** The interval at which waitForCompletion() should check. */
  public static int getCompletionPollInterval(Configuration conf) {
    int completionPollIntervalMillis = conf.getInt(
      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
    if (completionPollIntervalMillis < 1) { 
      LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
       " has been set to an invalid value; "
       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
    }
    return completionPollIntervalMillis;
  }

  /**
   * Get the task output filter.
   * 
   * @param conf the configuration.
   * @return the filter level.
   */
  public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
    return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
  }

  /**
   * Modify the Configuration to set the task output filter.
   * 
   * @param conf the Configuration to modify.
   * @param newValue the value to set.
   */
  public static void setTaskOutputFilter(Configuration conf, 
      TaskStatusFilter newValue) {
    conf.set(Job.OUTPUT_FILTER, newValue.toString());
  }

  public boolean isUber() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    updateStatus();
    return status.isUber();
  }

  /**
   * Get the reservation to which the job is submitted to, if any
   *
   * @return the reservationId the identifier of the job's reservation, null if
   *         the job does not have any reservation associated with it
   */
  public ReservationId getReservationId() {
    return reservationId;
  }

  /**
   * Set the reservation to which the job is submitted to
   *
   * @param reservationId the reservationId to set
   */
  public void setReservationId(ReservationId reservationId) {
    this.reservationId = reservationId;
  }
  
  /**
   * Close the <code>Job</code>.
   * @throws IOException if fail to close.
   */
  @Override
  public void close() throws IOException {
    if (cluster != null) {
      cluster.close();
      cluster = null;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Cluster 源码

hadoop ClusterMetrics 源码

hadoop ContextFactory 源码

hadoop Counter 源码

hadoop CounterGroup 源码

hadoop Counters 源码

hadoop CryptoUtils 源码

hadoop CustomJobEndNotifier 源码

hadoop FileSystemCounter 源码

hadoop ID 源码

0  赞