hadoop SocketOutputStream 源码

  • 2022-10-20
  • 浏览 (310)

haddop SocketOutputStream 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketOutputStream.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.net;

import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.LongWritable;

/**
 * This implements an output stream that can have a timeout while writing.
 * This sets non-blocking flag on the socket channel.
 * So after creating this object , read() on 
 * {@link Socket#getInputStream()} and write() on 
 * {@link Socket#getOutputStream()} on the associated socket will throw 
 * llegalBlockingModeException.
 * Please use {@link SocketInputStream} for reading.
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Unstable
public class SocketOutputStream extends OutputStream 
                                implements WritableByteChannel {                                
  
  private Writer writer;
  
  private static class Writer extends SocketIOWithTimeout {
    WritableByteChannel channel;
    
    Writer(WritableByteChannel channel, long timeout) throws IOException {
      super((SelectableChannel)channel, timeout);
      this.channel = channel;
    }
    
    @Override
    int performIO(ByteBuffer buf) throws IOException {
      return channel.write(buf);
    }
  }
  
  /**
   * Create a new ouput stream with the given timeout. If the timeout
   * is zero, it will be treated as infinite timeout. The socket's
   * channel will be configured to be non-blocking.
   * 
   * @param channel 
   *        Channel for writing, should also be a {@link SelectableChannel}.  
   *        The channel will be configured to be non-blocking.
   * @param timeout timeout in milliseconds. must not be negative.
   * @throws IOException raised on errors performing I/O.
   */
  public SocketOutputStream(WritableByteChannel channel, long timeout) 
                                                         throws IOException {
    SocketIOWithTimeout.checkChannelValidity(channel);
    writer = new Writer(channel, timeout);
  }
  
  /**
   * Same as SocketOutputStream(socket.getChannel(), timeout):<br><br>
   * 
   * Create a new ouput stream with the given timeout. If the timeout
   * is zero, it will be treated as infinite timeout. The socket's
   * channel will be configured to be non-blocking.
   * 
   * @see SocketOutputStream#SocketOutputStream(WritableByteChannel, long)
   *  
   * @param socket should have a channel associated with it.
   * @param timeout timeout timeout in milliseconds. must not be negative.
   * @throws IOException raised on errors performing I/O.
   */
  public SocketOutputStream(Socket socket, long timeout) 
                                         throws IOException {
    this(socket.getChannel(), timeout);
  }
  
  @Override
  public void write(int b) throws IOException {
    /* If we need to, we can optimize this allocation.
     * probably no need to optimize or encourage single byte writes.
     */
    byte[] buf = new byte[1];
    buf[0] = (byte)b;
    write(buf, 0, 1);
  }
  
  @Override
  public void write(byte[] b, int off, int len) throws IOException {
    ByteBuffer buf = ByteBuffer.wrap(b, off, len);
    while (buf.hasRemaining()) {
      try {
        if (write(buf) < 0) {
          throw new IOException("The stream is closed");
        }
      } catch (IOException e) {
        /* Unlike read, write can not inform user of partial writes.
         * So will close this if there was a partial write.
         */
        if (buf.capacity() > buf.remaining()) {
          writer.close();
        }
        throw e;
      }
    }
  }

  @Override
  public synchronized void close() throws IOException {
    /* close the channel since Socket.getOuputStream().close() 
     * closes the socket.
     */
    writer.channel.close();
    writer.close();
  }

  /**
   * @return Returns underlying channel used by this stream.
   * This is useful in certain cases like channel for 
   * {@link FileChannel#transferTo(long, long, WritableByteChannel)}
   */
  public WritableByteChannel getChannel() {
    return writer.channel; 
  }

  //WritableByteChannle interface 
  
  @Override
  public boolean isOpen() {
    return writer.isOpen();
  }

  @Override
  public int write(ByteBuffer src) throws IOException {
    return writer.doIO(src, SelectionKey.OP_WRITE);
  }
  
  /**
   * waits for the underlying channel to be ready for writing.
   * The timeout specified for this stream applies to this wait.
   *
   * @throws SocketTimeoutException 
   *         if select on the channel times out.
   * @throws IOException
   *         if any other I/O error occurs. 
   */
  public void waitForWritable() throws IOException {
    writer.waitForIO(SelectionKey.OP_WRITE);
  }
  
  /**
   * Transfers data from FileChannel using 
   * {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
   * Updates <code>waitForWritableTime</code> and <code>transferToTime</code>
   * with the time spent blocked on the network and the time spent transferring
   * data from disk to network respectively.
   * 
   * Similar to readFully(), this waits till requested amount of 
   * data is transfered.
   * 
   * @param fileCh FileChannel to transfer data from.
   * @param position position within the channel where the transfer begins
   * @param count number of bytes to transfer.
   * @param waitForWritableTime nanoseconds spent waiting for the socket 
   *        to become writable
   * @param transferToTime nanoseconds spent transferring data
   * 
   * @throws EOFException 
   *         If end of input file is reached before requested number of 
   *         bytes are transfered.
   *
   * @throws SocketTimeoutException 
   *         If this channel blocks transfer longer than timeout for 
   *         this stream.
   *          
   * @throws IOException Includes any exception thrown by 
   *         {@link FileChannel#transferTo(long, long, WritableByteChannel)}. 
   */
  public void transferToFully(FileChannel fileCh, long position, int count,
      LongWritable waitForWritableTime,
      LongWritable transferToTime) throws IOException {
    long waitTime = 0;
    long transferTime = 0;
    while (count > 0) {
      /* 
       * Ideally we should wait after transferTo returns 0. But because of
       * a bug in JRE on Linux (http://bugs.sun.com/view_bug.do?bug_id=5103988),
       * which throws an exception instead of returning 0, we wait for the
       * channel to be writable before writing to it. If you ever see 
       * IOException with message "Resource temporarily unavailable" 
       * thrown here, please let us know.
       * 
       * Once we move to JAVA SE 7, wait should be moved to correct place.
       */
      long start = System.nanoTime();
      waitForWritable();
      long wait = System.nanoTime();

      int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
      
      if (nTransfered == 0) {
        //check if end of file is reached.
        if (position >= fileCh.size()) {
          throw new EOFException("EOF Reached. file size is " + fileCh.size() + 
                                 " and " + count + " more bytes left to be " +
                                 "transfered.");
        }
        //otherwise assume the socket is full.
        //waitForWritable(); // see comment above.
      } else if (nTransfered < 0) {
        throw new IOException("Unexpected return of " + nTransfered + 
                              " from transferTo()");
      } else {
        position += nTransfered;
        count -= nTransfered;
      }
      long transfer = System.nanoTime();
      waitTime += wait - start;
      transferTime += transfer - wait;
    }
    
    if (waitForWritableTime != null) {
      waitForWritableTime.set(waitTime);
    }
    if (transferToTime != null) {
      transferToTime.set(transferTime);
    }
  }

  /**
   * Call
   * {@link #transferToFully(FileChannel, long, int, LongWritable, LongWritable)
   * }
   * with null <code>waitForWritableTime</code> and <code>transferToTime</code>.
   *
   * @param fileCh input fileCh.
   * @param position input position.
   * @param count input count.
   * @throws IOException raised on errors performing I/O.
   */
  public void transferToFully(FileChannel fileCh, long position, int count)
      throws IOException {
    transferToFully(fileCh, position, count, null, null);
  }

  public void setTimeout(int timeoutMs) {
    writer.setTimeout(timeoutMs);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbstractDNSToSwitchMapping 源码

hadoop CachedDNSToSwitchMapping 源码

hadoop ConnectTimeoutException 源码

hadoop DNS 源码

hadoop DNSDomainNameResolver 源码

hadoop DNSToSwitchMapping 源码

hadoop DNSToSwitchMappingWithDependency 源码

hadoop DomainNameResolver 源码

hadoop DomainNameResolverFactory 源码

hadoop InnerNode 源码

0  赞