kafka OAuthBearerSaslClient 源码

  • 2022-10-20
  • 浏览 (409)

kafka OAuthBearerSaslClient 代码

文件路径:/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClient.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.security.oauthbearer.internals;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslClientFactory;
import javax.security.sasl.SaslException;

import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * {@code SaslClient} implementation for SASL/OAUTHBEARER in Kafka. This
 * implementation requires an instance of {@code AuthenticateCallbackHandler}
 * that can handle an instance of {@link OAuthBearerTokenCallback} and return
 * the {@link OAuthBearerToken} generated by the {@code login()} event on the
 * {@code LoginContext}. Said handler can also optionally handle an instance of {@link SaslExtensionsCallback}
 * to return any extensions generated by the {@code login()} event on the {@code LoginContext}.
 *
 * @see <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750,
 *      Section 2.1</a>
 *
 */
public class OAuthBearerSaslClient implements SaslClient {
    static final byte BYTE_CONTROL_A = (byte) 0x01;
    private static final Logger log = LoggerFactory.getLogger(OAuthBearerSaslClient.class);
    private final CallbackHandler callbackHandler;

    enum State {
        SEND_CLIENT_FIRST_MESSAGE, RECEIVE_SERVER_FIRST_MESSAGE, RECEIVE_SERVER_MESSAGE_AFTER_FAILURE, COMPLETE, FAILED
    }

    private State state;

    public OAuthBearerSaslClient(AuthenticateCallbackHandler callbackHandler) {
        this.callbackHandler = Objects.requireNonNull(callbackHandler);
        setState(State.SEND_CLIENT_FIRST_MESSAGE);
    }

    public CallbackHandler callbackHandler() {
        return callbackHandler;
    }

    @Override
    public String getMechanismName() {
        return OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
    }

    @Override
    public boolean hasInitialResponse() {
        return true;
    }

    @Override
    public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
        try {
            OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
            switch (state) {
                case SEND_CLIENT_FIRST_MESSAGE:
                    if (challenge != null && challenge.length != 0)
                        throw new SaslException("Expected empty challenge");
                    callbackHandler().handle(new Callback[] {callback});
                    SaslExtensions extensions = retrieveCustomExtensions();

                    setState(State.RECEIVE_SERVER_FIRST_MESSAGE);

                    return new OAuthBearerClientInitialResponse(callback.token().value(), extensions).toBytes();
                case RECEIVE_SERVER_FIRST_MESSAGE:
                    if (challenge != null && challenge.length != 0) {
                        String jsonErrorResponse = new String(challenge, StandardCharsets.UTF_8);
                        if (log.isDebugEnabled())
                            log.debug("Sending %%x01 response to server after receiving an error: {}",
                                    jsonErrorResponse);
                        setState(State.RECEIVE_SERVER_MESSAGE_AFTER_FAILURE);
                        return new byte[] {BYTE_CONTROL_A};
                    }
                    callbackHandler().handle(new Callback[] {callback});
                    if (log.isDebugEnabled())
                        log.debug("Successfully authenticated as {}", callback.token().principalName());
                    setState(State.COMPLETE);
                    return null;
                default:
                    throw new IllegalSaslStateException("Unexpected challenge in Sasl client state " + state);
            }
        } catch (SaslException e) {
            setState(State.FAILED);
            throw e;
        } catch (IOException | UnsupportedCallbackException e) {
            setState(State.FAILED);
            throw new SaslException(e.getMessage(), e);
        }
    }

    @Override
    public boolean isComplete() {
        return state == State.COMPLETE;
    }

    @Override
    public byte[] unwrap(byte[] incoming, int offset, int len) {
        if (!isComplete())
            throw new IllegalStateException("Authentication exchange has not completed");
        return Arrays.copyOfRange(incoming, offset, offset + len);
    }

    @Override
    public byte[] wrap(byte[] outgoing, int offset, int len) {
        if (!isComplete())
            throw new IllegalStateException("Authentication exchange has not completed");
        return Arrays.copyOfRange(outgoing, offset, offset + len);
    }

    @Override
    public Object getNegotiatedProperty(String propName) {
        if (!isComplete())
            throw new IllegalStateException("Authentication exchange has not completed");
        return null;
    }

    @Override
    public void dispose() {
    }

    private void setState(State state) {
        log.debug("Setting SASL/{} client state to {}", OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, state);
        this.state = state;
    }

    private SaslExtensions retrieveCustomExtensions() throws SaslException {
        SaslExtensionsCallback extensionsCallback = new SaslExtensionsCallback();
        try {
            callbackHandler().handle(new Callback[] {extensionsCallback});
        } catch (UnsupportedCallbackException e) {
            log.debug("Extensions callback is not supported by client callback handler {}, no extensions will be added",
                    callbackHandler());
        } catch (Exception e) {
            throw new SaslException("SASL extensions could not be obtained", e);
        }

        return extensionsCallback.extensions();
    }

    public static class OAuthBearerSaslClientFactory implements SaslClientFactory {
        @Override
        public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol,
                String serverName, Map<String, ?> props, CallbackHandler callbackHandler) {
            String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
            for (String mechanism : mechanisms) {
                for (int i = 0; i < mechanismNamesCompatibleWithPolicy.length; i++) {
                    if (mechanismNamesCompatibleWithPolicy[i].equals(mechanism)) {
                        if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler))
                            throw new IllegalArgumentException(String.format(
                                    "Callback handler must be castable to %s: %s",
                                    AuthenticateCallbackHandler.class.getName(), callbackHandler.getClass().getName()));
                        return new OAuthBearerSaslClient((AuthenticateCallbackHandler) callbackHandler);
                    }
                }
            }
            return null;
        }

        @Override
        public String[] getMechanismNames(Map<String, ?> props) {
            return OAuthBearerSaslServer.mechanismNamesCompatibleWithPolicy(props);
        }
    }
}

相关信息

kafka 源码目录

相关文章

kafka OAuthBearerClientInitialResponse 源码

kafka OAuthBearerRefreshingLogin 源码

kafka OAuthBearerSaslClientCallbackHandler 源码

kafka OAuthBearerSaslClientProvider 源码

kafka OAuthBearerSaslServer 源码

kafka OAuthBearerSaslServerProvider 源码

0  赞