hadoop Chunk 源码

  • 2022-10-20
  • 浏览 (332)

haddop Chunk 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/file/tfile/Chunk.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.hadoop.io.file.tfile;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * Several related classes to support chunk-encoded sub-streams on top of a
 * regular stream.
 */
final class Chunk {

  /**
   * Prevent the instantiation of class.
   */
  private Chunk() {
    // nothing
  }

  /**
   * Decoding a chain of chunks encoded through ChunkEncoder or
   * SingleChunkEncoder.
   */
  static public class ChunkDecoder extends InputStream {
    private DataInputStream in = null;
    private boolean lastChunk;
    private int remain = 0;
    private boolean closed;

    public ChunkDecoder() {
      lastChunk = true;
      closed = true;
    }

    public void reset(DataInputStream downStream) {
      // no need to wind forward the old input.
      in = downStream;
      lastChunk = false;
      remain = 0;
      closed = false;
    }

    /**
     * Constructor
     * 
     * @param in
     *          The source input stream which contains chunk-encoded data
     *          stream.
     */
    public ChunkDecoder(DataInputStream in) {
      this.in = in;
      lastChunk = false;
      closed = false;
    }

    /**
     * Have we reached the last chunk.
     * 
     * @return true if we have reached the last chunk.
     * @throws java.io.IOException
     */
    public boolean isLastChunk() throws IOException {
      checkEOF();
      return lastChunk;
    }

    /**
     * How many bytes remain in the current chunk?
     * 
     * @return remaining bytes left in the current chunk.
     * @throws java.io.IOException
     */
    public int getRemain() throws IOException {
      checkEOF();
      return remain;
    }

    /**
     * Reading the length of next chunk.
     * 
     * @throws java.io.IOException
     *           when no more data is available.
     */
    private void readLength() throws IOException {
      remain = Utils.readVInt(in);
      if (remain >= 0) {
        lastChunk = true;
      } else {
        remain = -remain;
      }
    }

    /**
     * Check whether we reach the end of the stream.
     * 
     * @return false if the chunk encoded stream has more data to read (in which
     *         case available() will be greater than 0); true otherwise.
     * @throws java.io.IOException
     *           on I/O errors.
     */
    private boolean checkEOF() throws IOException {
      if (isClosed()) return true;
      while (true) {
        if (remain > 0) return false;
        if (lastChunk) return true;
        readLength();
      }
    }

    @Override
    /*
     * This method never blocks the caller. Returning 0 does not mean we reach
     * the end of the stream.
     */
    public int available() {
      return remain;
    }

    @Override
    public int read() throws IOException {
      if (checkEOF()) return -1;
      int ret = in.read();
      if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
      --remain;
      return ret;
    }

    @Override
    public int read(byte[] b) throws IOException {
      return read(b, 0, b.length);
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
        throw new IndexOutOfBoundsException();
      }

      if (!checkEOF()) {
        int n = Math.min(remain, len);
        int ret = in.read(b, off, n);
        if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
        remain -= ret;
        return ret;
      }
      return -1;
    }

    @Override
    public long skip(long n) throws IOException {
      if (!checkEOF()) {
        long ret = in.skip(Math.min(remain, n));
        remain -= ret;
        return ret;
      }
      return 0;
    }

    @Override
    public boolean markSupported() {
      return false;
    }

    public boolean isClosed() {
      return closed;
    }

