hadoop LoadBalancingKMSClientProvider 源码

  • 2022-10-20
  • 浏览 (471)

haddop LoadBalancingKMSClientProvider 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.hadoop.crypto.key.kms;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.net.URI;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

 * A simple LoadBalancing KMSClientProvider that round-robins requests
 * across a provided array of KMSClientProviders. It also retries failed
 * requests on the next available provider in the load balancer group. It
 * only retries failed requests that result in an IOException, sending back
 * all other Exceptions to the caller without retry.
public class LoadBalancingKMSClientProvider extends KeyProvider implements
    KeyProviderDelegationTokenExtension.DelegationTokenExtension {

  public static Logger LOG =

  static interface ProviderCallable<T> {
    public T call(KMSClientProvider provider) throws IOException, Exception;

  static class WrapperException extends RuntimeException {
    public WrapperException(Throwable cause) {

  private final KMSClientProvider[] providers;
  private final AtomicInteger currentIdx;
  private final Text dtService; // service in token.
  private final Text canonicalService; // credentials alias for token.

  private RetryPolicy retryPolicy = null;

  public LoadBalancingKMSClientProvider(URI providerUri,
      KMSClientProvider[] providers, Configuration conf) {
    this(providerUri, providers, Time.monotonicNow(), conf);

  LoadBalancingKMSClientProvider(KMSClientProvider[] providers, long seed,
      Configuration conf) {
    this(URI.create("kms://testing"), providers, seed, conf);

  private LoadBalancingKMSClientProvider(URI uri,
      KMSClientProvider[] providers, long seed, Configuration conf) {
    // uri is the token service so it can be instantiated for renew/cancel.
    dtService = KMSClientProvider.getDtService(uri);
    // if provider not in conf, new client will alias on uri else addr.
    if (KMSUtil.getKeyProviderUri(conf) == null) {
      canonicalService = dtService;
    } else {
      // canonical service (credentials alias) will be the first underlying
      // provider's service.  must be deterministic before shuffle so multiple
      // calls for a token do not obtain another unnecessary token.
      canonicalService = new Text(providers[0].getCanonicalServiceName());

    // shuffle unless seed is 0 which is used by tests for determinism.
    this.providers = (seed != 0) ? shuffle(providers) : providers;
    for (KMSClientProvider provider : providers) {
    this.currentIdx = new AtomicInteger((int)(seed % providers.length));
    int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic.
        KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length);
    int sleepBaseMillis = conf.getInt(CommonConfigurationKeysPublic.
    int sleepMaxMillis = conf.getInt(CommonConfigurationKeysPublic.
    Preconditions.checkState(maxNumRetries >= 0);
    Preconditions.checkState(sleepBaseMillis >= 0);
    Preconditions.checkState(sleepMaxMillis >= 0);
    this.retryPolicy = RetryPolicies.failoverOnNetworkException(
        RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis,
    LOG.debug("Created LoadBalancingKMSClientProvider for KMS url: {} with {} "
            + "providers. delegation token service: {}, canonical service: {}",
        uri, providers.length, dtService, canonicalService);

  public KMSClientProvider[] getProviders() {
    return providers;

  public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
      selectDelegationToken(Credentials creds) {
    Token<? extends TokenIdentifier> token =
        KMSClientProvider.selectDelegationToken(creds, canonicalService);
    if (token == null) {
      token = KMSClientProvider.selectDelegationToken(creds, dtService);
    // fallback to querying each sub-provider.
    if (token == null) {
      for (KMSClientProvider provider : getProviders()) {
        token = provider.selectDelegationToken(creds);
        if (token != null) {
    return token;

  private <T> T doOp(ProviderCallable<T> op, int currPos,
      boolean isIdempotent) throws IOException {
    if (providers.length == 0) {
      throw new IOException("No providers configured !");
    int numFailovers = 0;
    for (int i = 0;; i++, numFailovers++) {
      KMSClientProvider provider = providers[(currPos + i) % providers.length];
      try {
        return op.call(provider);
      } catch (AccessControlException ace) {
        // No need to retry on AccessControlException
        // and AuthorizationException.
        // This assumes all the servers are configured with identical
        // permissions and identical key acls.
        throw ace;
      } catch (IOException ioe) {
        LOG.warn("KMS provider at [{}] threw an IOException: ",
            provider.getKMSUrl(), ioe);
        // SSLException can occur here because of lost connection
        // with the KMS server, creating a ConnectException from it,
        // so that the FailoverOnNetworkExceptionRetry policy will retry
        if (ioe instanceof SSLException || ioe instanceof SocketException) {
          Exception cause = ioe;
          ioe = new ConnectException("SSLHandshakeException: "
              + cause.getMessage());
        RetryAction action = null;
        try {
          action = retryPolicy.shouldRetry(ioe, 0, numFailovers, isIdempotent);
        } catch (Exception e) {
          if (e instanceof IOException) {
            throw (IOException)e;
          throw new IOException(e);
        // make sure each provider is tried at least once, to keep behavior
        // compatible with earlier versions of LBKMSCP
        if (action.action == RetryAction.RetryDecision.FAIL
            && numFailovers >= providers.length - 1) {
          LOG.error("Aborting since the Request has failed with all KMS"
              + " providers(depending on {}={} setting and numProviders={})"
              + " in the group OR the exception is not recoverable",
                  KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length),
          throw ioe;
        if (((numFailovers + 1) % providers.length) == 0) {
          // Sleep only after we try all the providers for every cycle.
          try {
          } catch (InterruptedException e) {
            throw new InterruptedIOException("Thread Interrupted");
      } catch (Exception e) {
        if (e instanceof RuntimeException) {
          throw (RuntimeException)e;
        } else {
          throw new WrapperException(e);

  private int nextIdx() {
    while (true) {
      int current = currentIdx.get();
      int next = (current + 1) % providers.length;
      if (currentIdx.compareAndSet(current, next)) {
        return current;

  public String getCanonicalServiceName() {
    return canonicalService.toString();

  public Token<?> getDelegationToken(String renewer) throws IOException {
    return doOp(new ProviderCallable<Token<?>>() {
      public Token<?> call(KMSClientProvider provider) throws IOException {
        Token<?> token = provider.getDelegationToken(renewer);
        // override sub-providers service with our own so it can be used
        // across all providers.
        LOG.debug("New token service set. Token: ({})", token);
        return token;
    }, nextIdx(), false);

  public long renewDelegationToken(final Token<?> token) throws IOException {
    return doOp(new ProviderCallable<Long>() {
      public Long call(KMSClientProvider provider) throws IOException {
        return provider.renewDelegationToken(token);
    }, nextIdx(), false);

  public Void cancelDelegationToken(final Token<?> token) throws IOException {
    return doOp(new ProviderCallable<Void>() {
      public Void call(KMSClientProvider provider) throws IOException {
        return null;
    }, nextIdx(), false);

  // This request is sent to all providers in the load-balancing group
  public void warmUpEncryptedKeys(String... keyNames) throws IOException {
    Preconditions.checkArgument(providers.length > 0,
        "No providers are configured");
    boolean success = false;
    IOException e = null;
    for (KMSClientProvider provider : providers) {
      try {
        success = true;
      } catch (IOException ioe) {
        e = ioe;
            "Error warming up keys for provider with url"
            + "[" + provider.getKMSUrl() + "]", ioe);
    if (!success && e != null) {
      throw e;

  // This request is sent to all providers in the load-balancing group
  public void drain(String keyName) {
    for (KMSClientProvider provider : providers) {

  // This request is sent to all providers in the load-balancing group
  public void invalidateCache(String keyName) throws IOException {
    for (KMSClientProvider provider : providers) {

  public EncryptedKeyVersion
      generateEncryptedKey(final String encryptionKeyName)
          throws IOException, GeneralSecurityException {
    try {
      return doOp(new ProviderCallable<EncryptedKeyVersion>() {
        public EncryptedKeyVersion call(KMSClientProvider provider)
            throws IOException, GeneralSecurityException {
          return provider.generateEncryptedKey(encryptionKeyName);
      }, nextIdx(), true);
    } catch (WrapperException we) {
      if (we.getCause() instanceof GeneralSecurityException) {
        throw (GeneralSecurityException) we.getCause();
      throw new IOException(we.getCause());

  public KeyVersion
      decryptEncryptedKey(final EncryptedKeyVersion encryptedKeyVersion)
          throws IOException, GeneralSecurityException {
    try {
      return doOp(new ProviderCallable<KeyVersion>() {
        public KeyVersion call(KMSClientProvider provider)
            throws IOException, GeneralSecurityException {
          return provider.decryptEncryptedKey(encryptedKeyVersion);
      }, nextIdx(), true);
    } catch (WrapperException we) {
      if (we.getCause() instanceof GeneralSecurityException) {
        throw (GeneralSecurityException) we.getCause();
      throw new IOException(we.getCause());

  public EncryptedKeyVersion reencryptEncryptedKey(
      final EncryptedKeyVersion ekv)
      throws IOException, GeneralSecurityException {
    try {
      return doOp(new ProviderCallable<EncryptedKeyVersion>() {
        public EncryptedKeyVersion call(KMSClientProvider provider)
            throws IOException, GeneralSecurityException {
          return provider.reencryptEncryptedKey(ekv);
      }, nextIdx(), true);
    } catch (WrapperException we) {
      if (we.getCause() instanceof GeneralSecurityException) {
        throw (GeneralSecurityException) we.getCause();
      throw new IOException(we.getCause());

  public void reencryptEncryptedKeys(final List<EncryptedKeyVersion> ekvs)
      throws IOException, GeneralSecurityException {
    try {
      doOp(new ProviderCallable<Void>() {
        public Void call(KMSClientProvider provider)
            throws IOException, GeneralSecurityException {
          return null;
      }, nextIdx(), true);
    } catch (WrapperException we) {
      if (we.getCause() instanceof GeneralSecurityException) {
        throw (GeneralSecurityException) we.getCause();
      throw new IOException(we.getCause());

  public KeyVersion getKeyVersion(final String versionName) throws IOException {
    return doOp(new ProviderCallable<KeyVersion>() {
      public KeyVersion call(KMSClientProvider provider) throws IOException {
        return provider.getKeyVersion(versionName);
    }, nextIdx(), true);

  public List<String> getKeys() throws IOException {
    return doOp(new ProviderCallable<List<String>>() {
      public List<String> call(KMSClientProvider provider) throws IOException {
        return provider.getKeys();
    }, nextIdx(), true);

  public Metadata[] getKeysMetadata(final String... names) throws IOException {
    return doOp(new ProviderCallable<Metadata[]>() {
      public Metadata[] call(KMSClientProvider provider) throws IOException {
        return provider.getKeysMetadata(names);
    }, nextIdx(), true);

  public List<KeyVersion> getKeyVersions(final String name) throws IOException {
    return doOp(new ProviderCallable<List<KeyVersion>>() {
      public List<KeyVersion> call(KMSClientProvider provider)
          throws IOException {
        return provider.getKeyVersions(name);
    }, nextIdx(), true);

  public KeyVersion getCurrentKey(final String name) throws IOException {
    return doOp(new ProviderCallable<KeyVersion>() {
      public KeyVersion call(KMSClientProvider provider) throws IOException {
        return provider.getCurrentKey(name);
    }, nextIdx(), true);

  public Metadata getMetadata(final String name) throws IOException {
    return doOp(new ProviderCallable<Metadata>() {
      public Metadata call(KMSClientProvider provider) throws IOException {
        return provider.getMetadata(name);
    }, nextIdx(), true);

  public KeyVersion createKey(final String name, final byte[] material,
      final Options options) throws IOException {
    return doOp(new ProviderCallable<KeyVersion>() {
      public KeyVersion call(KMSClientProvider provider) throws IOException {
        return provider.createKey(name, material, options);
    }, nextIdx(), false);

  public KeyVersion createKey(final String name, final Options options)
      throws NoSuchAlgorithmException, IOException {
    try {
      return doOp(new ProviderCallable<KeyVersion>() {
        public KeyVersion call(KMSClientProvider provider) throws IOException,
            NoSuchAlgorithmException {
          return provider.createKey(name, options);
      }, nextIdx(), false);
    } catch (WrapperException e) {
      if (e.getCause() instanceof GeneralSecurityException) {
        throw (NoSuchAlgorithmException) e.getCause();
      throw new IOException(e.getCause());

  public void deleteKey(final String name) throws IOException {
    doOp(new ProviderCallable<Void>() {
      public Void call(KMSClientProvider provider) throws IOException {
        return null;
    }, nextIdx(), false);

  public KeyVersion rollNewVersion(final String name, final byte[] material)
      throws IOException {
    final KeyVersion newVersion = doOp(new ProviderCallable<KeyVersion>() {
      public KeyVersion call(KMSClientProvider provider) throws IOException {
        return provider.rollNewVersion(name, material);
    }, nextIdx(), false);
    return newVersion;

  public KeyVersion rollNewVersion(final String name)
      throws NoSuchAlgorithmException, IOException {
    try {
      final KeyVersion newVersion = doOp(new ProviderCallable<KeyVersion>() {
        public KeyVersion call(KMSClientProvider provider) throws IOException,
            NoSuchAlgorithmException {
          return provider.rollNewVersion(name);
      }, nextIdx(), false);
      return newVersion;
    } catch (WrapperException e) {
      if (e.getCause() instanceof GeneralSecurityException) {
        throw (NoSuchAlgorithmException) e.getCause();
      throw new IOException(e.getCause());

  // Close all providers in the LB group
  public void close() throws IOException {
    for (KMSClientProvider provider : providers) {
      try {
      } catch (IOException ioe) {
        LOG.error("Error closing provider with url"
            + "[" + provider.getKMSUrl() + "]");

  public void flush() throws IOException {
    for (KMSClientProvider provider : providers) {
      try {
      } catch (IOException ioe) {
        LOG.error("Error flushing provider with url"
            + "[" + provider.getKMSUrl() + "]");

  private static KMSClientProvider[] shuffle(KMSClientProvider[] providers) {
    List<KMSClientProvider> list = Arrays.asList(providers);
    return list.toArray(providers);


hadoop 源码目录


hadoop KMSClientProvider 源码

hadoop KMSDelegationToken 源码

hadoop KMSRESTConstants 源码

hadoop ValueQueue 源码

0  赞