kafka AdminApiHandler 源码
kafka AdminApiHandler 代码
文件路径:/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public interface AdminApiHandler<K, V> {
/**
* Get a user-friendly name for the API this handler is implementing.
*/
String apiName();
/**
* Build the requests necessary for the given keys. The set of keys is derived by
* {@link AdminApiDriver} during the lookup stage as the set of keys which all map
* to the same destination broker. Handlers can choose to issue a single request for
* all of the provided keys (see {@link Batched}, issue one request per key (see
* {@link Unbatched}, or implement their own custom grouping logic if necessary.
*
* @param brokerId the target brokerId for the request
* @param keys the set of keys that should be handled by this request
*
* @return a collection of {@link RequestAndKeys} for the requests containing the given keys
*/
Collection<RequestAndKeys<K>> buildRequest(int brokerId, Set<K> keys);
/**
* Callback that is invoked when a request returns successfully.
* The handler should parse the response, check for errors, and return a
* result which indicates which keys (if any) have either been completed or
* failed with an unrecoverable error.
*
* It is also possible that the response indicates an incorrect target brokerId
* (e.g. in the case of a NotLeader error when the request is bound for a partition
* leader). In this case the key will be "unmapped" from the target brokerId
* and lookup will be retried.
*
* Note that keys which received a retriable error should be left out of the
* result. They will be retried automatically.
*
* @param broker the broker that the associated request was sent to
* @param keys the set of keys from the associated request
* @param response the response received from the broker
*
* @return result indicating key completion, failure, and unmapping
*/
ApiResult<K, V> handleResponse(Node broker, Set<K> keys, AbstractResponse response);
/**
* Get the lookup strategy that is responsible for finding the brokerId
* which will handle each respective key.
*
* @return non-null lookup strategy
*/
AdminApiLookupStrategy<K> lookupStrategy();
class ApiResult<K, V> {
public final Map<K, V> completedKeys;
public final Map<K, Throwable> failedKeys;
public final List<K> unmappedKeys;
public ApiResult(
Map<K, V> completedKeys,
Map<K, Throwable> failedKeys,
List<K> unmappedKeys
) {
this.completedKeys = Collections.unmodifiableMap(completedKeys);
this.failedKeys = Collections.unmodifiableMap(failedKeys);
this.unmappedKeys = Collections.unmodifiableList(unmappedKeys);
}
public static <K, V> ApiResult<K, V> completed(K key, V value) {
return new ApiResult<>(
Collections.singletonMap(key, value),
Collections.emptyMap(),
Collections.emptyList()
);
}
public static <K, V> ApiResult<K, V> failed(K key, Throwable t) {
return new ApiResult<>(
Collections.emptyMap(),
Collections.singletonMap(key, t),
Collections.emptyList()
);
}
public static <K, V> ApiResult<K, V> unmapped(List<K> keys) {
return new ApiResult<>(
Collections.emptyMap(),
Collections.emptyMap(),
keys
);
}
public static <K, V> ApiResult<K, V> empty() {
return new ApiResult<>(
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyList()
);
}
}
class RequestAndKeys<K> {
public final AbstractRequest.Builder<?> request;
public final Set<K> keys;
public RequestAndKeys(AbstractRequest.Builder<?> request, Set<K> keys) {
this.request = request;
this.keys = keys;
}
}
/**
* An {@link AdminApiHandler} that will group multiple keys into a single request when possible.
* Keys will be grouped together whenever they target the same broker. This type of handler
* should be used when interacting with broker APIs that can act on multiple keys at once, such
* as describing or listing transactions.
*/
abstract class Batched<K, V> implements AdminApiHandler<K, V> {
abstract AbstractRequest.Builder<?> buildBatchedRequest(int brokerId, Set<K> keys);
@Override
public final Collection<RequestAndKeys<K>> buildRequest(int brokerId, Set<K> keys) {
return Collections.singleton(new RequestAndKeys<>(buildBatchedRequest(brokerId, keys), keys));
}
}
/**
* An {@link AdminApiHandler} that will create one request per key, not performing any grouping based
* on the targeted broker. This type of handler should only be used for broker APIs that do not accept
* multiple keys at once, such as initializing a transactional producer.
*/
abstract class Unbatched<K, V> implements AdminApiHandler<K, V> {
abstract AbstractRequest.Builder<?> buildSingleRequest(int brokerId, K key);
abstract ApiResult<K, V> handleSingleResponse(Node broker, K key, AbstractResponse response);
@Override
public final Collection<RequestAndKeys<K>> buildRequest(int brokerId, Set<K> keys) {
return keys.stream()
.map(key -> new RequestAndKeys<>(buildSingleRequest(brokerId, key), Collections.singleton(key)))
.collect(Collectors.toSet());
}
@Override
public final ApiResult<K, V> handleResponse(Node broker, Set<K> keys, AbstractResponse response) {
if (keys.size() != 1) {
throw new IllegalArgumentException("Unbatched admin handler should only be required to handle responses for a single key at a time");
}
K key = keys.iterator().next();
return handleSingleResponse(broker, key, response);
}
}
}
相关信息
相关文章
kafka AbortTransactionHandler 源码
kafka AdminApiLookupStrategy 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