hadoop DomainSocket 源码

  • 2022-10-20
  • 浏览 (301)

haddop DomainSocket 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.net.unix;

import java.io.Closeable;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.ByteBuffer;

import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.CloseableReferenceCount;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The implementation of UNIX domain sockets in Java.
 * 
 * See {@link DomainSocket} for more information about UNIX domain sockets.
 */
@InterfaceAudience.LimitedPrivate("HDFS")
public class DomainSocket implements Closeable {
  static {
    if (SystemUtils.IS_OS_WINDOWS) {
      loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
    } else if (!NativeCodeLoader.isNativeCodeLoaded()) {
      loadingFailureReason = "libhadoop cannot be loaded.";
    } else {
      String problem;
      try {
        anchorNative();
        problem = null;
      } catch (Throwable t) {
        problem = "DomainSocket#anchorNative got error: " + t.getMessage();
      }
      loadingFailureReason = problem;
    }
  }

  static final Logger LOG = LoggerFactory.getLogger(DomainSocket.class);

  /**
   * True only if we should validate the paths used in
   * {@link DomainSocket#bindAndListen(String)}
   */
  private static boolean validateBindPaths = true;

  /**
   * The reason why DomainSocket is not available, or null if it is available.
   */
  private final static String loadingFailureReason;

  /**
   * Initialize the native library code.
   */
  private static native void anchorNative();

  /**
   * This function is designed to validate that the path chosen for a UNIX
   * domain socket is secure.  A socket path is secure if it doesn't allow
   * unprivileged users to perform a man-in-the-middle attack against it.
   * For example, one way to perform a man-in-the-middle attack would be for
   * a malicious user to move the server socket out of the way and create his
   * own socket in the same place.  Not good.
   * 
   * Note that we only check the path once.  It's possible that the
   * permissions on the path could change, perhaps to something more relaxed,
   * immediately after the path passes our validation test-- hence creating a
   * security hole.  However, the purpose of this check is to spot common
   * misconfigurations.  System administrators do not commonly change
   * permissions on these paths while the server is running.
   *
   * For more information on Security exceptions see this wiki page:
   * https://wiki.apache.org/hadoop/SocketPathSecurity
   *
   * @param path             the path to validate
   * @param skipComponents   the number of starting path components to skip 
   *                         validation for (used only for testing)
   */
  @VisibleForTesting
  native static void validateSocketPathSecurity0(String path,
      int skipComponents) throws IOException;

  /**
   * Return true only if UNIX domain sockets are available.
   *
   * @return loadingFailureReason.
   */
  public static String getLoadingFailureReason() {
    return loadingFailureReason;
  }

  /**
   * Disable validation of the server bind paths.
   */
  @VisibleForTesting
  public static void disableBindPathValidation() {
    validateBindPaths = false;
  }

  /**
   * Given a path and a port, compute the effective path by replacing
   * occurrences of _PORT with the port.  This is mainly to make it 
   * possible to run multiple DataNodes locally for testing purposes.
   *
   * @param path            The source path
   * @param port            Port number to use
   *
   * @return                The effective path
   */
  public static String getEffectivePath(String path, int port) {
    return path.replace("_PORT", String.valueOf(port));
  }

  /**
   * The socket reference count and closed bit.
   */
  final CloseableReferenceCount refCount;

  /**
   * The file descriptor associated with this UNIX domain socket.
   */
  final int fd;

  /**
   * The path associated with this UNIX domain socket.
   */
  private final String path;

  /**
   * The InputStream associated with this socket.
   */
  private final DomainInputStream inputStream = new DomainInputStream();

  /**
   * The OutputStream associated with this socket.
   */
  private final DomainOutputStream outputStream = new DomainOutputStream();

  /**
   * The Channel associated with this socket.
   */
  private final DomainChannel channel = new DomainChannel();

  private DomainSocket(String path, int fd) {
    this.refCount = new CloseableReferenceCount();
    this.fd = fd;
    this.path = path;
  }

  private static native int bind0(String path) throws IOException;

