hadoop LocalJobRunner 源码

  • 2022-10-20
  • 浏览 (130)

haddop LocalJobRunner 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.crypto.KeyGenerator;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LocalJobRunner implements ClientProtocol {
  public static final Logger LOG =
      LoggerFactory.getLogger(LocalJobRunner.class);

  /** The maximum number of map tasks to run in parallel in LocalJobRunner */
  public static final String LOCAL_MAX_MAPS =
    "mapreduce.local.map.tasks.maximum";

  /** The maximum number of reduce tasks to run in parallel in LocalJobRunner */
  public static final String LOCAL_MAX_REDUCES =
    "mapreduce.local.reduce.tasks.maximum";

  public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1";

  private FileSystem fs;
  private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
  private JobConf conf;
  private AtomicInteger map_tasks = new AtomicInteger(0);
  private AtomicInteger reduce_tasks = new AtomicInteger(0);
  final Random rand = new Random();
  
  private LocalJobRunnerMetrics myMetrics = null;

  private static final String jobDir =  "localRunner/";

  public long getProtocolVersion(String protocol, long clientVersion) {
    return ClientProtocol.versionID;
  }

  @Override
  public ProtocolSignature getProtocolSignature(String protocol,
      long clientVersion, int clientMethodsHash) throws IOException {
    return ProtocolSignature.getProtocolSignature(
        this, protocol, clientVersion, clientMethodsHash);
  }

  private class Job extends Thread implements TaskUmbilicalProtocol {
    // The job directory on the system: JobClient places job configurations here.
    // This is analogous to JobTracker's system directory.
    private Path systemJobDir;
    private Path systemJobFile;
    
    // The job directory for the task.  Analagous to a task's job directory.
    private Path localJobDir;
    private Path localJobFile;

    private JobID id;
    private JobConf job;

    private int numMapTasks;
    private int numReduceTasks;
    private float [] partialMapProgress;
    private float [] partialReduceProgress;
    private Counters [] mapCounters;
    private Counters [] reduceCounters;

    private JobStatus status;
    private List<TaskAttemptID> mapIds = Collections.synchronizedList(
        new ArrayList<TaskAttemptID>());

    private JobProfile profile;
    private FileSystem localFs;
    boolean killed = false;
    
    private LocalDistributedCacheManager localDistributedCacheManager;

    public long getProtocolVersion(String protocol, long clientVersion) {
      return TaskUmbilicalProtocol.versionID;
    }
    
    @Override
    public ProtocolSignature getProtocolSignature(String protocol,
        long clientVersion, int clientMethodsHash) throws IOException {
      return ProtocolSignature.getProtocolSignature(
          this, protocol, clientVersion, clientMethodsHash);
    }

    public Job(JobID jobid, String jobSubmitDir) throws IOException {
      this.systemJobDir = new Path(jobSubmitDir);
      this.systemJobFile = new Path(systemJobDir, "job.xml");
      this.id = jobid;
      JobConf conf = new JobConf(systemJobFile);
      this.localFs = FileSystem.getLocal(conf);
      String user = UserGroupInformation.getCurrentUser().getShortUserName();
      this.localJobDir = localFs.makeQualified(new Path(
          new Path(conf.getLocalPath(jobDir), user), jobid.toString()));
      this.localJobFile = new Path(this.localJobDir, id + ".xml");

      // Manage the distributed cache.  If there are files to be copied,
      // this will trigger localFile to be re-written again.
      localDistributedCacheManager = new LocalDistributedCacheManager();
      localDistributedCacheManager.setup(conf, jobid);
      
      // Write out configuration file.  Instead of copying it from
      // systemJobFile, we re-write it, since setup(), above, may have
      // updated it.
      OutputStream out = localFs.create(localJobFile);
      try {
        conf.writeXml(out);
      } finally {
        out.close();
      }
      this.job = new JobConf(localJobFile);

      // Job (the current object) is a Thread, so we wrap its class loader.
      if (localDistributedCacheManager.hasLocalClasspaths()) {
        setContextClassLoader(localDistributedCacheManager.makeClassLoader(
                getContextClassLoader()));
      }
      
      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
                               "http://localhost:8080/", job.getJobName());
      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, 
          profile.getUser(), profile.getJobName(), profile.getJobFile(), 
          profile.getURL().toString());

      jobs.put(id, this);

      if (CryptoUtils.isEncryptedSpillEnabled(job)) {
        try {
          int keyLen = conf.getInt(
              MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
              MRJobConfig
                  .DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS);
          KeyGenerator keyGen =
              KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO);
          keyGen.init(keyLen);
          Credentials creds =
              UserGroupInformation.getCurrentUser().getCredentials();
          TokenCache.setEncryptedSpillKey(keyGen.generateKey().getEncoded(),
              creds);
          UserGroupInformation.getCurrentUser().addCredentials(creds);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating encrypted spill key", e);
        }
      }

      this.start();
    }

    protected abstract class RunnableWithThrowable implements Runnable {
      public volatile Throwable storedException;
    }

    /**
     * A Runnable instance that handles a map task to be run by an executor.
     */
    protected class MapTaskRunnable extends RunnableWithThrowable {
      private final int taskId;
      private final TaskSplitMetaInfo info;
      private final JobID jobId;
      private final JobConf localConf;

      // This is a reference to a shared object passed in by the
      // external context; this delivers state to the reducers regarding
      // where to fetch mapper outputs.
      private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;

      public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
          Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
        this.info = info;
        this.taskId = taskId;
        this.mapOutputFiles = mapOutputFiles;
        this.jobId = jobId;
        this.localConf = new JobConf(job);
      }

      public void run() {
        try {
          TaskAttemptID mapId = new TaskAttemptID(new TaskID(
              jobId, TaskType.MAP, taskId), 0);
          LOG.info("Starting task: " + mapId);
          mapIds.add(mapId);
          MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
            info.getSplitIndex(), 1);
          map.setUser(UserGroupInformation.getCurrentUser().
              getShortUserName());
          setupChildMapredLocalDirs(map, localConf);

          MapOutputFile mapOutput = new MROutputFiles();
          mapOutput.setConf(localConf);
          mapOutputFiles.put(mapId, mapOutput);

          map.setJobFile(localJobFile.toString());
          localConf.setUser(map.getUser());
          map.localizeConfiguration(localConf);
          map.setConf(localConf);
          try {
            map_tasks.getAndIncrement();
            myMetrics.launchMap(mapId);
            map.run(localConf, Job.this);
            myMetrics.completeMap(mapId);
          } finally {
            map_tasks.getAndDecrement();
          }

          LOG.info("Finishing task: " + mapId);
        } catch (Throwable e) {
          this.storedException = e;
        }
      }
    }

    /**
     * Create Runnables to encapsulate map tasks for use by the executor
     * service.
     * @param taskInfo Info about the map task splits
     * @param jobId the job id
     * @param mapOutputFiles a mapping from task attempts to output files
     * @return a List of Runnables, one per map task.
     */
    protected List<RunnableWithThrowable> getMapTaskRunnables(
        TaskSplitMetaInfo [] taskInfo, JobID jobId,
        Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {

      int numTasks = 0;
      ArrayList<RunnableWithThrowable> list =
          new ArrayList<RunnableWithThrowable>();
      for (TaskSplitMetaInfo task : taskInfo) {
        list.add(new MapTaskRunnable(task, numTasks++, jobId,
            mapOutputFiles));
      }

      return list;
    }

    protected class ReduceTaskRunnable extends RunnableWithThrowable {
      private final int taskId;
      private final JobID jobId;
      private final JobConf localConf;

      // This is a reference to a shared object passed in by the
      // external context; this delivers state to the reducers regarding
      // where to fetch mapper outputs.
      private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;

      public ReduceTaskRunnable(int taskId, JobID jobId,
          Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
        this.taskId = taskId;
        this.jobId = jobId;
        this.mapOutputFiles = mapOutputFiles;
        this.localConf = new JobConf(job);
        this.localConf.set("mapreduce.jobtracker.address", "local");
      }

      public void run() {
        try {
          TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
              jobId, TaskType.REDUCE, taskId), 0);
          LOG.info("Starting task: " + reduceId);

          ReduceTask reduce = new ReduceTask(systemJobFile.toString(),
              reduceId, taskId, mapIds.size(), 1);
          reduce.setUser(UserGroupInformation.getCurrentUser().
              getShortUserName());
          setupChildMapredLocalDirs(reduce, localConf);
          reduce.setLocalMapFiles(mapOutputFiles);

          if (!Job.this.isInterrupted()) {
            reduce.setJobFile(localJobFile.toString());
            localConf.setUser(reduce.getUser());
            reduce.localizeConfiguration(localConf);
            reduce.setConf(localConf);
            try {
              reduce_tasks.getAndIncrement();
              myMetrics.launchReduce(reduce.getTaskID());
              reduce.run(localConf, Job.this);
              myMetrics.completeReduce(reduce.getTaskID());
            } finally {
              reduce_tasks.getAndDecrement();
            }

            LOG.info("Finishing task: " + reduceId);
          } else {
            throw new InterruptedException();
          }
        } catch (Throwable t) {
          // store this to be rethrown in the initial thread context.
          this.storedException = t;
        }
      }
    }

    /**
     * Create Runnables to encapsulate reduce tasks for use by the executor
     * service.
     * @param jobId the job id
     * @param mapOutputFiles a mapping from task attempts to output files
     * @return a List of Runnables, one per reduce task.
     */
    protected List<RunnableWithThrowable> getReduceTaskRunnables(
        JobID jobId, Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {

      int taskId = 0;
      ArrayList<RunnableWithThrowable> list =
          new ArrayList<RunnableWithThrowable>();
      for (int i = 0; i < this.numReduceTasks; i++) {
        list.add(new ReduceTaskRunnable(taskId++, jobId, mapOutputFiles));
      }

      return list;
    }

    /**
     * Initialize the counters that will hold partial-progress from
     * the various task attempts.
     * @param numMaps the number of map tasks in this job.
     */
    private synchronized void initCounters(int numMaps, int numReduces) {
      // Initialize state trackers for all map tasks.
      this.partialMapProgress = new float[numMaps];
      this.mapCounters = new Counters[numMaps];
      for (int i = 0; i < numMaps; i++) {
        this.mapCounters[i] = new Counters();
      }

      this.partialReduceProgress = new float[numReduces];
      this.reduceCounters = new Counters[numReduces];
      for (int i = 0; i < numReduces; i++) {
        this.reduceCounters[i] = new Counters();
      }

      this.numMapTasks = numMaps;
      this.numReduceTasks = numReduces;
    }

    /**
     * Creates the executor service used to run map tasks.
     *
     * @return an ExecutorService instance that handles map tasks
     */
    protected synchronized ExecutorService createMapExecutor() {

      // Determine the size of the thread pool to use
      int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
      if (maxMapThreads < 1) {
        throw new IllegalArgumentException(
            "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
      }
      maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
      maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.

      LOG.debug("Starting mapper thread pool executor.");
      LOG.debug("Max local threads: " + maxMapThreads);
      LOG.debug("Map tasks to process: " + this.numMapTasks);

      // Create a new executor service to drain the work queue.
      ThreadFactory tf = new ThreadFactoryBuilder()
        .setNameFormat("LocalJobRunner Map Task Executor #%d")
        .build();
      ExecutorService executor = HadoopExecutors.newFixedThreadPool(
          maxMapThreads, tf);

      return executor;
    }
    
    /**
     * Creates the executor service used to run reduce tasks.
     *
     * @return an ExecutorService instance that handles reduce tasks
     */
    protected synchronized ExecutorService createReduceExecutor() {

      // Determine the size of the thread pool to use
      int maxReduceThreads = job.getInt(LOCAL_MAX_REDUCES, 1);
      if (maxReduceThreads < 1) {
        throw new IllegalArgumentException(
            "Configured " + LOCAL_MAX_REDUCES + " must be >= 1");
      }
      maxReduceThreads = Math.min(maxReduceThreads, this.numReduceTasks);
      maxReduceThreads = Math.max(maxReduceThreads, 1); // In case of no tasks.

      LOG.debug("Starting reduce thread pool executor.");
      LOG.debug("Max local threads: " + maxReduceThreads);
      LOG.debug("Reduce tasks to process: " + this.numReduceTasks);

      // Create a new executor service to drain the work queue.
      ExecutorService executor = HadoopExecutors.newFixedThreadPool(
          maxReduceThreads);

      return executor;
    }

    /** Run a set of tasks and waits for them to complete. */
    private void runTasks(List<RunnableWithThrowable> runnables,
        ExecutorService service, String taskType) throws Exception {
      // Start populating the executor with work units.
      // They may begin running immediately (in other threads).
      for (Runnable r : runnables) {
        service.submit(r);
      }

      try {
        service.shutdown(); // Instructs queue to drain.

        // Wait for tasks to finish; do not use a time-based timeout.
        // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
        LOG.info("Waiting for " + taskType + " tasks");
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
      } catch (InterruptedException ie) {
        // Cancel all threads.
        service.shutdownNow();
        throw ie;
      }

      LOG.info(taskType + " task executor complete.");

      // After waiting for the tasks to complete, if any of these
      // have thrown an exception, rethrow it now in the main thread context.
      for (RunnableWithThrowable r : runnables) {
        if (r.storedException != null) {
          throw new Exception(r.storedException);
        }
      }
    }

    private org.apache.hadoop.mapreduce.OutputCommitter 
    createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
      org.apache.hadoop.mapreduce.OutputCommitter committer = null;

      LOG.info("OutputCommitter set in config "
          + conf.get("mapred.output.committer.class"));

      if (newApiCommitter) {
        org.apache.hadoop.mapreduce.TaskID taskId =
            new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
        org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
            new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = 
            new TaskAttemptContextImpl(conf, taskAttemptID);
        OutputFormat outputFormat =
          ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
        committer = outputFormat.getOutputCommitter(taskContext);
      } else {
        committer = ReflectionUtils.newInstance(conf.getClass(
            "mapred.output.committer.class", FileOutputCommitter.class,
            org.apache.hadoop.mapred.OutputCommitter.class), conf);
      }
      LOG.info("OutputCommitter is " + committer.getClass().getName());
      return committer;
    }

    @Override
    public void run() {
      JobID jobId = profile.getJobID();
      JobContext jContext = new JobContextImpl(job, jobId);
      
      org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
      try {
        outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
      } catch (Exception e) {
        LOG.info("Failed to createOutputCommitter", e);
        return;
      }
      
      try {
        TaskSplitMetaInfo[] taskSplitMetaInfos = 
          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

        int numReduceTasks = job.getNumReduceTasks();
        outputCommitter.setupJob(jContext);
        status.setSetupProgress(1.0f);

        Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
            Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
        
        List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
            taskSplitMetaInfos, jobId, mapOutputFiles);
              
        initCounters(mapRunnables.size(), numReduceTasks);
        ExecutorService mapService = createMapExecutor();
        runTasks(mapRunnables, mapService, "map");

        try {
          if (numReduceTasks > 0) {
            List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
                jobId, mapOutputFiles);
            ExecutorService reduceService = createReduceExecutor();
            runTasks(reduceRunnables, reduceService, "reduce");
          }
        } finally {
          for (MapOutputFile output : mapOutputFiles.values()) {
            output.removeAll();
          }
        }
        // delete the temporary directory in output directory
        outputCommitter.commitJob(jContext);
        status.setCleanupProgress(1.0f);

        if (killed) {
          this.status.setRunState(JobStatus.KILLED);
        } else {
          this.status.setRunState(JobStatus.SUCCEEDED);
        }

        JobEndNotifier.localRunnerNotification(job, status);
      } catch (Throwable t) {
        try {
          outputCommitter.abortJob(jContext, 
            org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
        } catch (IOException ioe) {
          LOG.info("Error cleaning up job:" + id);
        }
        status.setCleanupProgress(1.0f);
        if (killed) {
          this.status.setRunState(JobStatus.KILLED);
        } else {
          this.status.setRunState(JobStatus.FAILED);
        }
        LOG.warn(id.toString(), t);

        JobEndNotifier.localRunnerNotification(job, status);

      } finally {
        try {
          try {
            // Cleanup distributed cache
            localDistributedCacheManager.close();
          } finally {
            try {
              fs.delete(systemJobFile.getParent(), true); // delete submit dir
            } finally {
              localFs.delete(localJobFile, true);         // delete local copy
            }
          }
        } catch (IOException e) {
          LOG.warn("Error cleaning up "+id+": "+e);
        }
      }
    }

    // TaskUmbilicalProtocol methods

    @Override
    public JvmTask getTask(JvmContext context) { return null; }
    
    @Override
    public synchronized AMFeedback statusUpdate(TaskAttemptID taskId,
        TaskStatus taskStatus) throws IOException, InterruptedException {
      AMFeedback feedback = new AMFeedback();
      feedback.setTaskFound(true);
      if (null == taskStatus) {
        return feedback;
      }
      // Serialize as we would if distributed in order to make deep copy
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      DataOutputStream dos = new DataOutputStream(baos);
      taskStatus.write(dos);
      dos.close();
      taskStatus = TaskStatus.createTaskStatus(taskStatus.getIsMap());
      taskStatus.readFields(new DataInputStream(
          new ByteArrayInputStream(baos.toByteArray())));
      
      LOG.info(taskStatus.getStateString());
      int mapTaskIndex = mapIds.indexOf(taskId);
      if (mapTaskIndex >= 0) {
        // mapping
        float numTasks = (float) this.numMapTasks;

        partialMapProgress[mapTaskIndex] = taskStatus.getProgress();
        mapCounters[mapTaskIndex] = taskStatus.getCounters();

        float partialProgress = 0.0f;
        for (float f : partialMapProgress) {
          partialProgress += f;
        }
        status.setMapProgress(partialProgress / numTasks);
      } else {
        // reducing
        int reduceTaskIndex = taskId.getTaskID().getId();
        float numTasks = (float) this.numReduceTasks;

        partialReduceProgress[reduceTaskIndex] = taskStatus.getProgress();
        reduceCounters[reduceTaskIndex] = taskStatus.getCounters();

        float partialProgress = 0.0f;
        for (float f : partialReduceProgress) {
          partialProgress += f;
        }
        status.setReduceProgress(partialProgress / numTasks);
      }

      // ignore phase
      return feedback;
    }

    /** Return the current values of the counters for this job,
     * including tasks that are in progress.
     */
    public synchronized Counters getCurrentCounters() {
      if (null == mapCounters) {
        // Counters not yet initialized for job.
        return new Counters();
      }

      Counters current = new Counters();
      for (Counters c : mapCounters) {
        current = Counters.sum(current, c);
      }

      if (null != reduceCounters && reduceCounters.length > 0) {
        for (Counters c : reduceCounters) {
          current = Counters.sum(current, c);
        }
      }

      return current;
    }

    /**
     * Task is reporting that it is in commit_pending
     * and it is waiting for the commit Response
     */
    public void commitPending(TaskAttemptID taskid,
                              TaskStatus taskStatus) 
    throws IOException, InterruptedException {
      statusUpdate(taskid, taskStatus);
    }

    @Override
    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
      // Ignore for now
    }
    
    @Override
    public void reportNextRecordRange(TaskAttemptID taskid, 
        SortedRanges.Range range) throws IOException {
      LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
    }

    @Override
    public boolean canCommit(TaskAttemptID taskid) 
    throws IOException {
      return true;
    }
    
    @Override
    public void done(TaskAttemptID taskId) throws IOException {
      int taskIndex = mapIds.indexOf(taskId);
      if (taskIndex >= 0) {                       // mapping
        status.setMapProgress(1.0f);
      } else {
        status.setReduceProgress(1.0f);
      }
    }

    @Override
    public synchronized void fsError(TaskAttemptID taskId, String message) 
    throws IOException {
      LOG.error("FSError: "+ message + "from task: " + taskId);
    }

    @Override
    public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
      LOG.error("shuffleError: "+ message + "from task: " + taskId);
    }
    
    public synchronized void fatalError(TaskAttemptID taskId, String msg, boolean fastFail)
    throws IOException {
      LOG.error("Fatal: "+ msg + " from task: " + taskId + " fast fail: " + fastFail);
    }
    
    @Override
    public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
        int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
      return new MapTaskCompletionEventsUpdate(
        org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
    }

    @Override
    public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
        throws IOException, InterruptedException {
      // ignore
    }

    @Override
    public TaskCheckpointID getCheckpointID(TaskID taskId) {
      // ignore
      return null;
    }

    @Override
    public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
      // ignore
    }

  }

  public LocalJobRunner(Configuration conf) throws IOException {
    this(new JobConf(conf));
  }

  @Deprecated
  public LocalJobRunner(JobConf conf) throws IOException {
    this.fs = FileSystem.getLocal(conf);
    this.conf = conf;
    myMetrics = LocalJobRunnerMetrics.create();
  }

  // JobSubmissionProtocol methods

  private static int jobid = 0;
  // used for making sure that local jobs run in different jvms don't
  // collide on staging or job directories
  private int randid;
  
  public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() {
    return new org.apache.hadoop.mapreduce.JobID("local" + randid, ++jobid);
  }

  public org.apache.hadoop.mapreduce.JobStatus submitJob(
      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
      Credentials credentials) throws IOException {
    Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
    job.job.setCredentials(credentials);
    return job.status;

  }

  public void killJob(org.apache.hadoop.mapreduce.JobID id) {
    jobs.get(JobID.downgrade(id)).killed = true;
    jobs.get(JobID.downgrade(id)).interrupt();
  }

  public void setJobPriority(org.apache.hadoop.mapreduce.JobID id,
      String jp) throws IOException {
    throw new UnsupportedOperationException("Changing job priority " +
                      "in LocalJobRunner is not supported.");
  }
  
  /** Throws {@link UnsupportedOperationException} */
  public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId,
      boolean shouldFail) throws IOException {
    throw new UnsupportedOperationException("Killing tasks in " +
    "LocalJobRunner is not supported");
  }

  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
      org.apache.hadoop.mapreduce.JobID id, TaskType type) {
    return new org.apache.hadoop.mapreduce.TaskReport[0];
  }

  public org.apache.hadoop.mapreduce.JobStatus getJobStatus(
      org.apache.hadoop.mapreduce.JobID id) {
    Job job = jobs.get(JobID.downgrade(id));
    if(job != null)
      return job.status;
    else 
      return null;
  }
  
  public org.apache.hadoop.mapreduce.Counters getJobCounters(
      org.apache.hadoop.mapreduce.JobID id) {
    Job job = jobs.get(JobID.downgrade(id));

    return new org.apache.hadoop.mapreduce.Counters(job.getCurrentCounters());
  }

  public String getFilesystemName() throws IOException {
    return fs.getUri().toString();
  }
  
  public ClusterMetrics getClusterMetrics() {
    int numMapTasks = map_tasks.get();
    int numReduceTasks = reduce_tasks.get();
    return new ClusterMetrics(numMapTasks, numReduceTasks, numMapTasks,
        numReduceTasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
  }

  public JobTrackerStatus getJobTrackerStatus() {
    return JobTrackerStatus.RUNNING;
  }

  public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
    return 0;
  }

  /** 
   * Get all active trackers in cluster. 
   * @return array of TaskTrackerInfo
   */
  public TaskTrackerInfo[] getActiveTrackers() 
      throws IOException, InterruptedException {
    return new TaskTrackerInfo[0];
  }

  /** 
   * Get all blacklisted trackers in cluster. 
   * @return array of TaskTrackerInfo
   */
  public TaskTrackerInfo[] getBlacklistedTrackers() 
      throws IOException, InterruptedException {
    return new TaskTrackerInfo[0];
  }

  public TaskCompletionEvent[] getTaskCompletionEvents(
      org.apache.hadoop.mapreduce.JobID jobid
      , int fromEventId, int maxEvents) throws IOException {
    return TaskCompletionEvent.EMPTY_ARRAY;
  }
  
  public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;}

  
  /**
   * Returns the diagnostic information for a particular task in the given job.
   * To be implemented
   */
  public String[] getTaskDiagnostics(
      org.apache.hadoop.mapreduce.TaskAttemptID taskid) throws IOException{
	  return new String [0];
  }

  /**
   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
   */
  public String getSystemDir() {
    Path sysDir = new Path(
      conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));  
    return fs.makeQualified(sysDir).toString();
  }

  /**
   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
   */
  public AccessControlList getQueueAdmins(String queueName) throws IOException {
	  return new AccessControlList(" ");// no queue admins for local job runner
  }

  /**
   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
   */
  public String getStagingAreaDir() throws IOException {
    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
        "/tmp/hadoop/mapred/staging"));
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    String user;
    randid = rand.nextInt(Integer.MAX_VALUE);
    if (ugi != null) {
      user = ugi.getShortUserName() + randid;
    } else {
      user = "dummy" + randid;
    }
    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
  }
  
  public String getJobHistoryDir() {
    return null;
  }

  @Override
  public QueueInfo[] getChildQueues(String queueName) throws IOException {
    return null;
  }

  @Override
  public QueueInfo[] getRootQueues() throws IOException {
    return null;
  }

  @Override
  public QueueInfo[] getQueues() throws IOException {
    return null;
  }


  @Override
  public QueueInfo getQueue(String queue) throws IOException {
    return null;
  }

  @Override
  public org.apache.hadoop.mapreduce.QueueAclsInfo[] 
      getQueueAclsForCurrentUser() throws IOException{
    return null;
  }

  /**
   * Set the max number of map tasks to run concurrently in the LocalJobRunner.
   * @param job the job to configure
   * @param maxMaps the maximum number of map tasks to allow.
   */
  public static void setLocalMaxRunningMaps(
      org.apache.hadoop.mapreduce.JobContext job,
      int maxMaps) {
    job.getConfiguration().setInt(LOCAL_MAX_MAPS, maxMaps);
  }

  /**
   * @return the max number of map tasks to run concurrently in the
   * LocalJobRunner.
   */
  public static int getLocalMaxRunningMaps(
      org.apache.hadoop.mapreduce.JobContext job) {
    return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
  }


  /**
   * Set the max number of reduce tasks to run concurrently in the LocalJobRunner.
   * @param job the job to configure
   * @param maxReduces the maximum number of reduce tasks to allow.
   */
  public static void setLocalMaxRunningReduces(
      org.apache.hadoop.mapreduce.JobContext job,
      int maxReduces) {
    job.getConfiguration().setInt(LOCAL_MAX_REDUCES, maxReduces);
  }

  /**
   * @return the max number of reduce tasks to run concurrently in the
   * LocalJobRunner.
   */
  public static int getLocalMaxRunningReduces(
      org.apache.hadoop.mapreduce.JobContext job) {
    return job.getConfiguration().getInt(LOCAL_MAX_REDUCES, 1);
  }

  @Override
  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                       ) throws IOException,
                                                InterruptedException {
  }

  @Override
  public Token<DelegationTokenIdentifier> 
     getDelegationToken(Text renewer) throws IOException, InterruptedException {
    return null;
  }

  @Override
  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
                                      ) throws IOException,InterruptedException{
    return 0;
  }

  @Override
  public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
      throws IOException, InterruptedException {
    throw new UnsupportedOperationException("Not supported");
  }
  
  static void setupChildMapredLocalDirs(Task t, JobConf conf) {
    String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
    String jobId = t.getJobID().toString();
    String taskId = t.getTaskID().toString();
    boolean isCleanup = t.isTaskCleanupTask();
    String user = t.getUser();
    StringBuffer childMapredLocalDir =
        new StringBuffer(localDirs[0] + Path.SEPARATOR
            + getLocalTaskDir(user, jobId, taskId, isCleanup));
    for (int i = 1; i < localDirs.length; i++) {
      childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
          + getLocalTaskDir(user, jobId, taskId, isCleanup));
    }
    LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
    conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
  }
  
  static final String TASK_CLEANUP_SUFFIX = ".cleanup";
  static final String JOBCACHE = "jobcache";
  
  static String getLocalTaskDir(String user, String jobid, String taskid,
      boolean isCleanupAttempt) {
    String taskDir = jobDir + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
      + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
    if (isCleanupAttempt) {
      taskDir = taskDir + TASK_CLEANUP_SUFFIX;
    }
    return taskDir;
  }
  
  
}

相关信息

hadoop 源码目录

相关文章

hadoop LocalClientProtocolProvider 源码

hadoop LocalDistributedCacheManager 源码

hadoop LocalJobRunnerMetrics 源码

0  赞