hadoop LocalDirAllocator 源码

  • 2022-10-20
  • 浏览 (431)

haddop LocalDirAllocator 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs;

import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** An implementation of a round-robin scheme for disk allocation for creating
 * files. The way it works is that it is kept track what disk was last
 * allocated for a file write. For the current request, the next disk from
 * the set of disks would be allocated if the free space on the disk is 
 * sufficient enough to accommodate the file that is being considered for
 * creation. If the space requirements cannot be met, the next disk in order
 * would be tried and so on till a disk is found with sufficient capacity.
 * Once a disk with sufficient space is identified, a check is done to make
 * sure that the disk is writable. Also, there is an API provided that doesn't
 * take the space requirements into consideration but just checks whether the
 * disk under consideration is writable (this should be used for cases where
 * the file size is not known apriori). An API is provided to read a path that
 * was created earlier. That API works by doing a scan of all the disks for the
 * input pathname.
 * This implementation also provides the functionality of having multiple 
 * allocators per JVM (one for each unique functionality or context, like 
 * mapred, dfs-client, etc.). It ensures that there is only one instance of
 * an allocator per context per JVM.
 * Note:
 * 1. The contexts referred above are actually the configuration items defined
 * in the Configuration class like "mapred.local.dir" (for which we want to 
 * control the dir allocations). The context-strings are exactly those 
 * configuration items.
 * 2. This implementation does not take into consideration cases where
 * a disk becomes read-only or goes out of space while a file is being written
 * to (disks are shared between multiple processes, and so the latter situation
 * is probable).
 * 3. In the class implementation, "Disk" is referred to as "Dir", which
 * actually points to the configured directory on the Disk which will be the
 * parent for all file write/read allocations.
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
public class LocalDirAllocator {
  
  //A Map from the config item names like "mapred.local.dir"
  //to the instance of the AllocatorPerContext. This
  //is a static object to make sure there exists exactly one instance per JVM
  private static Map <String, AllocatorPerContext> contexts = 
                 new TreeMap<String, AllocatorPerContext>();
  private String contextCfgItemName;

  /** Used when size of file to be allocated is unknown. */
  public static final int SIZE_UNKNOWN = -1;

  private final DiskValidator diskValidator;

  /**
   * Create an allocator object.
   * @param contextCfgItemName contextCfgItemName.
   */
  public LocalDirAllocator(String contextCfgItemName) {
    this.contextCfgItemName = contextCfgItemName;
    try {
      this.diskValidator = DiskValidatorFactory.getInstance(
              BasicDiskValidator.NAME);
    } catch (DiskErrorException e) {
      throw new RuntimeException(e);
    }
  }

  public LocalDirAllocator(String contextCfgItemName,
          DiskValidator diskValidator) {
    this.contextCfgItemName = contextCfgItemName;
    this.diskValidator = diskValidator;
  }
  
  /** This method must be used to obtain the dir allocation context for a 
   * particular value of the context name. The context name must be an item
   * defined in the Configuration object for which we want to control the 
   * dir allocations (e.g., <code>mapred.local.dir</code>). The method will
   * create a context for that name if it doesn't already exist.
   */
  private AllocatorPerContext obtainContext(String contextCfgItemName) {
    synchronized (contexts) {
      AllocatorPerContext l = contexts.get(contextCfgItemName);
      if (l == null) {
        contexts.put(contextCfgItemName, 
                    (l = new AllocatorPerContext(contextCfgItemName,
                            diskValidator)));
      }
      return l;
    }
  }
  
  /** Get a path from the local FS. This method should be used if the size of 
   *  the file is not known apriori. We go round-robin over the set of disks
   *  (via the configured dirs) and return the first complete path where
   *  we could create the parent directory of the passed path. 
   *  @param pathStr the requested path (this will be created on the first 
   *  available disk)
   *  @param conf the Configuration object
   *  @return the complete path to the file on a local disk
   *  @throws IOException raised on errors performing I/O.
   */
  public Path getLocalPathForWrite(String pathStr, 
      Configuration conf) throws IOException {
    return getLocalPathForWrite(pathStr, SIZE_UNKNOWN, conf);
  }
  