  private void unreference(boolean checkClosed) throws ClosedChannelException {
    if (checkClosed) {
      refCount.unreferenceCheckClosed();
    } else {
      refCount.unreference();
    }
  }

  /**
   * Create a new DomainSocket listening on the given path.
   *
   * @param path         The path to bind and listen on.
   * @return             The new DomainSocket.
   * @throws IOException raised on errors performing I/O.
   */
  public static DomainSocket bindAndListen(String path) throws IOException {
    if (loadingFailureReason != null) {
      throw new UnsupportedOperationException(loadingFailureReason);
    }
    if (validateBindPaths) {
      validateSocketPathSecurity0(path, 0);
    }
    int fd = bind0(path);
    return new DomainSocket(path, fd);
  }

  /**
   * Create a pair of UNIX domain sockets which are connected to each other
   * by calling socketpair(2).
   *
   * @return                An array of two UNIX domain sockets connected to
   *                        each other.
   * @throws IOException    on error.
   */
  public static DomainSocket[] socketpair() throws IOException {
    int fds[] = socketpair0();
    return new DomainSocket[] {
      new DomainSocket("(anonymous0)", fds[0]),
      new DomainSocket("(anonymous1)", fds[1])
    };
  }

  private static native int[] socketpair0() throws IOException;

  private static native int accept0(int fd) throws IOException;

  /**
   * Accept a new UNIX domain connection.
   *
   * This method can only be used on sockets that were bound with bind().
   *
   * @return                The new connection.
   * @throws IOException    If there was an I/O error performing the accept--
   *                        such as the socket being closed from under us.
   *                        Particularly when the accept is timed out, it throws
   *                        SocketTimeoutException.
   */
  public DomainSocket accept() throws IOException {
    refCount.reference();
    boolean exc = true;
    try {
      DomainSocket ret = new DomainSocket(path, accept0(fd));
      exc = false;
      return ret;
    } finally {
      unreference(exc);
    }
  }

  private static native int connect0(String path) throws IOException;

  /**
   * Create a new DomainSocket connected to the given path.
   *
   * @param path              The path to connect to.
   * @throws IOException      If there was an I/O error performing the connect.
   *
   * @return                  The new DomainSocket.
   */
  public static DomainSocket connect(String path) throws IOException {
    if (loadingFailureReason != null) {
      throw new UnsupportedOperationException(loadingFailureReason);
    }
    int fd = connect0(path);
    return new DomainSocket(path, fd);
  }

  /**
   * Return true if the file descriptor is currently open.
   *
   * @return                 True if the file descriptor is currently open.
   */
  public boolean isOpen() {
    return refCount.isOpen();
  }

  /**
   * @return                 The socket path.
   */
  public String getPath() {
    return path;
  }

  /**
   * @return                 The socket InputStream
   */
  public DomainInputStream getInputStream() {
    return inputStream;
  }

  /**
   * @return                 The socket OutputStream
   */
  public DomainOutputStream getOutputStream() {
    return outputStream;
  }

  /**
   * @return                 The socket Channel
   */
  public DomainChannel getChannel() {
    return channel;
  }

  public static final int SEND_BUFFER_SIZE = 1;
  public static final int RECEIVE_BUFFER_SIZE = 2;
  public static final int SEND_TIMEOUT = 3;
  public static final int RECEIVE_TIMEOUT = 4;

  private static native void setAttribute0(int fd, int type, int val)
      throws IOException;

  public void setAttribute(int type, int size) throws IOException {
    refCount.reference();
    boolean exc = true;
    try {
      setAttribute0(fd, type, size);
      exc = false;
    } finally {
      unreference(exc);
    }
  }

  private native int getAttribute0(int fd, int type) throws IOException;

  public int getAttribute(int type) throws IOException {
    refCount.reference();
    int attribute;
    boolean exc = true;
    try {
      attribute = getAttribute0(fd, type);
      exc = false;
      return attribute;
    } finally {
      unreference(exc);
    }
  }

  private static native void close0(int fd) throws IOException;

