hadoop Chunk 源码

  • 2022-10-20
  • 浏览 (375)

haddop Chunk 代码


 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
package org.apache.hadoop.io.file.tfile;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

 * Several related classes to support chunk-encoded sub-streams on top of a
 * regular stream.
final class Chunk {

   * Prevent the instantiation of class.
  private Chunk() {
    // nothing

   * Decoding a chain of chunks encoded through ChunkEncoder or
   * SingleChunkEncoder.
  static public class ChunkDecoder extends InputStream {
    private DataInputStream in = null;
    private boolean lastChunk;
    private int remain = 0;
    private boolean closed;

    public ChunkDecoder() {
      lastChunk = true;
      closed = true;

    public void reset(DataInputStream downStream) {
      // no need to wind forward the old input.
      in = downStream;
      lastChunk = false;
      remain = 0;
      closed = false;

     * Constructor
     * @param in
     *          The source input stream which contains chunk-encoded data
     *          stream.
    public ChunkDecoder(DataInputStream in) {
      this.in = in;
      lastChunk = false;
      closed = false;

     * Have we reached the last chunk.
     * @return true if we have reached the last chunk.
     * @throws java.io.IOException
    public boolean isLastChunk() throws IOException {
      return lastChunk;

     * How many bytes remain in the current chunk?
     * @return remaining bytes left in the current chunk.
     * @throws java.io.IOException
    public int getRemain() throws IOException {
      return remain;

     * Reading the length of next chunk.
     * @throws java.io.IOException
     *           when no more data is available.
    private void readLength() throws IOException {
      remain = Utils.readVInt(in);
      if (remain >= 0) {
        lastChunk = true;
      } else {
        remain = -remain;

     * Check whether we reach the end of the stream.
     * @return false if the chunk encoded stream has more data to read (in which
     *         case available() will be greater than 0); true otherwise.
     * @throws java.io.IOException
     *           on I/O errors.
    private boolean checkEOF() throws IOException {
      if (isClosed()) return true;
      while (true) {
        if (remain > 0) return false;
        if (lastChunk) return true;

     * This method never blocks the caller. Returning 0 does not mean we reach
     * the end of the stream.
    public int available() {
      return remain;

    public int read() throws IOException {
      if (checkEOF()) return -1;
      int ret = in.read();
      if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
      return ret;

    public int read(byte[] b) throws IOException {
      return read(b, 0, b.length);

    public int read(byte[] b, int off, int len) throws IOException {
      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
        throw new IndexOutOfBoundsException();

      if (!checkEOF()) {
        int n = Math.min(remain, len);
        int ret = in.read(b, off, n);
        if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
        remain -= ret;
        return ret;
      return -1;

    public long skip(long n) throws IOException {
      if (!checkEOF()) {
        long ret = in.skip(Math.min(remain, n));
        remain -= ret;
        return ret;
      return 0;

    public boolean markSupported() {
      return false;

    public boolean isClosed() {
      return closed;

    public void close() throws IOException {
      if (closed == false) {
        try {
          while (!checkEOF()) {
        } finally {
          closed = true;

   * Chunk Encoder. Encoding the output data into a chain of chunks in the
   * following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n,
   * byte[len_n]. Where len1, len2, ..., len_n are the lengths of the data
   * chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks
   * cannot have length 0. All lengths are in the range of 0 to
   * Integer.MAX_VALUE and are encoded in Utils.VInt format.
  static public class ChunkEncoder extends OutputStream {
     * The data output stream it connects to.
    private DataOutputStream out;

     * The internal buffer that is only used when we do not know the advertised
     * size.
    private byte buf[];

     * The number of valid bytes in the buffer. This value is always in the
     * range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt>
     * through <tt>buf[count-1]</tt> contain valid byte data.
    private int count;

     * Constructor.
     * @param out
     *          the underlying output stream.
     * @param buf
     *          user-supplied buffer. The buffer would be used exclusively by
     *          the ChunkEncoder during its life cycle.
    public ChunkEncoder(DataOutputStream out, byte[] buf) {
      this.out = out;
      this.buf = buf;
      this.count = 0;

     * Write out a chunk.
     * @param chunk
     *          The chunk buffer.
     * @param offset
     *          Offset to chunk buffer for the beginning of chunk.
     * @param len
     * @param last
     *          Is this the last call to flushBuffer?
    private void writeChunk(byte[] chunk, int offset, int len, boolean last)
        throws IOException {
      if (last) { // always write out the length for the last chunk.
        Utils.writeVInt(out, len);
        if (len > 0) {
          out.write(chunk, offset, len);
      } else {
        if (len > 0) {
          Utils.writeVInt(out, -len);
          out.write(chunk, offset, len);

     * Write out a chunk that is a concatenation of the internal buffer plus
     * user supplied data. This will never be the last block.
     * @param data
     *          User supplied data buffer.
     * @param offset
     *          Offset to user data buffer.
     * @param len
     *          User data buffer size.
    private void writeBufData(byte[] data, int offset, int len)
        throws IOException {
      if (count + len > 0) {
        Utils.writeVInt(out, -(count + len));
        out.write(buf, 0, count);
        count = 0;
        out.write(data, offset, len);

     * Flush the internal buffer.
     * Is this the last call to flushBuffer?
     * @throws java.io.IOException
    private void flushBuffer() throws IOException {
      if (count > 0) {
        writeChunk(buf, 0, count, false);
        count = 0;

    public void write(int b) throws IOException {
      if (count >= buf.length) {
      buf[count++] = (byte) b;

    public void write(byte b[]) throws IOException {
      write(b, 0, b.length);

    public void write(byte b[], int off, int len) throws IOException {
      if ((len + count) >= buf.length) {
         * If the input data do not fit in buffer, flush the output buffer and
         * then write the data directly. In this way buffered streams will
         * cascade harmlessly.
        writeBufData(b, off, len);

      System.arraycopy(b, off, buf, count, len);
      count += len;

    public void flush() throws IOException {

    public void close() throws IOException {
      if (buf != null) {
        try {
          writeChunk(buf, 0, count, true);
        } finally {
          buf = null;
          out = null;

   * Encode the whole stream as a single chunk. Expecting to know the size of
   * the chunk up-front.
  static public class SingleChunkEncoder extends OutputStream {
     * The data output stream it connects to.
    private final DataOutputStream out;

     * The remaining bytes to be written.
    private int remain;
    private boolean closed = false;

     * Constructor.
     * @param out
     *          the underlying output stream.
     * @param size
     *          The total # of bytes to be written as a single chunk.
     * @throws java.io.IOException
     *           if an I/O error occurs.
    public SingleChunkEncoder(DataOutputStream out, int size)
        throws IOException {
      this.out = out;
      this.remain = size;
      Utils.writeVInt(out, size);

    public void write(int b) throws IOException {
      if (remain > 0) {
      } else {
        throw new IOException("Writing more bytes than advertised size.");

    public void write(byte b[]) throws IOException {
      write(b, 0, b.length);

    public void write(byte b[], int off, int len) throws IOException {
      if (remain >= len) {
        out.write(b, off, len);
        remain -= len;
      } else {
        throw new IOException("Writing more bytes than advertised size.");

    public void flush() throws IOException {

    public void close() throws IOException {
      if (closed == true) {

      try {
        if (remain > 0) {
          throw new IOException("Writing less bytes than advertised size.");
      } finally {
        closed = true;


hadoop 源码目录


hadoop BCFile 源码

hadoop BoundedRangeFileInputStream 源码

hadoop ByteArray 源码

hadoop CompareUtils 源码

hadoop Compression 源码

hadoop MetaBlockAlreadyExists 源码

hadoop MetaBlockDoesNotExist 源码

hadoop RawComparable 源码

hadoop SimpleBufferedOutputStream 源码

hadoop TFile 源码

0  赞