hadoop Cluster 源码

  • 2022-10-20
  • 浏览 (184)

haddop Cluster 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Provides a way to access information about the map/reduce cluster.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Cluster {
  
  @InterfaceStability.Evolving
  public enum JobTrackerStatus {INITIALIZING, RUNNING};

  private ClientProtocolProvider clientProtocolProvider;
  private ClientProtocol client;
  private UserGroupInformation ugi;
  private Configuration conf;
  private FileSystem fs = null;
  private Path sysDir = null;
  private Path stagingAreaDir = null;
  private Path jobHistoryDir = null;
  private static final Logger LOG =
      LoggerFactory.getLogger(Cluster.class);

  @VisibleForTesting
  static Iterable<ClientProtocolProvider> frameworkLoader =
      ServiceLoader.load(ClientProtocolProvider.class);
  private volatile List<ClientProtocolProvider> providerList = null;

  private void initProviderList() {
    if (providerList == null) {
      synchronized (frameworkLoader) {
        if (providerList == null) {
          List<ClientProtocolProvider> localProviderList =
              new ArrayList<ClientProtocolProvider>();
          try {
            for (ClientProtocolProvider provider : frameworkLoader) {
              localProviderList.add(provider);
            }
          } catch(ServiceConfigurationError e) {
            LOG.info("Failed to instantiate ClientProtocolProvider, please "
                         + "check the /META-INF/services/org.apache."
                         + "hadoop.mapreduce.protocol.ClientProtocolProvider "
                         + "files on the classpath", e);
          }
          providerList = localProviderList;
        }
      }
    }
  }

  static {
    ConfigUtil.loadResources();
  }
  
  public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);
  }
  
  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    initProviderList();
    final IOException initEx = new IOException(
        "Cannot initialize Cluster. Please check your configuration for "
            + MRConfig.FRAMEWORK_NAME
            + " and the correspond server addresses.");
    if (jobTrackAddr != null) {
      LOG.info(
          "Initializing cluster for Job Tracker=" + jobTrackAddr.toString());
    }
    for (ClientProtocolProvider provider : providerList) {
      LOG.debug("Trying ClientProtocolProvider : "
          + provider.getClass().getName());
      ClientProtocol clientProtocol = null;
      try {
        if (jobTrackAddr == null) {
          clientProtocol = provider.create(conf);
        } else {
          clientProtocol = provider.create(jobTrackAddr, conf);
        }

        if (clientProtocol != null) {
          clientProtocolProvider = provider;
          client = clientProtocol;
          LOG.debug("Picked " + provider.getClass().getName()
              + " as the ClientProtocolProvider");
          break;
        } else {
          LOG.debug("Cannot pick " + provider.getClass().getName()
              + " as the ClientProtocolProvider - returned null protocol");
        }
      } catch (Exception e) {
        final String errMsg = "Failed to use " + provider.getClass().getName()
            + " due to error: ";
        initEx.addSuppressed(new IOException(errMsg, e));
        LOG.info(errMsg, e);
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw initEx;
    }
  }

  ClientProtocol getClient() {
    return client;
  }
  
  Configuration getConf() {
    return conf;
  }
  
  /**
   * Close the <code>Cluster</code>.
   * @throws IOException
   */
  public synchronized void close() throws IOException {
    clientProtocolProvider.close(client);
  }

  private Job[] getJobs(JobStatus[] stats) throws IOException {
    List<Job> jobs = new ArrayList<Job>();
    for (JobStatus stat : stats) {
      jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
    }
    return jobs.toArray(new Job[0]);
  }

  /**
   * Get the file system where job-specific files are stored
   * 
   * @return object of FileSystem
   * @throws IOException
   * @throws InterruptedException
   */
  public synchronized FileSystem getFileSystem() 
      throws IOException, InterruptedException {
    if (this.fs == null) {
      try {
        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
          public FileSystem run() throws IOException, InterruptedException {
            final Path sysDir = new Path(client.getSystemDir());
            return sysDir.getFileSystem(getConf());
          }
        });
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }
    return fs;
  }

  /**
   * Get job corresponding to jobid.
   * 
   * @param jobId
   * @return object of {@link Job}
   * @throws IOException
   * @throws InterruptedException
   */
  public Job getJob(JobID jobId) throws IOException, InterruptedException {
    JobStatus status = client.getJobStatus(jobId);
    if (status != null) {
      JobConf conf;
      try {
        conf = new JobConf(status.getJobFile());
      } catch (RuntimeException ex) {
        // If job file doesn't exist it means we can't find the job
        if (ex.getCause() instanceof FileNotFoundException) {
          return null;
        } else {
          throw ex;
        }
      }
      return Job.getInstance(this, status, conf);
    }
    return null;
  }
  
  /**
   * Get all the queues in cluster.
   * 
   * @return array of {@link QueueInfo}
   * @throws IOException
   * @throws InterruptedException
   */
  public QueueInfo[] getQueues() throws IOException, InterruptedException {
    return client.getQueues();
  }
  
  /**
   * Get queue information for the specified name.
   * 
   * @param name queuename
   * @return object of {@link QueueInfo}
   * @throws IOException
   * @throws InterruptedException
   */
  public QueueInfo getQueue(String name) 
      throws IOException, InterruptedException {
    return client.getQueue(name);
  }

  /**
   * Get log parameters for the specified jobID or taskAttemptID
   * @param jobID the job id.
   * @param taskAttemptID the task attempt id. Optional.
   * @return the LogParams
   * @throws IOException
   * @throws InterruptedException
   */
  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
      throws IOException, InterruptedException {
    return client.getLogFileParams(jobID, taskAttemptID);
  }

  /**
   * Get current cluster status.
   * 
   * @return object of {@link ClusterMetrics}
   * @throws IOException
   * @throws InterruptedException
   */
  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
    return client.getClusterMetrics();
  }
  
  /**
   * Get all active trackers in the cluster.
   * 
   * @return array of {@link TaskTrackerInfo}
   * @throws IOException
   * @throws InterruptedException
   */
  public TaskTrackerInfo[] getActiveTaskTrackers() 
      throws IOException, InterruptedException  {
    return client.getActiveTrackers();
  }
  
  /**
   * Get blacklisted trackers.
   * 
   * @return array of {@link TaskTrackerInfo}
   * @throws IOException
   * @throws InterruptedException
   */
  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
      throws IOException, InterruptedException  {
    return client.getBlacklistedTrackers();
  }
  
  /**
   * Get all the jobs in cluster.
   * 
   * @return array of {@link Job}
   * @throws IOException
   * @throws InterruptedException
   * @deprecated Use {@link #getAllJobStatuses()} instead.
   */
  @Deprecated
  public Job[] getAllJobs() throws IOException, InterruptedException {
    return getJobs(client.getAllJobs());
  }

  /**
   * Get job status for all jobs in the cluster.
   * @return job status for all jobs in cluster
   * @throws IOException
   * @throws InterruptedException
   */
  public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
    return client.getAllJobs();
  }

  /**
   * Grab the jobtracker system directory path where 
   * job-specific files will  be placed.
   * 
   * @return the system directory where job-specific files are to be placed.
   */
  public Path getSystemDir() throws IOException, InterruptedException {
    if (sysDir == null) {
      sysDir = new Path(client.getSystemDir());
    }
    return sysDir;
  }
  
  /**
   * Grab the jobtracker's view of the staging directory path where 
   * job-specific files will  be placed.
   * 
   * @return the staging directory where job-specific files are to be placed.
   */
  public Path getStagingAreaDir() throws IOException, InterruptedException {
    if (stagingAreaDir == null) {
      stagingAreaDir = new Path(client.getStagingAreaDir());
    }
    return stagingAreaDir;
  }

  /**
   * Get the job history file path for a given job id. The job history file at 
   * this path may or may not be existing depending on the job completion state.
   * The file is present only for the completed jobs.
   * @param jobId the JobID of the job submitted by the current user.
   * @return the file path of the job history file
   * @throws IOException
   * @throws InterruptedException
   */
  public String getJobHistoryUrl(JobID jobId) throws IOException, 
    InterruptedException {
    if (jobHistoryDir == null) {
      jobHistoryDir = new Path(client.getJobHistoryDir());
    }
    return new Path(jobHistoryDir, jobId.toString() + "_"
                    + ugi.getShortUserName()).toString();
  }

  /**
   * Gets the Queue ACLs for current user
   * @return array of QueueAclsInfo object for current user.
   * @throws IOException
   */
  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
      throws IOException, InterruptedException  {
    return client.getQueueAclsForCurrentUser();
  }

  /**
   * Gets the root level queues.
   * @return array of JobQueueInfo object.
   * @throws IOException
   */
  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
    return client.getRootQueues();
  }
  
  /**
   * Returns immediate children of queueName.
   * @param queueName
   * @return array of JobQueueInfo which are children of queueName
   * @throws IOException
   */
  public QueueInfo[] getChildQueues(String queueName) 
      throws IOException, InterruptedException {
    return client.getChildQueues(queueName);
  }
  
  /**
   * Get the JobTracker's status.
   * 
   * @return {@link JobTrackerStatus} of the JobTracker
   * @throws IOException
   * @throws InterruptedException
   */
  public JobTrackerStatus getJobTrackerStatus() throws IOException,
      InterruptedException {
    return client.getJobTrackerStatus();
  }
  
  /**
   * Get the tasktracker expiry interval for the cluster
   * @return the expiry interval in msec
   */
  public long getTaskTrackerExpiryInterval() throws IOException,
      InterruptedException {
    return client.getTaskTrackerExpiryInterval();
  }

  /**
   * Get a delegation token for the user from the JobTracker.
   * @param renewer the user who can renew the token
   * @return the new token
   * @throws IOException
   */
  public Token<DelegationTokenIdentifier> 
      getDelegationToken(Text renewer) throws IOException, InterruptedException{
    // client has already set the service
    return client.getDelegationToken(renewer);
  }

  /**
   * Renew a delegation token
   * @param token the token to renew
   * @return the new expiration time
   * @throws InvalidToken
   * @throws IOException
   * @deprecated Use {@link Token#renew} instead
   */
  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
                                   ) throws InvalidToken, IOException,
                                            InterruptedException {
    return token.renew(getConf());
  }

  /**
   * Cancel a delegation token from the JobTracker
   * @param token the token to cancel
   * @throws IOException
   * @deprecated Use {@link Token#cancel} instead
   */
  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                    ) throws IOException,
                                             InterruptedException {
    token.cancel(getConf());
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop ClusterMetrics 源码

hadoop ContextFactory 源码

hadoop Counter 源码

hadoop CounterGroup 源码

hadoop Counters 源码

hadoop CryptoUtils 源码

hadoop CustomJobEndNotifier 源码

hadoop FileSystemCounter 源码

hadoop ID 源码

hadoop InputFormat 源码

0  赞