hadoop CryptoInputStream 源码

  • 2022-10-20
  • 浏览 (406)

haddop CryptoInputStream 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.crypto;

import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.security.GeneralSecurityException;
import java.util.EnumSet;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;

/**
 * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
 * required in order to ensure that the plain text and cipher text have a 1:1
 * mapping. The decryption is buffer based. The key points of the decryption
 * are (1) calculating the counter and (2) padding through stream position:
 * <p>
 * counter = base + pos/(algorithm blocksize); 
 * padding = pos%(algorithm blocksize); 
 * <p>
 * The underlying stream offset is maintained as state.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CryptoInputStream extends FilterInputStream implements 
    Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, 
    CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, 
    ReadableByteChannel, CanUnbuffer, StreamCapabilities,
    ByteBufferPositionedReadable, IOStatisticsSource {
  private final byte[] oneByteBuf = new byte[1];
  private final CryptoCodec codec;
  private final Decryptor decryptor;
  private final int bufferSize;
  
  /**
   * Input data buffer. The data starts at inBuffer.position() and ends at 
   * to inBuffer.limit().
   */
  private ByteBuffer inBuffer;
  
  /**
   * The decrypted data buffer. The data starts at outBuffer.position() and 
   * ends at outBuffer.limit();
   */
  private ByteBuffer outBuffer;
  private long streamOffset = 0; // Underlying stream offset.
  
  /**
   * Whether the underlying stream supports 
   * {@link org.apache.hadoop.fs.ByteBufferReadable}
   */
  private Boolean usingByteBufferRead = null;
  
  /**
   * Padding = pos%(algorithm blocksize); Padding is put into {@link #inBuffer} 
   * before any other data goes in. The purpose of padding is to put the input 
   * data at proper position.
   */
  private byte padding;
  private boolean closed;
  private final byte[] key;
  private final byte[] initIV;
  private byte[] iv;
  private final boolean isByteBufferReadable;
  private final boolean isReadableByteChannel;
  
  /** DirectBuffer pool */
  private final Queue<ByteBuffer> bufferPool = 
      new ConcurrentLinkedQueue<ByteBuffer>();
  /** Decryptor pool */
  private final Queue<Decryptor> decryptorPool = 
      new ConcurrentLinkedQueue<Decryptor>();
  
  public CryptoInputStream(InputStream in, CryptoCodec codec, 
      int bufferSize, byte[] key, byte[] iv) throws IOException {
    this(in, codec, bufferSize, key, iv, 
        CryptoStreamUtils.getInputStreamOffset(in));
  }
  
  public CryptoInputStream(InputStream in, CryptoCodec codec,
      int bufferSize, byte[] key, byte[] iv, long streamOffset) throws IOException {
    super(in);
    CryptoStreamUtils.checkCodec(codec);
    this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
    this.codec = codec;
    this.key = key.clone();
    this.initIV = iv.clone();
    this.iv = iv.clone();
    this.streamOffset = streamOffset;
    isByteBufferReadable = in instanceof ByteBufferReadable;
    isReadableByteChannel = in instanceof ReadableByteChannel;
    inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
    outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
    decryptor = getDecryptor();
    resetStreamOffset(streamOffset);
  }
  
  public CryptoInputStream(InputStream in, CryptoCodec codec,
      byte[] key, byte[] iv) throws IOException {
    this(in, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), key, iv);
  }
  
  public InputStream getWrappedStream() {
    return in;
  }
  
  /**
   * Decryption is buffer based.
   * If there is data in {@link #outBuffer}, then read it out of this buffer.
   * If there is no data in {@link #outBuffer}, then read more from the 
   * underlying stream and do the decryption.
   * @param b the buffer into which the decrypted data is read.
   * @param off the buffer offset.
   * @param len the maximum number of decrypted data bytes to read.
   * @return int the total number of decrypted data bytes read into the buffer.
   * @throws IOException raised on errors performing I/O.
   */
  @Override
  public int read(byte[] b, int off, int len) throws IOException {
    checkStream();
    if (b == null) {
      throw new NullPointerException();
    } else if (off < 0 || len < 0 || len > b.length - off) {
      throw new IndexOutOfBoundsException();
    } else if (len == 0) {
      return 0;
    }
    
    final int remaining = outBuffer.remaining();
    if (remaining > 0) {
      int n = Math.min(len, remaining);
      outBuffer.get(b, off, n);
      return n;
    } else {
      int n = 0;
      
      /*
       * Check whether the underlying stream is {@link ByteBufferReadable},
       * it can avoid bytes copy.
       */
      if (usingByteBufferRead == null) {
        if (isByteBufferReadable || isReadableByteChannel) {
          try {
            n = isByteBufferReadable ? 
                ((ByteBufferReadable) in).read(inBuffer) : 
                  ((ReadableByteChannel) in).read(inBuffer);
            usingByteBufferRead = Boolean.TRUE;
          } catch (UnsupportedOperationException e) {
            usingByteBufferRead = Boolean.FALSE;
          }
        } else {
          usingByteBufferRead = Boolean.FALSE;
        }
        if (!usingByteBufferRead) {
          n = readFromUnderlyingStream(inBuffer);
        }
      } else {
        if (usingByteBufferRead) {
          n = isByteBufferReadable ? ((ByteBufferReadable) in).read(inBuffer) : 
                ((ReadableByteChannel) in).read(inBuffer);
        } else {
          n = readFromUnderlyingStream(inBuffer);
        }
      }
      if (n <= 0) {
        return n;
      }
      
      streamOffset += n; // Read n bytes
      decrypt(decryptor, inBuffer, outBuffer, padding);
      padding = afterDecryption(decryptor, inBuffer, streamOffset, iv);
      n = Math.min(len, outBuffer.remaining());
      outBuffer.get(b, off, n);
      return n;
    }
  }
  
  /** Read data from underlying stream. */
  private int readFromUnderlyingStream(ByteBuffer inBuffer) throws IOException {
    final int toRead = inBuffer.remaining();
    final byte[] tmp = getTmpBuf();
    final int n = in.read(tmp, 0, toRead);
    if (n > 0) {
      inBuffer.put(tmp, 0, n);
    }
    return n;
  }
  
  private byte[] tmpBuf;
  private byte[] getTmpBuf() {
    if (tmpBuf == null) {
      tmpBuf = new byte[bufferSize];
    }
    return tmpBuf;
  }
  
  /**
   * Do the decryption using inBuffer as input and outBuffer as output.
   * Upon return, inBuffer is cleared; the decrypted data starts at 
   * outBuffer.position() and ends at outBuffer.limit();
   */
  private void decrypt(Decryptor decryptor, ByteBuffer inBuffer, 
      ByteBuffer outBuffer, byte padding) throws IOException {
    Preconditions.checkState(inBuffer.position() >= padding);
    if(inBuffer.position() == padding) {
      // There is no real data in inBuffer.
      return;
    }
    inBuffer.flip();
    outBuffer.clear();
    decryptor.decrypt(inBuffer, outBuffer);
    inBuffer.clear();
    outBuffer.flip();
    if (padding > 0) {
      /*
       * The plain text and cipher text have a 1:1 mapping, they start at the 
       * same position.
       */
      outBuffer.position(padding);
    }
  }
  
  /**
   * This method is executed immediately after decryption. Check whether 
   * decryptor should be updated and recalculate padding if needed. 
   */
  private byte afterDecryption(Decryptor decryptor, ByteBuffer inBuffer, 
      long position, byte[] iv) throws IOException {
    byte padding = 0;
    if (decryptor.isContextReset()) {
      /*
       * This code is generally not executed since the decryptor usually 
       * maintains decryption context (e.g. the counter) internally. However, 
       * some implementations can't maintain context so a re-init is necessary 
       * after each decryption call.
       */
      updateDecryptor(decryptor, position, iv);
      padding = getPadding(position);
      inBuffer.position(padding);
    }
    return padding;
  }
  
  private long getCounter(long position) {
    return position / codec.getCipherSuite().getAlgorithmBlockSize();
  }
  
  private byte getPadding(long position) {
    return (byte)(position % codec.getCipherSuite().getAlgorithmBlockSize());
  }
  
  /** Calculate the counter and iv, update the decryptor. */
  private void updateDecryptor(Decryptor decryptor, long position, byte[] iv) 
      throws IOException {
    final long counter = getCounter(position);
    codec.calculateIV(initIV, counter, iv);
    decryptor.init(key, iv);
  }
  
  /**
   * Reset the underlying stream offset; clear {@link #inBuffer} and 
   * {@link #outBuffer}. This Typically happens during {@link #seek(long)} 
   * or {@link #skip(long)}.
   */
  private void resetStreamOffset(long offset) throws IOException {
    streamOffset = offset;
    inBuffer.clear();
    outBuffer.clear();
    outBuffer.limit(0);
    updateDecryptor(decryptor, offset, iv);
    padding = getPadding(offset);
    inBuffer.position(padding); // Set proper position for input data.
  }
  
  @Override
  public synchronized void close() throws IOException {
    if (closed) {
      return;
    }
    
    super.close();
    freeBuffers();
    codec.close();
    closed = true;
  }
  
  /** Positioned read. It is thread-safe */
  @Override
  public int read(long position, byte[] buffer, int offset, int length)
      throws IOException {
    checkStream();
    if (!(in instanceof PositionedReadable)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not support positioned read.");
    }
    final int n = ((PositionedReadable) in).read(position, buffer, offset,
        length);
    if (n > 0) {
      // This operation does not change the current offset of the file
      decrypt(position, buffer, offset, n);
    }

    return n;
  }

  /**
   * Positioned read using {@link ByteBuffer}s. This method is thread-safe.
   */
  @Override
  public int read(long position, final ByteBuffer buf)
      throws IOException {
    checkStream();
    if (!(in instanceof ByteBufferPositionedReadable)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not support positioned reads with byte buffers.");
    }
    int bufPos = buf.position();
    final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
    if (n > 0) {
      // This operation does not change the current offset of the file
      decrypt(position, buf, n, bufPos);
    }

    return n;
  }

  /**
   * Positioned readFully using {@link ByteBuffer}s. This method is thread-safe.
   */
  @Override
  public void readFully(long position, final ByteBuffer buf)
      throws IOException {
    checkStream();
    if (!(in instanceof ByteBufferPositionedReadable)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not support positioned reads with byte buffers.");
    }
    int bufPos = buf.position();
    ((ByteBufferPositionedReadable) in).readFully(position, buf);
    final int n = buf.position() - bufPos;
    if (n > 0) {
      // This operation does not change the current offset of the file
      decrypt(position, buf, n, bufPos);
    }
  }

  /**
   * Decrypt length bytes in buffer starting at offset. Output is also put 
   * into buffer starting at offset. It is thread-safe.
   */
  private void decrypt(long position, byte[] buffer, int offset, int length) 
      throws IOException {
    ByteBuffer localInBuffer = null;
    ByteBuffer localOutBuffer = null;
    Decryptor decryptor = null;
    try {
      localInBuffer = getBuffer();
      localOutBuffer = getBuffer();
      decryptor = getDecryptor();
      byte[] iv = initIV.clone();
      updateDecryptor(decryptor, position, iv);
      byte padding = getPadding(position);
      localInBuffer.position(padding); // Set proper position for input data.
      
      int n = 0;
      while (n < length) {
        int toDecrypt = Math.min(length - n, localInBuffer.remaining());
        localInBuffer.put(buffer, offset + n, toDecrypt);
        // Do decryption
        decrypt(decryptor, localInBuffer, localOutBuffer, padding);
        
        localOutBuffer.get(buffer, offset + n, toDecrypt);
        n += toDecrypt;
        padding = afterDecryption(decryptor, localInBuffer, position + n, iv);
      }
    } finally {
      returnBuffer(localInBuffer);
      returnBuffer(localOutBuffer);
      returnDecryptor(decryptor);
    }
  }

  /**
   * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
   * decrypted from {@code buf} starting at {@code start}.
   * {@code buf.position()} and {@code buf.limit()} are unchanged after this
   * method returns. This method is thread-safe.
   *
   * <p>
   *   This method decrypts the input buf chunk-by-chunk and writes the
   *   decrypted output back into the input buf. It uses two local buffers
   *   taken from the {@link #bufferPool} to assist in this process: one is
   *   designated as the input buffer and it stores a single chunk of the
   *   given buf, the other is designated as the output buffer, which stores
   *   the output of decrypting the input buffer. Both buffers are of size
   *   {@link #bufferSize}.
   * </p>
   *
   * <p>
   *   Decryption is done by using a {@link Decryptor} and the
   *   {@link #decrypt(Decryptor, ByteBuffer, ByteBuffer, byte)} method. Once
   *   the decrypted data is written into the output buffer, is is copied back
   *   into buf. Both buffers are returned back into the pool once the entire
   *   buf is decrypted.
   * </p>
   *
   * @param filePosition the current position of the file being read
   * @param buf the {@link ByteBuffer} to decrypt
   * @param length the number of bytes in {@code buf} to decrypt
   * @param start the position in {@code buf} to start decrypting data from
   */
  private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
          throws IOException {
    ByteBuffer localInBuffer = null;
    ByteBuffer localOutBuffer = null;

    // Duplicate the buffer so we don't have to worry about resetting the
    // original position and limit at the end of the method
    buf = buf.duplicate();

    int decryptedBytes = 0;
    Decryptor localDecryptor = null;
    try {
      localInBuffer = getBuffer();
      localOutBuffer = getBuffer();
      localDecryptor = getDecryptor();
      byte[] localIV = initIV.clone();
      updateDecryptor(localDecryptor, filePosition, localIV);
      byte localPadding = getPadding(filePosition);
      // Set proper filePosition for inputdata.
      localInBuffer.position(localPadding);

      while (decryptedBytes < length) {
        buf.position(start + decryptedBytes);
        buf.limit(start + decryptedBytes +
                Math.min(length - decryptedBytes, localInBuffer.remaining()));
        localInBuffer.put(buf);
        // Do decryption
        try {
          decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
          buf.position(start + decryptedBytes);
          buf.limit(start + length);
          decryptedBytes += localOutBuffer.remaining();
          buf.put(localOutBuffer);
        } finally {
          localPadding = afterDecryption(localDecryptor, localInBuffer,
                                         filePosition + length, localIV);
        }
      }
    } finally {
      returnBuffer(localInBuffer);
      returnBuffer(localOutBuffer);
      returnDecryptor(localDecryptor);
    }
  }

  /** Positioned read fully. It is thread-safe */
  @Override
  public void readFully(long position, byte[] buffer, int offset, int length)
      throws IOException {
    checkStream();
    if (!(in instanceof PositionedReadable)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not support positioned readFully.");
    }
    ((PositionedReadable) in).readFully(position, buffer, offset, length);
    if (length > 0) {
      // This operation does not change the current offset of the file
      decrypt(position, buffer, offset, length);
    }
  }

  @Override
  public void readFully(long position, byte[] buffer) throws IOException {
    readFully(position, buffer, 0, buffer.length);
  }

  /** Seek to a position. */
  @Override
  public void seek(long pos) throws IOException {
    if (pos < 0) {
      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
    }
    checkStream();
    /*
     * If data of target pos in the underlying stream has already been read
     * and decrypted in outBuffer, we just need to re-position outBuffer.
     */
    if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
      int forward = (int) (pos - (streamOffset - outBuffer.remaining()));
      if (forward > 0) {
        outBuffer.position(outBuffer.position() + forward);
      }
    } else {
      if (!(in instanceof Seekable)) {
        throw new UnsupportedOperationException(in.getClass().getCanonicalName()
            + " does not support seek.");
      }
      ((Seekable) in).seek(pos);
      resetStreamOffset(pos);
    }
  }
  
  /** Skip n bytes */
  @Override
  public long skip(long n) throws IOException {
    Preconditions.checkArgument(n >= 0, "Negative skip length.");
    checkStream();
    
    if (n == 0) {
      return 0;
    } else if (n <= outBuffer.remaining()) {
      int pos = outBuffer.position() + (int) n;
      outBuffer.position(pos);
      return n;
    } else {
      /*
       * Subtract outBuffer.remaining() to see how many bytes we need to 
       * skip in the underlying stream. Add outBuffer.remaining() to the 
       * actual number of skipped bytes in the underlying stream to get the 
       * number of skipped bytes from the user's point of view.
       */
      n -= outBuffer.remaining();
      long skipped = in.skip(n);
      if (skipped < 0) {
        skipped = 0;
      }
      long pos = streamOffset + skipped;
      skipped += outBuffer.remaining();
      resetStreamOffset(pos);
      return skipped;
    }
  }

  /** Get underlying stream position. */
  @Override
  public long getPos() throws IOException {
    checkStream();
    // Equals: ((Seekable) in).getPos() - outBuffer.remaining()
    return streamOffset - outBuffer.remaining();
  }
  
  /** ByteBuffer read. */
  @Override
  public int read(ByteBuffer buf) throws IOException {
    checkStream();
    if (isByteBufferReadable || isReadableByteChannel) {
      final int unread = outBuffer.remaining();
      if (unread > 0) { // Have unread decrypted data in buffer.
        int toRead = buf.remaining();
        if (toRead <= unread) {
          final int limit = outBuffer.limit();
          outBuffer.limit(outBuffer.position() + toRead);
          buf.put(outBuffer);
          outBuffer.limit(limit);
          return toRead;
        } else {
          buf.put(outBuffer);
        }
      }
      
      final int pos = buf.position();
      final int n = isByteBufferReadable ? ((ByteBufferReadable) in).read(buf) : 
            ((ReadableByteChannel) in).read(buf);
      if (n > 0) {
        streamOffset += n; // Read n bytes
        decrypt(buf, n, pos);
      }
      
      if (n >= 0) {
        return unread + n;
      } else {
        if (unread == 0) {
          return -1;
        } else {
          return unread;
        }
      }
    } else {
      int n = 0;
      if (buf.hasArray()) {
        n = read(buf.array(), buf.position(), buf.remaining());
        if (n > 0) {
          buf.position(buf.position() + n);
        }
      } else {
        byte[] tmp = new byte[buf.remaining()];
        n = read(tmp);
        if (n > 0) {
          buf.put(tmp, 0, n);
        }
      }
      return n;
    }
  }
  
  /**
   * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
   * decrypted from {@code buf} starting at {@code start}.
   * {@code buf.position()} and {@code buf.limit()} are unchanged after this
   * method returns.
   *
   * @see #decrypt(long, ByteBuffer, int, int)
   */
  private void decrypt(ByteBuffer buf, int length, int start)
      throws IOException {
    buf = buf.duplicate();
    int decryptedBytes = 0;
    while (decryptedBytes < length) {
      buf.position(start + decryptedBytes);
      buf.limit(start + decryptedBytes +
              Math.min(length - decryptedBytes, inBuffer.remaining()));
      inBuffer.put(buf);
      // Do decryption
      try {
        decrypt(decryptor, inBuffer, outBuffer, padding);
        buf.position(start + decryptedBytes);
        buf.limit(start + length);
        decryptedBytes += outBuffer.remaining();
        buf.put(outBuffer);
      } finally {
        padding = afterDecryption(decryptor, inBuffer,
                streamOffset - (length - decryptedBytes), iv);
      }
    }
  }
  
  @Override
  public int available() throws IOException {
    checkStream();
    
    return in.available() + outBuffer.remaining();
  }

  @Override
  public boolean markSupported() {
    return false;
  }
  
  @Override
  public void mark(int readLimit) {
  }
  
  @Override
  public void reset() throws IOException {
    throw new IOException("Mark/reset not supported");
  }

  @Override
  public boolean seekToNewSource(long targetPos) throws IOException {
    Preconditions.checkArgument(targetPos >= 0, 
        "Cannot seek to negative offset.");
    checkStream();
    if (!(in instanceof Seekable)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not support seekToNewSource.");
    }
    boolean result = ((Seekable) in).seekToNewSource(targetPos);
    resetStreamOffset(targetPos);
    return result;
  }

  @Override
  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
      EnumSet<ReadOption> opts) throws IOException,
      UnsupportedOperationException {
    checkStream();
    if (outBuffer.remaining() > 0) {
      if (!(in instanceof Seekable)) {
        throw new UnsupportedOperationException(in.getClass().getCanonicalName()
            + " does not support seek.");
      }
      // Have some decrypted data unread, need to reset.
      ((Seekable) in).seek(getPos());
      resetStreamOffset(getPos());
    }
    if (!(in instanceof HasEnhancedByteBufferAccess)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not support enhanced byte buffer access.");
    }
    final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
        read(bufferPool, maxLength, opts);
    if (buffer != null) {
      final int n = buffer.remaining();
      if (n > 0) {
        streamOffset += buffer.remaining(); // Read n bytes
        final int pos = buffer.position();
        decrypt(buffer, n, pos);
      }
    }
    return buffer;
  }

  @Override
  public void releaseBuffer(ByteBuffer buffer) {
    if (!(in instanceof HasEnhancedByteBufferAccess)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not support release buffer.");
    }
    ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
  }

  @Override
  public void setReadahead(Long readahead) throws IOException,
      UnsupportedOperationException {
    if (!(in instanceof CanSetReadahead)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not support setting the readahead caching strategy.");
    }
    ((CanSetReadahead) in).setReadahead(readahead);
  }

  @Override
  public void setDropBehind(Boolean dropCache) throws IOException,
      UnsupportedOperationException {
    if (!(in instanceof CanSetReadahead)) {
      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " stream does not support setting the drop-behind caching"
          + " setting.");
    }
    ((CanSetDropBehind) in).setDropBehind(dropCache);
  }

  @Override
  public FileDescriptor getFileDescriptor() throws IOException {
    if (in instanceof HasFileDescriptor) {
      return ((HasFileDescriptor) in).getFileDescriptor();
    } else if (in instanceof FileInputStream) {
      return ((FileInputStream) in).getFD();
    } else {
      return null;
    }
  }
  
  @Override
  public int read() throws IOException {
    return (read(oneByteBuf, 0, 1) == -1) ? -1 : (oneByteBuf[0] & 0xff);
  }
  
  private void checkStream() throws IOException {
    if (closed) {
      throw new IOException("Stream closed");
    }
  }
  
  /** Get direct buffer from pool */
  private ByteBuffer getBuffer() {
    ByteBuffer buffer = bufferPool.poll();
    if (buffer == null) {
      buffer = ByteBuffer.allocateDirect(bufferSize);
    }
    
    return buffer;
  }
  
  /** Return direct buffer to pool */
  private void returnBuffer(ByteBuffer buf) {
    if (buf != null) {
      buf.clear();
      bufferPool.add(buf);
    }
  }
  
  /** Forcibly free the direct buffers. */
  private void freeBuffers() {
    CryptoStreamUtils.freeDB(inBuffer);
    CryptoStreamUtils.freeDB(outBuffer);
    cleanBufferPool();
  }
  
  /** Clean direct buffer pool */
  private void cleanBufferPool() {
    ByteBuffer buf;
    while ((buf = bufferPool.poll()) != null) {
      CryptoStreamUtils.freeDB(buf);
    }
  }
  
  /** Get decryptor from pool */
  private Decryptor getDecryptor() throws IOException {
    Decryptor decryptor = decryptorPool.poll();
    if (decryptor == null) {
      try {
        decryptor = codec.createDecryptor();
      } catch (GeneralSecurityException e) {
        throw new IOException(e);
      }
    }
    
    return decryptor;
  }
  
  /** Return decryptor to pool */
  private void returnDecryptor(Decryptor decryptor) {
    if (decryptor != null) {
      decryptorPool.add(decryptor);
    }
  }

  @Override
  public boolean isOpen() {
    return !closed;
  }

  private void cleanDecryptorPool() {
    decryptorPool.clear();
  }

  @Override
  public void unbuffer() {
    cleanBufferPool();
    cleanDecryptorPool();
    StreamCapabilitiesPolicy.unbuffer(in);
  }

  @Override
  public boolean hasCapability(String capability) {
    switch (StringUtils.toLowerCase(capability)) {
    case StreamCapabilities.UNBUFFER:
      return true;
    case StreamCapabilities.READAHEAD:
    case StreamCapabilities.DROPBEHIND:
    case StreamCapabilities.READBYTEBUFFER:
    case StreamCapabilities.PREADBYTEBUFFER:
      if (!(in instanceof StreamCapabilities)) {
        throw new UnsupportedOperationException(in.getClass().getCanonicalName()
          + " does not expose its stream capabilities.");
      }
      return ((StreamCapabilities) in).hasCapability(capability);
    case StreamCapabilities.IOSTATISTICS:
      return (in instanceof StreamCapabilities)
          && ((StreamCapabilities) in).hasCapability(capability);
    default:
      return false;
    }
  }

  @Override
  public IOStatistics getIOStatistics() {
    return retrieveIOStatistics(in);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CipherOption 源码

hadoop CipherSuite 源码

hadoop CryptoCodec 源码

hadoop CryptoOutputStream 源码

hadoop CryptoProtocolVersion 源码

hadoop CryptoStreamUtils 源码

hadoop Decryptor 源码

hadoop Encryptor 源码

hadoop JceAesCtrCryptoCodec 源码

hadoop JceCtrCryptoCodec 源码

0  赞