kafka FetchResponse 源码

  • 2022-10-20
  • 浏览 (502)

kafka FetchResponse 代码

文件路径:/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.requests;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;

/**
 * This wrapper supports all versions of the Fetch API
 *
 * Possible error codes:
 *
 * - {@link Errors#OFFSET_OUT_OF_RANGE} If the fetch offset is out of range for a requested partition
 * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have READ access to a requested topic
 * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker with version < 2.6 which is not a replica
 * - {@link Errors#NOT_LEADER_OR_FOLLOWER} If the broker is not a leader or follower and either the provided leader epoch
 *     matches the known leader epoch on the broker or is empty
 * - {@link Errors#FENCED_LEADER_EPOCH} If the epoch is lower than the broker's epoch
 * - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch
 * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} If the broker does not have metadata for a topic or partition
 * - {@link Errors#KAFKA_STORAGE_ERROR} If the log directory for one of the requested partitions is offline
 * - {@link Errors#UNSUPPORTED_COMPRESSION_TYPE} If a fetched topic is using a compression type which is
 *     not supported by the fetch request version
 * - {@link Errors#CORRUPT_MESSAGE} If corrupt message encountered, e.g. when the broker scans the log to find
 *     the fetch offset after the index lookup
 * - {@link Errors#UNKNOWN_TOPIC_ID} If the request contains a topic ID unknown to the broker
 * - {@link Errors#FETCH_SESSION_TOPIC_ID_ERROR} If the request version supports topic IDs but the session does not or vice versa,
 *     or a topic ID in the request is inconsistent with a topic ID in the session
 * - {@link Errors#INCONSISTENT_TOPIC_ID} If a topic ID in the session does not match the topic ID in the log
 * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
 */
public class FetchResponse extends AbstractResponse {
    public static final long INVALID_HIGH_WATERMARK = -1L;
    public static final long INVALID_LAST_STABLE_OFFSET = -1L;
    public static final long INVALID_LOG_START_OFFSET = -1L;
    public static final int INVALID_PREFERRED_REPLICA_ID = -1;

    private final FetchResponseData data;
    // we build responseData when needed.
    private volatile LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = null;

    @Override
    public FetchResponseData data() {
        return data;
    }

    /**
     * From version 3 or later, the authorized and existing entries in `FetchRequest.fetchData` should be in the same order in `responseData`.
     * Version 13 introduces topic IDs which can lead to a few new errors. If there is any unknown topic ID in the request, the
     * response will contain a partition-level UNKNOWN_TOPIC_ID error for that partition.
     * If a request's topic ID usage is inconsistent with the session, we will return a top level FETCH_SESSION_TOPIC_ID_ERROR error.
     * We may also return INCONSISTENT_TOPIC_ID error as a partition-level error when a partition in the session has a topic ID
     * inconsistent with the log.
     */
    public FetchResponse(FetchResponseData fetchResponseData) {
        super(ApiKeys.FETCH);
        this.data = fetchResponseData;
    }

    public Errors error() {
        return Errors.forCode(data.errorCode());
    }

    public LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData(Map<Uuid, String> topicNames, short version) {
        if (responseData == null) {
            synchronized (this) {
                if (responseData == null) {
                    // Assigning the lazy-initialized `responseData` in the last step
                    // to avoid other threads accessing a half-initialized object.
                    final LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseDataTmp =
                            new LinkedHashMap<>();
                    data.responses().forEach(topicResponse -> {
                        String name;
                        if (version < 13) {
                            name = topicResponse.topic();
                        } else {
                            name = topicNames.get(topicResponse.topicId());
                        }
                        if (name != null) {
                            topicResponse.partitions().forEach(partition ->
                                responseDataTmp.put(new TopicPartition(name, partition.partitionIndex()), partition));
                        }
                    });
                    responseData = responseDataTmp;
                }
            }
        }
        return responseData;
    }

    @Override
    public int throttleTimeMs() {
        return data.throttleTimeMs();
    }

