hadoop CompressionInputStream 源码

  • 2022-10-20
  • 浏览 (527)

haddop CompressionInputStream 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.io.compress;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;

/**
 * A compression input stream.
 *
 * <p>Implementations are assumed to be buffered.  This permits clients to
 * reposition the underlying input stream then call {@link #resetState()},
 * without having to also synchronize client buffers.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class CompressionInputStream extends InputStream
    implements Seekable, IOStatisticsSource {
  /**
   * The input stream to be compressed. 
   */
  protected final InputStream in;
  protected long maxAvailableData = 0L;

  private Decompressor trackedDecompressor;

  /**
   * Create a compression input stream that reads
   * the decompressed bytes from the given stream.
   * 
   * @param in The input stream to be compressed.
   * @throws IOException raised on errors performing I/O.
   */
  protected CompressionInputStream(InputStream in) throws IOException {
    if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
        this.maxAvailableData = in.available();
    }
    this.in = in;
  }

  @Override
  public void close() throws IOException {
    try {
      in.close();
    } finally {
      if (trackedDecompressor != null) {
        CodecPool.returnDecompressor(trackedDecompressor);
        trackedDecompressor = null;
      }
    }
  }

  /**
   * Return any IOStatistics provided by the underlying stream.
   * @return IO stats from the inner stream.
   */
  @Override
  public IOStatistics getIOStatistics() {
    return IOStatisticsSupport.retrieveIOStatistics(in);
  }

  /**
   * Read bytes from the stream.
   * Made abstract to prevent leakage to underlying stream.
   */
  @Override
  public abstract int read(byte[] b, int off, int len) throws IOException;

  /**
   * Reset the decompressor to its initial state and discard any buffered data,
   * as the underlying stream may have been repositioned.
   *
   * @throws IOException raised on errors performing I/O.
   */
  public abstract void resetState() throws IOException;
  
  /**
   * This method returns the current position in the stream.
   *
   * @return Current position in stream as a long
   */
  @Override
  public long getPos() throws IOException {
    if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
      //This way of getting the current position will not work for file
      //size which can be fit in an int and hence can not be returned by
      //available method.
      return (this.maxAvailableData - this.in.available());
    }
    else{
      return ((Seekable)this.in).getPos();
    }

  }

  /**
   * This method is current not supported.
   *
   * @throws UnsupportedOperationException Unsupported Operation Exception.
   */

  @Override
  public void seek(long pos) throws UnsupportedOperationException {
    throw new UnsupportedOperationException();
  }

  /**
   * This method is current not supported.
   *
   * @throws UnsupportedOperationException Unsupported Operation Exception.
   */
  @Override
  public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
    throw new UnsupportedOperationException();
  }

  void setTrackedDecompressor(Decompressor decompressor) {
    trackedDecompressor = decompressor;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AlreadyClosedException 源码

hadoop BZip2Codec 源码

hadoop BlockCompressorStream 源码

hadoop BlockDecompressorStream 源码

hadoop CodecConstants 源码

hadoop CodecPool 源码

hadoop CompressionCodec 源码

hadoop CompressionCodecFactory 源码

hadoop CompressionOutputStream 源码

hadoop Compressor 源码

0  赞