hadoop TFile 源码

  • 2022-10-20
  • 浏览 (370)

haddop TFile 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/TFile.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.hadoop.io.file.tfile;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader;
import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender;
import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
import org.apache.hadoop.io.file.tfile.Utils.Version;
import org.apache.hadoop.io.serializer.JavaSerializationComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A TFile is a container of key-value pairs. Both keys and values are type-less
 * bytes. Keys are restricted to 64KB, value length is not restricted
 * (practically limited to the available disk storage). TFile further provides
 * the following features:
 * <ul>
 * <li>Block Compression.
 * <li>Named meta data blocks.
 * <li>Sorted or unsorted keys.
 * <li>Seek by key or by file offset.
 * </ul>
 * The memory footprint of a TFile includes the following:
 * <ul>
 * <li>Some constant overhead of reading or writing a compressed block.
 * <ul>
 * <li>Each compressed block requires one compression/decompression codec for
 * I/O.
 * <li>Temporary space to buffer the key.
 * <li>Temporary space to buffer the value (for TFile.Writer only). Values are
 * chunk encoded, so that we buffer at most one chunk of user data. By default,
 * the chunk buffer is 1MB. Reading chunked value does not require additional
 * memory.
 * </ul>
 * <li>TFile index, which is proportional to the total number of Data Blocks.
 * The total amount of memory needed to hold the index can be estimated as
 * (56+AvgKeySize)*NumBlocks.
 * <li>MetaBlock index, which is proportional to the total number of Meta
 * Blocks.The total amount of memory needed to hold the index for Meta Blocks
 * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock.
 * </ul>
 * <p>
 * The behavior of TFile can be customized by the following variables through
 * Configuration:
 * <ul>
 * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default
 * to 1MB. Values of the length less than the chunk size is guaranteed to have
 * known value length in read time (See
 * {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}).
 * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for
 * FSDataOutputStream. Integer (in bytes). Default to 256KB.
 * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for
 * FSDataInputStream. Integer (in bytes). Default to 256KB.
 * </ul>
 * <p>
 * Suggestions on performance optimization.
 * <ul>
 * <li>Minimum block size. We recommend a setting of minimum block size between
 * 256KB to 1MB for general usage. Larger block size is preferred if files are
 * primarily for sequential access. However, it would lead to inefficient random
 * access (because there are more data to decompress). Smaller blocks are good
 * for random access, but require more memory to hold the block index, and may
 * be slower to create (because we must flush the compressor stream at the
 * conclusion of each data block, which leads to an FS I/O flush). Further, due
 * to the internal caching in Compression codec, the smallest possible block
 * size would be around 20KB-30KB.
 * <li>The current implementation does not offer true multi-threading for
 * reading. The implementation uses FSDataInputStream seek()+read(), which is
 * shown to be much faster than positioned-read call in single thread mode.
 * However, it also means that if multiple threads attempt to access the same
 * TFile (using multiple scanners) simultaneously, the actual I/O is carried out
 * sequentially even if they access different DFS blocks.
 * <li>Compression codec. Use "none" if the data is not very compressable (by
 * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
 * as the starting point for experimenting. "gz" overs slightly better
 * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
 * decompress, comparing to "lzo".
 * <li>File system buffering, if the underlying FSDataInputStream and
 * FSDataOutputStream is already adequately buffered; or if applications
 * reads/writes keys and values in large buffers, we can reduce the sizes of
 * input/output buffering in TFile layer by setting the configuration parameters
 * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size".
 * </ul>
 * 
 * Some design rationale behind TFile can be found at <a
 * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TFile {
  static final Logger LOG = LoggerFactory.getLogger(TFile.class);

  private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size";
  private static final String FS_INPUT_BUF_SIZE_ATTR =
      "tfile.fs.input.buffer.size";
  private static final String FS_OUTPUT_BUF_SIZE_ATTR =
      "tfile.fs.output.buffer.size";

  static int getChunkBufferSize(Configuration conf) {
    int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024);
    return (ret > 0) ? ret : 1024 * 1024;
  }

  static int getFSInputBufferSize(Configuration conf) {
    return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
  }

  static int getFSOutputBufferSize(Configuration conf) {
    return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
  }

  private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB
  static final Version API_VERSION = new Version((short) 1, (short) 0);

  /** compression: gzip */
  public static final String COMPRESSION_GZ = "gz";
  /** compression: lzo */
  public static final String COMPRESSION_LZO = "lzo";
  /** compression: none */
  public static final String COMPRESSION_NONE = "none";
  /** comparator: memcmp */
  public static final String COMPARATOR_MEMCMP = "memcmp";
  /** comparator prefix: java class */
  public static final String COMPARATOR_JCLASS = "jclass:";

  /**
   * Make a raw comparator from a string name.
   * 
   * @param name
   *          Comparator name
   * @return A RawComparable comparator.
   */
  static public Comparator<RawComparable> makeComparator(String name) {
    return TFileMeta.makeComparator(name);
  }

  // Prevent the instantiation of TFiles
  private TFile() {
    // nothing
  }

  /**
   * Get names of supported compression algorithms. The names are acceptable by
   * TFile.Writer.
   * 
   * @return Array of strings, each represents a supported compression
   *         algorithm. Currently, the following compression algorithms are
   *         supported.
   *         <ul>
   *         <li>"none" - No compression.
   *         <li>"lzo" - LZO compression.
   *         <li>"gz" - GZIP compression.
   *         </ul>
   */
  public static String[] getSupportedCompressionAlgorithms() {
    return Compression.getSupportedAlgorithms();
  }

  /**
   * TFile Writer.
   */
  @InterfaceStability.Evolving
  public static class Writer implements Closeable {
    // minimum compressed size for a block.
    private final int sizeMinBlock;

    // Meta blocks.
    final TFileIndex tfileIndex;
    final TFileMeta tfileMeta;

    // reference to the underlying BCFile.
    private BCFile.Writer writerBCF;

    // current data block appender.
    BlockAppender blkAppender;
    long blkRecordCount;

    // buffers for caching the key.
    BoundedByteArrayOutputStream currentKeyBufferOS;
    BoundedByteArrayOutputStream lastKeyBufferOS;

    // buffer used by chunk codec
    private byte[] valueBuffer;

    /**
     * Writer states. The state always transits in circles: READY -> IN_KEY ->
     * END_KEY -> IN_VALUE -> READY.
     */
    private enum State {
      READY, // Ready to start a new key-value pair insertion.
      IN_KEY, // In the middle of key insertion.
      END_KEY, // Key insertion complete, ready to insert value.
      IN_VALUE, // In value insertion.
      // ERROR, // Error encountered, cannot continue.
      CLOSED, // TFile already closed.
    };

    // current state of Writer.
    State state = State.READY;
    Configuration conf;
    long errorCount = 0;

    /**
     * Constructor
     * 
     * @param fsdos
     *          output stream for writing. Must be at position 0.
     * @param minBlockSize
     *          Minimum compressed block size in bytes. A compression block will
     *          not be closed until it reaches this size except for the last
     *          block.
     * @param compressName
     *          Name of the compression algorithm. Must be one of the strings
     *          returned by {@link TFile#getSupportedCompressionAlgorithms()}.
     * @param comparator
     *          Leave comparator as null or empty string if TFile is not sorted.
     *          Otherwise, provide the string name for the comparison algorithm
     *          for keys. Two kinds of comparators are supported.
     *          <ul>
     *          <li>Algorithmic comparator: binary comparators that is language
     *          independent. Currently, only "memcmp" is supported.
     *          <li>Language-specific comparator: binary comparators that can
     *          only be constructed in specific language. For Java, the syntax
     *          is "jclass:", followed by the class name of the RawComparator.
     *          Currently, we only support RawComparators that can be
     *          constructed through the default constructor (with no
     *          parameters). Parameterized RawComparators such as
     *          {@link WritableComparator} or
     *          {@link JavaSerializationComparator} may not be directly used.
     *          One should write a wrapper class that inherits from such classes
     *          and use its default constructor to perform proper
     *          initialization.
     *          </ul>
     * @param conf
     *          The configuration object.
     * @throws IOException raised on errors performing I/O.
     */
    public Writer(FSDataOutputStream fsdos, int minBlockSize,
        String compressName, String comparator, Configuration conf)
        throws IOException {
      sizeMinBlock = minBlockSize;
      tfileMeta = new TFileMeta(comparator);
      tfileIndex = new TFileIndex(tfileMeta.getComparator());

      writerBCF = new BCFile.Writer(fsdos, compressName, conf);
      currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
      lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
      this.conf = conf;
    }

    /**
     * Close the Writer. Resources will be released regardless of the exceptions
     * being thrown. Future close calls will have no effect.
     * 
     * The underlying FSDataOutputStream is not closed.
     */
    @Override
    public void close() throws IOException {
      if ((state == State.CLOSED)) {
        return;
      }
      try {
        // First try the normal finish.
        // Terminate upon the first Exception.
        if (errorCount == 0) {
          if (state != State.READY) {
            throw new IllegalStateException(
                "Cannot close TFile in the middle of key-value insertion.");
          }

          finishDataBlock(true);

          // first, write out data:TFile.meta
          BlockAppender outMeta =
              writerBCF
                  .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE);
          try {
            tfileMeta.write(outMeta);
          } finally {
            outMeta.close();
          }

          // second, write out data:TFile.index
          BlockAppender outIndex =
              writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME);
          try {
            tfileIndex.write(outIndex);
          } finally {
            outIndex.close();
          }

          writerBCF.close();
        }
      } finally {
        IOUtils.cleanupWithLogger(LOG, blkAppender, writerBCF);
        blkAppender = null;
        writerBCF = null;
        state = State.CLOSED;
      }
    }

    /**
     * Adding a new key-value pair to the TFile. This is synonymous to
     * append(key, 0, key.length, value, 0, value.length)
     * 
     * @param key
     *          Buffer for key.
     * @param value
     *          Buffer for value.
     * @throws IOException raised on errors performing I/O.
     */
    public void append(byte[] key, byte[] value) throws IOException {
      append(key, 0, key.length, value, 0, value.length);
    }

    /**
     * Adding a new key-value pair to TFile.
     * 
     * @param key
     *          buffer for key.
     * @param koff
     *          offset in key buffer.
     * @param klen
     *          length of key.
     * @param value
     *          buffer for value.
     * @param voff
     *          offset in value buffer.
     * @param vlen
     *          length of value.
     * @throws IOException
     *           Upon IO errors.
     *           <p>
     *           If an exception is thrown, the TFile will be in an inconsistent
     *           state. The only legitimate call after that would be close
     */
    public void append(byte[] key, int koff, int klen, byte[] value, int voff,
        int vlen) throws IOException {
      if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) {
        throw new IndexOutOfBoundsException(
            "Bad key buffer offset-length combination.");
      }

      if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) {
        throw new IndexOutOfBoundsException(
            "Bad value buffer offset-length combination.");
      }

      try {
        DataOutputStream dosKey = prepareAppendKey(klen);
        try {
          ++errorCount;
          dosKey.write(key, koff, klen);
          --errorCount;
        } finally {
          dosKey.close();
        }

        DataOutputStream dosValue = prepareAppendValue(vlen);
        try {
          ++errorCount;
          dosValue.write(value, voff, vlen);
          --errorCount;
        } finally {
          dosValue.close();
        }
      } finally {
        state = State.READY;
      }
    }

    /**
     * Helper class to register key after close call on key append stream.
     */
    private class KeyRegister extends DataOutputStream {
      private final int expectedLength;
      private boolean closed = false;

      public KeyRegister(int len) {
        super(currentKeyBufferOS);
        if (len >= 0) {
          currentKeyBufferOS.reset(len);
        } else {
          currentKeyBufferOS.reset();
        }
        expectedLength = len;
      }

      @Override
      public void close() throws IOException {
        if (closed == true) {
          return;
        }

        try {
          ++errorCount;
          byte[] key = currentKeyBufferOS.getBuffer();
          int len = currentKeyBufferOS.size();
          /**
           * verify length.
           */
          if (expectedLength >= 0 && expectedLength != len) {
            throw new IOException("Incorrect key length: expected="
                + expectedLength + " actual=" + len);
          }

          Utils.writeVInt(blkAppender, len);
          blkAppender.write(key, 0, len);
          if (tfileIndex.getFirstKey() == null) {
            tfileIndex.setFirstKey(key, 0, len);
          }

          if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
            byte[] lastKey = lastKeyBufferOS.getBuffer();
            int lastLen = lastKeyBufferOS.size();
            if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
                lastLen) < 0) {
              throw new IOException("Keys are not added in sorted order");
            }
          }

          BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
          currentKeyBufferOS = lastKeyBufferOS;
          lastKeyBufferOS = tmp;
          --errorCount;
        } finally {
          closed = true;
          state = State.END_KEY;
        }
      }
    }

    /**
     * Helper class to register value after close call on value append stream.
     */
    private class ValueRegister extends DataOutputStream {
      private boolean closed = false;

      public ValueRegister(OutputStream os) {
        super(os);
      }

      // Avoiding flushing call to down stream.
      @Override
      public void flush() {
        // do nothing
      }

      @Override
      public void close() throws IOException {
        if (closed == true) {
          return;
        }

        try {
          ++errorCount;
          super.close();
          blkRecordCount++;
          // bump up the total record count in the whole file
          tfileMeta.incRecordCount();
          finishDataBlock(false);
          --errorCount;
        } finally {
          closed = true;
          state = State.READY;
        }
      }
    }

    /**
     * Obtain an output stream for writing a key into TFile. This may only be
     * called when there is no active Key appending stream or value appending
     * stream.
     * 
     * @param length
     *          The expected length of the key. If length of the key is not
     *          known, set length = -1. Otherwise, the application must write
     *          exactly as many bytes as specified here before calling close on
     *          the returned output stream.
     * @return The key appending output stream.
     * @throws IOException raised on errors performing I/O.
     * 
     */
    public DataOutputStream prepareAppendKey(int length) throws IOException {
      if (state != State.READY) {
        throw new IllegalStateException("Incorrect state to start a new key: "
            + state.name());
      }

      initDataBlock();
      DataOutputStream ret = new KeyRegister(length);
      state = State.IN_KEY;
      return ret;
    }

    /**
     * Obtain an output stream for writing a value into TFile. This may only be
     * called right after a key appending operation (the key append stream must
     * be closed).
     * 
     * @param length
     *          The expected length of the value. If length of the value is not
     *          known, set length = -1. Otherwise, the application must write
     *          exactly as many bytes as specified here before calling close on
     *          the returned output stream. Advertising the value size up-front
     *          guarantees that the value is encoded in one chunk, and avoids
     *          intermediate chunk buffering.
     * @throws IOException raised on errors performing I/O.
     * @return DataOutputStream.
     */
    public DataOutputStream prepareAppendValue(int length) throws IOException {
      if (state != State.END_KEY) {
        throw new IllegalStateException(
            "Incorrect state to start a new value: " + state.name());
      }

      DataOutputStream ret;

      // unknown length
      if (length < 0) {
        if (valueBuffer == null) {
          valueBuffer = new byte[getChunkBufferSize(conf)];
        }
        ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer));
      } else {
        ret =
            new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length));
      }

      state = State.IN_VALUE;
      return ret;
    }

    /**
     * Obtain an output stream for creating a meta block. This function may not
     * be called when there is a key append stream or value append stream
     * active. No more key-value insertion is allowed after a meta data block
     * has been added to TFile.
     * 
     * @param name
     *          Name of the meta block.
     * @param compressName
     *          Name of the compression algorithm to be used. Must be one of the
     *          strings returned by
     *          {@link TFile#getSupportedCompressionAlgorithms()}.
     * @return A DataOutputStream that can be used to write Meta Block data.
     *         Closing the stream would signal the ending of the block.
     * @throws IOException raised on errors performing I/O.
     * @throws MetaBlockAlreadyExists
     *           the Meta Block with the same name already exists.
     */
    public DataOutputStream prepareMetaBlock(String name, String compressName)
        throws IOException, MetaBlockAlreadyExists {
      if (state != State.READY) {
        throw new IllegalStateException(
            "Incorrect state to start a Meta Block: " + state.name());
      }

      finishDataBlock(true);
      DataOutputStream outputStream =
          writerBCF.prepareMetaBlock(name, compressName);
      return outputStream;
    }

    /**
     * Obtain an output stream for creating a meta block. This function may not
     * be called when there is a key append stream or value append stream
     * active. No more key-value insertion is allowed after a meta data block
     * has been added to TFile. Data will be compressed using the default
     * compressor as defined in Writer's constructor.
     * 
     * @param name
     *          Name of the meta block.
     * @return A DataOutputStream that can be used to write Meta Block data.
     *         Closing the stream would signal the ending of the block.
     * @throws IOException raised on errors performing I/O.
     * @throws MetaBlockAlreadyExists
     *           the Meta Block with the same name already exists.
     */
    public DataOutputStream prepareMetaBlock(String name) throws IOException,
        MetaBlockAlreadyExists {
      if (state != State.READY) {
        throw new IllegalStateException(
            "Incorrect state to start a Meta Block: " + state.name());
      }

      finishDataBlock(true);
      return writerBCF.prepareMetaBlock(name);
    }

    /**
     * Check if we need to start a new data block.
     * 
     * @throws IOException
     */
    private void initDataBlock() throws IOException {
      // for each new block, get a new appender
      if (blkAppender == null) {
        blkAppender = writerBCF.prepareDataBlock();
      }
    }

    /**
     * Close the current data block if necessary.
     * 
     * @param bForceFinish
     *          Force the closure regardless of the block size.
     * @throws IOException
     */
    void finishDataBlock(boolean bForceFinish) throws IOException {
      if (blkAppender == null) {
        return;
      }

      // exceeded the size limit, do the compression and finish the block
      if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) {
        // keep tracks of the last key of each data block, no padding
        // for now
        TFileIndexEntry keyLast =
            new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS
                .size(), blkRecordCount);
        tfileIndex.addEntry(keyLast);
        // close the appender
        blkAppender.close();
        blkAppender = null;
        blkRecordCount = 0;
      }
    }
  }

  /**
   * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
   * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
   * ) , a portion of TFile based on byte offsets (
   * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
   * fall in a certain key range (for sorted TFile only,
   * {@link Reader#createScannerByKey(byte[], byte[])} or
   * {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
   */
  @InterfaceStability.Evolving
  public static class Reader implements Closeable {
    // The underlying BCFile reader.
    final BCFile.Reader readerBCF;

    // TFile index, it is loaded lazily.
    TFileIndex tfileIndex = null;
    final TFileMeta tfileMeta;
    final BytesComparator comparator;

    // global begin and end locations.
    private final Location begin;
    private final Location end;

    /**
     * Location representing a virtual position in the TFile.
     */
    static final class Location implements Comparable<Location>, Cloneable {
      private int blockIndex;
      // distance/offset from the beginning of the block
      private long recordIndex;

      Location(int blockIndex, long recordIndex) {
        set(blockIndex, recordIndex);
      }

      void incRecordIndex() {
        ++recordIndex;
      }

      Location(Location other) {
        set(other);
      }

      int getBlockIndex() {
        return blockIndex;
      }

      long getRecordIndex() {
        return recordIndex;
      }

      void set(int blockIndex, long recordIndex) {
        if ((blockIndex | recordIndex) < 0) {
          throw new IllegalArgumentException(
              "Illegal parameter for BlockLocation.");
        }
        this.blockIndex = blockIndex;
        this.recordIndex = recordIndex;
      }

      void set(Location other) {
        set(other.blockIndex, other.recordIndex);
      }

      /**
       * @see java.lang.Comparable#compareTo(java.lang.Object)
       */
      @Override
      public int compareTo(Location other) {
        return compareTo(other.blockIndex, other.recordIndex);
      }

      int compareTo(int bid, long rid) {
        if (this.blockIndex == bid) {
          long ret = this.recordIndex - rid;
          if (ret > 0) return 1;
          if (ret < 0) return -1;
          return 0;
        }
        return this.blockIndex - bid;
      }

      /**
       * @see java.lang.Object#clone()
       */
      @Override
      protected Location clone() {
        return new Location(blockIndex, recordIndex);
      }

      /**
       * @see java.lang.Object#hashCode()
       */
      @Override
      public int hashCode() {
        final int prime = 31;
        int result = prime + blockIndex;
        result = (int) (prime * result + recordIndex);
        return result;
      }

      /**
       * @see java.lang.Object#equals(java.lang.Object)
       */
      @Override
      public boolean equals(Object obj) {
        if (this == obj) return true;
        if (obj == null) return false;
        if (getClass() != obj.getClass()) return false;
        Location other = (Location) obj;
        if (blockIndex != other.blockIndex) return false;
        if (recordIndex != other.recordIndex) return false;
        return true;
      }
    }

    /**
     * Constructor
     * 
     * @param fsdis
     *          FS input stream of the TFile.
     * @param fileLength
     *          The length of TFile. This is required because we have no easy
     *          way of knowing the actual size of the input file through the
     *          File input stream.
     * @param conf configuration.
     * @throws IOException raised on errors performing I/O.
     */
    public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
        throws IOException {
      readerBCF = new BCFile.Reader(fsdis, fileLength, conf);

      // first, read TFile meta
      BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME);
      try {
        tfileMeta = new TFileMeta(brMeta);
      } finally {
        brMeta.close();
      }

      comparator = tfileMeta.getComparator();
      // Set begin and end locations.
      begin = new Location(0, 0);
      end = new Location(readerBCF.getBlockCount(), 0);
    }

    /**
     * Close the reader. The state of the Reader object is undefined after
     * close. Calling close() for multiple times has no effect.
     */
    @Override
    public void close() throws IOException {
      readerBCF.close();
    }

    /**
     * Get the begin location of the TFile.
     * 
     * @return If TFile is not empty, the location of the first key-value pair.
     *         Otherwise, it returns end().
     */
    Location begin() {
      return begin;
    }

    /**
     * Get the end location of the TFile.
     * 
     * @return The location right after the last key-value pair in TFile.
     */
    Location end() {
      return end;
    }

    /**
     * Get the string representation of the comparator.
     * 
     * @return If the TFile is not sorted by keys, an empty string will be
     *         returned. Otherwise, the actual comparator string that is
     *         provided during the TFile creation time will be returned.
     */
    public String getComparatorName() {
      return tfileMeta.getComparatorString();
    }

    /**
     * Is the TFile sorted?
     * 
     * @return true if TFile is sorted.
     */
    public boolean isSorted() {
      return tfileMeta.isSorted();
    }

    /**
     * Get the number of key-value pair entries in TFile.
     * 
     * @return the number of key-value pairs in TFile
     */
    public long getEntryCount() {
      return tfileMeta.getRecordCount();
    }

    /**
     * Lazily loading the TFile index.
     * 
     * @throws IOException
     */
    synchronized void checkTFileDataIndex() throws IOException {
      if (tfileIndex == null) {
        BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
        try {
          tfileIndex =
              new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
                  .getComparator());
        } finally {
          brIndex.close();
        }
      }
    }

    /**
     * Get the first key in the TFile.
     * 
     * @return The first key in the TFile.
     * @throws IOException raised on errors performing I/O.
     */
    public RawComparable getFirstKey() throws IOException {
      checkTFileDataIndex();
      return tfileIndex.getFirstKey();
    }

    /**
     * Get the last key in the TFile.
     * 
     * @return The last key in the TFile.
     * @throws IOException raised on errors performing I/O.
     */
    public RawComparable getLastKey() throws IOException {
      checkTFileDataIndex();
      return tfileIndex.getLastKey();
    }

    /**
     * Get a Comparator object to compare Entries. It is useful when you want
     * stores the entries in a collection (such as PriorityQueue) and perform
     * sorting or comparison among entries based on the keys without copying out
     * the key.
     * 
     * @return An Entry Comparator..
     */
    public Comparator<Scanner.Entry> getEntryComparator() {
      if (!isSorted()) {
        throw new RuntimeException(
            "Entries are not comparable for unsorted TFiles");
      }

      return new Comparator<Scanner.Entry>() {
        /**
         * Provide a customized comparator for Entries. This is useful if we
         * have a collection of Entry objects. However, if the Entry objects
         * come from different TFiles, users must ensure that those TFiles share
         * the same RawComparator.
         */
        @Override
        public int compare(Scanner.Entry o1, Scanner.Entry o2) {
          return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2
              .getKeyBuffer(), 0, o2.getKeyLength());
        }
      };
    }

    /**
     * Get an instance of the RawComparator that is constructed based on the
     * string comparator representation.
     * 
     * @return a Comparator that can compare RawComparable's.
     */
    public Comparator<RawComparable> getComparator() {
      return comparator;
    }

    /**
     * Stream access to a meta block.``
     * 
     * @param name
     *          The name of the meta block.
     * @return The input stream.
     * @throws IOException
     *           on I/O error.
     * @throws MetaBlockDoesNotExist
     *           If the meta block with the name does not exist.
     */
    public DataInputStream getMetaBlock(String name) throws IOException,
        MetaBlockDoesNotExist {
      return readerBCF.getMetaBlock(name);
    }

    /**
     * if greater is true then returns the beginning location of the block
     * containing the key strictly greater than input key. if greater is false
     * then returns the beginning location of the block greater than equal to
     * the input key
     * 
     * @param key
     *          the input key
     * @param greater
     *          boolean flag
     * @return
     * @throws IOException
     */
    Location getBlockContainsKey(RawComparable key, boolean greater)
        throws IOException {
      if (!isSorted()) {
        throw new RuntimeException("Seeking in unsorted TFile");
      }
      checkTFileDataIndex();
      int blkIndex =
          (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key);
      if (blkIndex < 0) return end;
      return new Location(blkIndex, 0);
    }

    Location getLocationByRecordNum(long recNum) throws IOException {
      checkTFileDataIndex();
      return tfileIndex.getLocationByRecordNum(recNum);
    }

    long getRecordNumByLocation(Location location) throws IOException {
      checkTFileDataIndex();
      return tfileIndex.getRecordNumByLocation(location);      
    }
    
    int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
      if (!isSorted()) {
        throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
      }
      return comparator.compare(a, o1, l1, b, o2, l2);
    }

    int compareKeys(RawComparable a, RawComparable b) {
      if (!isSorted()) {
        throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
      }
      return comparator.compare(a, b);
    }

    /**
     * Get the location pointing to the beginning of the first key-value pair in
     * a compressed block whose byte offset in the TFile is greater than or
     * equal to the specified offset.
     * 
     * @param offset
     *          the user supplied offset.
     * @return the location to the corresponding entry; or end() if no such
     *         entry exists.
     */
    Location getLocationNear(long offset) {
      int blockIndex = readerBCF.getBlockIndexNear(offset);
      if (blockIndex == -1) return end;
      return new Location(blockIndex, 0);
    }

    /**
     * Get the RecordNum for the first key-value pair in a compressed block
     * whose byte offset in the TFile is greater than or equal to the specified
     * offset.
     * 
     * @param offset
     *          the user supplied offset.
     * @return the RecordNum to the corresponding entry. If no such entry
     *         exists, it returns the total entry count.
     * @throws IOException raised on errors performing I/O.
     */
    public long getRecordNumNear(long offset) throws IOException {
      return getRecordNumByLocation(getLocationNear(offset));
    }
    
    /**
     * Get a sample key that is within a block whose starting offset is greater
     * than or equal to the specified offset.
     * 
     * @param offset
     *          The file offset.
     * @return the key that fits the requirement; or null if no such key exists
     *         (which could happen if the offset is close to the end of the
     *         TFile).
     * @throws IOException raised on errors performing I/O.
     */
    public RawComparable getKeyNear(long offset) throws IOException {
      int blockIndex = readerBCF.getBlockIndexNear(offset);
      if (blockIndex == -1) return null;
      checkTFileDataIndex();
      return new ByteArray(tfileIndex.getEntry(blockIndex).key);
    }

    /**
     * Get a scanner than can scan the whole TFile.
     * 
     * @return The scanner object. A valid Scanner is always returned even if
     *         the TFile is empty.
     * @throws IOException raised on errors performing I/O.
     */
    public Scanner createScanner() throws IOException {
      return new Scanner(this, begin, end);
    }

    /**
     * Get a scanner that covers a portion of TFile based on byte offsets.
     * 
     * @param offset
     *          The beginning byte offset in the TFile.
     * @param length
     *          The length of the region.
     * @return The actual coverage of the returned scanner tries to match the
     *         specified byte-region but always round up to the compression
     *         block boundaries. It is possible that the returned scanner
     *         contains zero key-value pairs even if length is positive.
     * @throws IOException raised on errors performing I/O.
     */
    public Scanner createScannerByByteRange(long offset, long length) throws IOException {
      return new Scanner(this, offset, offset + length);
    }

    /**
     * Get a scanner that covers a portion of TFile based on keys.
     * 
     * @param beginKey
     *          Begin key of the scan (inclusive). If null, scan from the first
     *          key-value entry of the TFile.
     * @param endKey
     *          End key of the scan (exclusive). If null, scan up to the last
     *          key-value entry of the TFile.
     * @return The actual coverage of the returned scanner will cover all keys
     *         greater than or equal to the beginKey and less than the endKey.
     * @throws IOException raised on errors performing I/O.
     * 
     * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
     */
    @Deprecated
    public Scanner createScanner(byte[] beginKey, byte[] endKey)
      throws IOException {
      return createScannerByKey(beginKey, endKey);
    }
    
    /**
     * Get a scanner that covers a portion of TFile based on keys.
     * 
     * @param beginKey
     *          Begin key of the scan (inclusive). If null, scan from the first
     *          key-value entry of the TFile.
     * @param endKey
     *          End key of the scan (exclusive). If null, scan up to the last
     *          key-value entry of the TFile.
     * @return The actual coverage of the returned scanner will cover all keys
     *         greater than or equal to the beginKey and less than the endKey.
     * @throws IOException raised on errors performing I/O.
     */
    public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
        throws IOException {
      return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
          0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
          0, endKey.length));
    }

    /**
     * Get a scanner that covers a specific key range.
     * 
     * @param beginKey
     *          Begin key of the scan (inclusive). If null, scan from the first
     *          key-value entry of the TFile.
     * @param endKey
     *          End key of the scan (exclusive). If null, scan up to the last
     *          key-value entry of the TFile.
     * @return The actual coverage of the returned scanner will cover all keys
     *         greater than or equal to the beginKey and less than the endKey.
     * @throws IOException raised on errors performing I/O.
     * 
     * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
     *             instead.
     */
    @Deprecated
    public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
        throws IOException {
      return createScannerByKey(beginKey, endKey);
    }

    /**
     * Get a scanner that covers a specific key range.
     * 
     * @param beginKey
     *          Begin key of the scan (inclusive). If null, scan from the first
     *          key-value entry of the TFile.
     * @param endKey
     *          End key of the scan (exclusive). If null, scan up to the last
     *          key-value entry of the TFile.
     * @return The actual coverage of the returned scanner will cover all keys
     *         greater than or equal to the beginKey and less than the endKey.
     * @throws IOException raised on errors performing I/O.
     */
    public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
        throws IOException {
      if ((beginKey != null) && (endKey != null)
          && (compareKeys(beginKey, endKey) >= 0)) {
        return new Scanner(this, beginKey, beginKey);
      }
      return new Scanner(this, beginKey, endKey);
    }

    /**
     * Create a scanner that covers a range of records.
     * 
     * @param beginRecNum
     *          The RecordNum for the first record (inclusive).
     * @param endRecNum
     *          The RecordNum for the last record (exclusive). To scan the whole
     *          file, either specify endRecNum==-1 or endRecNum==getEntryCount().
     * @return The TFile scanner that covers the specified range of records.
     * @throws IOException raised on errors performing I/O.
     */
    public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
        throws IOException {
      if (beginRecNum < 0) beginRecNum = 0;
      if (endRecNum < 0 || endRecNum > getEntryCount()) {
        endRecNum = getEntryCount();
      }
      return new Scanner(this, getLocationByRecordNum(beginRecNum),
          getLocationByRecordNum(endRecNum));
    }

    /**
     * The TFile Scanner. The Scanner has an implicit cursor, which, upon
     * creation, points to the first key-value pair in the scan range. If the
     * scan range is empty, the cursor will point to the end of the scan range.
     * <p>
     * Use {@link Scanner#atEnd()} to test whether the cursor is at the end
     * location of the scanner.
     * <p>
     * Use {@link Scanner#advance()} to move the cursor to the next key-value
     * pair (or end if none exists). Use seekTo methods (
     * {@link Scanner#seekTo(byte[])} or
     * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary
     * location in the covered range (including backward seeking). Use
     * {@link Scanner#rewind()} to seek back to the beginning of the scanner.
     * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner.
     * <p>
     * Actual keys and values may be obtained through {@link Scanner.Entry}
     * object, which is obtained through {@link Scanner#entry()}.
     */
    public static class Scanner implements Closeable {
      // The underlying TFile reader.
      final Reader reader;
      // current block (null if reaching end)
      private BlockReader blkReader;

      Location beginLocation;
      Location endLocation;
      Location currentLocation;

      // flag to ensure value is only examined once.
      boolean valueChecked = false;
      // reusable buffer for keys.
      final byte[] keyBuffer;
      // length of key, -1 means key is invalid.
      int klen = -1;

      static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024;
      BytesWritable valTransferBuffer;

      DataInputBuffer keyDataInputStream;
      ChunkDecoder valueBufferInputStream;
      DataInputStream valueDataInputStream;
      // vlen == -1 if unknown.
      int vlen;

      /**
       * Constructor
       * 
       * @param reader
       *          The TFile reader object.
       * @param offBegin
       *          Begin byte-offset of the scan.
       * @param offEnd
       *          End byte-offset of the scan.
       * @throws IOException
       * 
       *           The offsets will be rounded to the beginning of a compressed
       *           block whose offset is greater than or equal to the specified
       *           offset.
       */
      protected Scanner(Reader reader, long offBegin, long offEnd)
          throws IOException {
        this(reader, reader.getLocationNear(offBegin), reader
            .getLocationNear(offEnd));
      }

      /**
       * Constructor
       * 
       * @param reader
       *          The TFile reader object.
       * @param begin
       *          Begin location of the scan.
       * @param end
       *          End location of the scan.
       * @throws IOException
       */
      Scanner(Reader reader, Location begin, Location end) throws IOException {
        this.reader = reader;
        // ensure the TFile index is loaded throughout the life of scanner.
        reader.checkTFileDataIndex();
        beginLocation = begin;
        endLocation = end;

        valTransferBuffer = new BytesWritable();
        // TODO: remember the longest key in a TFile, and use it to replace
        // MAX_KEY_SIZE.
        keyBuffer = new byte[MAX_KEY_SIZE];
        keyDataInputStream = new DataInputBuffer();
        valueBufferInputStream = new ChunkDecoder();
        valueDataInputStream = new DataInputStream(valueBufferInputStream);

        if (beginLocation.compareTo(endLocation) >= 0) {
          currentLocation = new Location(endLocation);
        } else {
          currentLocation = new Location(0, 0);
          initBlock(beginLocation.getBlockIndex());
          inBlockAdvance(beginLocation.getRecordIndex());
        }
      }

      /**
       * Constructor
       * 
       * @param reader
       *          The TFile reader object.
       * @param beginKey
       *          Begin key of the scan. If null, scan from the first
       *          &lt;K, V&gt; entry of the TFile.
       * @param endKey
       *          End key of the scan. If null, scan up to the last &lt;K, V&gt;
       *          entry of the TFile.
       * @throws IOException raised on errors performing I/O.
       */
      protected Scanner(Reader reader, RawComparable beginKey,
          RawComparable endKey) throws IOException {
        this(reader, (beginKey == null) ? reader.begin() : reader
            .getBlockContainsKey(beginKey, false), reader.end());
        if (beginKey != null) {
          inBlockAdvance(beginKey, false);
          beginLocation.set(currentLocation);
        }
        if (endKey != null) {
          seekTo(endKey, false);
          endLocation.set(currentLocation);
          seekTo(beginLocation);
        }
      }

      /**
       * Move the cursor to the first entry whose key is greater than or equal
       * to the input key. Synonymous to seekTo(key, 0, key.length). The entry
       * returned by the previous entry() call will be invalid.
       * 
       * @param key
       *          The input key
       * @return true if we find an equal key.
       * @throws IOException raised on errors performing I/O.
       */
      public boolean seekTo(byte[] key) throws IOException {
        return seekTo(key, 0, key.length);
      }

      /**
       * Move the cursor to the first entry whose key is greater than or equal
       * to the input key. The entry returned by the previous entry() call will
       * be invalid.
       * 
       * @param key
       *          The input key
       * @param keyOffset
       *          offset in the key buffer.
       * @param keyLen
       *          key buffer length.
       * @return true if we find an equal key; false otherwise.
       * @throws IOException raised on errors performing I/O.
       */
      public boolean seekTo(byte[] key, int keyOffset, int keyLen)
          throws IOException {
        return seekTo(new ByteArray(key, keyOffset, keyLen), false);
      }

      private boolean seekTo(RawComparable key, boolean beyond)
          throws IOException {
        Location l = reader.getBlockContainsKey(key, beyond);
        if (l.compareTo(beginLocation) < 0) {
          l = beginLocation;
        } else if (l.compareTo(endLocation) >= 0) {
          seekTo(endLocation);
          return false;
        }

        // check if what we are seeking is in the later part of the current
        // block.
        if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex())
            || (compareCursorKeyTo(key) >= 0)) {
          // sorry, we must seek to a different location first.
          seekTo(l);
        }

        return inBlockAdvance(key, beyond);
      }

      /**
       * Move the cursor to the new location. The entry returned by the previous
       * entry() call will be invalid.
       * 
       * @param l
       *          new cursor location. It must fall between the begin and end
       *          location of the scanner.
       * @throws IOException
       */
      private void seekTo(Location l) throws IOException {
        if (l.compareTo(beginLocation) < 0) {
          throw new IllegalArgumentException(
              "Attempt to seek before the begin location.");
        }

        if (l.compareTo(endLocation) > 0) {
          throw new IllegalArgumentException(
              "Attempt to seek after the end location.");
        }

        if (l.compareTo(endLocation) == 0) {
          parkCursorAtEnd();
          return;
        }

        if (l.getBlockIndex() != currentLocation.getBlockIndex()) {
          // going to a totally different block
          initBlock(l.getBlockIndex());
        } else {
          if (valueChecked) {
            // may temporarily go beyond the last record in the block (in which
            // case the next if loop will always be true).
            inBlockAdvance(1);
          }
          if (l.getRecordIndex() < currentLocation.getRecordIndex()) {
            initBlock(l.getBlockIndex());
          }
        }

        inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex());

        return;
      }

      /**
       * Rewind to the first entry in the scanner. The entry returned by the
       * previous entry() call will be invalid.
       * 
       * @throws IOException raised on errors performing I/O.
       */
      public void rewind() throws IOException {
        seekTo(beginLocation);
      }

      /**
       * Seek to the end of the scanner. The entry returned by the previous
       * entry() call will be invalid.
       * 
       * @throws IOException raised on errors performing I/O.
       */
      public void seekToEnd() throws IOException {
        parkCursorAtEnd();
      }

      /**
       * Move the cursor to the first entry whose key is greater than or equal
       * to the input key. Synonymous to lowerBound(key, 0, key.length). The
       * entry returned by the previous entry() call will be invalid.
       * 
       * @param key
       *          The input key
       * @throws IOException raised on errors performing I/O.
       */
      public void lowerBound(byte[] key) throws IOException {
        lowerBound(key, 0, key.length);
      }

      /**
       * Move the cursor to the first entry whose key is greater than or equal
       * to the input key. The entry returned by the previous entry() call will
       * be invalid.
       * 
       * @param key
       *          The input key
       * @param keyOffset
       *          offset in the key buffer.
       * @param keyLen
       *          key buffer length.
       * @throws IOException raised on errors performing I/O.
       */
      public void lowerBound(byte[] key, int keyOffset, int keyLen)
          throws IOException {
        seekTo(new ByteArray(key, keyOffset, keyLen), false);
      }

      /**
       * Move the cursor to the first entry whose key is strictly greater than
       * the input key. Synonymous to upperBound(key, 0, key.length). The entry
       * returned by the previous entry() call will be invalid.
       * 
       * @param key
       *          The input key
       * @throws IOException raised on errors performing I/O.
       */
      public void upperBound(byte[] key) throws IOException {
        upperBound(key, 0, key.length);
      }

      /**
       * Move the cursor to the first entry whose key is strictly greater than
       * the input key. The entry returned by the previous entry() call will be
       * invalid.
       * 
       * @param key
       *          The input key
       * @param keyOffset
       *          offset in the key buffer.
       * @param keyLen
       *          key buffer length.
       * @throws IOException raised on errors performing I/O.
       */
      public void upperBound(byte[] key, int keyOffset, int keyLen)
          throws IOException {
        seekTo(new ByteArray(key, keyOffset, keyLen), true);
      }

      /**
       * Move the cursor to the next key-value pair. The entry returned by the
       * previous entry() call will be invalid.
       * 
       * @return true if the cursor successfully moves. False when cursor is
       *         already at the end location and cannot be advanced.
       * @throws IOException raised on errors performing I/O.
       */
      public boolean advance() throws IOException {
        if (atEnd()) {
          return false;
        }

        int curBid = currentLocation.getBlockIndex();
        long curRid = currentLocation.getRecordIndex();
        long entriesInBlock = reader.getBlockEntryCount(curBid);
        if (curRid + 1 >= entriesInBlock) {
          if (endLocation.compareTo(curBid + 1, 0) <= 0) {
            // last entry in TFile.
            parkCursorAtEnd();
          } else {
            // last entry in Block.
            initBlock(curBid + 1);
          }
        } else {
          inBlockAdvance(1);
        }
        return true;
      }

      /**
       * Load a compressed block for reading. Expecting blockIndex is valid.
       * 
       * @throws IOException
       */
      private void initBlock(int blockIndex) throws IOException {
        klen = -1;
        if (blkReader != null) {
          try {
            blkReader.close();
          } finally {
            blkReader = null;
          }
        }
        blkReader = reader.getBlockReader(blockIndex);
        currentLocation.set(blockIndex, 0);
      }

      private void parkCursorAtEnd() throws IOException {
        klen = -1;
        currentLocation.set(endLocation);
        if (blkReader != null) {
          try {
            blkReader.close();
          } finally {
            blkReader = null;
          }
        }
      }

      /**
       * Close the scanner. Release all resources. The behavior of using the
       * scanner after calling close is not defined. The entry returned by the
       * previous entry() call will be invalid.
       */
      @Override
      public void close() throws IOException {
        parkCursorAtEnd();
      }

      /**
       * Is cursor at the end location?
       * 
       * @return true if the cursor is at the end location.
       */
      public boolean atEnd() {
        return (currentLocation.compareTo(endLocation) >= 0);
      }

      /**
       * check whether we have already successfully obtained the key. It also
       * initializes the valueInputStream.
       */
      void checkKey() throws IOException {
        if (klen >= 0) return;
        if (atEnd()) {
          throw new EOFException("No key-value to read");
        }
        klen = -1;
        vlen = -1;
        valueChecked = false;

        klen = Utils.readVInt(blkReader);
        blkReader.readFully(keyBuffer, 0, klen);
        valueBufferInputStream.reset(blkReader);
        if (valueBufferInputStream.isLastChunk()) {
          vlen = valueBufferInputStream.getRemain();
        }
      }

      /**
       * Get an entry to access the key and value.
       * 
       * @return The Entry object to access the key and value.
       * @throws IOException raised on errors performing I/O.
       */
      public Entry entry() throws IOException {
        checkKey();
        return new Entry();
      }

      /**
       * Get the RecordNum corresponding to the entry pointed by the cursor.
       * @return The RecordNum corresponding to the entry pointed by the cursor.
       * @throws IOException raised on errors performing I/O.
       */
      public long getRecordNum() throws IOException {
        return reader.getRecordNumByLocation(currentLocation);
      }
      
      /**
       * Internal API. Comparing the key at cursor to user-specified key.
       * 
       * @param other
       *          user-specified key.
       * @return negative if key at cursor is smaller than user key; 0 if equal;
       *         and positive if key at cursor greater than user key.
       * @throws IOException
       */
      int compareCursorKeyTo(RawComparable other) throws IOException {
        checkKey();
        return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other
            .offset(), other.size());
      }

      /**
       * Entry to a &lt;Key, Value&gt; pair.
       */
      public class Entry implements Comparable<RawComparable> {
        /**
         * Get the length of the key.
         * 
         * @return the length of the key.
         */
        public int getKeyLength() {
          return klen;
        }

        byte[] getKeyBuffer() {
          return keyBuffer;
        }

        /**
         * Copy the key and value in one shot into BytesWritables. This is
         * equivalent to getKey(key); getValue(value);
         * 
         * @param key
         *          BytesWritable to hold key.
         * @param value
         *          BytesWritable to hold value
         * @throws IOException raised on errors performing I/O.
         */
        public void get(BytesWritable key, BytesWritable value)
            throws IOException {
          getKey(key);
          getValue(value);
        }

        /**
         * Copy the key into BytesWritable. The input BytesWritable will be
         * automatically resized to the actual key size.
         * 
         * @param key
         *          BytesWritable to hold the key.
         * @throws IOException raised on errors performing I/O.
         * @return the key into BytesWritable.
         */
        public int getKey(BytesWritable key) throws IOException {
          key.setSize(getKeyLength());
          getKey(key.getBytes());
          return key.getLength();
        }

        /**
         * Copy the value into BytesWritable. The input BytesWritable will be
         * automatically resized to the actual value size. The implementation
         * directly uses the buffer inside BytesWritable for storing the value.
         * The call does not require the value length to be known.
         * 
         * @param value value.
         * @throws IOException raised on errors performing I/O.
         * @return long value.
         */
        public long getValue(BytesWritable value) throws IOException {
          DataInputStream dis = getValueStream();
          int size = 0;
          try {
            int remain;
            while ((remain = valueBufferInputStream.getRemain()) > 0) {
              value.setSize(size + remain);
              dis.readFully(value.getBytes(), size, remain);
              size += remain;
            }
            return value.getLength();
          } finally {
            dis.close();
          }
        }

        /**
         * Writing the key to the output stream. This method avoids copying key
         * buffer from Scanner into user buffer, then writing to the output
         * stream.
         * 
         * @param out
         *          The output stream
         * @return the length of the key.
         * @throws IOException raised on errors performing I/O.
         */
        public int writeKey(OutputStream out) throws IOException {
          out.write(keyBuffer, 0, klen);
          return klen;
        }

        /**
         * Writing the value to the output stream. This method avoids copying
         * value data from Scanner into user buffer, then writing to the output
         * stream. It does not require the value length to be known.
         * 
         * @param out
         *          The output stream
         * @return the length of the value
         * @throws IOException raised on errors performing I/O.
         */
        public long writeValue(OutputStream out) throws IOException {
          DataInputStream dis = getValueStream();
          long size = 0;
          try {
            int chunkSize;
            while ((chunkSize = valueBufferInputStream.getRemain()) > 0) {
              chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE);
              valTransferBuffer.setSize(chunkSize);
              dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize);
              out.write(valTransferBuffer.getBytes(), 0, chunkSize);
              size += chunkSize;
            }
            return size;
          } finally {
            dis.close();
          }
        }

        /**
         * Copy the key into user supplied buffer.
         * 
         * @param buf
         *          The buffer supplied by user. The length of the buffer must
         *          not be shorter than the key length.
         * @return The length of the key.
         * 
         * @throws IOException raised on errors performing I/O.
         */
        public int getKey(byte[] buf) throws IOException {
          return getKey(buf, 0);
        }

        /**
         * Copy the key into user supplied buffer.
         * 
         * @param buf
         *          The buffer supplied by user.
         * @param offset
         *          The starting offset of the user buffer where we should copy
         *          the key into. Requiring the key-length + offset no greater
         *          than the buffer length.
         * @return The length of the key.
         * @throws IOException raised on errors performing I/O.
         */
        public int getKey(byte[] buf, int offset) throws IOException {
          if ((offset | (buf.length - offset - klen)) < 0) {
            throw new IndexOutOfBoundsException(
                "Buffer not enough to store the key");
          }
          System.arraycopy(keyBuffer, 0, buf, offset, klen);
          return klen;
        }

        /**
         * Streaming access to the key. Useful for desrializing the key into
         * user objects.
         * 
         * @return The input stream.
         */
        public DataInputStream getKeyStream() {
          keyDataInputStream.reset(keyBuffer, klen);
          return keyDataInputStream;
        }

        /**
         * Get the length of the value. isValueLengthKnown() must be tested
         * true.
         * 
         * @return the length of the value.
         */
        public int getValueLength() {
          if (vlen >= 0) {
            return vlen;
          }

          throw new RuntimeException("Value length unknown.");
        }

        /**
         * Copy value into user-supplied buffer. User supplied buffer must be
         * large enough to hold the whole value. The value part of the key-value
         * pair pointed by the current cursor is not cached and can only be
         * examined once. Calling any of the following functions more than once
         * without moving the cursor will result in exception:
         * {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
         * {@link #getValueStream}.
         *
         * @param buf buf.
         * @return the length of the value. Does not require
         *         isValueLengthKnown() to be true.
         * @throws IOException raised on errors performing I/O.
         * 
         */
        public int getValue(byte[] buf) throws IOException {
          return getValue(buf, 0);
        }

        /**
         * Copy value into user-supplied buffer. User supplied buffer must be
         * large enough to hold the whole value (starting from the offset). The
         * value part of the key-value pair pointed by the current cursor is not
         * cached and can only be examined once. Calling any of the following
         * functions more than once without moving the cursor will result in
         * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
         * {@link #getValueStream}.
         *
         * @param buf buf.
         * @param offset offset.
         * @return the length of the value. Does not require
         *         isValueLengthKnown() to be true.
         * @throws IOException raised on errors performing I/O.
         */
        public int getValue(byte[] buf, int offset) throws IOException {
          DataInputStream dis = getValueStream();
          try {
            if (isValueLengthKnown()) {
              if ((offset | (buf.length - offset - vlen)) < 0) {
                throw new IndexOutOfBoundsException(
                    "Buffer too small to hold value");
              }
              dis.readFully(buf, offset, vlen);
              return vlen;
            }

            int nextOffset = offset;
            while (nextOffset < buf.length) {
              int n = dis.read(buf, nextOffset, buf.length - nextOffset);
              if (n < 0) {
                break;
              }
              nextOffset += n;
            }
            if (dis.read() >= 0) {
              // attempt to read one more byte to determine whether we reached
              // the
              // end or not.
              throw new IndexOutOfBoundsException(
                  "Buffer too small to hold value");
            }
            return nextOffset - offset;
          } finally {
            dis.close();
          }
        }

        /**
         * Stream access to value. The value part of the key-value pair pointed
         * by the current cursor is not cached and can only be examined once.
         * Calling any of the following functions more than once without moving
         * the cursor will result in exception: {@link #getValue(byte[])},
         * {@link #getValue(byte[], int)}, {@link #getValueStream}.
         * 
         * @return The input stream for reading the value.
         * @throws IOException raised on errors performing I/O.
         */
        public DataInputStream getValueStream() throws IOException {
          if (valueChecked == true) {
            throw new IllegalStateException(
                "Attempt to examine value multiple times.");
          }
          valueChecked = true;
          return valueDataInputStream;
        }

        /**
         * Check whether it is safe to call getValueLength().
         * 
         * @return true if value length is known before hand. Values less than
         *         the chunk size will always have their lengths known before
         *         hand. Values that are written out as a whole (with advertised
         *         length up-front) will always have their lengths known in
         *         read.
         */
        public boolean isValueLengthKnown() {
          return (vlen >= 0);
        }

        /**
         * Compare the entry key to another key. Synonymous to compareTo(key, 0,
         * key.length).
         * 
         * @param buf
         *          The key buffer.
         * @return comparison result between the entry key with the input key.
         */
        public int compareTo(byte[] buf) {
          return compareTo(buf, 0, buf.length);
        }

        /**
         * Compare the entry key to another key. Synonymous to compareTo(new
         * ByteArray(buf, offset, length)
         * 
         * @param buf
         *          The key buffer
         * @param offset
         *          offset into the key buffer.
         * @param length
         *          the length of the key.
         * @return comparison result between the entry key with the input key.
         */
        public int compareTo(byte[] buf, int offset, int length) {
          return compareTo(new ByteArray(buf, offset, length));
        }

        /**
         * Compare an entry with a RawComparable object. This is useful when
         * Entries are stored in a collection, and we want to compare a user
         * supplied key.
         */
        @Override
        public int compareTo(RawComparable key) {
          return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(),
              key.offset(), key.size());
        }

        /**
         * Compare whether this and other points to the same key value.
         */
        @Override
        public boolean equals(Object other) {
          if (this == other) return true;
          if (!(other instanceof Entry)) return false;
          return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0;
        }

        @Override
        public int hashCode() {
          return WritableComparator.hashBytes(keyBuffer, 0, getKeyLength());
        }
      }

      /**
       * Advance cursor by n positions within the block.
       * 
       * @param n
       *          Number of key-value pairs to skip in block.
       * @throws IOException
       */
      private void inBlockAdvance(long n) throws IOException {
        for (long i = 0; i < n; ++i) {
          checkKey();
          if (!valueBufferInputStream.isClosed()) {
            valueBufferInputStream.close();
          }
          klen = -1;
          currentLocation.incRecordIndex();
        }
      }

      /**
       * Advance cursor in block until we find a key that is greater than or
       * equal to the input key.
       * 
       * @param key
       *          Key to compare.
       * @param greater
       *          advance until we find a key greater than the input key.
       * @return true if we find a equal key.
       * @throws IOException
       */
      private boolean inBlockAdvance(RawComparable key, boolean greater)
          throws IOException {
        int curBid = currentLocation.getBlockIndex();
        long entryInBlock = reader.getBlockEntryCount(curBid);
        if (curBid == endLocation.getBlockIndex()) {
          entryInBlock = endLocation.getRecordIndex();
        }

        while (currentLocation.getRecordIndex() < entryInBlock) {
          int cmp = compareCursorKeyTo(key);
          if (cmp > 0) return false;
          if (cmp == 0 && !greater) return true;
          if (!valueBufferInputStream.isClosed()) {
            valueBufferInputStream.close();
          }
          klen = -1;
          currentLocation.incRecordIndex();
        }

        throw new RuntimeException("Cannot find matching key in block.");
      }
    }

    long getBlockEntryCount(int curBid) {
      return tfileIndex.getEntry(curBid).entries();
    }

    BlockReader getBlockReader(int blockIndex) throws IOException {
      return readerBCF.getDataBlock(blockIndex);
    }
  }

  /**
   * Data structure representing "TFile.meta" meta block.
   */
  static final class TFileMeta {
    final static String BLOCK_NAME = "TFile.meta";
    final Version version;
    private long recordCount;
    private final String strComparator;
    private final BytesComparator comparator;

    // ctor for writes
    public TFileMeta(String comparator) {
      // set fileVersion to API version when we create it.
      version = TFile.API_VERSION;
      recordCount = 0;
      strComparator = (comparator == null) ? "" : comparator;
      this.comparator = makeComparator(strComparator);
    }

    // ctor for reads
    public TFileMeta(DataInput in) throws IOException {
      version = new Version(in);
      if (!version.compatibleWith(TFile.API_VERSION)) {
        throw new RuntimeException("Incompatible TFile fileVersion.");
      }
      recordCount = Utils.readVLong(in);
      strComparator = Utils.readString(in);
      comparator = makeComparator(strComparator);
    }

    @SuppressWarnings("unchecked")
    static BytesComparator makeComparator(String comparator) {
      if (comparator.length() == 0) {
        // unsorted keys
        return null;
      }
      if (comparator.equals(COMPARATOR_MEMCMP)) {
        // default comparator
        return new BytesComparator(new MemcmpRawComparator());
      } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
        String compClassName =
            comparator.substring(COMPARATOR_JCLASS.length()).trim();
        try {
          Class compClass = Class.forName(compClassName);
          // use its default ctor to create an instance
          return new BytesComparator((RawComparator<Object>) compClass
              .newInstance());
        } catch (Exception e) {
          throw new IllegalArgumentException(
              "Failed to instantiate comparator: " + comparator + "("
                  + e.toString() + ")");
        }
      } else {
        throw new IllegalArgumentException("Unsupported comparator: "
            + comparator);
      }
    }

    public void write(DataOutput out) throws IOException {
      TFile.API_VERSION.write(out);
      Utils.writeVLong(out, recordCount);
      Utils.writeString(out, strComparator);
    }

    public long getRecordCount() {
      return recordCount;
    }

    public void incRecordCount() {
      ++recordCount;
    }

    public boolean isSorted() {
      return !strComparator.isEmpty();
    }

    public String getComparatorString() {
      return strComparator;
    }

    public BytesComparator getComparator() {
      return comparator;
    }

    public Version getVersion() {
      return version;
    }
  } // END: class MetaTFileMeta

  /**
   * Data structure representing "TFile.index" meta block.
   */
  static class TFileIndex {
    final static String BLOCK_NAME = "TFile.index";
    private ByteArray firstKey;
    private final ArrayList<TFileIndexEntry> index;
    private final ArrayList<Long> recordNumIndex;
    private final BytesComparator comparator;
    private long sum = 0;
    
    /**
     * For reading from file.
     * 
     * @throws IOException
     */
    public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
        throws IOException {
      index = new ArrayList<TFileIndexEntry>(entryCount);
      recordNumIndex = new ArrayList<Long>(entryCount);
      int size = Utils.readVInt(in); // size for the first key entry.
      if (size > 0) {
        byte[] buffer = new byte[size];
        in.readFully(buffer);
        DataInputStream firstKeyInputStream =
            new DataInputStream(new ByteArrayInputStream(buffer, 0, size));

        int firstKeyLength = Utils.readVInt(firstKeyInputStream);
        firstKey = new ByteArray(new byte[firstKeyLength]);
        firstKeyInputStream.readFully(firstKey.buffer());

        for (int i = 0; i < entryCount; i++) {
          size = Utils.readVInt(in);
          if (buffer.length < size) {
            buffer = new byte[size];
          }
          in.readFully(buffer, 0, size);
          TFileIndexEntry idx =
              new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
                  buffer, 0, size)));
          index.add(idx);
          sum += idx.entries();
          recordNumIndex.add(sum);
        }
      } else {
        if (entryCount != 0) {
          throw new RuntimeException("Internal error");
        }
      }
      this.comparator = comparator;
    }

    /**
     * @param key
     *          input key.
     * @return the ID of the first block that contains key >= input key. Or -1
     *         if no such block exists.
     */
    public int lowerBound(RawComparable key) {
      if (comparator == null) {
        throw new RuntimeException("Cannot search in unsorted TFile");
      }

      if (firstKey == null) {
        return -1; // not found
      }

      int ret = Utils.lowerBound(index, key, comparator);
      if (ret == index.size()) {
        return -1;
      }
      return ret;
    }

    /**
     * @param key
     *          input key.
     * @return the ID of the first block that contains key > input key. Or -1
     *         if no such block exists.
     */
    public int upperBound(RawComparable key) {
      if (comparator == null) {
        throw new RuntimeException("Cannot search in unsorted TFile");
      }

      if (firstKey == null) {
        return -1; // not found
      }

      int ret = Utils.upperBound(index, key, comparator);
      if (ret == index.size()) {
        return -1;
      }
      return ret;
    }

    /**
     * For writing to file.
     */
    public TFileIndex(BytesComparator comparator) {
      index = new ArrayList<TFileIndexEntry>();
      recordNumIndex = new ArrayList<Long>();
      this.comparator = comparator;
    }

    public RawComparable getFirstKey() {
      return firstKey;
    }
    
    public Reader.Location getLocationByRecordNum(long recNum) {
      int idx = Utils.upperBound(recordNumIndex, recNum);
      long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
      return new Reader.Location(idx, recNum-lastRecNum);
    }

    public long getRecordNumByLocation(Reader.Location location) {
      int blkIndex = location.getBlockIndex();
      long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
      return lastRecNum + location.getRecordIndex();
    }
    
    public void setFirstKey(byte[] key, int offset, int length) {
      firstKey = new ByteArray(new byte[length]);
      System.arraycopy(key, offset, firstKey.buffer(), 0, length);
    }

    public RawComparable getLastKey() {
      if (index.size() == 0) {
        return null;
      }
      return new ByteArray(index.get(index.size() - 1).buffer());
    }

    public void addEntry(TFileIndexEntry keyEntry) {
      index.add(keyEntry);
      sum += keyEntry.entries();
      recordNumIndex.add(sum);
    }

    public TFileIndexEntry getEntry(int bid) {
      return index.get(bid);
    }

    public void write(DataOutput out) throws IOException {
      if (firstKey == null) {
        Utils.writeVInt(out, 0);
        return;
      }

      DataOutputBuffer dob = new DataOutputBuffer();
      Utils.writeVInt(dob, firstKey.size());
      dob.write(firstKey.buffer());
      Utils.writeVInt(out, dob.size());
      out.write(dob.getData(), 0, dob.getLength());

      for (TFileIndexEntry entry : index) {
        dob.reset();
        entry.write(dob);
        Utils.writeVInt(out, dob.getLength());
        out.write(dob.getData(), 0, dob.getLength());
      }
    }
  }

  /**
   * TFile Data Index entry. We should try to make the memory footprint of each
   * index entry as small as possible.
   */
  static final class TFileIndexEntry implements RawComparable {
    final byte[] key;
    // count of <key, value> entries in the block.
    final long kvEntries;

    public TFileIndexEntry(DataInput in) throws IOException {
      int len = Utils.readVInt(in);
      key = new byte[len];
      in.readFully(key, 0, len);
      kvEntries = Utils.readVLong(in);
    }

    // default entry, without any padding
    public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) {
      key = new byte[len];
      System.arraycopy(newkey, offset, key, 0, len);
      this.kvEntries = entries;
    }

    @Override
    public byte[] buffer() {
      return key;
    }

    @Override
    public int offset() {
      return 0;
    }

    @Override
    public int size() {
      return key.length;
    }

    long entries() {
      return kvEntries;
    }

    public void write(DataOutput out) throws IOException {
      Utils.writeVInt(out, key.length);
      out.write(key, 0, key.length);
      Utils.writeVLong(out, kvEntries);
    }
  }

  /**
   * Dumping the TFile information.
   * 
   * @param args
   *          A list of TFile paths.
   */
  public static void main(String[] args) {
    System.out.printf("TFile Dumper (TFile %s, BCFile %s)%n", TFile.API_VERSION
        .toString(), BCFile.API_VERSION.toString());
    if (args.length == 0) {
      System.out
          .println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]");
      System.exit(0);
    }
    Configuration conf = new Configuration();

    for (String file : args) {
      System.out.println("===" + file + "===");
      try {
        TFileDumper.dumpInfo(file, System.out, conf);
      } catch (IOException e) {
        e.printStackTrace(System.err);
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BCFile 源码

hadoop BoundedRangeFileInputStream 源码

hadoop ByteArray 源码

hadoop Chunk 源码

hadoop CompareUtils 源码

hadoop Compression 源码

hadoop MetaBlockAlreadyExists 源码

hadoop MetaBlockDoesNotExist 源码

hadoop RawComparable 源码

hadoop SimpleBufferedOutputStream 源码

0  赞