hadoop FairCallQueue 源码

  • 2022-10-20
  • 浏览 (362)

haddop FairCallQueue 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.ipc;

import java.lang.ref.WeakReference;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.AbstractQueue;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.util.MBeans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A queue with multiple levels for each priority.
 */
public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    implements BlockingQueue<E> {
  @Deprecated
  public static final int    IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
  @Deprecated
  public static final String IPC_CALLQUEUE_PRIORITY_LEVELS_KEY =
    "faircallqueue.priority-levels";

  public static final Logger LOG = LoggerFactory.getLogger(FairCallQueue.class);

  /**
   * Save the queue data of multiple priority strategies.
   * Usually the number of queue data and priority strategies saved
   * is the same.
   */
  private final List<BlockingQueue<E>> queues;

  /* Track available permits for scheduled objects.  All methods that will
   * mutate a subqueue must acquire or release a permit on the semaphore.
   * A semaphore is much faster than an exclusive lock because producers do
   * not contend with consumers and consumers do not block other consumers
   * while polling.
   */
  private final Semaphore semaphore = new Semaphore(0);
  private void signalNotEmpty() {
    semaphore.release();
  }

  /* Multiplexer picks which queue to draw from */
  private RpcMultiplexer multiplexer;

  /* Statistic tracking */
  private final ArrayList<AtomicLong> overflowedCalls;

  /* Failover if queue is filled up */
  private boolean serverFailOverEnabled;

  @VisibleForTesting
  public FairCallQueue(int priorityLevels, int capacity, String ns,
      Configuration conf) {
    this(priorityLevels, capacity, ns,
        CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), conf);
  }

  /**
   * Create a FairCallQueue.
   * @param priorityLevels the total size of all multi-level queue
   *                       priority policies
   * @param capacity the total size of all sub-queues
   * @param ns the prefix to use for configuration
   * @param capacityWeights the weights array for capacity allocation
   *                        among subqueues
   * @param conf the configuration to read from
   * Notes: Each sub-queue has a capacity of `capacity / numSubqueues`.
   * The first or the highest priority sub-queue has an excess capacity
   * of `capacity % numSubqueues`
   */
  public FairCallQueue(int priorityLevels, int capacity, String ns,
      int[] capacityWeights, Configuration conf) {
    if(priorityLevels < 1) {
      throw new IllegalArgumentException("Number of Priority Levels must be " +
          "at least 1");
    }
    int numQueues = priorityLevels;
    LOG.info("FairCallQueue is in use with " + numQueues +
        " queues with total capacity of " + capacity);

    this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
    this.overflowedCalls = new ArrayList<AtomicLong>(numQueues);
    int totalWeights = 0;
    for (int i = 0; i < capacityWeights.length; i++) {
      totalWeights += capacityWeights[i];
    }
    int residueCapacity = capacity % totalWeights;
    int unitCapacity = capacity / totalWeights;
    int queueCapacity;
    for(int i=0; i < numQueues; i++) {
      queueCapacity = unitCapacity * capacityWeights[i];
      if (i == 0) {
        this.queues.add(new LinkedBlockingQueue<E>(
            queueCapacity + residueCapacity));
      } else {
        this.queues.add(new LinkedBlockingQueue<E>(queueCapacity));
      }
      this.overflowedCalls.add(new AtomicLong(0));
    }
    this.serverFailOverEnabled = conf.getBoolean(
        ns + "." +
        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);

    this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf);
    // Make this the active source of metrics
    MetricsProxy mp = MetricsProxy.getInstance(ns);
    mp.setDelegate(this);
  }

  /**
   * Returns an element first non-empty queue equal to the priority returned
   * by the multiplexer or scans from highest to lowest priority queue.
   *
   * Caller must always acquire a semaphore permit before invoking.
   *
   * @return the first non-empty queue with less priority, or null if
   * everything was empty
   */
  private E removeNextElement() {
    int priority = multiplexer.getAndAdvanceCurrentIndex();
    E e = queues.get(priority).poll();
    // a semaphore permit has been acquired, so an element MUST be extracted
    // or the semaphore and queued elements will go out of sync.  loop to
    // avoid race condition if elements are added behind the current position,
    // awakening other threads that poll the elements ahead of our position.
    while (e == null) {
      for (int idx = 0; e == null && idx < queues.size(); idx++) {
        e = queues.get(idx).poll();
      }
    }
    return e;
  }

  /* AbstractQueue and BlockingQueue methods */

  /**
   * Add, put, and offer follow the same pattern:
   * 1. Get the assigned priorityLevel from the call by scheduler
   * 2. Get the nth sub-queue matching this priorityLevel
   * 3. delegate the call to this sub-queue.
   *
   * But differ in how they handle overflow:
   * - Add will move on to the next queue, throw on last queue overflow
   * - Put will move on to the next queue, block on last queue overflow
   * - Offer does not attempt other queues on overflow
   */

  @Override
  public boolean add(E e) {
    final int priorityLevel = e.getPriorityLevel();
    // try offering to all queues.
    if (!offerQueues(priorityLevel, e, true)) {

      CallQueueOverflowException ex;
      if (serverFailOverEnabled) {
        // Signal clients to failover and try a separate server.
        ex = CallQueueOverflowException.FAILOVER;
      } else if (priorityLevel == queues.size() - 1){
        // only disconnect the lowest priority users that overflow the queue.
        ex = CallQueueOverflowException.DISCONNECT;
      } else {
        ex = CallQueueOverflowException.KEEPALIVE;
      }
      throw ex;
    }
    return true;
  }

  @Override
  public void put(E e) throws InterruptedException {
    final int priorityLevel = e.getPriorityLevel();
    // try offering to all but last queue, put on last.
    if (!offerQueues(priorityLevel, e, false)) {
      putQueue(queues.size() - 1, e);
    }
  }

  /**
   * Put the element in a queue of a specific priority.
   * @param priority - queue priority
   * @param e - element to add
   */
  @VisibleForTesting
  void putQueue(int priority, E e) throws InterruptedException {
    queues.get(priority).put(e);
    signalNotEmpty();
  }

  /**
   * Offer the element to queue of a specific priority.
   * @param priority - queue priority
   * @param e - element to add
   * @return boolean if added to the given queue
   */
  @VisibleForTesting
  boolean offerQueue(int priority, E e) {
    boolean ret = queues.get(priority).offer(e);
    if (ret) {
      signalNotEmpty();
    }
    return ret;
  }

  /**
   * Offer the element to queue of the given or lower priority.
   * @param priority - starting queue priority
   * @param e - element to add
   * @param includeLast - whether to attempt last queue
   * @return boolean if added to a queue
   */
  private boolean offerQueues(int priority, E e, boolean includeLast) {
    int lastPriority = queues.size() - (includeLast ? 1 : 2);
    for (int i=priority; i <= lastPriority; i++) {
      if (offerQueue(i, e)) {
        return true;
      }
      // Update stats
      overflowedCalls.get(i).getAndIncrement();
    }
    return false;
  }

  @Override
  public boolean offer(E e, long timeout, TimeUnit unit)
      throws InterruptedException {
    int priorityLevel = e.getPriorityLevel();
    BlockingQueue<E> q = this.queues.get(priorityLevel);
    boolean ret = q.offer(e, timeout, unit);
    if (ret) {
      signalNotEmpty();
    }
    return ret;
  }

  @Override
  public boolean offer(E e) {
    int priorityLevel = e.getPriorityLevel();
    BlockingQueue<E> q = this.queues.get(priorityLevel);
    boolean ret = q.offer(e);
    if (ret) {
      signalNotEmpty();
    }
    return ret;
  }

  @Override
  public E take() throws InterruptedException {
    semaphore.acquire();
    return removeNextElement();
  }

  @Override
  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    return semaphore.tryAcquire(timeout, unit) ? removeNextElement() : null;
  }

  /**
   * poll() provides no strict consistency: it is possible for poll to return
   * null even though an element is in the queue.
   */
  @Override
  public E poll() {
    return semaphore.tryAcquire() ? removeNextElement() : null;
  }

  /**
   * Peek, like poll, provides no strict consistency.
   */
  @Override
  public E peek() {
    E e = null;
    for (int i=0; e == null && i < queues.size(); i++) {
      e = queues.get(i).peek();
    }
    return e;
  }

  /**
   * Size returns the sum of all sub-queue sizes, so it may be greater than
   * capacity.
   * Note: size provides no strict consistency, and should not be used to
   * control queue IO.
   */
  @Override
  public int size() {
    return semaphore.availablePermits();
  }

  /**
   * Iterator is not implemented, as it is not needed.
   */
  @Override
  public Iterator<E> iterator() {
    throw new NotImplementedException("Code is not implemented");
  }

  /**
   * drainTo defers to each sub-queue. Note that draining from a FairCallQueue
   * to another FairCallQueue will likely fail, since the incoming calls
   * may be scheduled differently in the new FairCallQueue. Nonetheless this
   * method is provided for completeness.
   */
  @Override
  public int drainTo(Collection<? super E> c, int maxElements) {
    // initially take all permits to stop consumers from modifying queues
    // while draining.  will restore any excess when done draining.
    final int permits = semaphore.drainPermits();
    final int numElements = Math.min(maxElements, permits);
    int numRemaining = numElements;
    for (int i=0; numRemaining > 0 && i < queues.size(); i++) {
      numRemaining -= queues.get(i).drainTo(c, numRemaining);
    }
    int drained = numElements - numRemaining;
    if (permits > drained) { // restore unused permits.
      semaphore.release(permits - drained);
    }
    return drained;
  }

  @Override
  public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
  }

  /**
   * Returns maximum remaining capacity. This does not reflect how much you can
   * ideally fit in this FairCallQueue, as that would depend on the scheduler's
   * decisions.
   */
  @Override
  public int remainingCapacity() {
    int sum = 0;
    for (BlockingQueue<E> q : this.queues) {
      sum += q.remainingCapacity();
    }
    return sum;
  }

  /**
   * MetricsProxy is a singleton because we may init multiple
   * FairCallQueues, but the metrics system cannot unregister beans cleanly.
   */
  private static final class MetricsProxy implements FairCallQueueMXBean,
      MetricsSource {
    // One singleton per namespace
    private static final HashMap<String, MetricsProxy> INSTANCES =
      new HashMap<String, MetricsProxy>();

    // Weakref for delegate, so we don't retain it forever if it can be GC'd
    private WeakReference<FairCallQueue<? extends Schedulable>> delegate;

    // Keep track of how many objects we registered
    private int revisionNumber = 0;

    private String namespace;

    private MetricsProxy(String namespace) {
      this.namespace = namespace;
      MBeans.register(namespace, "FairCallQueue", this);
      final String name = namespace + ".FairCallQueue";
      DefaultMetricsSystem.instance().register(name, name, this);
    }

    public static synchronized MetricsProxy getInstance(String namespace) {
      MetricsProxy mp = INSTANCES.get(namespace);
      if (mp == null) {
        // We must create one
        mp = new MetricsProxy(namespace);
        INSTANCES.put(namespace, mp);
      }
      return mp;
    }

    public void setDelegate(FairCallQueue<? extends Schedulable> obj) {
      this.delegate
          = new WeakReference<FairCallQueue<? extends Schedulable>>(obj);
      this.revisionNumber++;
    }

    /**
     * Fetch the current call queue from the weak reference delegate. If there
     * is no delegate, or the delegate is empty, this will return null.
     */
    private FairCallQueue<? extends Schedulable> getCallQueue() {
      WeakReference<FairCallQueue<? extends Schedulable>> ref = this.delegate;
      if (ref == null) {
        return null;
      }
      return ref.get();
    }

    @Override
    public int[] getQueueSizes() {
      FairCallQueue<? extends Schedulable> obj = getCallQueue();
      if (obj == null) {
        return new int[]{};
      }

      return obj.getQueueSizes();
    }

    @Override
    public long[] getOverflowedCalls() {
      FairCallQueue<? extends Schedulable> obj = getCallQueue();
      if (obj == null) {
        return new long[]{};
      }

      return obj.getOverflowedCalls();
    }

    @Override public int getRevision() {
      return revisionNumber;
    }

    @Override
    public void getMetrics(MetricsCollector collector, boolean all) {
      MetricsRecordBuilder rb = collector.addRecord("FairCallQueue")
          .setContext("rpc")
          .tag(Interns.info("namespace", "Namespace"), namespace);

      final int[] currentQueueSizes = getQueueSizes();
      final long[] currentOverflowedCalls = getOverflowedCalls();

      for (int i = 0; i < currentQueueSizes.length; i++) {
        rb.addGauge(Interns.info("FairCallQueueSize_p" + i, "FCQ Queue Size"),
            currentQueueSizes[i]);
        rb.addCounter(Interns.info("FairCallQueueOverflowedCalls_p" + i,
            "FCQ Overflowed Calls"), currentOverflowedCalls[i]);
      }
    }
  }

  // FairCallQueueMXBean
  public int[] getQueueSizes() {
    int numQueues = queues.size();
    int[] sizes = new int[numQueues];
    for (int i=0; i < numQueues; i++) {
      sizes[i] = queues.get(i).size();
    }
    return sizes;
  }

  public long[] getOverflowedCalls() {
    int numQueues = queues.size();
    long[] calls = new long[numQueues];
    for (int i=0; i < numQueues; i++) {
      calls[i] = overflowedCalls.get(i).get();
    }
    return calls;
  }

  @VisibleForTesting
  public void setMultiplexer(RpcMultiplexer newMux) {
    this.multiplexer = newMux;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AlignmentContext 源码

hadoop AsyncCallLimitExceededException 源码

hadoop CallQueueManager 源码

hadoop CallerContext 源码

hadoop Client 源码

hadoop ClientCache 源码

hadoop ClientId 源码

hadoop CostProvider 源码

hadoop DecayRpcScheduler 源码

hadoop DecayRpcSchedulerMXBean 源码

0  赞