kafka ApiKeys 源码
kafka ApiKeys 代码
文件路径:/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
import static org.apache.kafka.common.protocol.types.Type.COMPACT_BYTES;
import static org.apache.kafka.common.protocol.types.Type.COMPACT_NULLABLE_BYTES;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
/**
* Identifiers for all the Kafka APIs
*/
public enum ApiKeys {
PRODUCE(ApiMessageType.PRODUCE),
FETCH(ApiMessageType.FETCH),
LIST_OFFSETS(ApiMessageType.LIST_OFFSETS),
METADATA(ApiMessageType.METADATA),
LEADER_AND_ISR(ApiMessageType.LEADER_AND_ISR, true),
STOP_REPLICA(ApiMessageType.STOP_REPLICA, true),
UPDATE_METADATA(ApiMessageType.UPDATE_METADATA, true),
CONTROLLED_SHUTDOWN(ApiMessageType.CONTROLLED_SHUTDOWN, true),
OFFSET_COMMIT(ApiMessageType.OFFSET_COMMIT),
OFFSET_FETCH(ApiMessageType.OFFSET_FETCH),
FIND_COORDINATOR(ApiMessageType.FIND_COORDINATOR),
JOIN_GROUP(ApiMessageType.JOIN_GROUP),
HEARTBEAT(ApiMessageType.HEARTBEAT),
LEAVE_GROUP(ApiMessageType.LEAVE_GROUP),
SYNC_GROUP(ApiMessageType.SYNC_GROUP),
DESCRIBE_GROUPS(ApiMessageType.DESCRIBE_GROUPS),
LIST_GROUPS(ApiMessageType.LIST_GROUPS),
SASL_HANDSHAKE(ApiMessageType.SASL_HANDSHAKE),
API_VERSIONS(ApiMessageType.API_VERSIONS),
CREATE_TOPICS(ApiMessageType.CREATE_TOPICS, false, true),
DELETE_TOPICS(ApiMessageType.DELETE_TOPICS, false, true),
DELETE_RECORDS(ApiMessageType.DELETE_RECORDS),
INIT_PRODUCER_ID(ApiMessageType.INIT_PRODUCER_ID),
OFFSET_FOR_LEADER_EPOCH(ApiMessageType.OFFSET_FOR_LEADER_EPOCH),
ADD_PARTITIONS_TO_TXN(ApiMessageType.ADD_PARTITIONS_TO_TXN, false, RecordBatch.MAGIC_VALUE_V2, false),
ADD_OFFSETS_TO_TXN(ApiMessageType.ADD_OFFSETS_TO_TXN, false, RecordBatch.MAGIC_VALUE_V2, false),
END_TXN(ApiMessageType.END_TXN, false, RecordBatch.MAGIC_VALUE_V2, false),
WRITE_TXN_MARKERS(ApiMessageType.WRITE_TXN_MARKERS, true, RecordBatch.MAGIC_VALUE_V2, false),
TXN_OFFSET_COMMIT(ApiMessageType.TXN_OFFSET_COMMIT, false, RecordBatch.MAGIC_VALUE_V2, false),
DESCRIBE_ACLS(ApiMessageType.DESCRIBE_ACLS),
CREATE_ACLS(ApiMessageType.CREATE_ACLS, false, true),
DELETE_ACLS(ApiMessageType.DELETE_ACLS, false, true),
DESCRIBE_CONFIGS(ApiMessageType.DESCRIBE_CONFIGS),
ALTER_CONFIGS(ApiMessageType.ALTER_CONFIGS, false, true),
ALTER_REPLICA_LOG_DIRS(ApiMessageType.ALTER_REPLICA_LOG_DIRS),
DESCRIBE_LOG_DIRS(ApiMessageType.DESCRIBE_LOG_DIRS),
SASL_AUTHENTICATE(ApiMessageType.SASL_AUTHENTICATE),
CREATE_PARTITIONS(ApiMessageType.CREATE_PARTITIONS, false, true),
CREATE_DELEGATION_TOKEN(ApiMessageType.CREATE_DELEGATION_TOKEN, false, true),
RENEW_DELEGATION_TOKEN(ApiMessageType.RENEW_DELEGATION_TOKEN, false, true),
EXPIRE_DELEGATION_TOKEN(ApiMessageType.EXPIRE_DELEGATION_TOKEN, false, true),
DESCRIBE_DELEGATION_TOKEN(ApiMessageType.DESCRIBE_DELEGATION_TOKEN),
DELETE_GROUPS(ApiMessageType.DELETE_GROUPS),
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS, false, true),
INCREMENTAL_ALTER_CONFIGS(ApiMessageType.INCREMENTAL_ALTER_CONFIGS, false, true),
ALTER_PARTITION_REASSIGNMENTS(ApiMessageType.ALTER_PARTITION_REASSIGNMENTS, false, true),
LIST_PARTITION_REASSIGNMENTS(ApiMessageType.LIST_PARTITION_REASSIGNMENTS, false, true),
OFFSET_DELETE(ApiMessageType.OFFSET_DELETE),
DESCRIBE_CLIENT_QUOTAS(ApiMessageType.DESCRIBE_CLIENT_QUOTAS),
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, true),
ALTER_PARTITION(ApiMessageType.ALTER_PARTITION, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, true, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false),
DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false),
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
static {
for (ApiMessageType.ListenerType listenerType : ApiMessageType.ListenerType.values()) {
APIS_BY_LISTENER.put(listenerType, filterApisForListener(listenerType));
}
}
// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
.collect(Collectors.toMap(key -> (int) key.id, Function.identity()));
/** the permanent and immutable id of an API - this can't change ever */
public final short id;
/** An english description of the api - used for debugging and metric names, it can potentially be changed via a KIP */
public final String name;
/** indicates if this is a ClusterAction request used only by brokers */
public final boolean clusterAction;
/** indicates the minimum required inter broker magic required to support the API */
public final byte minRequiredInterBrokerMagic;
/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;
public final boolean requiresDelayedAllocation;
public final ApiMessageType messageType;
ApiKeys(ApiMessageType messageType) {
this(messageType, false);
}
ApiKeys(ApiMessageType messageType, boolean clusterAction) {
this(messageType, clusterAction, RecordBatch.MAGIC_VALUE_V0, false);
}
ApiKeys(ApiMessageType messageType, boolean clusterAction, boolean forwardable) {
this(messageType, clusterAction, RecordBatch.MAGIC_VALUE_V0, forwardable);
}
ApiKeys(
ApiMessageType messageType,
boolean clusterAction,
byte minRequiredInterBrokerMagic,
boolean forwardable
) {
this.messageType = messageType;
this.id = messageType.apiKey();
this.name = messageType.name;
this.clusterAction = clusterAction;
this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas());
this.forwardable = forwardable;
}
private static boolean shouldRetainsBufferReference(Schema[] requestSchemas) {
boolean requestRetainsBufferReference = false;
for (Schema requestVersionSchema : requestSchemas) {
if (retainsBufferReference(requestVersionSchema)) {
requestRetainsBufferReference = true;
break;
}
}
return requestRetainsBufferReference;
}
public static ApiKeys forId(int id) {
ApiKeys apiKey = ID_TO_TYPE.get(id);
if (apiKey == null) {
throw new IllegalArgumentException("Unexpected api key: " + id);
}
return apiKey;
}
public static boolean hasId(int id) {
return ID_TO_TYPE.containsKey(id);
}
public short latestVersion() {
return messageType.highestSupportedVersion();
}
public short oldestVersion() {
return messageType.lowestSupportedVersion();
}
public List<Short> allVersions() {
List<Short> versions = new ArrayList<>(latestVersion() - oldestVersion() + 1);
for (short version = oldestVersion(); version <= latestVersion(); version++) {
versions.add(version);
}
return versions;
}
public boolean isVersionSupported(short apiVersion) {
return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
}
public short requestHeaderVersion(short apiVersion) {
return messageType.requestHeaderVersion(apiVersion);
}
public short responseHeaderVersion(short apiVersion) {
return messageType.responseHeaderVersion(apiVersion);
}
public boolean inScope(ApiMessageType.ListenerType listener) {
return messageType.listeners().contains(listener);
}
private static String toHtml() {
final StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>");
b.append("<th>Name</th>\n");
b.append("<th>Key</th>\n");
b.append("</tr>");
for (ApiKeys key : clientApis()) {
b.append("<tr>\n");
b.append("<td>");
b.append("<a href=\"#The_Messages_" + key.name + "\">" + key.name + "</a>");
b.append("</td>");
b.append("<td>");
b.append(key.id);
b.append("</td>");
b.append("</tr>\n");
}
b.append("</tbody></table>\n");
return b.toString();
}
public static void main(String[] args) {
System.out.println(toHtml());
}
private static boolean retainsBufferReference(Schema schema) {
final AtomicBoolean hasBuffer = new AtomicBoolean(false);
Schema.Visitor detector = new Schema.Visitor() {
@Override
public void visit(Type field) {
if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS ||
field == COMPACT_BYTES || field == COMPACT_NULLABLE_BYTES)
hasBuffer.set(true);
}
};
schema.walk(detector);
return hasBuffer.get();
}
public static EnumSet<ApiKeys> zkBrokerApis() {
return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
}
public static EnumSet<ApiKeys> controllerApis() {
return apisForListener(ApiMessageType.ListenerType.CONTROLLER);
}
public static EnumSet<ApiKeys> clientApis() {
List<ApiKeys> apis = Arrays.stream(ApiKeys.values())
.filter(apiKey -> apiKey.inScope(ApiMessageType.ListenerType.ZK_BROKER) || apiKey.inScope(ApiMessageType.ListenerType.BROKER))
.collect(Collectors.toList());
return EnumSet.copyOf(apis);
}
public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) {
return APIS_BY_LISTENER.get(listener);
}
private static EnumSet<ApiKeys> filterApisForListener(ApiMessageType.ListenerType listener) {
List<ApiKeys> apis = Arrays.stream(ApiKeys.values())
.filter(apiKey -> apiKey.inScope(listener))
.collect(Collectors.toList());
return EnumSet.copyOf(apis);
}
}
相关信息
相关文章
kafka DataOutputStreamWritable 源码
kafka MessageSizeAccumulator 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