hadoop BoundedResourcePool 源码

  • 2022-10-20
  • 浏览 (477)

haddop BoundedResourcePool 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BoundedResourcePool.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.hadoop.fs.impl.prefetch;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;

/**
 * Manages a fixed pool of resources.
 *
 * Avoids creating a new resource if a previously created instance is already available.
 */
public abstract class BoundedResourcePool<T> extends ResourcePool<T> {
  /**
   * The size of this pool. Fixed at creation time.
   */
  private final int size;

  /**
   * Items currently available in the pool.
   */
  private ArrayBlockingQueue<T> items;

  /**
   * Items that have been created so far (regardless of whether they are currently available).
   */
  private Set<T> createdItems;

  /**
   * Constructs a resource pool of the given size.
   *
   * @param size the size of this pool. Cannot be changed post creation.
   *
   * @throws IllegalArgumentException if size is zero or negative.
   */
  public BoundedResourcePool(int size) {
    Validate.checkPositiveInteger(size, "size");

    this.size = size;
    this.items = new ArrayBlockingQueue<>(size);

    // The created items are identified based on their object reference.
    this.createdItems = Collections.newSetFromMap(new IdentityHashMap<T, Boolean>());
  }

  /**
   * Acquires a resource blocking if necessary until one becomes available.
   */
  @Override
  public T acquire() {
    return this.acquireHelper(true);
  }

  /**
   * Acquires a resource blocking if one is immediately available. Otherwise returns null.
   */
  @Override
  public T tryAcquire() {
    return this.acquireHelper(false);
  }

  /**
   * Releases a previously acquired resource.
   *
   * @throws IllegalArgumentException if item is null.
   */
  @Override
  public void release(T item) {
    checkNotNull(item, "item");

    synchronized (createdItems) {
      if (!createdItems.contains(item)) {
        throw new IllegalArgumentException("This item is not a part of this pool");
      }
    }

    // Return if this item was released earlier.
    // We cannot use items.contains() because that check is not based on reference equality.
    for (T entry : items) {
      if (entry == item) {
        return;
      }
    }

    try {
      items.put(item);
    } catch (InterruptedException e) {
      throw new IllegalStateException("release() should never block", e);
    }
  }

  @Override
  public synchronized void close() {
    for (T item : createdItems) {
      close(item);
    }

    items.clear();
    items = null;

    createdItems.clear();
    createdItems = null;
  }

  /**
   * Derived classes may implement a way to cleanup each item.
   */
  @Override
  protected synchronized void close(T item) {
    // Do nothing in this class. Allow overriding classes to take any cleanup action.
  }

  /**
   * Number of items created so far. Mostly for testing purposes.
   * @return the count.
   */
  public int numCreated() {
    synchronized (createdItems) {
      return createdItems.size();
    }
  }

  /**
   * Number of items available to be acquired. Mostly for testing purposes.
   * @return the number available.
   */
  public synchronized int numAvailable() {
    return (size - numCreated()) + items.size();
  }

  // For debugging purposes.
  @Override
  public synchronized String toString() {
    return String.format(
        "size = %d, #created = %d, #in-queue = %d, #available = %d",
        size, numCreated(), items.size(), numAvailable());
  }

  /**
   * Derived classes must implement a way to create an instance of a resource.
   */
  protected abstract T createNew();

  private T acquireHelper(boolean canBlock) {

    // Prefer reusing an item if one is available.
    // That avoids unnecessarily creating new instances.
    T result = items.poll();
    if (result != null) {
      return result;
    }

    synchronized (createdItems) {
      // Create a new instance if allowed by the capacity of this pool.
      if (createdItems.size() < size) {
        T item = createNew();
        createdItems.add(item);
        return item;
      }
    }

    if (canBlock) {
      try {
        // Block for an instance to be available.
        return items.take();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return null;
      }
    } else {
      return null;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BlockCache 源码

hadoop BlockData 源码

hadoop BlockManager 源码

hadoop BlockOperations 源码

hadoop BufferData 源码

hadoop BufferPool 源码

hadoop CachingBlockManager 源码

hadoop EmptyPrefetchingStatistics 源码

hadoop ExecutorServiceFuturePool 源码

hadoop FilePosition 源码

0  赞