    @Override
    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
        data.setThrottleTimeMs(throttleTimeMs);
    }

    public int sessionId() {
        return data.sessionId();
    }

    @Override
    public Map<Errors, Integer> errorCounts() {
        Map<Errors, Integer> errorCounts = new HashMap<>();
        updateErrorCounts(errorCounts, error());
        data.responses().forEach(topicResponse ->
            topicResponse.partitions().forEach(partition ->
                updateErrorCounts(errorCounts, Errors.forCode(partition.errorCode())))
        );
        return errorCounts;
    }

    public static FetchResponse parse(ByteBuffer buffer, short version) {
        return new FetchResponse(new FetchResponseData(new ByteBufferAccessor(buffer), version));
    }

    // Fetch versions 13 and above should have topic IDs for all topics.
    // Fetch versions < 13 should return the empty set.
    public Set<Uuid> topicIds() {
        return data.responses().stream().map(FetchResponseData.FetchableTopicResponse::topicId).filter(id -> !id.equals(Uuid.ZERO_UUID)).collect(Collectors.toSet());
    }

    /**
     * Convenience method to find the size of a response.
     *
     * @param version       The version of the response to use.
     * @param partIterator  The partition iterator.
     * @return              The response size in bytes.
     */
    public static int sizeOf(short version,
                             Iterator<Map.Entry<TopicIdPartition,
                             FetchResponseData.PartitionData>> partIterator) {
        // Since the throttleTimeMs and metadata field sizes are constant and fixed, we can
        // use arbitrary values here without affecting the result.
        FetchResponseData data = toMessage(Errors.NONE, 0, INVALID_SESSION_ID, partIterator);
        ObjectSerializationCache cache = new ObjectSerializationCache();
        return 4 + data.size(cache, version);
    }

    @Override
    public boolean shouldClientThrottle(short version) {
        return version >= 8;
    }

    public static Optional<FetchResponseData.EpochEndOffset> divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
        return partitionResponse.divergingEpoch().epoch() < 0 ? Optional.empty()
                : Optional.of(partitionResponse.divergingEpoch());
    }

    public static boolean isDivergingEpoch(FetchResponseData.PartitionData partitionResponse) {
        return partitionResponse.divergingEpoch().epoch() >= 0;
    }

    public static Optional<Integer> preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
        return partitionResponse.preferredReadReplica() == INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
                : Optional.of(partitionResponse.preferredReadReplica());
    }

    public static boolean isPreferredReplica(FetchResponseData.PartitionData partitionResponse) {
        return partitionResponse.preferredReadReplica() != INVALID_PREFERRED_REPLICA_ID;
    }

    public static FetchResponseData.PartitionData partitionResponse(TopicIdPartition topicIdPartition, Errors error) {
        return partitionResponse(topicIdPartition.topicPartition().partition(), error);
    }

    public static FetchResponseData.PartitionData partitionResponse(int partition, Errors error) {
        return new FetchResponseData.PartitionData()
            .setPartitionIndex(partition)
            .setErrorCode(error.code())
            .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK);
    }

    /**
     * Returns `partition.records` as `Records` (instead of `BaseRecords`). If `records` is `null`, returns `MemoryRecords.EMPTY`.
     *
     * If this response was deserialized after a fetch, this method should never fail. An example where this would
     * fail is a down-converted response (e.g. LazyDownConversionRecords) on the broker (before it's serialized and
     * sent on the wire).
     *
     * @param partition partition data
     * @return Records or empty record if the records in PartitionData is null.
     */
    public static Records recordsOrFail(FetchResponseData.PartitionData partition) {
        if (partition.records() == null) return MemoryRecords.EMPTY;
        if (partition.records() instanceof Records) return (Records) partition.records();
        throw new ClassCastException("The record type is " + partition.records().getClass().getSimpleName() + ", which is not a subtype of " +
            Records.class.getSimpleName() + ". This method is only safe to call if the `FetchResponse` was deserialized from bytes.");
    }

    /**
     * @return The size in bytes of the records. 0 is returned if records of input partition is null.
     */
    public static int recordsSize(FetchResponseData.PartitionData partition) {
        return partition.records() == null ? 0 : partition.records().sizeInBytes();
    }

    // TODO: remove as a part of KAFKA-12410
    public static FetchResponse of(Errors error,
                                   int throttleTimeMs,
                                   int sessionId,
                                   LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> responseData) {
        return new FetchResponse(toMessage(error, throttleTimeMs, sessionId, responseData.entrySet().iterator()));
    }

    private static boolean matchingTopic(FetchResponseData.FetchableTopicResponse previousTopic, TopicIdPartition currentTopic) {
        if (previousTopic == null)
            return false;
        if (!previousTopic.topicId().equals(Uuid.ZERO_UUID))
            return previousTopic.topicId().equals(currentTopic.topicId());
        else
            return previousTopic.topic().equals(currentTopic.topicPartition().topic());

    }

    private static FetchResponseData toMessage(Errors error,
                                               int throttleTimeMs,
                                               int sessionId,
                                               Iterator<Map.Entry<TopicIdPartition, FetchResponseData.PartitionData>> partIterator) {
        List<FetchResponseData.FetchableTopicResponse> topicResponseList = new ArrayList<>();
        while (partIterator.hasNext()) {
            Map.Entry<TopicIdPartition, FetchResponseData.PartitionData> entry = partIterator.next();
            FetchResponseData.PartitionData partitionData = entry.getValue();
            // Since PartitionData alone doesn't know the partition ID, we set it here
            partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
            // We have to keep the order of input topic-partition. Hence, we batch the partitions only if the last
            // batch is in the same topic group.
            FetchResponseData.FetchableTopicResponse previousTopic = topicResponseList.isEmpty() ? null
                : topicResponseList.get(topicResponseList.size() - 1);
            if (matchingTopic(previousTopic, entry.getKey()))
                previousTopic.partitions().add(partitionData);
            else {
                List<FetchResponseData.PartitionData> partitionResponses = new ArrayList<>();
                partitionResponses.add(partitionData);
                topicResponseList.add(new FetchResponseData.FetchableTopicResponse()
                    .setTopic(entry.getKey().topicPartition().topic())
                    .setTopicId(entry.getKey().topicId())
                    .setPartitions(partitionResponses));
            }
        }

        return new FetchResponseData()
            .setThrottleTimeMs(throttleTimeMs)
            .setErrorCode(error.code())
            .setSessionId(sessionId)
            .setResponses(topicResponseList);
    }
}

相关信息

kafka 源码目录

相关文章

kafka AbstractControlRequest 源码

kafka AbstractRequest 源码

kafka AbstractRequestResponse 源码

kafka AbstractResponse 源码

kafka AddOffsetsToTxnRequest 源码

kafka AddOffsetsToTxnResponse 源码

kafka AddPartitionsToTxnRequest 源码

kafka AddPartitionsToTxnResponse 源码

kafka AllocateProducerIdsRequest 源码

kafka AllocateProducerIdsResponse 源码

0  赞