hadoop BZip2Codec 源码

  • 2022-10-20
  • 浏览 (435)

haddop BZip2Codec 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.io.compress;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;

/**
 * This class provides output and input streams for bzip2 compression
 * and decompression.  It uses the native bzip2 library on the system
 * if possible, else it uses a pure-Java implementation of the bzip2
 * algorithm.  The configuration parameter
 * io.compression.codec.bzip2.library can be used to control this
 * behavior.
 *
 * In the pure-Java mode, the Compressor and Decompressor interfaces
 * are not implemented.  Therefore, in that mode, those methods of
 * CompressionCodec which have a Compressor or Decompressor type
 * argument, throw UnsupportedOperationException.
 *
 * Currently, support for splittability is available only in the
 * pure-Java mode; therefore, if a SplitCompressionInputStream is
 * requested, the pure-Java implementation is used, regardless of the
 * setting of the configuration parameter mentioned above.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class BZip2Codec implements Configurable, SplittableCompressionCodec {

  private static final String HEADER = "BZ";
  private static final int HEADER_LEN = HEADER.length();
  private static final String SUB_HEADER = "h9";
  private static final int SUB_HEADER_LEN = SUB_HEADER.length();

  private Configuration conf;
  
  /**
   * Set the configuration to be used by this object.
   *
   * @param conf the configuration object.
   */
  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }
  
  /**
   * Return the configuration used by this object.
   *
   * @return the configuration object used by this objec.
   */
  @Override
  public Configuration getConf() {
    return conf;
  }
  
  /**
  * Creates a new instance of BZip2Codec.
  */
  public BZip2Codec() { }

  /**
   * Create a {@link CompressionOutputStream} that will write to the given
   * {@link OutputStream}.
   *
   * @param out        the location for the final output stream
   * @return a stream the user can write uncompressed data to, to have it 
   *         compressed
   * @throws IOException raised on errors performing I/O.
   */
  @Override
  public CompressionOutputStream createOutputStream(OutputStream out)
      throws IOException {
    return CompressionCodec.Util.
        createOutputStreamWithCodecPool(this, conf, out);
  }

  /**
   * Create a {@link CompressionOutputStream} that will write to the given
   * {@link OutputStream} with the given {@link Compressor}.
   *
   * @param out        the location for the final output stream
   * @param compressor compressor to use
   * @return a stream the user can write uncompressed data to, to have it 
   *         compressed
   * @throws IOException raised on errors performing I/O.
   */
  @Override
  public CompressionOutputStream createOutputStream(OutputStream out,
      Compressor compressor) throws IOException {
    return Bzip2Factory.isNativeBzip2Loaded(conf) ?
      new CompressorStream(out, compressor, 
                           conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
                                   IO_FILE_BUFFER_SIZE_DEFAULT)) :
      new BZip2CompressionOutputStream(out);
  }

  /**
   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
   *
   * @return the type of compressor needed by this codec.
   */
  @Override
  public Class<? extends Compressor> getCompressorType() {
    return Bzip2Factory.getBzip2CompressorType(conf);
  }

  /**
   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
   *
   * @return a new compressor for use by this codec
   */
  @Override
  public Compressor createCompressor() {
    return Bzip2Factory.getBzip2Compressor(conf);
  }

  /**
   * Create a {@link CompressionInputStream} that will read from the given
   * input stream and return a stream for uncompressed data.
   *
   * @param in the stream to read compressed bytes from
   * @return a stream to read uncompressed bytes from
   * @throws IOException raised on errors performing I/O.
   */
  @Override
  public CompressionInputStream createInputStream(InputStream in)
      throws IOException {
    return CompressionCodec.Util.
        createInputStreamWithCodecPool(this, conf, in);
  }

  /**
   * Create a {@link CompressionInputStream} that will read from the given
   * {@link InputStream} with the given {@link Decompressor}, and return a 
   * stream for uncompressed data.
   *
   * @param in           the stream to read compressed bytes from
   * @param decompressor decompressor to use
   * @return a stream to read uncompressed bytes from
   * @throws IOException raised on errors performing I/O.
   */
  @Override
  public CompressionInputStream createInputStream(InputStream in,
      Decompressor decompressor) throws IOException {
    return Bzip2Factory.isNativeBzip2Loaded(conf) ? 
      new DecompressorStream(in, decompressor,
                             conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
                                 IO_FILE_BUFFER_SIZE_DEFAULT)) :
      new BZip2CompressionInputStream(
              in, 0L, Long.MAX_VALUE, READ_MODE.BYBLOCK);
  }

  /**
   * Creates CompressionInputStream to be used to read off uncompressed data
   * in one of the two reading modes. i.e. Continuous or Blocked reading modes
   *
   * @param seekableIn The InputStream
   * @param start The start offset into the compressed stream
   * @param end The end offset into the compressed stream
   * @param readMode Controls whether progress is reported continuously or
   *                 only at block boundaries.
   *
   * @return CompressionInputStream for BZip2 aligned at block boundaries
   */
  public SplitCompressionInputStream createInputStream(InputStream seekableIn,
      Decompressor decompressor, long start, long end, READ_MODE readMode)
      throws IOException {

    if (!(seekableIn instanceof Seekable)) {
      throw new IOException("seekableIn must be an instance of " +
          Seekable.class.getName());
    }

    ((Seekable)seekableIn).seek(start);
    return new BZip2CompressionInputStream(seekableIn, start, end, readMode);
  }

  /**
   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
   *
   * @return the type of decompressor needed by this codec.
   */
  @Override
  public Class<? extends Decompressor> getDecompressorType() {
    return Bzip2Factory.getBzip2DecompressorType(conf);
  }

  /**
   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
   *
   * @return a new decompressor for use by this codec
   */
  @Override
  public Decompressor createDecompressor() {
    return Bzip2Factory.getBzip2Decompressor(conf);
  }

  /**
  * .bz2 is recognized as the default extension for compressed BZip2 files
  *
  * @return A String telling the default bzip2 file extension
  */
  @Override
  public String getDefaultExtension() {
    return CodecConstants.BZIP2_CODEC_EXTENSION;
  }

  private static class BZip2CompressionOutputStream extends
      CompressionOutputStream {

    // class data starts here//
    private CBZip2OutputStream output;
    private boolean needsReset; 
    // class data ends here//

    public BZip2CompressionOutputStream(OutputStream out)
        throws IOException {
      super(out);
      needsReset = true;
    }

    private void writeStreamHeader() throws IOException {
      if (super.out != null) {
        writeHeader(out);
      }
    }

    public void finish() throws IOException {
      if (needsReset) {
        // In the case that nothing is written to this stream, we still need to
        // write out the header before closing, otherwise the stream won't be
        // recognized by BZip2CompressionInputStream.
        internalReset();
      }
      this.output.finish();
      needsReset = true;
    }

    private void internalReset() throws IOException {
      if (needsReset) {
        needsReset = false;
        writeStreamHeader();
        this.output = new CBZip2OutputStream(out);
      }
    }    
    
    public void resetState() throws IOException {
      // Cannot write to out at this point because out might not be ready
      // yet, as in SequenceFile.Writer implementation.
      needsReset = true;
    }

    public void write(int b) throws IOException {
      if (needsReset) {
        internalReset();
      }
      this.output.write(b);
    }

    public void write(byte[] b, int off, int len) throws IOException {
      if (needsReset) {
        internalReset();
      }
      this.output.write(b, off, len);
    }

    public void close() throws IOException {
      try {
        super.close();
      } finally {
        output.close();
      }
    }

  }// end of class BZip2CompressionOutputStream

  /**
   * This class is capable to de-compress BZip2 data in two modes;
   * CONTINOUS and BYBLOCK.  BYBLOCK mode makes it possible to
   * do decompression starting any arbitrary position in the stream.
   *
   * So this facility can easily be used to parallelize decompression
   * of a large BZip2 file for performance reasons.  (It is exactly
   * done so for Hadoop framework.  See LineRecordReader for an
   * example).  So one can break the file (of course logically) into
   * chunks for parallel processing.  These "splits" should be like
   * default Hadoop splits (e.g as in FileInputFormat getSplit metod).
   * So this code is designed and tested for FileInputFormat's way
   * of splitting only.
   */

  private static class BZip2CompressionInputStream extends
      SplitCompressionInputStream {

    // class data starts here//
    private CBZip2InputStream input;
    boolean needsReset;
    private BufferedInputStream bufferedIn;
    private boolean isHeaderStripped = false;
    private boolean isSubHeaderStripped = false;
    private READ_MODE readMode = READ_MODE.CONTINUOUS;
    private long startingPos = 0L;
    private boolean didInitialRead;

    // Following state machine handles different states of compressed stream
    // position
    // HOLD : Don't advertise compressed stream position
    // ADVERTISE : Read 1 more character and advertise stream position
    // See more comments about it before updatePos method.
    private enum POS_ADVERTISEMENT_STATE_MACHINE {
      HOLD, ADVERTISE
    };

    POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
    long compressedStreamPosition = 0;

    // class data ends here//

    public BZip2CompressionInputStream(InputStream in) throws IOException {
      this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
    }

    public BZip2CompressionInputStream(InputStream in, long start, long end,
        READ_MODE readMode) throws IOException {
      super(in, start, end);
      needsReset = false;
      bufferedIn = new BufferedInputStream(super.in);
      this.startingPos = super.getPos();
      this.readMode = readMode;
      long numSkipped = 0;
      if (this.startingPos == 0) {
        // We only strip header if it is start of file
        bufferedIn = readStreamHeader();
      } else if (this.readMode == READ_MODE.BYBLOCK  &&
          this.startingPos <= HEADER_LEN + SUB_HEADER_LEN) {
        // When we're in BYBLOCK mode and the start position is >=0
        // and < HEADER_LEN + SUB_HEADER_LEN, we should skip to after
        // start of the first bz2 block to avoid duplicated records
        numSkipped = HEADER_LEN + SUB_HEADER_LEN + 1 - this.startingPos;
        long skipBytes = numSkipped;
        while (skipBytes > 0) {
          long s = bufferedIn.skip(skipBytes);
          if (s > 0) {
            skipBytes -= s;
          } else {
            if (bufferedIn.read() == -1) {
              break; // end of the split
            } else {
              skipBytes--;
            }
          }
        }
      }
      input = new CBZip2InputStream(bufferedIn, readMode);
      if (this.isHeaderStripped) {
        input.updateReportedByteCount(HEADER_LEN);
      }

      if (this.isSubHeaderStripped) {
        input.updateReportedByteCount(SUB_HEADER_LEN);
      }

      if (numSkipped > 0) {
        input.updateReportedByteCount((int) numSkipped);
      }

      // To avoid dropped records, not advertising a new byte position
      // when we are in BYBLOCK mode and the start position is 0
      if (!(this.readMode == READ_MODE.BYBLOCK && this.startingPos == 0)) {
        this.updatePos(false);
      }
    }

    private BufferedInputStream readStreamHeader() throws IOException {
      // We are flexible enough to allow the compressed stream not to
      // start with the header of BZ. So it works fine either we have
      // the header or not.
      if (super.in != null) {
        bufferedIn.mark(HEADER_LEN);
        byte[] headerBytes = new byte[HEADER_LEN];
        int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
        if (actualRead != -1) {
          String header = new String(headerBytes, StandardCharsets.UTF_8);
          if (header.compareTo(HEADER) != 0) {
            bufferedIn.reset();
          } else {
            this.isHeaderStripped = true;
            // In case of BYBLOCK mode, we also want to strip off
            // remaining two character of the header.
            if (this.readMode == READ_MODE.BYBLOCK) {
              actualRead = bufferedIn.read(headerBytes, 0,
                  SUB_HEADER_LEN);
              if (actualRead != -1) {
                this.isSubHeaderStripped = true;
              }
            }
          }
        }
      }

      if (bufferedIn == null) {
        throw new IOException("Failed to read bzip2 stream.");
      }

      return bufferedIn;

    }// end of method

    public void close() throws IOException {
      if (!needsReset) {
        try {
          input.close();
          needsReset = true;
        } finally {
          super.close();
        }
      }
    }

    /**
    * This method updates compressed stream position exactly when the
    * client of this code has read off at least one byte passed any BZip2
    * end of block marker.
    *
    * This mechanism is very helpful to deal with data level record
    * boundaries. Please see constructor and next methods of
    * org.apache.hadoop.mapred.LineRecordReader as an example usage of this
    * feature.  We elaborate it with an example in the following:
    *
    * Assume two different scenarios of the BZip2 compressed stream, where
    * [m] represent end of block, \n is line delimiter and . represent compressed
    * data.
    *
    * ............[m]......\n.......
    *
    * ..........\n[m]......\n.......
    *
    * Assume that end is right after [m].  In the first case the reading
    * will stop at \n and there is no need to read one more line.  (To see the
    * reason of reading one more line in the next() method is explained in LineRecordReader.)
    * While in the second example LineRecordReader needs to read one more line
    * (till the second \n).  Now since BZip2Codecs only update position
    * at least one byte passed a maker, so it is straight forward to differentiate
    * between the two cases mentioned.
    *
    */

    public int read(byte[] b, int off, int len) throws IOException {
      if (b == null) {
        throw new NullPointerException();
      }
      if (off < 0 || len < 0 || len > b.length - off) {
        throw new IndexOutOfBoundsException();
      }
      if (len == 0) {
        return 0;
      }
      if (needsReset) {
        internalReset();
      }
      // When startingPos > 0, the stream should be initialized at the end of
      // one block (which would correspond to be the start of another block).
      // Thus, the initial read would technically be reading one byte passed a
      // BZip2 end of block marker. To be consistent, we should also be
      // updating the position to be one byte after the end of an block on the
      // initial read.
      boolean initializedAtEndOfBlock =
          !didInitialRead && startingPos > 0 && readMode == READ_MODE.BYBLOCK;
      int result = initializedAtEndOfBlock
          ? BZip2Constants.END_OF_BLOCK
          : this.input.read(b, off, len);
      if (result == BZip2Constants.END_OF_BLOCK) {
        this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
      }

      if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
        result = this.input.read(b, off, 1);
        // This is the precise time to update compressed stream position
        // to the client of this code.
        this.updatePos(true);
        this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
      }

      didInitialRead = true;
      return result;

    }

    public int read() throws IOException {
      byte b[] = new byte[1];
      int result = this.read(b, 0, 1);
      return (result < 0) ? result : (b[0] & 0xff);
    }

    private void internalReset() throws IOException {
      if (needsReset) {
        needsReset = false;
        BufferedInputStream bufferedIn = readStreamHeader();
        input = new CBZip2InputStream(bufferedIn, this.readMode);
        didInitialRead = false;
      }
    }    
    
    public void resetState() throws IOException {
      // Cannot read from bufferedIn at this point because bufferedIn
      // might not be ready
      // yet, as in SequenceFile.Reader implementation.
      needsReset = true;
    }

    public long getPos() {
      return this.compressedStreamPosition;
      }

    /*
     * As the comments before read method tell that
     * compressed stream is advertised when at least
     * one byte passed EOB have been read off.  But
     * there is an exception to this rule.  When we
     * construct the stream we advertise the position
     * exactly at EOB.  In the following method
     * shouldAddOn boolean captures this exception.
     *
     */
    private void updatePos(boolean shouldAddOn) {
      int addOn = shouldAddOn ? 1 : 0;
      this.compressedStreamPosition = this.startingPos
          + this.input.getProcessedByteCount() + addOn;
    }

  }// end of BZip2CompressionInputStream

  @VisibleForTesting
  public static void writeHeader(OutputStream out) throws IOException {
    // The compressed bzip2 stream should start with the
    // identifying characters BZ. Caller of CBZip2OutputStream
    // i.e. this class must write these characters.
    out.write(HEADER.getBytes(StandardCharsets.UTF_8));
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AlreadyClosedException 源码

hadoop BlockCompressorStream 源码

hadoop BlockDecompressorStream 源码

hadoop CodecConstants 源码

hadoop CodecPool 源码

hadoop CompressionCodec 源码

hadoop CompressionCodecFactory 源码

hadoop CompressionInputStream 源码

hadoop CompressionOutputStream 源码

hadoop Compressor 源码

0  赞