hadoop TeraScheduler 源码

  • 2022-10-20
  • 浏览 (154)

haddop TeraScheduler 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraScheduler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.examples.terasort;

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;

class TeraScheduler {
  private static final Logger LOG =
      LoggerFactory.getLogger(TeraScheduler.class);
  private Split[] splits;
  private List<Host> hosts = new ArrayList<Host>();
  private int slotsPerHost;
  private int remainingSplits = 0;
  private FileSplit[] realSplits = null;

  static class Split {
    String filename;
    boolean isAssigned = false;
    List<Host> locations = new ArrayList<Host>();
    Split(String filename) {
      this.filename = filename;
    }
    public String toString() {
      StringBuffer result = new StringBuffer();
      result.append(filename);
      result.append(" on ");
      for(Host host: locations) {
        result.append(host.hostname);
        result.append(", ");
      }
      return result.toString();
    }
  }
  static class Host {
    String hostname;
    List<Split> splits = new ArrayList<Split>();
    Host(String hostname) {
      this.hostname = hostname;
    }
    public String toString() {
      StringBuffer result = new StringBuffer();
      result.append(splits.size());
      result.append(" ");
      result.append(hostname);
      return result.toString();
    }
  }

  List<String> readFile(String filename) throws IOException {
    List<String> result = new ArrayList<String>(10000);
    try (BufferedReader in = new BufferedReader(
        new InputStreamReader(new FileInputStream(filename), Charsets.UTF_8))) {
      String line = in.readLine();
      while (line != null) {
        result.add(line);
        line = in.readLine();
      }
    }
    return result;
  }

  public TeraScheduler(String splitFilename, 
                       String nodeFilename) throws IOException {
    slotsPerHost = 4;
    // get the hosts
    Map<String, Host> hostIds = new HashMap<String,Host>();
    for(String hostName: readFile(nodeFilename)) {
      Host host = new Host(hostName);
      hosts.add(host);
      hostIds.put(hostName, host);
    }
    // read the blocks
    List<String> splitLines = readFile(splitFilename);
    splits = new Split[splitLines.size()];
    remainingSplits = 0;
    for(String line: splitLines) {
      StringTokenizer itr = new StringTokenizer(line);
      Split newSplit = new Split(itr.nextToken());
      splits[remainingSplits++] = newSplit;
      while (itr.hasMoreTokens()) {
        Host host = hostIds.get(itr.nextToken());
        newSplit.locations.add(host);
        host.splits.add(newSplit);
      }
    }
  }

  public TeraScheduler(FileSplit[] realSplits,
                       Configuration conf) throws IOException {
    this.realSplits = realSplits;
    this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
    Map<String, Host> hostTable = new HashMap<String, Host>();
    splits = new Split[realSplits.length];
    for(FileSplit realSplit: realSplits) {
      Split split = new Split(realSplit.getPath().toString());
      splits[remainingSplits++] = split;
      for(String hostname: realSplit.getLocations()) {
        Host host = hostTable.get(hostname);
        if (host == null) {
          host = new Host(hostname);
          hostTable.put(hostname, host);
          hosts.add(host);
        }
        host.splits.add(split);
        split.locations.add(host);
      }
    }
  }

  Host pickBestHost() {
    Host result = null;
    int splits = Integer.MAX_VALUE;
    for(Host host: hosts) {
      if (host.splits.size() < splits) {
        result = host;
        splits = host.splits.size();
      }
    }
    if (result != null) {
      hosts.remove(result);
      LOG.debug("picking " + result);
    }
    return result;
  }

  void pickBestSplits(Host host) {
    int tasksToPick = Math.min(slotsPerHost, 
                               (int) Math.ceil((double) remainingSplits / 
                                               hosts.size()));
    Split[] best = new Split[tasksToPick];
    for(Split cur: host.splits) {
      LOG.debug("  examine: " + cur.filename + " " + cur.locations.size());
      int i = 0;
      while (i < tasksToPick && best[i] != null && 
             best[i].locations.size() <= cur.locations.size()) {
        i += 1;
      }
      if (i < tasksToPick) {
        for(int j = tasksToPick - 1; j > i; --j) {
          best[j] = best[j-1];
        }
        best[i] = cur;
      }
    }
    // for the chosen blocks, remove them from the other locations
    for(int i=0; i < tasksToPick; ++i) {
      if (best[i] != null) {
        LOG.debug(" best: " + best[i].filename);
        for (Host other: best[i].locations) {
          other.splits.remove(best[i]);
        }
        best[i].locations.clear();
        best[i].locations.add(host);
        best[i].isAssigned = true;
        remainingSplits -= 1;
      }
    }
    // for the non-chosen blocks, remove this host
    for(Split cur: host.splits) {
      if (!cur.isAssigned) {
        cur.locations.remove(host);
      }
    }
  }
  
  void solve() throws IOException {
    Host host = pickBestHost();
    while (host != null) {
      pickBestSplits(host);
      host = pickBestHost();
    }
  }

  /**
   * Solve the schedule and modify the FileSplit array to reflect the new
   * schedule. It will move placed splits to front and unplacable splits
   * to the end.
   * @return a new list of FileSplits that are modified to have the
   *    best host as the only host.
   * @throws IOException
   */
  public List<InputSplit> getNewFileSplits() throws IOException {
    solve();
    FileSplit[] result = new FileSplit[realSplits.length];
    int left = 0;
    int right = realSplits.length - 1;
    for(int i=0; i < splits.length; ++i) {
      if (splits[i].isAssigned) {
        // copy the split and fix up the locations
        String[] newLocations = {splits[i].locations.get(0).hostname};
        realSplits[i] = new FileSplit(realSplits[i].getPath(),
            realSplits[i].getStart(), realSplits[i].getLength(), newLocations);
        result[left++] = realSplits[i];
      } else {
        result[right--] = realSplits[i];
      }
    }
    List<InputSplit> ret = new ArrayList<InputSplit>();
    for (FileSplit fs : result) {
      ret.add(fs);
    }
    return ret;
  }

  public static void main(String[] args) throws IOException {
    TeraScheduler problem = new TeraScheduler("block-loc.txt", "nodes");
    for(Host host: problem.hosts) {
      System.out.println(host);
    }
    LOG.info("starting solve");
    problem.solve();
    List<Split> leftOvers = new ArrayList<Split>();
    for(int i=0; i < problem.splits.length; ++i) {
      if (problem.splits[i].isAssigned) {
        System.out.println("sched: " + problem.splits[i]);        
      } else {
        leftOvers.add(problem.splits[i]);
      }
    }
    for(Split cur: leftOvers) {
      System.out.println("left: " + cur);
    }
    System.out.println("left over: " + leftOvers.size());
    LOG.info("done");
  }

}

相关信息

hadoop 源码目录

相关文章

hadoop GenSort 源码

hadoop Random16 源码

hadoop TeraChecksum 源码

hadoop TeraGen 源码

hadoop TeraInputFormat 源码

hadoop TeraOutputFormat 源码

hadoop TeraSort 源码

hadoop TeraSortConfigKeys 源码

hadoop TeraValidate 源码

hadoop Unsigned16 源码

0  赞