hadoop Invoker 源码

  • 2022-10-20
  • 浏览 (138)

haddop Invoker 代码

文件路径:/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Optional;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

import com.amazonaws.AmazonClientException;
import com.amazonaws.SdkBaseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.FutureIO;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
import org.apache.hadoop.util.Preconditions;

import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;

/**
 * Class to provide lambda expression invocation of AWS operations.
 *
 * The core retry logic is in
 * {@link #retryUntranslated(String, boolean, Retried, CallableRaisingIOE)};
 * the other {@code retry() and retryUntranslated()} calls are wrappers.
 *
 * The static {@link #once(String, String, CallableRaisingIOE)} and
 * {@link #once(String, String, InvocationRaisingIOE)} calls take an
 * operation and
 * return it with AWS exceptions translated to IOEs of some form.
 *
 * The retry logic on a failure is defined by the retry policy passed in
 * the constructor; the standard retry policy is {@link S3ARetryPolicy},
 * though others may be used.
 *
 * The constructor also takes two {@link Retried} callbacks.
 * The {@code caughtCallback} is called whenever an exception (IOE or AWS)
 * is caught, before the retry processing looks at it.
 * The {@code retryCallback} is invoked after a retry is scheduled
 * but before the sleep.
 * These callbacks can be used for reporting and incrementing statistics.
 *
 * The static {@link #quietly(String, String, InvocationRaisingIOE)} and
 * {@link #quietlyEval(String, String, CallableRaisingIOE)} calls exist to
 * take any operation and quietly catch and log at debug.
 * The return value of {@link #quietlyEval(String, String, CallableRaisingIOE)}
 * is a java 8 optional,
 * which can then be used in java8-expressions.
 */
@InterfaceAudience.Private
public class Invoker {
  private static final Logger LOG = LoggerFactory.getLogger(Invoker.class);

  /**
   * Retry policy to use.
   */
  private final RetryPolicy retryPolicy;

  /**
   * Default retry handler.
   */
  private final Retried retryCallback;

  /**
   * Instantiate.
   * @param retryPolicy retry policy for all operations.
   * @param retryCallback standard retry policy
   */
  public Invoker(
      RetryPolicy retryPolicy,
      Retried retryCallback) {
    this.retryPolicy = retryPolicy;
    this.retryCallback = retryCallback;
  }

  public RetryPolicy getRetryPolicy() {
    return retryPolicy;
  }

  public Retried getRetryCallback() {
    return retryCallback;
  }

  /**
   * Execute a function, translating any exception into an IOException.
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param operation operation to execute
   * @param <T> type of return value
   * @return the result of the function call
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.OnceTranslated
  public static <T> T once(String action, String path,
      CallableRaisingIOE<T> operation)
      throws IOException {
    try (DurationInfo ignored = new DurationInfo(LOG, false, "%s", action)) {
      return operation.apply();
    } catch (AmazonClientException e) {
      throw S3AUtils.translateException(action, path, e);
    }
  }

  /**
   * Execute a function, translating any exception into an IOException.
   * The supplied duration tracker instance is updated with success/failure.
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param tracker tracker to update
   * @param operation operation to execute
   * @param <T> type of return value
   * @return the result of the function call
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.OnceTranslated
  public static <T> T onceTrackingDuration(
      final String action,
      final String path,
      final DurationTracker tracker,
      final CallableRaisingIOE<T> operation)
      throws IOException {
    try {
      return invokeTrackingDuration(tracker, operation);
    } catch (AmazonClientException e) {
      throw S3AUtils.translateException(action, path, e);
    }
  }

  /**
   * Execute an operation with no result.
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param operation operation to execute
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.OnceTranslated
  public static void once(String action, String path,
      InvocationRaisingIOE operation) throws IOException {
    once(action, path,
        () -> {
          operation.apply();
          return null;
        });
  }


  /**
   *
   * Wait for a future, translating AmazonClientException into an IOException.
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param future future to await for
   * @param <T> type of return value
   * @return the result of the function call
   * @throws IOException any IOE raised, or translated exception
   * @throws RuntimeException any other runtime exception
   */
  @Retries.OnceTranslated
  public static <T> T onceInTheFuture(String action,
      String path,
      final Future<T> future)
      throws IOException {
    try (DurationInfo ignored = new DurationInfo(LOG, false, "%s", action)) {
      return FutureIO.awaitFuture(future);
    } catch (AmazonClientException e) {
      throw S3AUtils.translateException(action, path, e);
    }
  }

  /**
   * Execute an operation and ignore all raised IOExceptions; log at INFO;
   * full stack only at DEBUG.
   * @param log log to use.
   * @param action action to include in log
   * @param path optional path to include in log
   * @param operation operation to execute
   * @param <T> type of operation
   */
  public static <T> void ignoreIOExceptions(
      Logger log,
      String action,
      String path,
      CallableRaisingIOE<T> operation) {
    try {
      once(action, path, operation);
    } catch (IOException e) {
      String description = toDescription(action, path);
      String error = e.toString();
      log.info("{}: {}", description, error);
      log.debug("{}", description, e);
    }
  }