  private static native void closeFileDescriptor0(FileDescriptor fd)
      throws IOException;

  private static native void shutdown0(int fd) throws IOException;

  /**
   * Close the Socket.
   */
  @Override
  public void close() throws IOException {
    // Set the closed bit on this DomainSocket
    int count;
    try {
      count = refCount.setClosed();
    } catch (ClosedChannelException e) {
      // Someone else already closed the DomainSocket.
      return;
    }
    // Wait for all references to go away
    boolean didShutdown = false;
    boolean interrupted = false;
    while (count > 0) {
      if (!didShutdown) {
        try {
          // Calling shutdown on the socket will interrupt blocking system
          // calls like accept, write, and read that are going on in a
          // different thread.
          shutdown0(fd);
        } catch (IOException e) {
          LOG.error("shutdown error: ", e);
        }
        didShutdown = true;
      }
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        interrupted = true;
      }
      count = refCount.getReferenceCount();
    }

    // At this point, nobody has a reference to the file descriptor, 
    // and nobody will be able to get one in the future either.
    // We now call close(2) on the file descriptor.
    // After this point, the file descriptor number will be reused by 
    // something else.  Although this DomainSocket object continues to hold 
    // the old file descriptor number (it's a final field), we never use it 
    // again because this DomainSocket is closed.
    close0(fd);
    if (interrupted) {
      Thread.currentThread().interrupt();
    }
  }
  
  /**
   * Call shutdown(SHUT_RDWR) on the UNIX domain socket.
   *
   * @throws IOException raised on errors performing I/O.
   */
  public void shutdown() throws IOException {
    refCount.reference();
    boolean exc = true;
    try {
      shutdown0(fd);
      exc = false;
    } finally {
      unreference(exc);
    }
  }

  private native static void sendFileDescriptors0(int fd,
      FileDescriptor descriptors[],
      byte jbuf[], int offset, int length) throws IOException;

  /**
   * Send some FileDescriptor objects to the process on the other side of this
   * socket.
   * 
   * @param descriptors       The file descriptors to send.
   * @param jbuf              Some bytes to send.  You must send at least
   *                          one byte.
   * @param offset            The offset in the jbuf array to start at.
   * @param length            Length of the jbuf array to use.
   * @throws IOException raised on errors performing I/O.
   */
  public void sendFileDescriptors(FileDescriptor descriptors[],
      byte jbuf[], int offset, int length) throws IOException {
    refCount.reference();
    boolean exc = true;
    try {
      sendFileDescriptors0(fd, descriptors, jbuf, offset, length);
      exc = false;
    } finally {
      unreference(exc);
    }
  }

  private static native int receiveFileDescriptors0(int fd,
      FileDescriptor[] descriptors,
      byte[] buf, int offset, int length) throws IOException;

  /**
   * Receive some FileDescriptor objects from the process on the other side of
   * this socket, and wrap them in FileInputStream objects.
   *
   * @param streams input stream.
   * @param buf input buf.
   * @param offset input offset.
   * @param length input length.
   * @return wrap them in FileInputStream objects.
   * @throws IOException raised on errors performing I/O.
   */
  public int recvFileInputStreams(FileInputStream[] streams, byte buf[],
        int offset, int length) throws IOException {
    FileDescriptor descriptors[] = new FileDescriptor[streams.length];
    boolean success = false;
    for (int i = 0; i < streams.length; i++) {
      streams[i] = null;
    }
    refCount.reference();
    try {
      int ret = receiveFileDescriptors0(fd, descriptors, buf, offset, length);
      for (int i = 0, j = 0; i < descriptors.length; i++) {
        if (descriptors[i] != null) {
          streams[j++] = new FileInputStream(descriptors[i]);
          descriptors[i] = null;
        }
      }
      success = true;
      return ret;
    } finally {
      if (!success) {
        for (int i = 0; i < descriptors.length; i++) {
          if (descriptors[i] != null) {
            try {
              closeFileDescriptor0(descriptors[i]);
            } catch (Throwable t) {
              LOG.warn(t.toString());
            }
          } else if (streams[i] != null) {
            try {
              streams[i].close();
            } catch (Throwable t) {
              LOG.warn(t.toString());
            } finally {
              streams[i] = null; }
          }
        }
      }
      unreference(!success);
    }
  }

  private native static int readArray0(int fd, byte b[], int off, int len)
      throws IOException;
  
  private native static int available0(int fd) throws IOException;

  private static native void write0(int fd, int b) throws IOException;

  private static native void writeArray0(int fd, byte b[], int offset, int length)
      throws IOException;

  private native static int readByteBufferDirect0(int fd, ByteBuffer dst,
      int position, int remaining) throws IOException;

  /**
   * Input stream for UNIX domain sockets.
   */
  @InterfaceAudience.LimitedPrivate("HDFS")
  public class DomainInputStream extends InputStream {
    @Override
    public int read() throws IOException {
      refCount.reference();
      boolean exc = true;
      try {
        byte b[] = new byte[1];
        int ret = DomainSocket.readArray0(DomainSocket.this.fd, b, 0, 1);
        exc = false;
        return (ret >= 0) ? b[0] : -1;
      } finally {
        unreference(exc);
      }
    }
    
    @Override
    public int read(byte b[], int off, int len) throws IOException {
      refCount.reference();
      boolean exc = true;
      try {
        int nRead = DomainSocket.readArray0(DomainSocket.this.fd, b, off, len);
        exc = false;
        return nRead;
      } finally {
        unreference(exc);
      }
    }

    @Override
    public int available() throws IOException {
      refCount.reference();
      boolean exc = true;
      try {
        int nAvailable = DomainSocket.available0(DomainSocket.this.fd);
        exc = false;
        return nAvailable;
      } finally {
        unreference(exc);
      }
    }

    @Override
    public void close() throws IOException {
      DomainSocket.this.close();
    }
  }

  /**
   * Output stream for UNIX domain sockets.
   */
  @InterfaceAudience.LimitedPrivate("HDFS")
  public class DomainOutputStream extends OutputStream {
    @Override
    public void close() throws IOException {
      DomainSocket.this.close();
    }

    @Override
    public void write(int val) throws IOException {
      refCount.reference();
      boolean exc = true;
      try {
        byte b[] = new byte[1];
        b[0] = (byte)val;
        DomainSocket.writeArray0(DomainSocket.this.fd, b, 0, 1);
        exc = false;
      } finally {
        unreference(exc);
      }
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
      refCount.reference();
      boolean exc = true;
      try {
        DomainSocket.writeArray0(DomainSocket.this.fd, b, off, len);
        exc = false;
      } finally {
        unreference(exc);
      }
    }
  }

  @InterfaceAudience.LimitedPrivate("HDFS")
  public class DomainChannel implements ReadableByteChannel {
    @Override
    public boolean isOpen() {
      return DomainSocket.this.isOpen();
    }

    @Override
    public void close() throws IOException {
      DomainSocket.this.close();
    }

    @Override
    public int read(ByteBuffer dst) throws IOException {
      refCount.reference();
      boolean exc = true;
      try {
        int nread = 0;
        if (dst.isDirect()) {
          nread = DomainSocket.readByteBufferDirect0(DomainSocket.this.fd,
              dst, dst.position(), dst.remaining());
        } else if (dst.hasArray()) {
          nread = DomainSocket.readArray0(DomainSocket.this.fd,
              dst.array(), dst.position() + dst.arrayOffset(),
              dst.remaining());
        } else {
          throw new AssertionError("we don't support " +
              "using ByteBuffers that aren't either direct or backed by " +
              "arrays");
        }
        if (nread > 0) {
          dst.position(dst.position() + nread);
        }
        exc = false;
        return nread;
      } finally {
        unreference(exc);
      }
    }
  }

  @Override
  public String toString() {
    return String.format("DomainSocket(fd=%d,path=%s)", fd, path);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop DomainSocketWatcher 源码

0  赞