hadoop BufferPool 源码

  • 2022-10-20
  • 浏览 (477)

haddop BufferPool 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BufferPool.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.hadoop.fs.impl.prefetch;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkState;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkNotNull;

/**
 * Manages a fixed pool of {@code ByteBuffer} instances.
 * <p>
 * Avoids creating a new buffer if a previously created buffer is already available.
 */
public class BufferPool implements Closeable {

  private static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);

  /**
   * Max number of buffers in this pool.
   */
  private final int size;

  /**
   * Size in bytes of each buffer.
   */
  private final int bufferSize;

  /*
   Invariants for internal state.
   -- a buffer is either in this.pool or in this.allocated
   -- transition between this.pool <==> this.allocated must be atomic
   -- only one buffer allocated for a given blockNumber
  */


  /**
   * Underlying bounded resource pool.
   */
  private BoundedResourcePool<ByteBuffer> pool;

  /**
   * Allows associating metadata to each buffer in the pool.
   */
  private Map<BufferData, ByteBuffer> allocated;

  /**
   * Prefetching stats.
   */
  private PrefetchingStatistics prefetchingStatistics;

  /**
   * Initializes a new instance of the {@code BufferPool} class.
   * @param size number of buffer in this pool.
   * @param bufferSize size in bytes of each buffer.
   * @param prefetchingStatistics statistics for this stream.
   * @throws IllegalArgumentException if size is zero or negative.
   * @throws IllegalArgumentException if bufferSize is zero or negative.
   */
  public BufferPool(int size,
      int bufferSize,
      PrefetchingStatistics prefetchingStatistics) {
    Validate.checkPositiveInteger(size, "size");
    Validate.checkPositiveInteger(bufferSize, "bufferSize");

    this.size = size;
    this.bufferSize = bufferSize;
    this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
    this.pool = new BoundedResourcePool<ByteBuffer>(size) {
      @Override
      public ByteBuffer createNew() {
        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
        prefetchingStatistics.memoryAllocated(bufferSize);
        return buffer;
      }
    };
  }

  /**
   * Gets a list of all blocks in this pool.
   * @return a list of all blocks in this pool.
   */
  public List<BufferData> getAll() {
    synchronized (allocated) {
      return Collections.unmodifiableList(new ArrayList<>(allocated.keySet()));
    }
  }

  /**
   * Acquires a {@code ByteBuffer}; blocking if necessary until one becomes available.
   * @param blockNumber the id of the block to acquire.
   * @return the acquired block's {@code BufferData}.
   */
  public synchronized BufferData acquire(int blockNumber) {
    BufferData data;
    final int maxRetryDelayMs = 600 * 1000;
    final int statusUpdateDelayMs = 120 * 1000;
    Retryer retryer = new Retryer(10, maxRetryDelayMs, statusUpdateDelayMs);

    do {
      if (retryer.updateStatus()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("waiting to acquire block: {}", blockNumber);
          LOG.debug("state = {}", this);
        }
        releaseReadyBlock(blockNumber);
      }
      data = tryAcquire(blockNumber);
    }
    while ((data == null) && retryer.continueRetry());

    if (data != null) {
      return data;
    } else {
      String message =
          String.format("Wait failed for acquire(%d)", blockNumber);
      throw new IllegalStateException(message);
    }
  }

  /**
   * Acquires a buffer if one is immediately available. Otherwise returns null.
   * @param blockNumber the id of the block to try acquire.
   * @return the acquired block's {@code BufferData} or null.
   */
  public synchronized BufferData tryAcquire(int blockNumber) {
    return acquireHelper(blockNumber, false);
  }

  private synchronized BufferData acquireHelper(int blockNumber,
      boolean canBlock) {
    checkNotNegative(blockNumber, "blockNumber");

    releaseDoneBlocks();

    BufferData data = find(blockNumber);
    if (data != null) {
      return data;
    }

    ByteBuffer buffer = canBlock ? pool.acquire() : pool.tryAcquire();
    if (buffer == null) {
      return null;
    }

    buffer.clear();
    data = new BufferData(blockNumber, buffer.duplicate());

    synchronized (allocated) {
      checkState(find(blockNumber) == null, "buffer data already exists");

      allocated.put(data, buffer);
    }

    return data;
  }

  /**
   * Releases resources for any blocks marked as 'done'.
   */
  private synchronized void releaseDoneBlocks() {
    for (BufferData data : getAll()) {
      if (data.stateEqualsOneOf(BufferData.State.DONE)) {
        release(data);
      }
    }
  }

  /**
   * If no blocks were released after calling releaseDoneBlocks() a few times,
   * we may end up waiting forever. To avoid that situation, we try releasing
   * a 'ready' block farthest away from the given block.
   */
  private synchronized void releaseReadyBlock(int blockNumber) {
    BufferData releaseTarget = null;
    for (BufferData data : getAll()) {
      if (data.stateEqualsOneOf(BufferData.State.READY)) {
        if (releaseTarget == null) {
          releaseTarget = data;
        } else {
          if (distance(data, blockNumber) > distance(releaseTarget,
              blockNumber)) {
            releaseTarget = data;
          }
        }
      }
    }

    if (releaseTarget != null) {
      LOG.warn("releasing 'ready' block: {}", releaseTarget);
      releaseTarget.setDone();
    }
  }

  private int distance(BufferData data, int blockNumber) {
    return Math.abs(data.getBlockNumber() - blockNumber);
  }

  /**
   * Releases a previously acquired resource.
   * @param data the {@code BufferData} instance to release.
   * @throws IllegalArgumentException if data is null.
   * @throws IllegalArgumentException if data cannot be released due to its state.
   */
  public synchronized void release(BufferData data) {
    checkNotNull(data, "data");

    synchronized (data) {
      checkArgument(
          canRelease(data),
          String.format("Unable to release buffer: %s", data));

      ByteBuffer buffer = allocated.get(data);
      if (buffer == null) {
        // Likely released earlier.
        return;
      }
      buffer.clear();
      pool.release(buffer);
      allocated.remove(data);
    }

    releaseDoneBlocks();
  }

  @Override
  public synchronized void close() {
    for (BufferData data : getAll()) {
      Future<Void> actionFuture = data.getActionFuture();
      if (actionFuture != null) {
        actionFuture.cancel(true);
      }
    }

    int currentPoolSize = pool.numCreated();

    pool.close();
    pool = null;

    allocated.clear();
    allocated = null;

    prefetchingStatistics.memoryFreed(currentPoolSize * bufferSize);
  }

  // For debugging purposes.
  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append(pool.toString());
    sb.append("\n");
    List<BufferData> allData = new ArrayList<>(getAll());
    Collections.sort(allData,
        (d1, d2) -> d1.getBlockNumber() - d2.getBlockNumber());
    for (BufferData data : allData) {
      sb.append(data.toString());
      sb.append("\n");
    }

    return sb.toString();
  }

  // Number of ByteBuffers created so far.
  public synchronized int numCreated() {
    return pool.numCreated();
  }

  // Number of ByteBuffers available to be acquired.
  public synchronized int numAvailable() {
    releaseDoneBlocks();
    return pool.numAvailable();
  }

  private BufferData find(int blockNumber) {
    synchronized (allocated) {
      for (BufferData data : allocated.keySet()) {
        if ((data.getBlockNumber() == blockNumber)
            && !data.stateEqualsOneOf(BufferData.State.DONE)) {
          return data;
        }
      }
    }

    return null;
  }

  private boolean canRelease(BufferData data) {
    return data.stateEqualsOneOf(
        BufferData.State.DONE,
        BufferData.State.READY);
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BlockCache 源码

hadoop BlockData 源码

hadoop BlockManager 源码

hadoop BlockOperations 源码

hadoop BoundedResourcePool 源码

hadoop BufferData 源码

hadoop CachingBlockManager 源码

hadoop EmptyPrefetchingStatistics 源码

hadoop ExecutorServiceFuturePool 源码

hadoop FilePosition 源码

0  赞