  /** Get a path from the local FS. Pass size as 
   *  SIZE_UNKNOWN if not known apriori. We
   *  round-robin over the set of disks (via the configured dirs) and return
   *  the first complete path which has enough space 
   *  @param pathStr the requested path (this will be created on the first 
   *  available disk)
   *  @param size the size of the file that is going to be written
   *  @param conf the Configuration object
   *  @return the complete path to the file on a local disk
   *  @throws IOException raised on errors performing I/O.
   */
  public Path getLocalPathForWrite(String pathStr, long size, 
      Configuration conf) throws IOException {
    return getLocalPathForWrite(pathStr, size, conf, true);
  }
  
  /** Get a path from the local FS. Pass size as 
   *  SIZE_UNKNOWN if not known apriori. We
   *  round-robin over the set of disks (via the configured dirs) and return
   *  the first complete path which has enough space 
   *  @param pathStr the requested path (this will be created on the first 
   *  available disk)
   *  @param size the size of the file that is going to be written
   *  @param conf the Configuration object
   *  @param checkWrite ensure that the path is writable
   *  @return the complete path to the file on a local disk
   *  @throws IOException raised on errors performing I/O.
   */
  public Path getLocalPathForWrite(String pathStr, long size, 
                                   Configuration conf,
                                   boolean checkWrite) throws IOException {
    AllocatorPerContext context = obtainContext(contextCfgItemName);
    return context.getLocalPathForWrite(pathStr, size, conf, checkWrite);
  }
  
  /** Get a path from the local FS for reading. We search through all the
   *  configured dirs for the file's existence and return the complete
   *  path to the file when we find one 
   *  @param pathStr the requested file (this will be searched)
   *  @param conf the Configuration object
   *  @return the complete path to the file on a local disk
   *  @throws IOException raised on errors performing I/O.
   */
  public Path getLocalPathToRead(String pathStr, 
      Configuration conf) throws IOException {
    AllocatorPerContext context = obtainContext(contextCfgItemName);
    return context.getLocalPathToRead(pathStr, conf);
  }
  
  /**
   * Get all of the paths that currently exist in the working directories.
   * @param pathStr the path underneath the roots
   * @param conf the configuration to look up the roots in
   * @return all of the paths that exist under any of the roots
   * @throws IOException raised on errors performing I/O.
   */
  public Iterable<Path> getAllLocalPathsToRead(String pathStr, 
                                               Configuration conf
                                               ) throws IOException {
    AllocatorPerContext context;
    synchronized (this) {
      context = obtainContext(contextCfgItemName);
    }
    return context.getAllLocalPathsToRead(pathStr, conf);    
  }

  /** Creates a temporary file in the local FS. Pass size as -1 if not known 
   *  apriori. We round-robin over the set of disks (via the configured dirs) 
   *  and select the first complete path which has enough space. A file is
   *  created on this directory. The file is guaranteed to go away when the
   *  JVM exits.
   *  @param pathStr prefix for the temporary file
   *  @param size the size of the file that is going to be written
   *  @param conf the Configuration object
   *  @return a unique temporary file
   *  @throws IOException raised on errors performing I/O.
   */
  public File createTmpFileForWrite(String pathStr, long size, 
      Configuration conf) throws IOException {
    AllocatorPerContext context = obtainContext(contextCfgItemName);
    return context.createTmpFileForWrite(pathStr, size, conf);
  }
  
  /**
   * Method to check whether a context is valid.
   * @param contextCfgItemName contextCfgItemName.
   * @return true/false
   */
  public static boolean isContextValid(String contextCfgItemName) {
    synchronized (contexts) {
      return contexts.containsKey(contextCfgItemName);
    }
  }
  
  /**
   * Removes the context from the context config items.
   * 
   * @param contextCfgItemName contextCfgItemName.
   */
  @Deprecated
  @InterfaceAudience.LimitedPrivate({"MapReduce"})
  public static void removeContext(String contextCfgItemName) {
    synchronized (contexts) {
      contexts.remove(contextCfgItemName);
    }
  }
    
  /**
   *  We search through all the configured dirs for the file's existence
   *  and return true when we find.
   *  @param pathStr the requested file (this will be searched)
   *  @param conf the Configuration object
   *  @return true if files exist. false otherwise
   */
  public boolean ifExists(String pathStr, Configuration conf) {
    AllocatorPerContext context = obtainContext(contextCfgItemName);
    return context.ifExists(pathStr, conf);
  }

