hadoop SequenceFile 源码

  • 2022-10-20
  • 浏览 (321)

haddop SequenceFile 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.io;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.rmi.server.UID;
import java.security.MessageDigest;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.MergeSort;
import org.apache.hadoop.util.PriorityQueue;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_KEY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/** 
 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
 * pairs.
 * 
 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
 * reading and sorting respectively.</p>
 * 
 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
 * {@link CompressionType} used to compress key/value pairs:
 * <ol>
 *   <li>
 *   <code>Writer</code> : Uncompressed records.
 *   </li>
 *   <li>
 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
 *                                       values.
 *   </li>
 *   <li>
 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys &amp;
 *                                      values are collected in 'blocks' 
 *                                      separately and compressed. The size of 
 *                                      the 'block' is configurable.
 * </ol>
 * 
 * <p>The actual compression algorithm used to compress key and/or values can be
 * specified by using the appropriate {@link CompressionCodec}.</p>
 * 
 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
 *
 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
 * above <code>SequenceFile</code> formats.</p>
 *
 * <h3 id="Formats">SequenceFile Formats</h3>
 * 
 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
 * depending on the <code>CompressionType</code> specified. All of them share a
 * <a href="#Header">common header</a> described below.
 * 
 * <h4 id="Header">SequenceFile Header</h4>
 * <ul>
 *   <li>
 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
 *             version number (e.g. SEQ4 or SEQ6)
 *   </li>
 *   <li>
 *   keyClassName -key class
 *   </li>
 *   <li>
 *   valueClassName - value class
 *   </li>
 *   <li>
 *   compression - A boolean which specifies if compression is turned on for 
 *                 keys/values in this file.
 *   </li>
 *   <li>
 *   blockCompression - A boolean which specifies if block-compression is 
 *                      turned on for keys/values in this file.
 *   </li>
 *   <li>
 *   compression codec - <code>CompressionCodec</code> class which is used for  
 *                       compression of keys and/or values (if compression is 
 *                       enabled).
 *   </li>
 *   <li>
 *   metadata - {@link Metadata} for this file.
 *   </li>
 *   <li>
 *   sync - A sync marker to denote end of the header.
 *   </li>
 * </ul>
 * 
 * <h5>Uncompressed SequenceFile Format</h5>
 * <ul>
 * <li>
 * <a href="#Header">Header</a>
 * </li>
 * <li>
 * Record
 *   <ul>
 *     <li>Record length</li>
 *     <li>Key length</li>
 *     <li>Key</li>
 *     <li>Value</li>
 *   </ul>
 * </li>
 * <li>
 * A sync-marker every few <code>100</code> kilobytes or so.
 * </li>
 * </ul>
 *
 * <h5>Record-Compressed SequenceFile Format</h5>
 * <ul>
 * <li>
 * <a href="#Header">Header</a>
 * </li>
 * <li>
 * Record
 *   <ul>
 *     <li>Record length</li>
 *     <li>Key length</li>
 *     <li>Key</li>
 *     <li><i>Compressed</i> Value</li>
 *   </ul>
 * </li>
 * <li>
 * A sync-marker every few <code>100</code> kilobytes or so.
 * </li>
 * </ul>
 * 
 * <h5>Block-Compressed SequenceFile Format</h5>
 * <ul>
 * <li>
 * <a href="#Header">Header</a>
 * </li>
 * <li>
 * Record <i>Block</i>
 *   <ul>
 *     <li>Uncompressed number of records in the block</li>
 *     <li>Compressed key-lengths block-size</li>
 *     <li>Compressed key-lengths block</li>
 *     <li>Compressed keys block-size</li>
 *     <li>Compressed keys block</li>
 *     <li>Compressed value-lengths block-size</li>
 *     <li>Compressed value-lengths block</li>
 *     <li>Compressed values block-size</li>
 *     <li>Compressed values block</li>
 *   </ul>
 * </li>
 * <li>
 * A sync-marker every block.
 * </li>
 * </ul>
 * 
 * <p>The compressed blocks of key lengths and value lengths consist of the 
 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
 * format.</p>
 * 
 * @see CompressionCodec
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFile {
  private static final Logger LOG = LoggerFactory.getLogger(SequenceFile.class);

  private SequenceFile() {}                         // no public ctor

  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
  private static final byte VERSION_WITH_METADATA = (byte)6;
  private static byte[] VERSION = new byte[] {
    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
  };

  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash

  /**
   * The number of bytes between sync points. 100 KB, default.
   * Computed as 5 KB * 20 = 100 KB
   */
  public static final int SYNC_INTERVAL = 5 * 1024 * SYNC_SIZE; // 5KB*(16+4)

  /** 
   * The compression type used to compress key/value pairs in the 
   * {@link SequenceFile}.
   * 
   * @see SequenceFile.Writer
   */
  public enum CompressionType {
    /** Do not compress records. */
    NONE, 
    /** Compress values only, each separately. */
    RECORD,
    /** Compress sequences of records together in blocks. */
    BLOCK
  }

  /**
   * Get the compression type for the reduce outputs
   * @param job the job config to look in
   * @return the kind of compression to use
   */
  static public CompressionType getDefaultCompressionType(Configuration job) {
    String name = job.get("io.seqfile.compression.type");
    return name == null ? CompressionType.RECORD : 
      CompressionType.valueOf(name);
  }
  
  /**
   * Set the default compression type for sequence files.
   * @param job the configuration to modify
   * @param val the new compression type (none, block, record)
   */
  static public void setDefaultCompressionType(Configuration job, 
                                               CompressionType val) {
    job.set("io.seqfile.compression.type", val.toString());
  }

  /**
   * Create a new Writer with the given options.
   * @param conf the configuration to use
   * @param opts the options to create the file with
   * @return a new Writer
   * @throws IOException raised on errors performing I/O.
   */
  public static Writer createWriter(Configuration conf, Writer.Option... opts
                                    ) throws IOException {
    Writer.CompressionOption compressionOption = 
      Options.getOption(Writer.CompressionOption.class, opts);
    CompressionType kind;
    if (compressionOption != null) {
      kind = compressionOption.getValue();
    } else {
      kind = getDefaultCompressionType(conf);
      opts = Options.prependOptions(opts, Writer.compression(kind));
    }
    switch (kind) {
      default:
      case NONE:
        return new Writer(conf, opts);
      case RECORD:
        return new RecordCompressWriter(conf, opts);
      case BLOCK:
        return new BlockCompressWriter(conf, opts);
    }
  }

  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fs The configured filesystem. 
   * @param conf The configuration.
   * @param name The name of the file. 
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer 
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass) throws IOException {
    return createWriter(conf, Writer.filesystem(fs),
                        Writer.file(name), Writer.keyClass(keyClass),
                        Writer.valueClass(valClass));
  }
  
  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fs The configured filesystem. 
   * @param conf The configuration.
   * @param name The name of the file. 
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param compressionType The compression type.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer 
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, 
                 CompressionType compressionType) throws IOException {
    return createWriter(conf, Writer.filesystem(fs),
                        Writer.file(name), Writer.keyClass(keyClass),
                        Writer.valueClass(valClass), 
                        Writer.compression(compressionType));
  }
  
  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fs The configured filesystem. 
   * @param conf The configuration.
   * @param name The name of the file. 
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param compressionType The compression type.
   * @param progress The Progressable object to track progress.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, CompressionType compressionType,
                 Progressable progress) throws IOException {
    return createWriter(conf, Writer.file(name),
                        Writer.filesystem(fs),
                        Writer.keyClass(keyClass),
                        Writer.valueClass(valClass), 
                        Writer.compression(compressionType),
                        Writer.progressable(progress));
  }

  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fs The configured filesystem. 
   * @param conf The configuration.
   * @param name The name of the file. 
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param compressionType The compression type.
   * @param codec The compression codec.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer 
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, CompressionType compressionType, 
                 CompressionCodec codec) throws IOException {
    return createWriter(conf, Writer.file(name),
                        Writer.filesystem(fs),
                        Writer.keyClass(keyClass),
                        Writer.valueClass(valClass), 
                        Writer.compression(compressionType, codec));
  }
  
  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fs The configured filesystem. 
   * @param conf The configuration.
   * @param name The name of the file. 
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param compressionType The compression type.
   * @param codec The compression codec.
   * @param progress The Progressable object to track progress.
   * @param metadata The metadata of the file.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, 
                 CompressionType compressionType, CompressionCodec codec,
                 Progressable progress, Metadata metadata) throws IOException {
    return createWriter(conf, Writer.file(name),
                        Writer.filesystem(fs),
                        Writer.keyClass(keyClass),
                        Writer.valueClass(valClass),
                        Writer.compression(compressionType, codec),
                        Writer.progressable(progress),
                        Writer.metadata(metadata));
  }

  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fs The configured filesystem.
   * @param conf The configuration.
   * @param name The name of the file.
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param bufferSize buffer size for the underlaying outputstream.
   * @param replication replication factor for the file.
   * @param blockSize block size for the file.
   * @param compressionType The compression type.
   * @param codec The compression codec.
   * @param progress The Progressable object to track progress.
   * @param metadata The metadata of the file.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name,
                 Class keyClass, Class valClass, int bufferSize,
                 short replication, long blockSize,
                 CompressionType compressionType, CompressionCodec codec,
                 Progressable progress, Metadata metadata) throws IOException {
    return createWriter(conf, Writer.file(name),
                        Writer.filesystem(fs),
                        Writer.keyClass(keyClass),
                        Writer.valueClass(valClass), 
                        Writer.bufferSize(bufferSize), 
                        Writer.replication(replication),
                        Writer.blockSize(blockSize),
                        Writer.compression(compressionType, codec),
                        Writer.progressable(progress),
                        Writer.metadata(metadata));
  }

  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fs The configured filesystem.
   * @param conf The configuration.
   * @param name The name of the file.
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param bufferSize buffer size for the underlaying outputstream.
   * @param replication replication factor for the file.
   * @param blockSize block size for the file.
   * @param createParent create parent directory if non-existent
   * @param compressionType The compression type.
   * @param codec The compression codec.
   * @param metadata The metadata of the file.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   */
  @Deprecated
  public static Writer
  createWriter(FileSystem fs, Configuration conf, Path name,
               Class keyClass, Class valClass, int bufferSize,
               short replication, long blockSize, boolean createParent,
               CompressionType compressionType, CompressionCodec codec,
               Metadata metadata) throws IOException {
    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
        conf, name, keyClass, valClass, compressionType, codec,
        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
        CreateOpts.bufferSize(bufferSize),
        createParent ? CreateOpts.createParent()
                     : CreateOpts.donotCreateParent(),
        CreateOpts.repFac(replication),
        CreateOpts.blockSize(blockSize)
      );
  }

  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fc The context for the specified file.
   * @param conf The configuration.
   * @param name The name of the file.
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param compressionType The compression type.
   * @param codec The compression codec.
   * @param metadata The metadata of the file.
   * @param createFlag gives the semantics of create: overwrite, append etc.
   * @param opts file creation options; see {@link CreateOpts}.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   */
  public static Writer
  createWriter(FileContext fc, Configuration conf, Path name,
               Class keyClass, Class valClass,
               CompressionType compressionType, CompressionCodec codec,
               Metadata metadata,
               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
               throws IOException {
    return createWriter(conf, fc.create(name, createFlag, opts),
          keyClass, valClass, compressionType, codec, metadata).ownStream();
  }

  /**
   * Construct the preferred type of SequenceFile Writer.
   * @param fs The configured filesystem. 
   * @param conf The configuration.
   * @param name The name of the file. 
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param compressionType The compression type.
   * @param codec The compression codec.
   * @param progress The Progressable object to track progress.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer
    createWriter(FileSystem fs, Configuration conf, Path name, 
                 Class keyClass, Class valClass, 
                 CompressionType compressionType, CompressionCodec codec,
                 Progressable progress) throws IOException {
    return createWriter(conf, Writer.file(name),
                        Writer.filesystem(fs),
                        Writer.keyClass(keyClass),
                        Writer.valueClass(valClass),
                        Writer.compression(compressionType, codec),
                        Writer.progressable(progress));
  }

  /**
   * Construct the preferred type of 'raw' SequenceFile Writer.
   * @param conf The configuration.
   * @param out The stream on top which the writer is to be constructed.
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param compressionType The compression type.
   * @param codec The compression codec.
   * @param metadata The metadata of the file.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer
    createWriter(Configuration conf, FSDataOutputStream out, 
                 Class keyClass, Class valClass,
                 CompressionType compressionType,
                 CompressionCodec codec, Metadata metadata) throws IOException {
    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
                        Writer.valueClass(valClass), 
                        Writer.compression(compressionType, codec),
                        Writer.metadata(metadata));
  }
  
  /**
   * Construct the preferred type of 'raw' SequenceFile Writer.
   * @param conf The configuration.
   * @param out The stream on top which the writer is to be constructed.
   * @param keyClass The 'key' type.
   * @param valClass The 'value' type.
   * @param compressionType The compression type.
   * @param codec The compression codec.
   * @return Returns the handle to the constructed SequenceFile Writer.
   * @throws IOException raised on errors performing I/O.
   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
   *     instead.
   */
  @Deprecated
  public static Writer
    createWriter(Configuration conf, FSDataOutputStream out, 
                 Class keyClass, Class valClass, CompressionType compressionType,
                 CompressionCodec codec) throws IOException {
    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
                        Writer.valueClass(valClass),
                        Writer.compression(compressionType, codec));
  }
  

  /** The interface to 'raw' values of SequenceFiles. */
  public static interface ValueBytes {

    /**
     * Writes the uncompressed bytes to the outStream.
     * @param outStream : Stream to write uncompressed bytes into.
     * @throws IOException raised on errors performing I/O.
     */
    public void writeUncompressedBytes(DataOutputStream outStream)
      throws IOException;

    /**
     * Write compressed bytes to outStream.
     * Note: that it will NOT compress the bytes if they are not compressed.
     * @param outStream : Stream to write compressed bytes into.
     * @throws IllegalArgumentException an illegal or inappropriate argument.
     * @throws IOException raised on errors performing I/O.
     */
    public void writeCompressedBytes(DataOutputStream outStream) 
      throws IllegalArgumentException, IOException;

    /**
     * @return Size of stored data.
     */
    public int getSize();
  }
  
  private static class UncompressedBytes implements ValueBytes {
    private int dataSize;
    private byte[] data;
    
    private UncompressedBytes() {
      data = null;
      dataSize = 0;
    }
    
    private void reset(DataInputStream in, int length) throws IOException {
      if (data == null) {
        data = new byte[length];
      } else if (length > data.length) {
        data = new byte[Math.max(length, data.length * 2)];
      }
      dataSize = -1;
      in.readFully(data, 0, length);
      dataSize = length;
    }
    
    @Override
    public int getSize() {
      return dataSize;
    }
    
    @Override
    public void writeUncompressedBytes(DataOutputStream outStream)
      throws IOException {
      outStream.write(data, 0, dataSize);
    }

    @Override
    public void writeCompressedBytes(DataOutputStream outStream) 
      throws IllegalArgumentException, IOException {
      throw 
        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
    }

  } // UncompressedBytes
  
  private static class CompressedBytes implements ValueBytes {
    private int dataSize;
    private byte[] data;
    DataInputBuffer rawData = null;
    CompressionCodec codec = null;
    CompressionInputStream decompressedStream = null;

    private CompressedBytes(CompressionCodec codec) {
      data = null;
      dataSize = 0;
      this.codec = codec;
    }

    private void reset(DataInputStream in, int length) throws IOException {
      if (data == null) {
        data = new byte[length];
      } else if (length > data.length) {
        data = new byte[Math.max(length, data.length * 2)];
      } 
      dataSize = -1;
      in.readFully(data, 0, length);
      dataSize = length;
    }
    
    @Override
    public int getSize() {
      return dataSize;
    }
    
    @Override
    public void writeUncompressedBytes(DataOutputStream outStream)
      throws IOException {
      if (decompressedStream == null) {
        rawData = new DataInputBuffer();
        decompressedStream = codec.createInputStream(rawData);
      } else {
        decompressedStream.resetState();
      }
      rawData.reset(data, 0, dataSize);

      byte[] buffer = new byte[8192];
      int bytesRead = 0;
      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
        outStream.write(buffer, 0, bytesRead);
      }
    }

    @Override
    public void writeCompressedBytes(DataOutputStream outStream) 
      throws IllegalArgumentException, IOException {
      outStream.write(data, 0, dataSize);
    }

  } // CompressedBytes
  
  /**
   * The class encapsulating with the metadata of a file.
   * The metadata of a file is a list of attribute name/value
   * pairs of Text type.
   *
   */
  public static class Metadata implements Writable {

    private TreeMap<Text, Text> theMetadata;
    
    public Metadata() {
      this(new TreeMap<Text, Text>());
    }
    
    public Metadata(TreeMap<Text, Text> arg) {
      if (arg == null) {
        this.theMetadata = new TreeMap<Text, Text>();
      } else {
        this.theMetadata = arg;
      }
    }
    
    public Text get(Text name) {
      return this.theMetadata.get(name);
    }
    
    public void set(Text name, Text value) {
      this.theMetadata.put(name, value);
    }
    
    public TreeMap<Text, Text> getMetadata() {
      return new TreeMap<Text, Text>(this.theMetadata);
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(this.theMetadata.size());
      Iterator<Map.Entry<Text, Text>> iter =
        this.theMetadata.entrySet().iterator();
      while (iter.hasNext()) {
        Map.Entry<Text, Text> en = iter.next();
        en.getKey().write(out);
        en.getValue().write(out);
      }
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      int sz = in.readInt();
      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
      this.theMetadata = new TreeMap<Text, Text>();
      for (int i = 0; i < sz; i++) {
        Text key = new Text();
        Text val = new Text();
        key.readFields(in);
        val.readFields(in);
        this.theMetadata.put(key, val);
      }    
    }

    @Override
    public boolean equals(Object other) {
      if (other == null) {
        return false;
      }
      if (other.getClass() != this.getClass()) {
        return false;
      } else {
        return equals((Metadata)other);
      }
    }
    
    public boolean equals(Metadata other) {
      if (other == null) return false;
      if (this.theMetadata.size() != other.theMetadata.size()) {
        return false;
      }
      Iterator<Map.Entry<Text, Text>> iter1 =
        this.theMetadata.entrySet().iterator();
      Iterator<Map.Entry<Text, Text>> iter2 =
        other.theMetadata.entrySet().iterator();
      while (iter1.hasNext() && iter2.hasNext()) {
        Map.Entry<Text, Text> en1 = iter1.next();
        Map.Entry<Text, Text> en2 = iter2.next();
        if (!en1.getKey().equals(en2.getKey())) {
          return false;
        }
        if (!en1.getValue().equals(en2.getValue())) {
          return false;
        }
      }
      if (iter1.hasNext() || iter2.hasNext()) {
        return false;
      }
      return true;
    }

    @Override
    public int hashCode() {
      assert false : "hashCode not designed";
      return 42; // any arbitrary constant will do 
    }
    
    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("size: ").append(this.theMetadata.size()).append("\n");
      Iterator<Map.Entry<Text, Text>> iter =
        this.theMetadata.entrySet().iterator();
      while (iter.hasNext()) {
        Map.Entry<Text, Text> en = iter.next();
        sb.append("\t").append(en.getKey().toString()).append("\t")
            .append(en.getValue().toString()).append("\n");
      }
      return sb.toString();
    }
  }
  
  /** Write key/value pairs to a sequence-format file. */
  public static class Writer implements java.io.Closeable, Syncable,
                  Flushable, StreamCapabilities {
    private Configuration conf;
    FSDataOutputStream out;
    boolean ownOutputStream = true;
    DataOutputBuffer buffer = new DataOutputBuffer();

    Class keyClass;
    Class valClass;

    private final CompressionType compress;
    CompressionCodec codec = null;
    CompressionOutputStream deflateFilter = null;
    DataOutputStream deflateOut = null;
    Metadata metadata = null;
    Compressor compressor = null;

    private boolean appendMode = false;

    protected Serializer keySerializer;
    protected Serializer uncompressedValSerializer;
    protected Serializer compressedValSerializer;
    
    // Insert a globally unique 16-byte value every few entries, so that one
    // can seek into the middle of a file and then synchronize with record
    // starts and ends by scanning for this value.
    long lastSyncPos;                     // position of last sync
    byte[] sync;                          // 16 random bytes
    @VisibleForTesting
    int syncInterval;

    {
      try {                                       
        MessageDigest digester = MessageDigest.getInstance("MD5");
        long time = Time.now();
        digester.update((new UID()+"@"+time).getBytes(StandardCharsets.UTF_8));
        sync = digester.digest();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }

    public static interface Option {}
    
    static class FileOption extends Options.PathOption 
                                    implements Option {
      FileOption(Path path) {
        super(path);
      }
    }

    /**
     * @deprecated only used for backwards-compatibility in the createWriter methods
     * that take FileSystem.
     */
    @Deprecated
    private static class FileSystemOption implements Option {
      private final FileSystem value;
      protected FileSystemOption(FileSystem value) {
        this.value = value;
      }
      public FileSystem getValue() {
        return value;
      }
    }

    static class StreamOption extends Options.FSDataOutputStreamOption 
                              implements Option {
      StreamOption(FSDataOutputStream stream) {
        super(stream);
      }
    }

    static class BufferSizeOption extends Options.IntegerOption
                                  implements Option {
      BufferSizeOption(int value) {
        super(value);
      }
    }
    
    static class BlockSizeOption extends Options.LongOption implements Option {
      BlockSizeOption(long value) {
        super(value);
      }
    }

    static class ReplicationOption extends Options.IntegerOption
                                   implements Option {
      ReplicationOption(int value) {
        super(value);
      }
    }

    static class AppendIfExistsOption extends Options.BooleanOption implements
        Option {
      AppendIfExistsOption(boolean value) {
        super(value);
      }
    }

    static class KeyClassOption extends Options.ClassOption implements Option {
      KeyClassOption(Class<?> value) {
        super(value);
      }
    }

    static class ValueClassOption extends Options.ClassOption
                                          implements Option {
      ValueClassOption(Class<?> value) {
        super(value);
      }
    }

    static class MetadataOption implements Option {
      private final Metadata value;
      MetadataOption(Metadata value) {
        this.value = value;
      }
      Metadata getValue() {
        return value;
      }
    }

    static class ProgressableOption extends Options.ProgressableOption
                                    implements Option {
      ProgressableOption(Progressable value) {
        super(value);
      }
    }

    private static class CompressionOption implements Option {
      private final CompressionType value;
      private final CompressionCodec codec;
      CompressionOption(CompressionType value) {
        this(value, null);
      }
      CompressionOption(CompressionType value, CompressionCodec codec) {
        this.value = value;
        this.codec = (CompressionType.NONE != value && null == codec)
          ? new DefaultCodec()
          : codec;
      }
      CompressionType getValue() {
        return value;
      }
      CompressionCodec getCodec() {
        return codec;
      }
    }

    public static Option file(Path value) {
      return new FileOption(value);
    }

    /**
     * @deprecated only used for backwards-compatibility in the createWriter methods
     * that take FileSystem.
     */
    @Deprecated
    private static Option filesystem(FileSystem fs) {
      return new SequenceFile.Writer.FileSystemOption(fs);
    }

    private static class SyncIntervalOption extends Options.IntegerOption
        implements Option {
      SyncIntervalOption(int val) {
        // If a negative sync interval is provided,
        // fall back to the default sync interval.
        super(val < 0 ? SYNC_INTERVAL : val);
      }
    }

    public static Option bufferSize(int value) {
      return new BufferSizeOption(value);
    }
    
    public static Option stream(FSDataOutputStream value) {
      return new StreamOption(value);
    }
    
    public static Option replication(short value) {
      return new ReplicationOption(value);
    }
    
    public static Option appendIfExists(boolean value) {
      return new AppendIfExistsOption(value);
    }

    public static Option blockSize(long value) {
      return new BlockSizeOption(value);
    }
    
    public static Option progressable(Progressable value) {
      return new ProgressableOption(value);
    }

    public static Option keyClass(Class<?> value) {
      return new KeyClassOption(value);
    }
    
    public static Option valueClass(Class<?> value) {
      return new ValueClassOption(value);
    }
    
    public static Option metadata(Metadata value) {
      return new MetadataOption(value);
    }

    public static Option compression(CompressionType value) {
      return new CompressionOption(value);
    }

    public static Option compression(CompressionType value,
        CompressionCodec codec) {
      return new CompressionOption(value, codec);
    }

    public static Option syncInterval(int value) {
      return new SyncIntervalOption(value);
    }

    /**
     * Construct a uncompressed writer from a set of options.
     * @param conf the configuration to use
     * @param opts the options used when creating the writer
     * @throws IOException if it fails
     */
    Writer(Configuration conf, 
           Option... opts) throws IOException {
      BlockSizeOption blockSizeOption = 
        Options.getOption(BlockSizeOption.class, opts);
      BufferSizeOption bufferSizeOption = 
        Options.getOption(BufferSizeOption.class, opts);
      ReplicationOption replicationOption = 
        Options.getOption(ReplicationOption.class, opts);
      ProgressableOption progressOption = 
        Options.getOption(ProgressableOption.class, opts);
      FileOption fileOption = Options.getOption(FileOption.class, opts);
      AppendIfExistsOption appendIfExistsOption = Options.getOption(
          AppendIfExistsOption.class, opts);
      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
      KeyClassOption keyClassOption = 
        Options.getOption(KeyClassOption.class, opts);
      ValueClassOption valueClassOption = 
        Options.getOption(ValueClassOption.class, opts);
      MetadataOption metadataOption = 
        Options.getOption(MetadataOption.class, opts);
      CompressionOption compressionTypeOption =
        Options.getOption(CompressionOption.class, opts);
      SyncIntervalOption syncIntervalOption =
          Options.getOption(SyncIntervalOption.class, opts);
      // check consistency of options
      if ((fileOption == null) == (streamOption == null)) {
        throw new IllegalArgumentException("file or stream must be specified");
      }
      if (fileOption == null && (blockSizeOption != null ||
                                 bufferSizeOption != null ||
                                 replicationOption != null ||
                                 progressOption != null)) {
        throw new IllegalArgumentException("file modifier options not " +
                                           "compatible with stream");
      }

      FSDataOutputStream out;
      boolean ownStream = fileOption != null;
      if (ownStream) {
        Path p = fileOption.getValue();
        FileSystem fs;
        if (fsOption != null) {
          fs = fsOption.getValue();
        } else {
          fs = p.getFileSystem(conf);
        }
        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
          bufferSizeOption.getValue();
        short replication = replicationOption == null ? 
          fs.getDefaultReplication(p) :
          (short) replicationOption.getValue();
        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
          blockSizeOption.getValue();
        Progressable progress = progressOption == null ? null :
          progressOption.getValue();

        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
            && fs.exists(p)) {

          // Read the file and verify header details
          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
          try {

            if (keyClassOption.getValue() != reader.getKeyClass()
                || valueClassOption.getValue() != reader.getValueClass()) {
              throw new IllegalArgumentException(
                  "Key/value class provided does not match the file");
            }

            if (reader.getVersion() != VERSION[3]) {
              throw new VersionMismatchException(VERSION[3],
                  reader.getVersion());
            }

            if (metadataOption != null) {
              LOG.info("MetaData Option is ignored during append");
            }
            metadataOption = (MetadataOption) SequenceFile.Writer
                .metadata(reader.getMetadata());

            CompressionOption readerCompressionOption = new CompressionOption(
                reader.getCompressionType(), reader.getCompressionCodec());

            // Codec comparison will be ignored if the compression is NONE
            if (readerCompressionOption.value != compressionTypeOption.value
                || (readerCompressionOption.value != CompressionType.NONE
                    && readerCompressionOption.codec
                        .getClass() != compressionTypeOption.codec
                            .getClass())) {
              throw new IllegalArgumentException(
                  "Compression option provided does not match the file");
            }

            sync = reader.getSync();

          } finally {
            reader.close();
          }

          out = fs.append(p, bufferSize, progress);
          this.appendMode = true;
        } else {
          out = fs
              .create(p, true, bufferSize, replication, blockSize, progress);
        }
      } else {
        out = streamOption.getValue();
      }
      Class<?> keyClass = keyClassOption == null ?
          Object.class : keyClassOption.getValue();
      Class<?> valueClass = valueClassOption == null ?
          Object.class : valueClassOption.getValue();
      Metadata metadata = metadataOption == null ?
          new Metadata() : metadataOption.getValue();
      this.compress = compressionTypeOption.getValue();
      final CompressionCodec codec = compressionTypeOption.getCodec();
      this.syncInterval = (syncIntervalOption == null) ?
          SYNC_INTERVAL :
          syncIntervalOption.getValue();
      init(
          conf, out, ownStream, keyClass, valueClass,
          codec, metadata, syncInterval);
    }

    /**
     * Create the named file.
     * @deprecated Use 
     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
     *   instead.
     * @param fs input filesystem.
     * @param conf input configuration.
     * @param name input name.
     * @param keyClass input keyClass.
     * @param valClass input valClass.
     * @throws IOException raised on errors performing I/O.
     */
    @Deprecated
    public Writer(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass) throws IOException {
      this.compress = CompressionType.NONE;
      init(conf, fs.create(name), true, keyClass, valClass, null, 
           new Metadata(), SYNC_INTERVAL);
    }
    
    /**
     * Create the named file with write-progress reporter.
     * @deprecated Use 
     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
     *   instead.
     * @param fs input filesystem.
     * @param conf input configuration.
     * @param name input name.
     * @param keyClass input keyClass.
     * @param valClass input valClass.
     * @param progress input progress.
     * @param metadata input metadata.
     * @throws IOException raised on errors performing I/O.
     */
    @Deprecated
    public Writer(FileSystem fs, Configuration conf, Path name, 
                  Class keyClass, Class valClass,
                  Progressable progress, Metadata metadata) throws IOException {
      this.compress = CompressionType.NONE;
      init(conf, fs.create(name, progress), true, keyClass, valClass,
           null, metadata, SYNC_INTERVAL);
    }
    
    /**
     * Create the named file with write-progress reporter.
     * @deprecated Use 
     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
     *   instead.
     * @param fs input filesystem.
     * @param conf input configuration.
     * @param name input name.
     * @param keyClass input keyClass.
     * @param valClass input valClass.
     * @param bufferSize input bufferSize.
     * @param replication input replication.
     * @param blockSize input blockSize.
     * @param progress input progress.
     * @param metadata input metadata.
     * @throws IOException raised on errors performing I/O.
     */
    @Deprecated
    public Writer(FileSystem fs, Configuration conf, Path name,
                  Class keyClass, Class valClass,
                  int bufferSize, short replication, long blockSize,
                  Progressable progress, Metadata metadata) throws IOException {
      this.compress = CompressionType.NONE;
      init(conf,
           fs.create(name, true, bufferSize, replication, blockSize, progress),
           true, keyClass, valClass, null, metadata, SYNC_INTERVAL);
    }

    boolean isCompressed() { return compress != CompressionType.NONE; }
    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
    
    Writer ownStream() { this.ownOutputStream = true; return this;  }

    /** Write and flush the file header. */
    private void writeFileHeader() 
      throws IOException {
      out.write(VERSION);
      Text.writeString(out, keyClass.getName());
      Text.writeString(out, valClass.getName());
      
      out.writeBoolean(this.isCompressed());
      out.writeBoolean(this.isBlockCompressed());
      
      if (this.isCompressed()) {
        Text.writeString(out, (codec.getClass()).getName());
      }
      this.metadata.write(out);
      out.write(sync);                       // write the sync bytes
      out.flush();                           // flush header
    }

    /** Initialize. */
    @SuppressWarnings("unchecked")
    void init(Configuration config, FSDataOutputStream outStream,
              boolean ownStream, Class key, Class val,
              CompressionCodec compCodec, Metadata meta,
              int syncIntervalVal)
      throws IOException {
      this.conf = config;
      this.out = outStream;
      this.ownOutputStream = ownStream;
      this.keyClass = key;
      this.valClass = val;
      this.codec = compCodec;
      this.metadata = meta;
      this.syncInterval = syncIntervalVal;
      SerializationFactory serializationFactory =
          new SerializationFactory(config);
      this.keySerializer = serializationFactory.getSerializer(keyClass);
      if (this.keySerializer == null) {
        throw new IOException(
            "Could not find a serializer for the Key class: '"
                + keyClass.getCanonicalName() + "'. "
                + "Please ensure that the configuration '" +
                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
                + "properly configured, if you're using"
                + "custom serialization.");
      }
      this.keySerializer.open(buffer);
      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
      if (this.uncompressedValSerializer == null) {
        throw new IOException(
            "Could not find a serializer for the Value class: '"
                + valClass.getCanonicalName() + "'. "
                + "Please ensure that the configuration '" +
                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
                + "properly configured, if you're using"
                + "custom serialization.");
      }
      this.uncompressedValSerializer.open(buffer);
      if (this.codec != null) {
        ReflectionUtils.setConf(this.codec, this.conf);
        this.compressor = CodecPool.getCompressor(this.codec);
        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
        this.deflateOut = 
          new DataOutputStream(new BufferedOutputStream(deflateFilter));
        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
        if (this.compressedValSerializer == null) {
          throw new IOException(
              "Could not find a serializer for the Value class: '"
                  + valClass.getCanonicalName() + "'. "
                  + "Please ensure that the configuration '" +
                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
                  + "properly configured, if you're using"
                  + "custom serialization.");
        }
        this.compressedValSerializer.open(deflateOut);
      }

      if (appendMode) {
        sync();
      } else {
        writeFileHeader();
      }
    }
    
    /** @return Returns the class of keys in this file. */
    public Class getKeyClass() { return keyClass; }

    /** @return Returns the class of values in this file. */
    public Class getValueClass() { return valClass; }

    /** @return Returns the compression codec of data in this file. */
    public CompressionCodec getCompressionCodec() { return codec; }
    
    /**
     * create a sync point.
     * @throws IOException raised on errors performing I/O.
     */
    public void sync() throws IOException {
      if (sync != null && lastSyncPos != out.getPos()) {
        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
        out.write(sync);                          // write sync
        lastSyncPos = out.getPos();               // update lastSyncPos
      }
    }

    /**
     * flush all currently written data to the file system.
     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
     * @throws IOException raised on errors performing I/O.
     */
    @Deprecated
    public void syncFs() throws IOException {
      if (out != null) {
        out.hflush();  // flush contents to file system
      }
    }

    @Override
    public void hsync() throws IOException {
      if (out != null) {
        out.hsync();
      }
    }

    @Override
    public void hflush() throws IOException {
      if (out != null) {
        out.hflush();
      }
    }

    @Override
    public void flush() throws IOException {
      if (out != null) {
        out.flush();
      }
    }

    @Override
    public boolean hasCapability(String capability) {
      if (out !=null && capability != null) {
        return out.hasCapability(capability);
      }
      return false;
    }
    
    /** Returns the configuration of this file. */
    Configuration getConf() { return conf; }
    
    /** Close the file. */
    @Override
    public synchronized void close() throws IOException {
      keySerializer.close();
      uncompressedValSerializer.close();
      if (compressedValSerializer != null) {
        compressedValSerializer.close();
      }

      CodecPool.returnCompressor(compressor);
      compressor = null;
      
      if (out != null) {
        
        // Close the underlying stream iff we own it...
        if (ownOutputStream) {
          out.close();
        } else {
          out.flush();
        }
        out = null;
      }
    }

    synchronized void checkAndWriteSync() throws IOException {
      if (sync != null &&
          out.getPos() >= lastSyncPos+this.syncInterval) { // time to emit sync
        sync();
      }
    }

    /**
     * Append a key/value pair.
     * @param key input Writable key.
     * @param val input Writable val.
     * @throws IOException raised on errors performing I/O.
     */
    public void append(Writable key, Writable val)
      throws IOException {
      append((Object) key, (Object) val);
    }

    /**
     * Append a key/value pair.
     * @param key input Object key.
     * @param val input Object val.
     * @throws IOException raised on errors performing I/O.
     */
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val.getClass().getName()
                              +" is not "+valClass);

      buffer.reset();

      // Append the 'key'
      keySerializer.serialize(key);
      int keyLength = buffer.getLength();
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);

      // Append the 'value'
      if (compress == CompressionType.RECORD) {
        deflateFilter.resetState();
        compressedValSerializer.serialize(val);
        deflateOut.flush();
        deflateFilter.finish();
      } else {
        uncompressedValSerializer.serialize(val);
      }

      // Write the record out
      checkAndWriteSync();                                // sync
      out.writeInt(buffer.getLength());                   // total record length
      out.writeInt(keyLength);                            // key portion length
      out.write(buffer.getData(), 0, buffer.getLength()); // data
    }

    public synchronized void appendRaw(byte[] keyData, int keyOffset,
        int keyLength, ValueBytes val) throws IOException {
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + keyLength);

      int valLength = val.getSize();

      checkAndWriteSync();
      
      out.writeInt(keyLength+valLength);          // total record length
      out.writeInt(keyLength);                    // key portion length
      out.write(keyData, keyOffset, keyLength);   // key
      val.writeUncompressedBytes(out);            // value
    }

    /** @return Returns the current length of the output file.
     *
     * <p>This always returns a synchronized position.  In other words,
     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
     * the key may be earlier in the file than key last written when this
     * method was called (e.g., with block-compression, it may be the first key
     * in the block that was being written when this method was called).</p>
     *
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized long getLength() throws IOException {
      return out.getPos();
    }

  } // class Writer

  /** Write key/compressed-value pairs to a sequence-format file. */
  static class RecordCompressWriter extends Writer {
    
    RecordCompressWriter(Configuration conf, 
                         Option... options) throws IOException {
      super(conf, options);
    }

    /** Append a key/value pair. */
    @Override
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val.getClass().getName()
                              +" is not "+valClass);

      buffer.reset();

      // Append the 'key'
      keySerializer.serialize(key);
      int keyLength = buffer.getLength();
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);

      // Compress 'value' and append it
      deflateFilter.resetState();
      compressedValSerializer.serialize(val);
      deflateOut.flush();
      deflateFilter.finish();

      // Write the record out
      checkAndWriteSync();                                // sync
      out.writeInt(buffer.getLength());                   // total record length
      out.writeInt(keyLength);                            // key portion length
      out.write(buffer.getData(), 0, buffer.getLength()); // data
    }

    /** Append a key/value pair. */
    @Override
    public synchronized void appendRaw(byte[] keyData, int keyOffset,
        int keyLength, ValueBytes val) throws IOException {

      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + keyLength);

      int valLength = val.getSize();
      
      checkAndWriteSync();                        // sync
      out.writeInt(keyLength+valLength);          // total record length
      out.writeInt(keyLength);                    // key portion length
      out.write(keyData, keyOffset, keyLength);   // 'key' data
      val.writeCompressedBytes(out);              // 'value' data
    }
    
  } // RecordCompressionWriter

  /** Write compressed key/value blocks to a sequence-format file. */
  static class BlockCompressWriter extends Writer {
    
    private int noBufferedRecords = 0;
    
    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
    private DataOutputBuffer keyBuffer = new DataOutputBuffer();

    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
    private DataOutputBuffer valBuffer = new DataOutputBuffer();

    private final int compressionBlockSize;
    
    BlockCompressWriter(Configuration conf,
                        Option... options) throws IOException {
      super(conf, options);
      compressionBlockSize = 
        conf.getInt(IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY,
            IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT
        );
      keySerializer.close();
      keySerializer.open(keyBuffer);
      uncompressedValSerializer.close();
      uncompressedValSerializer.open(valBuffer);
    }

    /** Workhorse to check and write out compressed data/lengths */
    private synchronized 
      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
      throws IOException {
      deflateFilter.resetState();
      buffer.reset();
      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
                       uncompressedDataBuffer.getLength());
      deflateOut.flush();
      deflateFilter.finish();
      
      WritableUtils.writeVInt(out, buffer.getLength());
      out.write(buffer.getData(), 0, buffer.getLength());
    }
    
    /** Compress and flush contents to dfs */
    @Override
    public synchronized void sync() throws IOException {
      if (noBufferedRecords > 0) {
        super.sync();
        
        // No. of records
        WritableUtils.writeVInt(out, noBufferedRecords);
        
        // Write 'keys' and lengths
        writeBuffer(keyLenBuffer);
        writeBuffer(keyBuffer);
        
        // Write 'values' and lengths
        writeBuffer(valLenBuffer);
        writeBuffer(valBuffer);
        
        // Flush the file-stream
        out.flush();
        
        // Reset internal states
        keyLenBuffer.reset();
        keyBuffer.reset();
        valLenBuffer.reset();
        valBuffer.reset();
        noBufferedRecords = 0;
      }
      
    }
    
    /** Close the file. */
    @Override
    public synchronized void close() throws IOException {
      if (out != null) {
        sync();
      }
      super.close();
    }

    /** Append a key/value pair. */
    @Override
    @SuppressWarnings("unchecked")
    public synchronized void append(Object key, Object val)
      throws IOException {
      if (key.getClass() != keyClass)
        throw new IOException("wrong key class: "+key+" is not "+keyClass);
      if (val.getClass() != valClass)
        throw new IOException("wrong value class: "+val+" is not "+valClass);

      // Save key/value into respective buffers 
      int oldKeyLength = keyBuffer.getLength();
      keySerializer.serialize(key);
      int keyLength = keyBuffer.getLength() - oldKeyLength;
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed: " + key);
      WritableUtils.writeVInt(keyLenBuffer, keyLength);

      int oldValLength = valBuffer.getLength();
      uncompressedValSerializer.serialize(val);
      int valLength = valBuffer.getLength() - oldValLength;
      WritableUtils.writeVInt(valLenBuffer, valLength);
      
      // Added another key/value pair
      ++noBufferedRecords;
      
      // Compress and flush?
      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
      if (currentBlockSize >= compressionBlockSize) {
        sync();
      }
    }
    
    /** Append a key/value pair. */
    @Override
    public synchronized void appendRaw(byte[] keyData, int keyOffset,
        int keyLength, ValueBytes val) throws IOException {
      
      if (keyLength < 0)
        throw new IOException("negative length keys not allowed");

      int valLength = val.getSize();
      
      // Save key/value data in relevant buffers
      WritableUtils.writeVInt(keyLenBuffer, keyLength);
      keyBuffer.write(keyData, keyOffset, keyLength);
      WritableUtils.writeVInt(valLenBuffer, valLength);
      val.writeUncompressedBytes(valBuffer);

      // Added another key/value pair
      ++noBufferedRecords;

      // Compress and flush?
      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
      if (currentBlockSize >= compressionBlockSize) {
        sync();
      }
    }
  
  } // BlockCompressionWriter

  /** Get the configured buffer size */
  private static int getBufferSize(Configuration conf) {
    return conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT);
  }

  /** Reads key/value pairs from a sequence-format file. */
  public static class Reader implements java.io.Closeable {
    private String filename;
    private FSDataInputStream in;
    private DataOutputBuffer outBuf = new DataOutputBuffer();

    private byte version;

    private String keyClassName;
    private String valClassName;
    private Class keyClass;
    private Class valClass;

    private CompressionCodec codec = null;
    private Metadata metadata = null;
    
    private byte[] sync = new byte[SYNC_HASH_SIZE];
    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
    private boolean syncSeen;

    private long headerEnd;
    private long end;
    private int keyLength;
    private int recordLength;

    private boolean decompress;
    private boolean blockCompressed;
    
    private Configuration conf;

    private int noBufferedRecords = 0;
    private boolean lazyDecompress = true;
    private boolean valuesDecompressed = true;
    
    private int noBufferedKeys = 0;
    private int noBufferedValues = 0;
    
    private DataInputBuffer keyLenBuffer = null;
    private CompressionInputStream keyLenInFilter = null;
    private DataInputStream keyLenIn = null;
    private Decompressor keyLenDecompressor = null;
    private DataInputBuffer keyBuffer = null;
    private CompressionInputStream keyInFilter = null;
    private DataInputStream keyIn = null;
    private Decompressor keyDecompressor = null;

    private DataInputBuffer valLenBuffer = null;
    private CompressionInputStream valLenInFilter = null;
    private DataInputStream valLenIn = null;
    private Decompressor valLenDecompressor = null;
    private DataInputBuffer valBuffer = null;
    private CompressionInputStream valInFilter = null;
    private DataInputStream valIn = null;
    private Decompressor valDecompressor = null;
    
    private Deserializer keyDeserializer;
    private Deserializer valDeserializer;

    /**
     * A tag interface for all of the Reader options
     */
    public static interface Option {}
    
    /**
     * Create an option to specify the path name of the sequence file.
     * @param value the path to read
     * @return a new option
     */
    public static Option file(Path value) {
      return new FileOption(value);
    }
    
    /**
     * Create an option to specify the stream with the sequence file.
     * @param value the stream to read.
     * @return a new option
     */
    public static Option stream(FSDataInputStream value) {
      return new InputStreamOption(value);
    }
    
    /**
     * Create an option to specify the starting byte to read.
     * @param value the number of bytes to skip over
     * @return a new option
     */
    public static Option start(long value) {
      return new StartOption(value);
    }
    
    /**
     * Create an option to specify the number of bytes to read.
     * @param value the number of bytes to read
     * @return a new option
     */
    public static Option length(long value) {
      return new LengthOption(value);
    }
    
    /**
     * Create an option with the buffer size for reading the given pathname.
     * @param value the number of bytes to buffer
     * @return a new option
     */
    public static Option bufferSize(int value) {
      return new BufferSizeOption(value);
    }

    private static class FileOption extends Options.PathOption 
                                    implements Option {
      private FileOption(Path value) {
        super(value);
      }
    }
    
    private static class InputStreamOption
        extends Options.FSDataInputStreamOption 
        implements Option {
      private InputStreamOption(FSDataInputStream value) {
        super(value);
      }
    }

    private static class StartOption extends Options.LongOption
                                     implements Option {
      private StartOption(long value) {
        super(value);
      }
    }

    private static class LengthOption extends Options.LongOption
                                      implements Option {
      private LengthOption(long value) {
        super(value);
      }
    }

    private static class BufferSizeOption extends Options.IntegerOption
                                      implements Option {
      private BufferSizeOption(int value) {
        super(value);
      }
    }

    // only used directly
    private static class OnlyHeaderOption extends Options.BooleanOption 
                                          implements Option {
      private OnlyHeaderOption() {
        super(true);
      }
    }

    public Reader(Configuration conf, Option... opts) throws IOException {
      // Look up the options, these are null if not set
      FileOption fileOpt = Options.getOption(FileOption.class, opts);
      InputStreamOption streamOpt = 
        Options.getOption(InputStreamOption.class, opts);
      StartOption startOpt = Options.getOption(StartOption.class, opts);
      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
      OnlyHeaderOption headerOnly = 
        Options.getOption(OnlyHeaderOption.class, opts);
      // check for consistency
      if ((fileOpt == null) == (streamOpt == null)) {
        throw new 
          IllegalArgumentException("File or stream option must be specified");
      }
      if (fileOpt == null && bufOpt != null) {
        throw new IllegalArgumentException("buffer size can only be set when" +
                                           " a file is specified.");
      }
      // figure out the real values
      Path filename = null;
      FSDataInputStream file;
      final long len;
      if (fileOpt != null) {
        filename = fileOpt.getValue();
        FileSystem fs = filename.getFileSystem(conf);
        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
        len = null == lenOpt
          ? fs.getFileStatus(filename).getLen()
          : lenOpt.getValue();
        file = openFile(fs, filename, bufSize, len);
      } else {
        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
        file = streamOpt.getValue();
      }
      long start = startOpt == null ? 0 : startOpt.getValue();
      // really set up
      initialize(filename, file, start, len, conf, headerOnly != null);
    }

    /**
     * Construct a reader by opening a file from the given file system.
     * @param fs The file system used to open the file.
     * @param file The file being read.
     * @param conf Configuration
     * @throws IOException raised on errors performing I/O.
     * @deprecated Use Reader(Configuration, Option...) instead.
     */
    @Deprecated
    public Reader(FileSystem fs, Path file, 
                  Configuration conf) throws IOException {
      this(conf, file(fs.makeQualified(file)));
    }

    /**
     * Construct a reader by the given input stream.
     * @param in An input stream.
     * @param buffersize unused
     * @param start The starting position.
     * @param length The length being read.
     * @param conf Configuration
     * @throws IOException raised on errors performing I/O.
     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
     */
    @Deprecated
    public Reader(FSDataInputStream in, int buffersize,
        long start, long length, Configuration conf) throws IOException {
      this(conf, stream(in), start(start), length(length));
    }

    /** Common work of the constructors. */
    private void initialize(Path filename, FSDataInputStream in,
                            long start, long length, Configuration conf,
                            boolean tempReader) throws IOException {
      if (in == null) {
        throw new IllegalArgumentException("in == null");
      }
      this.filename = filename == null ? "<unknown>" : filename.toString();
      this.in = in;
      this.conf = conf;
      boolean succeeded = false;
      try {
        seek(start);
        this.end = this.in.getPos() + length;
        // if it wrapped around, use the max
        if (end < length) {
          end = Long.MAX_VALUE;
        }
        init(tempReader);
        succeeded = true;
      } finally {
        if (!succeeded) {
          IOUtils.cleanupWithLogger(LOG, this.in);
        }
      }
    }

    /**
     * Override this method to specialize the type of
     * {@link FSDataInputStream} returned.
     * @param fs The file system used to open the file.
     * @param file The file being read.
     * @param bufferSize The buffer size used to read the file.
     * @param length The length being read if it is {@literal >=} 0.
     *               Otherwise, the length is not available.
     * @return The opened stream.
     * @throws IOException raised on errors performing I/O.
     */
    protected FSDataInputStream openFile(FileSystem fs, Path file,
        int bufferSize, long length) throws IOException {
      FutureDataInputStreamBuilder builder = fs.openFile(file)
          .opt(FS_OPTION_OPENFILE_READ_POLICY,
              FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
          .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
      if (length >= 0) {
        builder.opt(FS_OPTION_OPENFILE_LENGTH, length);
      }
      return awaitFuture(builder.build());
    }
    
    /**
     * Initialize the {@link Reader}
     * @param tmpReader <code>true</code> if we are constructing a temporary
     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
     *                  and hence do not initialize every component; 
     *                  <code>false</code> otherwise.
     * @throws IOException
     */
    private void init(boolean tempReader) throws IOException {
      byte[] versionBlock = new byte[VERSION.length];
      String exceptionMsg = this + " not a SequenceFile";

      // Try to read sequence file header.
      try {
        in.readFully(versionBlock);
      } catch (EOFException e) {
        throw new EOFException(exceptionMsg);
      }

      if ((versionBlock[0] != VERSION[0]) ||
          (versionBlock[1] != VERSION[1]) ||
          (versionBlock[2] != VERSION[2])) {
        throw new IOException(this + " not a SequenceFile");
      }

      // Set 'version'
      version = versionBlock[3];
      if (version > VERSION[3]) {
        throw new VersionMismatchException(VERSION[3], version);
      }

      if (version < BLOCK_COMPRESS_VERSION) {
        UTF8 className = new UTF8();

        className.readFields(in);
        keyClassName = className.toStringChecked(); // key class name

        className.readFields(in);
        valClassName = className.toStringChecked(); // val class name
      } else {
        keyClassName = Text.readString(in);
        valClassName = Text.readString(in);
      }

      if (version > 2) {                          // if version > 2
        this.decompress = in.readBoolean();       // is compressed?
      } else {
        decompress = false;
      }

      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
        this.blockCompressed = in.readBoolean();  // is block-compressed?
      } else {
        blockCompressed = false;
      }
      
      // if version >= 5
      // setup the compression codec
      if (decompress) {
        if (version >= CUSTOM_COMPRESS_VERSION) {
          String codecClassname = Text.readString(in);
          try {
            Class<? extends CompressionCodec> codecClass
              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
            this.codec = ReflectionUtils.newInstance(codecClass, conf);
          } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("Unknown codec: " + 
                                               codecClassname, cnfe);
          }
        } else {
          codec = new DefaultCodec();
          ((Configurable)codec).setConf(conf);
        }
      }
      
      this.metadata = new Metadata();
      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
        this.metadata.readFields(in);
      }
      
      if (version > 1) {                          // if version > 1
        in.readFully(sync);                       // read sync bytes
        headerEnd = in.getPos();                  // record end of header
      }
      
      // Initialize... *not* if this we are constructing a temporary Reader
      if (!tempReader) {
        valBuffer = new DataInputBuffer();
        if (decompress) {
          valDecompressor = CodecPool.getDecompressor(codec);
          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
          valIn = new DataInputStream(valInFilter);
        } else {
          valIn = valBuffer;
        }

        if (blockCompressed) {
          keyLenBuffer = new DataInputBuffer();
          keyBuffer = new DataInputBuffer();
          valLenBuffer = new DataInputBuffer();

          keyLenDecompressor = CodecPool.getDecompressor(codec);
          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
                                                   keyLenDecompressor);
          keyLenIn = new DataInputStream(keyLenInFilter);

          keyDecompressor = CodecPool.getDecompressor(codec);
          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
          keyIn = new DataInputStream(keyInFilter);

          valLenDecompressor = CodecPool.getDecompressor(codec);
          valLenInFilter = codec.createInputStream(valLenBuffer, 
                                                   valLenDecompressor);
          valLenIn = new DataInputStream(valLenInFilter);
        }
        
        SerializationFactory serializationFactory =
          new SerializationFactory(conf);
        this.keyDeserializer =
          getDeserializer(serializationFactory, getKeyClass());
        if (this.keyDeserializer == null) {
          throw new IOException(
              "Could not find a deserializer for the Key class: '"
                  + getKeyClass().getCanonicalName() + "'. "
                  + "Please ensure that the configuration '" +
                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
                  + "properly configured, if you're using "
                  + "custom serialization.");
        }
        if (!blockCompressed) {
          this.keyDeserializer.open(valBuffer);
        } else {
          this.keyDeserializer.open(keyIn);
        }
        this.valDeserializer =
          getDeserializer(serializationFactory, getValueClass());
        if (this.valDeserializer == null) {
          throw new IOException(
              "Could not find a deserializer for the Value class: '"
                  + getValueClass().getCanonicalName() + "'. "
                  + "Please ensure that the configuration '" +
                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
                  + "properly configured, if you're using "
                  + "custom serialization.");
        }
        this.valDeserializer.open(valIn);
      }
    }
    
    @SuppressWarnings("unchecked")
    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
      return sf.getDeserializer(c);
    }
    
    /** Close the file. */
    @Override
    public synchronized void close() throws IOException {
      // Return the decompressors to the pool
      CodecPool.returnDecompressor(keyLenDecompressor);
      CodecPool.returnDecompressor(keyDecompressor);
      CodecPool.returnDecompressor(valLenDecompressor);
      CodecPool.returnDecompressor(valDecompressor);
      keyLenDecompressor = keyDecompressor = null;
      valLenDecompressor = valDecompressor = null;
      
      if (keyDeserializer != null) {
    	keyDeserializer.close();
      }
      if (valDeserializer != null) {
        valDeserializer.close();
      }
      
      // Close the input-stream
      in.close();
    }

    /** @return Returns the name of the key class. */
    public String getKeyClassName() {
      return keyClassName;
    }

    /** @return Returns the class of keys in this file. */
    public synchronized Class<?> getKeyClass() {
      if (null == keyClass) {
        try {
          keyClass = WritableName.getClass(getKeyClassName(), conf);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
      return keyClass;
    }

    /** @return Returns the name of the value class. */
    public String getValueClassName() {
      return valClassName;
    }

    /** @return Returns the class of values in this file. */
    public synchronized Class<?> getValueClass() {
      if (null == valClass) {
        try {
          valClass = WritableName.getClass(getValueClassName(), conf);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
      }
      return valClass;
    }

    /**
     * Returns true if values are compressed.
     * @return if values are compressed true, not false.
     */
    public boolean isCompressed() { return decompress; }
    
    /**
     * Returns true if records are block-compressed.
     * @return if records are block-compressed true, not false.
     */
    public boolean isBlockCompressed() { return blockCompressed; }
    
    /**
     * Returns the compression codec of data in this file.
     * @return CompressionCodec.
     */
    public CompressionCodec getCompressionCodec() { return codec; }
    
    private byte[] getSync() {
      return sync;
    }

    private byte getVersion() {
      return version;
    }

    /**
     * Get the compression type for this file.
     * @return the compression type
     */
    public CompressionType getCompressionType() {
      if (decompress) {
        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
      } else {
        return CompressionType.NONE;
      }
    }

    /**
     * Returns the metadata object of the file.
     * @return metadata.
     */
    public Metadata getMetadata() {
      return this.metadata;
    }
    
    /** Returns the configuration used for this file. */
    Configuration getConf() { return conf; }
    
    /** Read a compressed buffer */
    private synchronized void readBuffer(DataInputBuffer buffer, 
                                         CompressionInputStream filter) throws IOException {
      // Read data into a temporary buffer
      DataOutputBuffer dataBuffer = new DataOutputBuffer();

      try {
        int dataBufferLength = WritableUtils.readVInt(in);
        dataBuffer.write(in, dataBufferLength);
      
        // Set up 'buffer' connected to the input-stream
        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
      } finally {
        dataBuffer.close();
      }

      // Reset the codec
      filter.resetState();
    }
    
    /** Read the next 'compressed' block */
    private synchronized void readBlock() throws IOException {
      // Check if we need to throw away a whole block of 
      // 'values' due to 'lazy decompression' 
      if (lazyDecompress && !valuesDecompressed) {
        in.seek(WritableUtils.readVInt(in)+in.getPos());
        in.seek(WritableUtils.readVInt(in)+in.getPos());
      }
      
      // Reset internal states
      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
      valuesDecompressed = false;

      //Process sync
      if (sync != null) {
        in.readInt();
        in.readFully(syncCheck);                // read syncCheck
        if (!Arrays.equals(sync, syncCheck))    // check it
          throw new IOException("File is corrupt!");
      }
      syncSeen = true;

      // Read number of records in this block
      noBufferedRecords = WritableUtils.readVInt(in);
      
      // Read key lengths and keys
      readBuffer(keyLenBuffer, keyLenInFilter);
      readBuffer(keyBuffer, keyInFilter);
      noBufferedKeys = noBufferedRecords;
      
      // Read value lengths and values
      if (!lazyDecompress) {
        readBuffer(valLenBuffer, valLenInFilter);
        readBuffer(valBuffer, valInFilter);
        noBufferedValues = noBufferedRecords;
        valuesDecompressed = true;
      }
    }

    /** 
     * Position valLenIn/valIn to the 'value' 
     * corresponding to the 'current' key 
     */
    private synchronized void seekToCurrentValue() throws IOException {
      if (!blockCompressed) {
        if (decompress) {
          valInFilter.resetState();
        }
        valBuffer.reset();
      } else {
        // Check if this is the first value in the 'block' to be read
        if (lazyDecompress && !valuesDecompressed) {
          // Read the value lengths and values
          readBuffer(valLenBuffer, valLenInFilter);
          readBuffer(valBuffer, valInFilter);
          noBufferedValues = noBufferedRecords;
          valuesDecompressed = true;
        }
        
        // Calculate the no. of bytes to skip
        // Note: 'current' key has already been read!
        int skipValBytes = 0;
        int currentKey = noBufferedKeys + 1;          
        for (int i=noBufferedValues; i > currentKey; --i) {
          skipValBytes += WritableUtils.readVInt(valLenIn);
          --noBufferedValues;
        }
        
        // Skip to the 'val' corresponding to 'current' key
        if (skipValBytes > 0) {
          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
            throw new IOException("Failed to seek to " + currentKey + 
                                  "(th) value!");
          }
        }
      }
    }

    /**
     * Get the 'value' corresponding to the last read 'key'.
     * @param val : The 'value' to be read.
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized void getCurrentValue(Writable val) 
      throws IOException {
      if (val instanceof Configurable) {
        ((Configurable) val).setConf(this.conf);
      }

      // Position stream to 'current' value
      seekToCurrentValue();

      if (!blockCompressed) {
        val.readFields(valIn);
        
        if (valIn.read() > 0) {
          LOG.info("available bytes: " + valIn.available());
          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
                                + " bytes, should read " +
                                (valBuffer.getLength()-keyLength));
        }
      } else {
        // Get the value
        int valLength = WritableUtils.readVInt(valLenIn);
        val.readFields(valIn);
        
        // Read another compressed 'value'
        --noBufferedValues;
        
        // Sanity check
        if ((valLength < 0) && LOG.isDebugEnabled()) {
          LOG.debug(val + " is a zero-length value");
        }
      }

    }
    
    /**
     * @return Get the 'value' corresponding to the last read 'key'.
     * @param val : The 'value' to be read.
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized Object getCurrentValue(Object val) 
      throws IOException {
      if (val instanceof Configurable) {
        ((Configurable) val).setConf(this.conf);
      }

      // Position stream to 'current' value
      seekToCurrentValue();

      if (!blockCompressed) {
        val = deserializeValue(val);
        
        if (valIn.read() > 0) {
          LOG.info("available bytes: " + valIn.available());
          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
                                + " bytes, should read " +
                                (valBuffer.getLength()-keyLength));
        }
      } else {
        // Get the value
        int valLength = WritableUtils.readVInt(valLenIn);
        val = deserializeValue(val);
        
        // Read another compressed 'value'
        --noBufferedValues;
        
        // Sanity check
        if ((valLength < 0) && LOG.isDebugEnabled()) {
          LOG.debug(val + " is a zero-length value");
        }
      }
      return val;

    }

    @SuppressWarnings("unchecked")
    private Object deserializeValue(Object val) throws IOException {
      return valDeserializer.deserialize(val);
    }
    
    /**
     * @return Read the next key in the file into <code>key</code>, skipping its
     * value.True if another entry exists, and false at end of file.
     *
     * @param key key.
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized boolean next(Writable key) throws IOException {
      if (key.getClass() != getKeyClass())
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);

      if (!blockCompressed) {
        outBuf.reset();
        
        keyLength = next(outBuf);
        if (keyLength < 0)
          return false;
        
        valBuffer.reset(outBuf.getData(), outBuf.getLength());
        
        key.readFields(valBuffer);
        valBuffer.mark(0);
        if (valBuffer.getPosition() != keyLength)
          throw new IOException(key + " read " + valBuffer.getPosition()
                                + " bytes, should read " + keyLength);
      } else {
        //Reset syncSeen
        syncSeen = false;
        
        if (noBufferedKeys == 0) {
          try {
            readBlock();
          } catch (EOFException eof) {
            return false;
          }
        }
        
        int keyLength = WritableUtils.readVInt(keyLenIn);
        
        // Sanity check
        if (keyLength < 0) {
          return false;
        }
        
        //Read another compressed 'key'
        key.readFields(keyIn);
        --noBufferedKeys;
      }

      return true;
    }

    /**
     * Read the next key/value pair in the file into <code>key</code> and
     * <code>val</code>.
     * @return Returns true if such a pair exists and false when at
     * end of file.
     *
     * @param key input key.
     * @param val input val.
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized boolean next(Writable key, Writable val)
      throws IOException {
      if (val.getClass() != getValueClass())
        throw new IOException("wrong value class: "+val+" is not "+valClass);

      boolean more = next(key);
      
      if (more) {
        getCurrentValue(val);
      }

      return more;
    }
    
    /**
     * Read and return the next record length, potentially skipping over 
     * a sync block.
     * @return the length of the next record or -1 if there is no next record
     * @throws IOException
     */
    private synchronized int readRecordLength() throws IOException {
      if (in.getPos() >= end) {
        return -1;
      }      
      int length = in.readInt();
      if (version > 1 && sync != null &&
          length == SYNC_ESCAPE) {              // process a sync entry
        in.readFully(syncCheck);                // read syncCheck
        if (!Arrays.equals(sync, syncCheck))    // check it
          throw new IOException("File is corrupt!");
        syncSeen = true;
        if (in.getPos() >= end) {
          return -1;
        }
        length = in.readInt();                  // re-read length
      } else {
        syncSeen = false;
      }
      
      return length;
    }
    
    /** Read the next key/value pair in the file into <code>buffer</code>.
     * Returns the length of the key read, or -1 if at end of file.  The length
     * of the value may be computed by calling buffer.getLength() before and
     * after calls to this method. */
    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
    @Deprecated
    synchronized int next(DataOutputBuffer buffer) throws IOException {
      // Unsupported for block-compressed sequence files
      if (blockCompressed) {
        throw new IOException("Unsupported call for block-compressed" +
                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
      }
      try {
        int length = readRecordLength();
        if (length == -1) {
          return -1;
        }
        int keyLength = in.readInt();
        buffer.write(in, length);
        return keyLength;
      } catch (ChecksumException e) {             // checksum failure
        handleChecksumException(e);
        return next(buffer);
      }
    }

    public ValueBytes createValueBytes() {
      ValueBytes val = null;
      if (!decompress || blockCompressed) {
        val = new UncompressedBytes();
      } else {
        val = new CompressedBytes(codec);
      }
      return val;
    }

    /**
     * Read 'raw' records.
     * @param key - The buffer into which the key is read
     * @param val - The 'raw' value
     * @return Returns the total record length or -1 for end of file
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
      throws IOException {
      if (!blockCompressed) {
        int length = readRecordLength();
        if (length == -1) {
          return -1;
        }
        int keyLength = in.readInt();
        int valLength = length - keyLength;
        key.write(in, keyLength);
        if (decompress) {
          CompressedBytes value = (CompressedBytes)val;
          value.reset(in, valLength);
        } else {
          UncompressedBytes value = (UncompressedBytes)val;
          value.reset(in, valLength);
        }
        
        return length;
      } else {
        //Reset syncSeen
        syncSeen = false;
        
        // Read 'key'
        if (noBufferedKeys == 0) {
          if (in.getPos() >= end) 
            return -1;

          try { 
            readBlock();
          } catch (EOFException eof) {
            return -1;
          }
        }
        int keyLength = WritableUtils.readVInt(keyLenIn);
        if (keyLength < 0) {
          throw new IOException("zero length key found!");
        }
        key.write(keyIn, keyLength);
        --noBufferedKeys;
        
        // Read raw 'value'
        seekToCurrentValue();
        int valLength = WritableUtils.readVInt(valLenIn);
        UncompressedBytes rawValue = (UncompressedBytes)val;
        rawValue.reset(valIn, valLength);
        --noBufferedValues;
        
        return (keyLength+valLength);
      }
      
    }

    /**
     * Read 'raw' keys.
     * @param key - The buffer into which the key is read
     * @return Returns the key length or -1 for end of file
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized int nextRawKey(DataOutputBuffer key) 
      throws IOException {
      if (!blockCompressed) {
        recordLength = readRecordLength();
        if (recordLength == -1) {
          return -1;
        }
        keyLength = in.readInt();
        key.write(in, keyLength);
        return keyLength;
      } else {
        //Reset syncSeen
        syncSeen = false;
        
        // Read 'key'
        if (noBufferedKeys == 0) {
          if (in.getPos() >= end) 
            return -1;

          try { 
            readBlock();
          } catch (EOFException eof) {
            return -1;
          }
        }
        int keyLength = WritableUtils.readVInt(keyLenIn);
        if (keyLength < 0) {
          throw new IOException("zero length key found!");
        }
        key.write(keyIn, keyLength);
        --noBufferedKeys;
        
        return keyLength;
      }
      
    }

    /**
     * Read the next key in the file, skipping its
     * value.
     *
     * @param key input Object key.
     * @throws IOException raised on errors performing I/O.
     * @return Return null at end of file.
     */
    public synchronized Object next(Object key) throws IOException {
      if (key != null && key.getClass() != getKeyClass()) {
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);
      }

      if (!blockCompressed) {
        outBuf.reset();
        
        keyLength = next(outBuf);
        if (keyLength < 0)
          return null;
        
        valBuffer.reset(outBuf.getData(), outBuf.getLength());
        
        key = deserializeKey(key);
        valBuffer.mark(0);
        if (valBuffer.getPosition() != keyLength)
          throw new IOException(key + " read " + valBuffer.getPosition()
                                + " bytes, should read " + keyLength);
      } else {
        //Reset syncSeen
        syncSeen = false;
        
        if (noBufferedKeys == 0) {
          try {
            readBlock();
          } catch (EOFException eof) {
            return null;
          }
        }
        
        int keyLength = WritableUtils.readVInt(keyLenIn);
        
        // Sanity check
        if (keyLength < 0) {
          return null;
        }
        
        //Read another compressed 'key'
        key = deserializeKey(key);
        --noBufferedKeys;
      }

      return key;
    }

    @SuppressWarnings("unchecked")
    private Object deserializeKey(Object key) throws IOException {
      return keyDeserializer.deserialize(key);
    }

    /**
     * Read 'raw' values.
     * @param val - The 'raw' value
     * @return Returns the value length
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized int nextRawValue(ValueBytes val) 
      throws IOException {
      
      // Position stream to current value
      seekToCurrentValue();
 
      if (!blockCompressed) {
        int valLength = recordLength - keyLength;
        if (decompress) {
          CompressedBytes value = (CompressedBytes)val;
          value.reset(in, valLength);
        } else {
          UncompressedBytes value = (UncompressedBytes)val;
          value.reset(in, valLength);
        }
         
        return valLength;
      } else {
        int valLength = WritableUtils.readVInt(valLenIn);
        UncompressedBytes rawValue = (UncompressedBytes)val;
        rawValue.reset(valIn, valLength);
        --noBufferedValues;
        return valLength;
      }
      
    }

    private void handleChecksumException(ChecksumException e)
      throws IOException {
      if (this.conf.getBoolean(
          IO_SKIP_CHECKSUM_ERRORS_KEY, IO_SKIP_CHECKSUM_ERRORS_DEFAULT)) {
        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
      } else {
        throw e;
      }
    }

    /** disables sync. often invoked for tmp files. */
    synchronized void ignoreSync() {
      sync = null;
    }
    
    /**
     * Set the current byte position in the input file.
     *
     * <p>The position passed must be a position returned by {@link
     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
     * position, use {@link SequenceFile.Reader#sync(long)}. </p>
     *
     * @param position input position.
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized void seek(long position) throws IOException {
      in.seek(position);
      if (blockCompressed) {                      // trigger block read
        noBufferedKeys = 0;
        valuesDecompressed = true;
      }
    }

    /**
     * Seek to the next sync mark past a given position.
     * @param position position.
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized void sync(long position) throws IOException {
      if (position+SYNC_SIZE >= end) {
        seek(end);
        return;
      }

      if (position < headerEnd) {
        // seek directly to first record
        in.seek(headerEnd);
        // note the sync marker "seen" in the header
        syncSeen = true;
        return;
      }

      try {
        seek(position+4);                         // skip escape
        in.readFully(syncCheck);
        int syncLen = sync.length;
        for (int i = 0; in.getPos() < end; i++) {
          int j = 0;
          for (; j < syncLen; j++) {
            if (sync[j] != syncCheck[(i+j)%syncLen])
              break;
          }
          if (j == syncLen) {
            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
            return;
          }
          syncCheck[i%syncLen] = in.readByte();
        }
      } catch (ChecksumException e) {             // checksum failure
        handleChecksumException(e);
      }
    }

    /** @return Returns true iff the previous call to next passed a sync mark.*/
    public synchronized boolean syncSeen() { return syncSeen; }

    /**
     * @return Return the current byte position in the input file.
     * @throws IOException raised on errors performing I/O.
     */
    public synchronized long getPosition() throws IOException {
      return in.getPos();
    }

    /** Returns the name of the file. */
    @Override
    public String toString() {
      return filename;
    }

  }

  /** Sorts key/value pairs in a sequence-format file.
   *
   * <p>For best performance, applications should make sure that the {@link
   * Writable#readFields(DataInput)} implementation of their keys is
   * very efficient.  In particular, it should avoid allocating memory.
   */
  public static class Sorter {

    private RawComparator comparator;

    private MergeSort mergeSort; //the implementation of merge sort
    
    private Path[] inFiles;                     // when merging or sorting

    private Path outFile;

    private int memory; // bytes
    private int factor; // merged per pass

    private FileSystem fs = null;

    private Class keyClass;
    private Class valClass;

    private Configuration conf;
    private Metadata metadata;
    
    private Progressable progressable = null;

    /**
     * Sort and merge files containing the named classes.
     * @param fs input FileSystem.
     * @param keyClass input keyClass.
     * @param valClass input valClass.
     * @param conf input Configuration.
     */
    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
                  Class valClass, Configuration conf)  {
      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
    }

    /**
     * Sort and merge using an arbitrary {@link RawComparator}.
     * @param fs input FileSystem.
     * @param comparator input RawComparator.
     * @param keyClass input keyClass.
     * @param valClass input valClass.
     * @param conf input Configuration.
     */
    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
                  Class valClass, Configuration conf) {
      this(fs, comparator, keyClass, valClass, conf, new Metadata());
    }

    /**
     * Sort and merge using an arbitrary {@link RawComparator}.
     * @param fs input FileSystem.
     * @param comparator input RawComparator.
     * @param keyClass input keyClass.
     * @param valClass input valClass.
     * @param conf input Configuration.
     * @param metadata input metadata.
     */
    @SuppressWarnings("deprecation")
    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
                  Class valClass, Configuration conf, Metadata metadata) {
      this.fs = fs;
      this.comparator = comparator;
      this.keyClass = keyClass;
      this.valClass = valClass;
      // Remember to fall-back on the deprecated MB and Factor keys
      // until they are removed away permanently.
      if (conf.get(CommonConfigurationKeys.IO_SORT_MB_KEY) != null) {
        this.memory = conf.getInt(CommonConfigurationKeys.IO_SORT_MB_KEY,
          CommonConfigurationKeys.SEQ_IO_SORT_MB_DEFAULT) * 1024 * 1024;
      } else {
        this.memory = conf.getInt(CommonConfigurationKeys.SEQ_IO_SORT_MB_KEY,
          CommonConfigurationKeys.SEQ_IO_SORT_MB_DEFAULT) * 1024 * 1024;
      }
      if (conf.get(CommonConfigurationKeys.IO_SORT_FACTOR_KEY) != null) {
        this.factor = conf.getInt(CommonConfigurationKeys.IO_SORT_FACTOR_KEY,
            CommonConfigurationKeys.SEQ_IO_SORT_FACTOR_DEFAULT);
      } else {
        this.factor = conf.getInt(
            CommonConfigurationKeys.SEQ_IO_SORT_FACTOR_KEY,
            CommonConfigurationKeys.SEQ_IO_SORT_FACTOR_DEFAULT);
      }
      this.conf = conf;
      this.metadata = metadata;
    }

    /**
     * Set the number of streams to merge at once.
     * @param factor factor.
     */
    public void setFactor(int factor) { this.factor = factor; }

    /** @return Get the number of streams to merge at once.*/
    public int getFactor() { return factor; }

    /**
     * Set the total amount of buffer memory, in bytes.
     * @param memory buffer memory.
     */
    public void setMemory(int memory) { this.memory = memory; }

    /** @return Get the total amount of buffer memory, in bytes.*/
    public int getMemory() { return memory; }

    /**
     * Set the progressable object in order to report progress.
     * @param progressable input Progressable.
     */
    public void setProgressable(Progressable progressable) {
      this.progressable = progressable;
    }
    
    /** 
     * Perform a file sort from a set of input files into an output file.
     * @param inFiles the files to be sorted
     * @param outFile the sorted output file
     * @param deleteInput should the input files be deleted as they are read?
     * @throws IOException raised on errors performing I/O.
     */
    public void sort(Path[] inFiles, Path outFile,
                     boolean deleteInput) throws IOException {
      if (fs.exists(outFile)) {
        throw new IOException("already exists: " + outFile);
      }

      this.inFiles = inFiles;
      this.outFile = outFile;

      int segments = sortPass(deleteInput);
      if (segments > 1) {
        mergePass(outFile.getParent());
      }
    }

    /** 
     * Perform a file sort from a set of input files and return an iterator.
     * @param inFiles the files to be sorted
     * @param tempDir the directory where temp files are created during sort
     * @param deleteInput should the input files be deleted as they are read?
     * @return iterator the RawKeyValueIterator
     * @throws IOException raised on errors performing I/O.
     */
    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
                                              boolean deleteInput) throws IOException {
      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
      if (fs.exists(outFile)) {
        throw new IOException("already exists: " + outFile);
      }
      this.inFiles = inFiles;
      //outFile will basically be used as prefix for temp files in the cases
      //where sort outputs multiple sorted segments. For the single segment
      //case, the outputFile itself will contain the sorted data for that
      //segment
      this.outFile = outFile;

      int segments = sortPass(deleteInput);
      if (segments > 1)
        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
                     tempDir);
      else if (segments == 1)
        return merge(new Path[]{outFile}, true, tempDir);
      else return null;
    }

    /**
     * The backwards compatible interface to sort.
     * @param inFile the input file to sort.
     * @param outFile the sorted output file.
     * @throws IOException raised on errors performing I/O.
     */
    public void sort(Path inFile, Path outFile) throws IOException {
      sort(new Path[]{inFile}, outFile, false);
    }
    
    private int sortPass(boolean deleteInput) throws IOException {
      if(LOG.isDebugEnabled()) {
        LOG.debug("running sort pass");
      }
      SortPass sortPass = new SortPass();         // make the SortPass
      sortPass.setProgressable(progressable);
      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
      try {
        return sortPass.run(deleteInput);         // run it
      } finally {
        sortPass.close();                         // close it
      }
    }

    private class SortPass {
      private int memoryLimit = memory/4;
      private int recordLimit = 1000000;
      
      private DataOutputBuffer rawKeys = new DataOutputBuffer();
      private byte[] rawBuffer;

      private int[] keyOffsets = new int[1024];
      private int[] pointers = new int[keyOffsets.length];
      private int[] pointersCopy = new int[keyOffsets.length];
      private int[] keyLengths = new int[keyOffsets.length];
      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
      
      private ArrayList segmentLengths = new ArrayList();
      
      private Reader in = null;
      private FSDataOutputStream out = null;
      private FSDataOutputStream indexOut = null;
      private Path outName;

      private Progressable progressable = null;

      public int run(boolean deleteInput) throws IOException {
        int segments = 0;
        int currentFile = 0;
        boolean atEof = (currentFile >= inFiles.length);
        CompressionType compressionType;
        CompressionCodec codec = null;
        segmentLengths.clear();
        if (atEof) {
          return 0;
        }
        
        // Initialize
        in = new Reader(fs, inFiles[currentFile], conf);
        compressionType = in.getCompressionType();
        codec = in.getCompressionCodec();
        
        for (int i=0; i < rawValues.length; ++i) {
          rawValues[i] = null;
        }
        
        while (!atEof) {
          int count = 0;
          int bytesProcessed = 0;
          rawKeys.reset();
          while (!atEof && 
                 bytesProcessed < memoryLimit && count < recordLimit) {

            // Read a record into buffer
            // Note: Attempt to re-use 'rawValue' as far as possible
            int keyOffset = rawKeys.getLength();       
            ValueBytes rawValue = 
              (count == keyOffsets.length || rawValues[count] == null) ? 
              in.createValueBytes() : 
              rawValues[count];
            int recordLength = in.nextRaw(rawKeys, rawValue);
            if (recordLength == -1) {
              in.close();
              if (deleteInput) {
                fs.delete(inFiles[currentFile], true);
              }
              currentFile += 1;
              atEof = currentFile >= inFiles.length;
              if (!atEof) {
                in = new Reader(fs, inFiles[currentFile], conf);
              } else {
                in = null;
              }
              continue;
            }

            int keyLength = rawKeys.getLength() - keyOffset;

            if (count == keyOffsets.length)
              grow();

            keyOffsets[count] = keyOffset;                // update pointers
            pointers[count] = count;
            keyLengths[count] = keyLength;
            rawValues[count] = rawValue;

            bytesProcessed += recordLength; 
            count++;
          }

          // buffer is full -- sort & flush it
          if(LOG.isDebugEnabled()) {
            LOG.debug("flushing segment " + segments);
          }
          rawBuffer = rawKeys.getData();
          sort(count);
          // indicate we're making progress
          if (progressable != null) {
            progressable.progress();
          }
          flush(count, bytesProcessed, compressionType, codec, 
                segments==0 && atEof);
          segments++;
        }
        return segments;
      }

      public void close() throws IOException {
        if (in != null) {
          in.close();
        }
        if (out != null) {
          out.close();
        }
        if (indexOut != null) {
          indexOut.close();
        }
      }

      private void grow() {
        int newLength = keyOffsets.length * 3 / 2;
        keyOffsets = grow(keyOffsets, newLength);
        pointers = grow(pointers, newLength);
        pointersCopy = new int[newLength];
        keyLengths = grow(keyLengths, newLength);
        rawValues = grow(rawValues, newLength);
      }

      private int[] grow(int[] old, int newLength) {
        int[] result = new int[newLength];
        System.arraycopy(old, 0, result, 0, old.length);
        return result;
      }
      
      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
        ValueBytes[] result = new ValueBytes[newLength];
        System.arraycopy(old, 0, result, 0, old.length);
        for (int i=old.length; i < newLength; ++i) {
          result[i] = null;
        }
        return result;
      }

      private void flush(int count, int bytesProcessed, 
                         CompressionType compressionType, 
                         CompressionCodec codec, 
                         boolean done) throws IOException {
        if (out == null) {
          outName = done ? outFile : outFile.suffix(".0");
          out = fs.create(outName);
          if (!done) {
            indexOut = fs.create(outName.suffix(".index"));
          }
        }

        long segmentStart = out.getPos();
        Writer writer = createWriter(conf, Writer.stream(out), 
            Writer.keyClass(keyClass), Writer.valueClass(valClass),
            Writer.compression(compressionType, codec),
            Writer.metadata(done ? metadata : new Metadata()));
        
        if (!done) {
          writer.sync = null;                     // disable sync on temp files
        }

        for (int i = 0; i < count; i++) {         // write in sorted order
          int p = pointers[i];
          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
        }
        writer.close();
        
        if (!done) {
          // Save the segment length
          WritableUtils.writeVLong(indexOut, segmentStart);
          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
          indexOut.flush();
        }
      }

      private void sort(int count) {
        System.arraycopy(pointers, 0, pointersCopy, 0, count);
        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
      }
      class SeqFileComparator implements Comparator<IntWritable> {
        @Override
        public int compare(IntWritable I, IntWritable J) {
          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
                                    keyLengths[I.get()], rawBuffer, 
                                    keyOffsets[J.get()], keyLengths[J.get()]);
        }
      }
      
      /** set the progressable object in order to report progress */
      public void setProgressable(Progressable progressable)
      {
        this.progressable = progressable;
      }
      
    } // SequenceFile.Sorter.SortPass

    /** The interface to iterate over raw keys/values of SequenceFiles. */
    public static interface RawKeyValueIterator {
      /**
       * Gets the current raw key.
       * @return DataOutputBuffer
       * @throws IOException raised on errors performing I/O.
       */
      DataOutputBuffer getKey() throws IOException; 
      /**
       * Gets the current raw value.
       * @return ValueBytes 
       * @throws IOException raised on errors performing I/O.
       */
      ValueBytes getValue() throws IOException; 
      /**
       * Sets up the current key and value (for getKey and getValue).
       * @return true if there exists a key/value, false otherwise 
       * @throws IOException raised on errors performing I/O.
       */
      boolean next() throws IOException;
      /**
       * closes the iterator so that the underlying streams can be closed.
       * @throws IOException raised on errors performing I/O.
       */
      void close() throws IOException;
      /**
       * @return Gets the Progress object; this has a float (0.0 - 1.0)
       * indicating the bytes processed by the iterator so far.
       */
      Progress getProgress();
    }    
    
    /**
     * Merges the list of segments of type <code>SegmentDescriptor</code>
     * @param segments the list of SegmentDescriptors
     * @param tmpDir the directory to write temporary files into
     * @return RawKeyValueIterator
     * @throws IOException raised on errors performing I/O.
     */
    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
                                     Path tmpDir) 
      throws IOException {
      // pass in object to report progress, if present
      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
      return mQueue.merge();
    }

    /**
     * Merges the contents of files passed in Path[] using a max factor value
     * that is already set
     * @param inNames the array of path names
     * @param deleteInputs true if the input files should be deleted when 
     * unnecessary
     * @param tmpDir the directory to write temporary files into
     * @return RawKeyValueIteratorMergeQueue
     * @throws IOException raised on errors performing I/O.
     */
    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
                                     Path tmpDir) 
      throws IOException {
      return merge(inNames, deleteInputs, 
                   (inNames.length < factor) ? inNames.length : factor,
                   tmpDir);
    }

    /**
     * Merges the contents of files passed in Path[]
     * @param inNames the array of path names
     * @param deleteInputs true if the input files should be deleted when 
     * unnecessary
     * @param factor the factor that will be used as the maximum merge fan-in
     * @param tmpDir the directory to write temporary files into
     * @return RawKeyValueIteratorMergeQueue
     * @throws IOException raised on errors performing I/O.
     */
    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
                                     int factor, Path tmpDir) 
      throws IOException {
      //get the segments from inNames
      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
      for (int i = 0; i < inNames.length; i++) {
        SegmentDescriptor s = new SegmentDescriptor(0,
            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
        s.preserveInput(!deleteInputs);
        s.doSync();
        a.add(s);
      }
      this.factor = factor;
      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
      return mQueue.merge();
    }

    /**
     * Merges the contents of files passed in Path[]
     * @param inNames the array of path names
     * @param tempDir the directory for creating temp files during merge
     * @param deleteInputs true if the input files should be deleted when 
     * unnecessary
     * @return RawKeyValueIteratorMergeQueue
     * @throws IOException raised on errors performing I/O.
     */
    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
                                     boolean deleteInputs) 
      throws IOException {
      //outFile will basically be used as prefix for temp files for the
      //intermediate merge outputs           
      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
      //get the segments from inNames
      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
      for (int i = 0; i < inNames.length; i++) {
        SegmentDescriptor s = new SegmentDescriptor(0,
            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
        s.preserveInput(!deleteInputs);
        s.doSync();
        a.add(s);
      }
      factor = (inNames.length < factor) ? inNames.length : factor;
      // pass in object to report progress, if present
      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
      return mQueue.merge();
    }

    /**
     * Clones the attributes (like compression of the input file and creates a 
     * corresponding Writer
     * @param inputFile the path of the input file whose attributes should be 
     * cloned
     * @param outputFile the path of the output file 
     * @param prog the Progressable to report status during the file write
     * @return Writer
     * @throws IOException raised on errors performing I/O.
     */
    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
                                      Progressable prog) throws IOException {
      Reader reader = new Reader(conf,
                                 Reader.file(inputFile),
                                 new Reader.OnlyHeaderOption());
      CompressionType compress = reader.getCompressionType();
      CompressionCodec codec = reader.getCompressionCodec();
      reader.close();

      Writer writer = createWriter(conf, 
                                   Writer.file(outputFile), 
                                   Writer.keyClass(keyClass), 
                                   Writer.valueClass(valClass), 
                                   Writer.compression(compress, codec), 
                                   Writer.progressable(prog));
      return writer;
    }

    /**
     * Writes records from RawKeyValueIterator into a file represented by the 
     * passed writer.
     * @param records the RawKeyValueIterator
     * @param writer the Writer created earlier 
     * @throws IOException raised on errors performing I/O.
     */
    public void writeFile(RawKeyValueIterator records, Writer writer) 
      throws IOException {
      while(records.next()) {
        writer.appendRaw(records.getKey().getData(), 0, 
                         records.getKey().getLength(), records.getValue());
      }
      writer.sync();
    }
        
    /** Merge the provided files.
     * @param inFiles the array of input path names
     * @param outFile the final output file
     * @throws IOException raised on errors performing I/O.
     */
    public void merge(Path[] inFiles, Path outFile) throws IOException {
      if (fs.exists(outFile)) {
        throw new IOException("already exists: " + outFile);
      }
      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
      
      writeFile(r, writer);

      writer.close();
    }

    /** sort calls this to generate the final merged output */
    private int mergePass(Path tmpDir) throws IOException {
      if(LOG.isDebugEnabled()) {
        LOG.debug("running merge pass");
      }
      Writer writer = cloneFileAttributes(
                                          outFile.suffix(".0"), outFile, null);
      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
                                    outFile.suffix(".0.index"), tmpDir);
      writeFile(r, writer);

      writer.close();
      return 0;
    }

    /** Used by mergePass to merge the output of the sort
     * @param inName the name of the input file containing sorted segments
     * @param indexIn the offsets of the sorted segments
     * @param tmpDir the relative directory to store intermediate results in
     * @return RawKeyValueIterator
     * @throws IOException
     */
    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
      throws IOException {
      //get the segments from indexIn
      //we create a SegmentContainer so that we can track segments belonging to
      //inName and delete inName as soon as we see that we have looked at all
      //the contained segments during the merge process & hence don't need 
      //them anymore
      SegmentContainer container = new SegmentContainer(inName, indexIn);
      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
      return mQueue.merge();
    }
    
    /** This class implements the core of the merge logic */
    private class MergeQueue extends PriorityQueue 
      implements RawKeyValueIterator {
      private boolean compress;
      private boolean blockCompress;
      private DataOutputBuffer rawKey = new DataOutputBuffer();
      private ValueBytes rawValue;
      private long totalBytesProcessed;
      private float progPerByte;
      private Progress mergeProgress = new Progress();
      private Path tmpDir;
      private Progressable progress = null; //handle to the progress reporting object
      private SegmentDescriptor minSegment;
      
      //a TreeMap used to store the segments sorted by size (segment offset and
      //segment path name is used to break ties between segments of same sizes)
      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
        new TreeMap<SegmentDescriptor, Void>();
            
      @SuppressWarnings("unchecked")
      public void put(SegmentDescriptor stream) throws IOException {
        if (size() == 0) {
          compress = stream.in.isCompressed();
          blockCompress = stream.in.isBlockCompressed();
        } else if (compress != stream.in.isCompressed() || 
                   blockCompress != stream.in.isBlockCompressed()) {
          throw new IOException("All merged files must be compressed or not.");
        } 
        super.put(stream);
      }
      
      /**
       * A queue of file segments to merge
       * @param segments the file segments to merge
       * @param tmpDir a relative local directory to save intermediate files in
       * @param progress the reference to the Progressable object
       */
      public MergeQueue(List <SegmentDescriptor> segments,
          Path tmpDir, Progressable progress) {
        int size = segments.size();
        for (int i = 0; i < size; i++) {
          sortedSegmentSizes.put(segments.get(i), null);
        }
        this.tmpDir = tmpDir;
        this.progress = progress;
      }
      @Override
      protected boolean lessThan(Object a, Object b) {
        // indicate we're making progress
        if (progress != null) {
          progress.progress();
        }
        SegmentDescriptor msa = (SegmentDescriptor)a;
        SegmentDescriptor msb = (SegmentDescriptor)b;
        return comparator.compare(msa.getKey().getData(), 0, 
                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
                                  msb.getKey().getLength()) < 0;
      }
      @Override
      public void close() throws IOException {
        SegmentDescriptor ms;                           // close inputs
        while ((ms = (SegmentDescriptor)pop()) != null) {
          ms.cleanup();
        }
        minSegment = null;
      }
      @Override
      public DataOutputBuffer getKey() throws IOException {
        return rawKey;
      }
      @Override
      public ValueBytes getValue() throws IOException {
        return rawValue;
      }
      @Override
      public boolean next() throws IOException {
        if (size() == 0)
          return false;
        if (minSegment != null) {
          //minSegment is non-null for all invocations of next except the first
          //one. For the first invocation, the priority queue is ready for use
          //but for the subsequent invocations, first adjust the queue 
          adjustPriorityQueue(minSegment);
          if (size() == 0) {
            minSegment = null;
            return false;
          }
        }
        minSegment = (SegmentDescriptor)top();
        long startPos = minSegment.in.getPosition(); // Current position in stream
        //save the raw key reference
        rawKey = minSegment.getKey();
        //load the raw value. Re-use the existing rawValue buffer
        if (rawValue == null) {
          rawValue = minSegment.in.createValueBytes();
        }
        minSegment.nextRawValue(rawValue);
        long endPos = minSegment.in.getPosition(); // End position after reading value
        updateProgress(endPos - startPos);
        return true;
      }
      
      @Override
      public Progress getProgress() {
        return mergeProgress; 
      }

      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
        long startPos = ms.in.getPosition(); // Current position in stream
        boolean hasNext = ms.nextRawKey();
        long endPos = ms.in.getPosition(); // End position after reading key
        updateProgress(endPos - startPos);
        if (hasNext) {
          adjustTop();
        } else {
          pop();
          ms.cleanup();
        }
      }

      private void updateProgress(long bytesProcessed) {
        totalBytesProcessed += bytesProcessed;
        if (progPerByte > 0) {
          mergeProgress.set(totalBytesProcessed * progPerByte);
        }
      }
      
      /** This is the single level merge that is called multiple times 
       * depending on the factor size and the number of segments
       * @return RawKeyValueIterator
       * @throws IOException
       */
      public RawKeyValueIterator merge() throws IOException {
        //create the MergeStreams from the sorted map created in the constructor
        //and dump the final output to a file
        int numSegments = sortedSegmentSizes.size();
        int origFactor = factor;
        int passNo = 1;
        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
        do {
          //get the factor for this pass of merge
          factor = getPassFactor(passNo, numSegments);
          List<SegmentDescriptor> segmentsToMerge =
            new ArrayList<SegmentDescriptor>();
          int segmentsConsidered = 0;
          int numSegmentsToConsider = factor;
          while (true) {
            //extract the smallest 'factor' number of segment pointers from the 
            //TreeMap. Call cleanup on the empty segments (no key/value data)
            SegmentDescriptor[] mStream = 
              getSegmentDescriptors(numSegmentsToConsider);
            for (int i = 0; i < mStream.length; i++) {
              if (mStream[i].nextRawKey()) {
                segmentsToMerge.add(mStream[i]);
                segmentsConsidered++;
                // Count the fact that we read some bytes in calling nextRawKey()
                updateProgress(mStream[i].in.getPosition());
              }
              else {
                mStream[i].cleanup();
                numSegments--; //we ignore this segment for the merge
              }
            }
            //if we have the desired number of segments
            //or looked at all available segments, we break
            if (segmentsConsidered == factor || 
                sortedSegmentSizes.size() == 0) {
              break;
            }
              
            numSegmentsToConsider = factor - segmentsConsidered;
          }
          //feed the streams to the priority queue
          initialize(segmentsToMerge.size()); clear();
          for (int i = 0; i < segmentsToMerge.size(); i++) {
            put(segmentsToMerge.get(i));
          }
          //if we have lesser number of segments remaining, then just return the
          //iterator, else do another single level merge
          if (numSegments <= factor) {
            //calculate the length of the remaining segments. Required for 
            //calculating the merge progress
            long totalBytes = 0;
            for (int i = 0; i < segmentsToMerge.size(); i++) {
              totalBytes += segmentsToMerge.get(i).segmentLength;
            }
            if (totalBytes != 0) //being paranoid
              progPerByte = 1.0f / (float)totalBytes;
            //reset factor to what it originally was
            factor = origFactor;
            return this;
          } else {
            //we want to spread the creation of temp files on multiple disks if 
            //available under the space constraints
            long approxOutputSize = 0; 
            for (SegmentDescriptor s : segmentsToMerge) {
              approxOutputSize += s.segmentLength + 
                                  ChecksumFileSystem.getApproxChkSumLength(
                                  s.segmentLength);
            }
            Path tmpFilename = 
              new Path(tmpDir, "intermediate").suffix("." + passNo);

            Path outputFile =  lDirAlloc.getLocalPathForWrite(
                                                tmpFilename.toString(),
                                                approxOutputSize, conf);
            if(LOG.isDebugEnabled()) { 
              LOG.debug("writing intermediate results to " + outputFile);
            }
            Writer writer = cloneFileAttributes(
                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
                                                fs.makeQualified(outputFile), null);
            writer.sync = null; //disable sync for temp files
            writeFile(this, writer);
            writer.close();
            
            //we finished one single level merge; now clean up the priority 
            //queue
            this.close();
            
            SegmentDescriptor tempSegment = 
              new SegmentDescriptor(0,
                  fs.getFileStatus(outputFile).getLen(), outputFile);
            //put the segment back in the TreeMap
            sortedSegmentSizes.put(tempSegment, null);
            numSegments = sortedSegmentSizes.size();
            passNo++;
          }
          //we are worried about only the first pass merge factor. So reset the 
          //factor to what it originally was
          factor = origFactor;
        } while(true);
      }
  
      //Hadoop-591
      public int getPassFactor(int passNo, int numSegments) {
        if (passNo > 1 || numSegments <= factor || factor == 1) 
          return factor;
        int mod = (numSegments - 1) % (factor - 1);
        if (mod == 0)
          return factor;
        return mod + 1;
      }
      
      /** Return (& remove) the requested number of segment descriptors from the
       * sorted map.
       */
      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
        if (numDescriptors > sortedSegmentSizes.size())
          numDescriptors = sortedSegmentSizes.size();
        SegmentDescriptor[] SegmentDescriptors = 
          new SegmentDescriptor[numDescriptors];
        Iterator iter = sortedSegmentSizes.keySet().iterator();
        int i = 0;
        while (i < numDescriptors) {
          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
          iter.remove();
        }
        return SegmentDescriptors;
      }
    } // SequenceFile.Sorter.MergeQueue

    /** This class defines a merge segment. This class can be subclassed to 
     * provide a customized cleanup method implementation. In this 
     * implementation, cleanup closes the file handle and deletes the file 
     */
    public class SegmentDescriptor implements Comparable {
      
      long segmentOffset; //the start of the segment in the file
      long segmentLength; //the length of the segment
      Path segmentPathName; //the path name of the file containing the segment
      boolean ignoreSync = true; //set to true for temp files
      private Reader in = null; 
      private DataOutputBuffer rawKey = null; //this will hold the current key
      private boolean preserveInput = false; //delete input segment files?
      
      /** Constructs a segment
       * @param segmentOffset the offset of the segment in the file
       * @param segmentLength the length of the segment
       * @param segmentPathName the path name of the file containing the segment
       */
      public SegmentDescriptor (long segmentOffset, long segmentLength, 
                                Path segmentPathName) {
        this.segmentOffset = segmentOffset;
        this.segmentLength = segmentLength;
        this.segmentPathName = segmentPathName;
      }
      
      /** Do the sync checks. */
      public void doSync() {ignoreSync = false;}
      
      /**
       * Whether to delete the files when no longer needed.
       * @param preserve input boolean preserve.
       */
      public void preserveInput(boolean preserve) {
        preserveInput = preserve;
      }

      public boolean shouldPreserveInput() {
        return preserveInput;
      }
      
      @Override
      public int compareTo(Object o) {
        SegmentDescriptor that = (SegmentDescriptor)o;
        if (this.segmentLength != that.segmentLength) {
          return (this.segmentLength < that.segmentLength ? -1 : 1);
        }
        if (this.segmentOffset != that.segmentOffset) {
          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
        }
        return (this.segmentPathName.toString()).
          compareTo(that.segmentPathName.toString());
      }

      @Override
      public boolean equals(Object o) {
        if (!(o instanceof SegmentDescriptor)) {
          return false;
        }
        SegmentDescriptor that = (SegmentDescriptor)o;
        if (this.segmentLength == that.segmentLength &&
            this.segmentOffset == that.segmentOffset &&
            this.segmentPathName.toString().equals(
              that.segmentPathName.toString())) {
          return true;
        }
        return false;
      }

      @Override
      public int hashCode() {
        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
      }

      /**
       * Fills up the rawKey object with the key returned by the Reader.
       * @return true if there is a key returned; false, otherwise
       * @throws IOException raised on errors performing I/O.
       */
      public boolean nextRawKey() throws IOException {
        if (in == null) {
          int bufferSize = getBufferSize(conf); 
          Reader reader = new Reader(conf,
                                     Reader.file(segmentPathName), 
                                     Reader.bufferSize(bufferSize),
                                     Reader.start(segmentOffset), 
                                     Reader.length(segmentLength));
        
          //sometimes we ignore syncs especially for temp merge files
          if (ignoreSync) reader.ignoreSync();

          if (reader.getKeyClass() != keyClass)
            throw new IOException("wrong key class: " + reader.getKeyClass() +
                                  " is not " + keyClass);
          if (reader.getValueClass() != valClass)
            throw new IOException("wrong value class: "+reader.getValueClass()+
                                  " is not " + valClass);
          this.in = reader;
          rawKey = new DataOutputBuffer();
        }
        rawKey.reset();
        int keyLength = 
          in.nextRawKey(rawKey);
        return (keyLength >= 0);
      }

      /**
       * Fills up the passed rawValue with the value corresponding to the key
       * read earlier.
       * @param rawValue input ValueBytes rawValue.
       * @return the length of the value
       * @throws IOException raised on errors performing I/O.
       */
      public int nextRawValue(ValueBytes rawValue) throws IOException {
        int valLength = in.nextRawValue(rawValue);
        return valLength;
      }
      
      /** @return Returns the stored rawKey */
      public DataOutputBuffer getKey() {
        return rawKey;
      }
      
      /** closes the underlying reader */
      private void close() throws IOException {
        this.in.close();
        this.in = null;
      }

      /**
       * The default cleanup. Subclasses can override this with a custom
       * cleanup.
       * @throws IOException raised on errors performing I/O.
       */
      public void cleanup() throws IOException {
        close();
        if (!preserveInput) {
          fs.delete(segmentPathName, true);
        }
      }
    } // SequenceFile.Sorter.SegmentDescriptor
    
    /** This class provisions multiple segments contained within a single
     *  file
     */
    private class LinkedSegmentsDescriptor extends SegmentDescriptor {

      SegmentContainer parentContainer = null;

      /** Constructs a segment
       * @param segmentOffset the offset of the segment in the file
       * @param segmentLength the length of the segment
       * @param segmentPathName the path name of the file containing the segment
       * @param parent the parent SegmentContainer that holds the segment
       */
      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
                                       Path segmentPathName, SegmentContainer parent) {
        super(segmentOffset, segmentLength, segmentPathName);
        this.parentContainer = parent;
      }
      /** The default cleanup. Subclasses can override this with a custom 
       * cleanup 
       */
      @Override
      public void cleanup() throws IOException {
        super.close();
        if (super.shouldPreserveInput()) return;
        parentContainer.cleanup();
      }
      
      @Override
      public boolean equals(Object o) {
        if (!(o instanceof LinkedSegmentsDescriptor)) {
          return false;
        }
        return super.equals(o);
      }
    } //SequenceFile.Sorter.LinkedSegmentsDescriptor

    /** The class that defines a container for segments to be merged. Primarily
     * required to delete temp files as soon as all the contained segments
     * have been looked at */
    private class SegmentContainer {
      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
      private int numSegmentsContained; //# of segments contained
      private Path inName; //input file from where segments are created
      
      //the list of segments read from the file
      private ArrayList <SegmentDescriptor> segments = 
        new ArrayList <SegmentDescriptor>();
      /** This constructor is there primarily to serve the sort routine that 
       * generates a single output file with an associated index file */
      public SegmentContainer(Path inName, Path indexIn) throws IOException {
        //get the segments from indexIn
        FSDataInputStream fsIndexIn = fs.open(indexIn);
        long end = fs.getFileStatus(indexIn).getLen();
        while (fsIndexIn.getPos() < end) {
          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
          long segmentLength = WritableUtils.readVLong(fsIndexIn);
          Path segmentName = inName;
          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
                                                    segmentLength, segmentName, this));
        }
        fsIndexIn.close();
        fs.delete(indexIn, true);
        numSegmentsContained = segments.size();
        this.inName = inName;
      }

      public List <SegmentDescriptor> getSegmentList() {
        return segments;
      }
      public void cleanup() throws IOException {
        numSegmentsCleanedUp++;
        if (numSegmentsCleanedUp == numSegmentsContained) {
          fs.delete(inName, true);
        }
      }
    } //SequenceFile.Sorter.SegmentContainer

  } // SequenceFile.Sorter

} // SequenceFile

相关信息

hadoop 源码目录

相关文章

hadoop AbstractMapWritable 源码

hadoop ArrayFile 源码

hadoop ArrayPrimitiveWritable 源码

hadoop ArrayWritable 源码

hadoop BinaryComparable 源码

hadoop BloomMapFile 源码

hadoop BooleanWritable 源码

hadoop BoundedByteArrayOutputStream 源码

hadoop ByteBufferPool 源码

hadoop ByteWritable 源码

0  赞