    @Override
    public void close() throws IOException {
      if (closed == false) {
        try {
          while (!checkEOF()) {
            skip(Integer.MAX_VALUE);
          }
        } finally {
          closed = true;
        }
      }
    }
  }

  /**
   * Chunk Encoder. Encoding the output data into a chain of chunks in the
   * following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n,
   * byte[len_n]. Where len1, len2, ..., len_n are the lengths of the data
   * chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks
   * cannot have length 0. All lengths are in the range of 0 to
   * Integer.MAX_VALUE and are encoded in Utils.VInt format.
   */
  static public class ChunkEncoder extends OutputStream {
    /**
     * The data output stream it connects to.
     */
    private DataOutputStream out;

    /**
     * The internal buffer that is only used when we do not know the advertised
     * size.
     */
    private byte buf[];

    /**
     * The number of valid bytes in the buffer. This value is always in the
     * range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt>
     * through <tt>buf[count-1]</tt> contain valid byte data.
     */
    private int count;

    /**
     * Constructor.
     * 
     * @param out
     *          the underlying output stream.
     * @param buf
     *          user-supplied buffer. The buffer would be used exclusively by
     *          the ChunkEncoder during its life cycle.
     */
    public ChunkEncoder(DataOutputStream out, byte[] buf) {
      this.out = out;
      this.buf = buf;
      this.count = 0;
    }

    /**
     * Write out a chunk.
     * 
     * @param chunk
     *          The chunk buffer.
     * @param offset
     *          Offset to chunk buffer for the beginning of chunk.
     * @param len
     * @param last
     *          Is this the last call to flushBuffer?
     */
    private void writeChunk(byte[] chunk, int offset, int len, boolean last)
        throws IOException {
      if (last) { // always write out the length for the last chunk.
        Utils.writeVInt(out, len);
        if (len > 0) {
          out.write(chunk, offset, len);
        }
      } else {
        if (len > 0) {
          Utils.writeVInt(out, -len);
          out.write(chunk, offset, len);
        }
      }
    }

    /**
     * Write out a chunk that is a concatenation of the internal buffer plus
     * user supplied data. This will never be the last block.
     * 
     * @param data
     *          User supplied data buffer.
     * @param offset
     *          Offset to user data buffer.
     * @param len
     *          User data buffer size.
     */
    private void writeBufData(byte[] data, int offset, int len)
        throws IOException {
      if (count + len > 0) {
        Utils.writeVInt(out, -(count + len));
        out.write(buf, 0, count);
        count = 0;
        out.write(data, offset, len);
      }
    }

    /**
     * Flush the internal buffer.
     * 
     * Is this the last call to flushBuffer?
     * 
     * @throws java.io.IOException
     */
    private void flushBuffer() throws IOException {
      if (count > 0) {
        writeChunk(buf, 0, count, false);
        count = 0;
      }
    }

    @Override
    public void write(int b) throws IOException {
      if (count >= buf.length) {
        flushBuffer();
      }
      buf[count++] = (byte) b;
    }

    @Override
    public void write(byte b[]) throws IOException {
      write(b, 0, b.length);
    }

    @Override
    public void write(byte b[], int off, int len) throws IOException {
      if ((len + count) >= buf.length) {
        /*
         * If the input data do not fit in buffer, flush the output buffer and
         * then write the data directly. In this way buffered streams will
         * cascade harmlessly.
         */
        writeBufData(b, off, len);
        return;
      }

      System.arraycopy(b, off, buf, count, len);
      count += len;
    }

    @Override
    public void flush() throws IOException {
      flushBuffer();
      out.flush();
    }

    @Override
    public void close() throws IOException {
      if (buf != null) {
        try {
          writeChunk(buf, 0, count, true);
        } finally {
          buf = null;
          out = null;
        }
      }
    }
  }

  /**
   * Encode the whole stream as a single chunk. Expecting to know the size of
   * the chunk up-front.
   */
  static public class SingleChunkEncoder extends OutputStream {
    /**
     * The data output stream it connects to.
     */
    private final DataOutputStream out;

    /**
     * The remaining bytes to be written.
     */
    private int remain;
    private boolean closed = false;

    /**
     * Constructor.
     * 
     * @param out
     *          the underlying output stream.
     * @param size
     *          The total # of bytes to be written as a single chunk.
     * @throws java.io.IOException
     *           if an I/O error occurs.
     */
    public SingleChunkEncoder(DataOutputStream out, int size)
        throws IOException {
      this.out = out;
      this.remain = size;
      Utils.writeVInt(out, size);
    }

    @Override
    public void write(int b) throws IOException {
      if (remain > 0) {
        out.write(b);
        --remain;
      } else {
        throw new IOException("Writing more bytes than advertised size.");
      }
    }

    @Override
    public void write(byte b[]) throws IOException {
      write(b, 0, b.length);
    }

    @Override
    public void write(byte b[], int off, int len) throws IOException {
      if (remain >= len) {
        out.write(b, off, len);
        remain -= len;
      } else {
        throw new IOException("Writing more bytes than advertised size.");
      }
    }

    @Override
    public void flush() throws IOException {
      out.flush();
    }

    @Override
    public void close() throws IOException {
      if (closed == true) {
        return;
      }

      try {
        if (remain > 0) {
          throw new IOException("Writing less bytes than advertised size.");
        }
      } finally {
        closed = true;
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BCFile 源码

hadoop BoundedRangeFileInputStream 源码

hadoop ByteArray 源码

hadoop CompareUtils 源码

hadoop Compression 源码

hadoop MetaBlockAlreadyExists 源码

hadoop MetaBlockDoesNotExist 源码

hadoop RawComparable 源码

hadoop SimpleBufferedOutputStream 源码

hadoop TFile 源码

0  赞