  /**
   * Get the current directory index for the given configuration item.
   * @return the current directory index for the given configuration item.
   */
  int getCurrentDirectoryIndex() {
    AllocatorPerContext context = obtainContext(contextCfgItemName);
    return context.getCurrentDirectoryIndex();
  }
  
  private static class AllocatorPerContext {

    private static final Logger LOG =
        LoggerFactory.getLogger(AllocatorPerContext.class);

    private Random dirIndexRandomizer = new Random();
    private String contextCfgItemName;

    // NOTE: the context must be accessed via a local reference as it
    //       may be updated at any time to reference a different context
    private AtomicReference<Context> currentContext;
    private final DiskValidator diskValidator;

    private static class Context {
      private AtomicInteger dirNumLastAccessed = new AtomicInteger(0);
      private FileSystem localFS;
      private DF[] dirDF;
      private Path[] localDirs;
      private String savedLocalDirs;

      public int getAndIncrDirNumLastAccessed() {
        return getAndIncrDirNumLastAccessed(1);
      }

      public int getAndIncrDirNumLastAccessed(int delta) {
        if (localDirs.length < 2 || delta == 0) {
          return dirNumLastAccessed.get();
        }
        int oldval, newval;
        do {
          oldval = dirNumLastAccessed.get();
          newval = (oldval + delta) % localDirs.length;
        } while (!dirNumLastAccessed.compareAndSet(oldval, newval));
        return oldval;
      }
    }

    public AllocatorPerContext(String contextCfgItemName,
            DiskValidator diskValidator) {
      this.contextCfgItemName = contextCfgItemName;
      this.currentContext = new AtomicReference<Context>(new Context());
      this.diskValidator = diskValidator;
    }

    /** This method gets called everytime before any read/write to make sure
     * that any change to localDirs is reflected immediately.
     */
    private Context confChanged(Configuration conf)
        throws IOException {
      Context ctx = currentContext.get();
      String newLocalDirs = conf.get(contextCfgItemName);
      if (null == newLocalDirs) {
        throw new IOException(contextCfgItemName + " not configured");
      }
      if (!newLocalDirs.equals(ctx.savedLocalDirs)) {
        ctx = new Context();
        String[] dirStrings = StringUtils.getTrimmedStrings(newLocalDirs);
        ctx.localFS = FileSystem.getLocal(conf);
        int numDirs = dirStrings.length;
        ArrayList<Path> dirs = new ArrayList<Path>(numDirs);
        ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
        for (int i = 0; i < numDirs; i++) {
          try {
            // filter problematic directories
            Path tmpDir = new Path(dirStrings[i]);
            if(ctx.localFS.mkdirs(tmpDir)|| ctx.localFS.exists(tmpDir)) {
              try {
                File tmpFile = tmpDir.isAbsolute()
                    ? new File(ctx.localFS.makeQualified(tmpDir).toUri())
                    : new File(dirStrings[i]);

                diskValidator.checkStatus(tmpFile);
                dirs.add(new Path(tmpFile.getPath()));
                dfList.add(new DF(tmpFile, 30000));
              } catch (DiskErrorException de) {
                LOG.warn(dirStrings[i] + " is not writable\n", de);
              }
            } else {
              LOG.warn("Failed to create " + dirStrings[i]);
            }
          } catch (IOException ie) { 
            LOG.warn("Failed to create " + dirStrings[i] + ": " +
                ie.getMessage() + "\n", ie);
          } //ignore
        }
        ctx.localDirs = dirs.toArray(new Path[dirs.size()]);
        ctx.dirDF = dfList.toArray(new DF[dirs.size()]);
        ctx.savedLocalDirs = newLocalDirs;

        if (dirs.size() > 0) {
          // randomize the first disk picked in the round-robin selection
          ctx.dirNumLastAccessed.set(dirIndexRandomizer.nextInt(dirs.size()));
        }

        currentContext.set(ctx);
      }

      return ctx;
    }