  /**
   * Execute an operation and ignore all raised IOExceptions; log at INFO;
   * full stack only at DEBUG.
   * @param log log to use.
   * @param action action to include in log
   * @param path optional path to include in log
   * @param operation operation to execute
   */
  public static void ignoreIOExceptions(
      Logger log,
      String action,
      String path,
      InvocationRaisingIOE operation) {
    ignoreIOExceptions(log, action, path,
        () -> {
          operation.apply();
          return null;
        });
  }

  /**
   * Execute a void operation with retry processing.
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param retrying callback on retries
   * @param operation operation to execute
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.RetryTranslated
  public void retry(String action,
      String path,
      boolean idempotent,
      Retried retrying,
      InvocationRaisingIOE operation)
      throws IOException {
    retry(action, path, idempotent, retrying,
        () -> {
          operation.apply();
          return null;
        });
  }

  /**
   * Execute a void operation with retry processing when doRetry=true, else
   * just once.
   * @param doRetry true if retries should be performed
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param retrying callback on retries
   * @param operation operation to execute
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.RetryTranslated
  public void maybeRetry(boolean doRetry,
      String action,
      String path,
      boolean idempotent,
      Retried retrying,
      InvocationRaisingIOE operation)
      throws IOException {
    maybeRetry(doRetry, action, path, idempotent, retrying,
        () -> {
          operation.apply();
          return null;
        });
  }

  /**
   * Execute a void operation with  the default retry callback invoked.
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param operation operation to execute
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.RetryTranslated
  public void retry(String action,
      String path,
      boolean idempotent,
      InvocationRaisingIOE operation)
      throws IOException {
    retry(action, path, idempotent, retryCallback, operation);
  }

  /**
   * Execute a void operation with the default retry callback invoked when
   * doRetry=true, else just once.
   * @param doRetry true if retries should be performed
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param operation operation to execute
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.RetryTranslated
  public void maybeRetry(
      boolean doRetry,
      String action,
      String path,
      boolean idempotent,
      InvocationRaisingIOE operation)
      throws IOException {
    maybeRetry(doRetry, action, path, idempotent, retryCallback, operation);
  }

  /**
   * Execute a function with the default retry callback invoked.
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param operation operation to execute
   * @param <T> type of return value
   * @return the result of the call
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.RetryTranslated
  public <T> T retry(String action,
      @Nullable String path,
      boolean idempotent,
      CallableRaisingIOE<T> operation)
      throws IOException {

    return retry(action, path, idempotent, retryCallback, operation);
  }

  /**
   * Execute a function with retry processing.
   * Uses {@link #once(String, String, CallableRaisingIOE)} as the inner
   * invocation mechanism before retry logic is performed.
   * @param <T> type of return value
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param retrying callback on retries
   * @param operation operation to execute
   * @return the result of the call
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.RetryTranslated
  public <T> T retry(
      String action,
      @Nullable String path,
      boolean idempotent,
      Retried retrying,
      CallableRaisingIOE<T> operation)
      throws IOException {
    return retryUntranslated(
        toDescription(action, path),
        idempotent,
        retrying,
        () -> once(action, path, operation));
  }

  /**
   * Execute a function with retry processing when doRetry=true, else just once.
   * Uses {@link #once(String, String, CallableRaisingIOE)} as the inner
   * invocation mechanism before retry logic is performed.
   * @param <T> type of return value
   * @param doRetry true if retries should be performed
   * @param action action to execute (used in error messages)
   * @param path path of work (used in error messages)
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param retrying callback on retries
   * @param operation operation to execute
   * @return the result of the call
   * @throws IOException any IOE raised, or translated exception
   */
  @Retries.RetryTranslated
  public <T> T maybeRetry(
      boolean doRetry,
      String action,
      @Nullable String path,
      boolean idempotent,
      Retried retrying,
      CallableRaisingIOE<T> operation)
      throws IOException {
    if (doRetry) {
      return retryUntranslated(
          toDescription(action, path),
          idempotent,
          retrying,
          () -> once(action, path, operation));
    } else {
      return once(action, path, operation);
    }
  }

  /**
   * Execute a function with retry processing and no translation.
   * and the default retry callback.
   * @param text description for the catching callback
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param operation operation to execute
   * @param <T> type of return value
   * @return the result of the call
   * @throws IOException any IOE raised
   * @throws RuntimeException any Runtime exception raised
   */
  @Retries.RetryRaw
  public <T> T retryUntranslated(
      String text,
      boolean idempotent,
      CallableRaisingIOE<T> operation) throws IOException {
    return retryUntranslated(text, idempotent,
        retryCallback, operation);
  }