    private Path createPath(Path dir, String path,
        boolean checkWrite) throws IOException {
      Path file = new Path(dir, path);
      if (checkWrite) {
        //check whether we are able to create a directory here. If the disk
        //happens to be RDONLY we will fail
        try {
          diskValidator.checkStatus(new File(file.getParent().toUri().getPath()));
          return file;
        } catch (DiskErrorException d) {
          LOG.warn("Disk Error Exception: ", d);
          return null;
        }
      }
      return file;
    }

    /**
     * Get the current directory index.
     * @return the current directory index.
     */
    int getCurrentDirectoryIndex() {
      return currentContext.get().dirNumLastAccessed.get();
    }

    /** Get a path from the local FS. If size is known, we go
     *  round-robin over the set of disks (via the configured dirs) and return
     *  the first complete path which has enough space.
     *  
     *  If size is not known, use roulette selection -- pick directories
     *  with probability proportional to their available space.
     */
    public Path getLocalPathForWrite(String pathStr, long size,
        Configuration conf, boolean checkWrite) throws IOException {
      Context ctx = confChanged(conf);
      int numDirs = ctx.localDirs.length;
      int numDirsSearched = 0;
      // Max capacity in any directory
      long maxCapacity = 0;
      String errorText = null;
      IOException diskException = null;
      //remove the leading slash from the path (to make sure that the uri
      //resolution results in a valid path on the dir being checked)
      if (pathStr.startsWith("/")) {
        pathStr = pathStr.substring(1);
      }
      Path returnPath = null;
      
      if(size == SIZE_UNKNOWN) {  //do roulette selection: pick dir with probability 
                    //proportional to available size
        long[] availableOnDisk = new long[ctx.dirDF.length];
        long totalAvailable = 0;
        
            //build the "roulette wheel"
        for(int i =0; i < ctx.dirDF.length; ++i) {
          availableOnDisk[i] = ctx.dirDF[i].getAvailable();
          totalAvailable += availableOnDisk[i];
        }

        if (totalAvailable == 0){
          throw new DiskErrorException("No space available in any of the local directories.");
        }

        // Keep rolling the wheel till we get a valid path
        Random r = new java.util.Random();
        while (numDirsSearched < numDirs && returnPath == null) {
          long randomPosition = (r.nextLong() >>> 1) % totalAvailable;
          int dir = 0;
          while (randomPosition > availableOnDisk[dir]) {
            randomPosition -= availableOnDisk[dir];
            dir++;
          }
          ctx.dirNumLastAccessed.set(dir);
          returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite);
          if (returnPath == null) {
            totalAvailable -= availableOnDisk[dir];
            availableOnDisk[dir] = 0; // skip this disk
            numDirsSearched++;
          }
        }
      } else {
        // Start linear search with random increment if possible
        int randomInc = 1;
        if (numDirs > 2) {
          randomInc += dirIndexRandomizer.nextInt(numDirs - 1);
        }
        int dirNum = ctx.getAndIncrDirNumLastAccessed(randomInc);
        while (numDirsSearched < numDirs) {
          long capacity = ctx.dirDF[dirNum].getAvailable();
          if (capacity > maxCapacity) {
            maxCapacity = capacity;
          }
          if (capacity > size) {
            try {
              returnPath = createPath(ctx.localDirs[dirNum], pathStr,
                  checkWrite);
            } catch (IOException e) {
              errorText = e.getMessage();
              diskException = e;
              LOG.debug("DiskException caught for dir {}", ctx.localDirs[dirNum], e);
            }
            if (returnPath != null) {
              ctx.getAndIncrDirNumLastAccessed(numDirsSearched);
              break;
            }
          }
          dirNum++;
          dirNum = dirNum % numDirs;
          numDirsSearched++;
        }
      }
      if (returnPath != null) {
        return returnPath;
      }
      
      //no path found
      String newErrorText = "Could not find any valid local directory for " +
          pathStr + " with requested size " + size +
          " as the max capacity in any directory is " + maxCapacity;
      if (errorText != null) {
        newErrorText = newErrorText + " due to " + errorText;
      }
      throw new DiskErrorException(newErrorText, diskException);
    }