  /**
   * Execute a function with retry processing: AWS SDK Exceptions
   * are <i>not</i> translated.
   * This is method which the others eventually invoke.
   * @param <T> type of return value
   * @param text text to include in messages
   * @param idempotent does the operation have semantics
   * which mean that it can be retried even if was already executed?
   * @param retrying callback on retries
   * @param operation operation to execute
   * @return the result of the call
   * @throws IOException any IOE raised
   * @throws SdkBaseException any AWS exception raised
   * @throws RuntimeException : these are never caught and retries.
   */
  @Retries.RetryRaw
  public <T> T retryUntranslated(
      String text,
      boolean idempotent,
      Retried retrying,
      CallableRaisingIOE<T> operation) throws IOException {

    Preconditions.checkArgument(retrying != null, "null retrying argument");
    int retryCount = 0;
    Exception caught;
    RetryPolicy.RetryAction retryAction;
    boolean shouldRetry;
    do {
      try {
        if (retryCount > 0) {
          LOG.debug("retry #{}", retryCount);
        }
        // execute the operation, returning if successful
        return operation.apply();
      } catch (IOException | SdkBaseException e) {
        caught = e;
      }
      // you only get here if the operation didn't complete
      // normally, hence caught != null

      // translate the exception into an IOE for the retry logic
      IOException translated;
      if (caught instanceof IOException) {
        translated = (IOException) caught;
      } else {
        translated = S3AUtils.translateException(text, "",
            (SdkBaseException)caught);
      }

      try {
        // decide action base on operation, invocation count, etc
        retryAction = retryPolicy.shouldRetry(translated, retryCount, 0,
            idempotent);
        // is it a retry operation?
        shouldRetry = retryAction.action.equals(
            RetryPolicy.RetryAction.RETRY.action);
        if (shouldRetry) {
          // notify the callback
          retrying.onFailure(text, translated, retryCount, idempotent);
          // then sleep for the policy delay
          Thread.sleep(retryAction.delayMillis);
        }
        // increment the retry count
        retryCount++;
      } catch (InterruptedException e) {
        // sleep was interrupted
        // change the exception
        caught = new InterruptedIOException("Interrupted");
        caught.initCause(e);
        // no retry
        shouldRetry = false;
        // and re-interrupt the thread
        Thread.currentThread().interrupt();
      } catch (Exception e) {
        // The retry policy raised an exception
        // log that something happened
        LOG.warn("{}: exception in retry processing", text, e);
        // and fail the execution with the last execution exception.
        shouldRetry = false;
      }
    } while (shouldRetry);

    if (caught instanceof IOException) {
      throw (IOException) caught;
    } else {
      throw (SdkBaseException) caught;
    }
  }


  /**
   * Execute an operation; any exception raised is simply caught and
   * logged at debug.
   * @param action action to execute
   * @param path path (for exception construction)
   * @param operation operation
   */
  public static void quietly(String action,
      String path,
      InvocationRaisingIOE operation) {
    try {
      once(action, path, operation);
    } catch (Exception e) {
      LOG.debug("Action {} failed", action, e);
    }
  }

  /**
   * Execute an operation; any exception raised is caught and
   * logged at debug.
   * The result is only non-empty if the operation succeeded
   * @param <T> type to return
   * @param action action to execute
   * @param path path (for exception construction)
   * @param operation operation
   * @return the result of a successful operation
   */
  public static <T> Optional<T> quietlyEval(String action,
      String path,
      CallableRaisingIOE<T> operation) {
    try {
      return Optional.of(once(action, path, operation));
    } catch (Exception e) {
      LOG.debug("Action {} failed", action, e);
      return Optional.empty();
    }
  }

  /**
   * Take an action and path and produce a string for logging.
   * @param action action
   * @param path path (may be null or empty)
   * @return string for logs
   */
  private static String toDescription(String action, @Nullable String path) {
    return action +
        (StringUtils.isNotEmpty(path) ? (" on " + path) : "");
  }

  /**
   * Callback for retry and notification operations.
   * Even if the interface is throwing up "raw" exceptions, this handler
   * gets the translated one.
   */
  @FunctionalInterface
  public interface Retried {
    /**
     * Retry event in progress (before any sleep).
     * @param text text passed in to the retry() Call.
     * @param exception the caught (and possibly translated) exception.
     * @param retries number of retries so far
     * @param idempotent is the request idempotent.
     */
    void onFailure(
        String text,
        IOException exception,
        int retries,
        boolean idempotent);
  }

  /**
   * No op for a retrying callback.
   */
  public static final Retried NO_OP = new Retried() {
    @Override
    public void onFailure(String text,
        IOException exception,
        int retries,
        boolean idempotent) {
    }
  };

  /**
   * Log retries at debug.
   */
  public static final Retried LOG_EVENT = new Retried() {
    @Override
    public void onFailure(String text,
        IOException exception,
        int retries,
        boolean idempotent) {
      LOG.debug("{}: " + exception, text);
      if (retries == 1) {
        // stack on first attempt, to keep noise down
        LOG.debug("{}: " + exception, text, exception);
      }
    }
  };
}

相关信息

hadoop 源码目录

相关文章

hadoop AWSBadRequestException 源码

hadoop AWSClientIOException 源码

hadoop AWSCredentialProviderList 源码

hadoop AWSNoResponseException 源码

hadoop AWSRedirectException 源码

hadoop AWSS3IOException 源码

hadoop AWSServiceIOException 源码

hadoop AWSServiceThrottledException 源码

hadoop AWSStatus500Exception 源码

hadoop AnonymousAWSCredentialsProvider 源码

0  赞