    /** Creates a file on the local FS. Pass size as 
     * {@link LocalDirAllocator.SIZE_UNKNOWN} if not known apriori. We
     *  round-robin over the set of disks (via the configured dirs) and return
     *  a file on the first path which has enough space. The file is guaranteed
     *  to go away when the JVM exits.
     */
    public File createTmpFileForWrite(String pathStr, long size, 
        Configuration conf) throws IOException {

      // find an appropriate directory
      Path path = getLocalPathForWrite(pathStr, size, conf, true);
      File dir = new File(path.getParent().toUri().getPath());
      String prefix = path.getName();

      // create a temp file on this directory
      File result = File.createTempFile(prefix, null, dir);
      result.deleteOnExit();
      return result;
    }

    /** Get a path from the local FS for reading. We search through all the
     *  configured dirs for the file's existence and return the complete
     *  path to the file when we find one 
     */
    public Path getLocalPathToRead(String pathStr,
        Configuration conf) throws IOException {
      Context ctx = confChanged(conf);
      int numDirs = ctx.localDirs.length;
      int numDirsSearched = 0;
      //remove the leading slash from the path (to make sure that the uri
      //resolution results in a valid path on the dir being checked)
      if (pathStr.startsWith("/")) {
        pathStr = pathStr.substring(1);
      }
      while (numDirsSearched < numDirs) {
        Path file = new Path(ctx.localDirs[numDirsSearched], pathStr);
        if (ctx.localFS.exists(file)) {
          return file;
        }
        numDirsSearched++;
      }

      //no path found
      throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
      " the configured local directories");
    }

    private static class PathIterator implements Iterator<Path>, Iterable<Path> {
      private final FileSystem fs;
      private final String pathStr;
      private int i = 0;
      private final Path[] rootDirs;
      private Path next = null;

      private PathIterator(FileSystem fs, String pathStr, Path[] rootDirs)
          throws IOException {
        this.fs = fs;
        this.pathStr = pathStr;
        this.rootDirs = rootDirs;
        advance();
      }

      @Override
      public boolean hasNext() {
        return next != null;
      }

      private void advance() throws IOException {
        while (i < rootDirs.length) {
          next = new Path(rootDirs[i++], pathStr);
          if (fs.exists(next)) {
            return;
          }
        }
        next = null;
      }

      @Override
      public Path next() {
        final Path result = next;
        try {
          advance();
        } catch (IOException ie) {
          throw new RuntimeException("Can't check existence of " + next, ie);
        }
        if (result == null) {
          throw new NoSuchElementException();
        }
        return result;
      }

      @Override
      public void remove() {
        throw new UnsupportedOperationException("read only iterator");
      }

      @Override
      public Iterator<Path> iterator() {
        return this;
      }
    }

    /**
     * Get all of the paths that currently exist in the working directories.
     * @param pathStr the path underneath the roots
     * @param conf the configuration to look up the roots in
     * @return all of the paths that exist under any of the roots
     * @throws IOException
     */
    Iterable<Path> getAllLocalPathsToRead(String pathStr,
        Configuration conf) throws IOException {
      Context ctx = confChanged(conf);
      if (pathStr.startsWith("/")) {
        pathStr = pathStr.substring(1);
      }
      return new PathIterator(ctx.localFS, pathStr, ctx.localDirs);
    }

    /** We search through all the configured dirs for the file's existence
     *  and return true when we find one 
     */
    public boolean ifExists(String pathStr, Configuration conf) {
      Context ctx = currentContext.get();
      try {
        int numDirs = ctx.localDirs.length;
        int numDirsSearched = 0;
        //remove the leading slash from the path (to make sure that the uri
        //resolution results in a valid path on the dir being checked)
        if (pathStr.startsWith("/")) {
          pathStr = pathStr.substring(1);
        }
        while (numDirsSearched < numDirs) {
          Path file = new Path(ctx.localDirs[numDirsSearched], pathStr);
          if (ctx.localFS.exists(file)) {
            return true;
          }
          numDirsSearched++;
        }
      } catch (IOException e) {
        // IGNORE and try again
      }
      return false;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop Abortable 源码

hadoop AbstractFileSystem 源码

hadoop AvroFSInput 源码

hadoop BBPartHandle 源码

hadoop BBUploadHandle 源码

hadoop BatchListingOperations 源码

hadoop BatchedRemoteIterator 源码

hadoop BlockLocation 源码

hadoop BlockStoragePolicySpi 源码

hadoop BufferedFSInputStream 源码

0  赞