kafka LazyDownConversionRecords 源码
kafka LazyDownConversionRecords 代码
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
* Encapsulation for holding records that require down-conversion in a lazy, chunked manner (KIP-283). See
* {@link LazyDownConversionRecordsSend} for the actual chunked send implementation.
public class LazyDownConversionRecords implements BaseRecords {
private final TopicPartition topicPartition;
private final Records records;
private final byte toMagic;
private final long firstOffset;
private ConvertedRecords firstConvertedBatch;
private final int sizeInBytes;
private final Time time;
* @param topicPartition The topic-partition to which records belong
* @param records Records to lazily down-convert
* @param toMagic Magic version to down-convert to
* @param firstOffset The starting offset for down-converted records. This only impacts some cases. See
* {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
* @param time The time instance to use
* @throws org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first batch to down-convert
* has a compression type which we do not support down-conversion for.
public LazyDownConversionRecords(TopicPartition topicPartition, Records records, byte toMagic, long firstOffset, Time time) {
this.topicPartition = Objects.requireNonNull(topicPartition);
this.records = Objects.requireNonNull(records);
this.toMagic = toMagic;
this.firstOffset = firstOffset;
this.time = Objects.requireNonNull(time);
// To make progress, kafka consumers require at least one full record batch per partition, i.e. we need to
// ensure we can accommodate one full batch of down-converted messages. We achieve this by having `sizeInBytes`
// factor in the size of the first down-converted batch and we return at least that many bytes.
java.util.Iterator<ConvertedRecords<?>> it = iterator(0);
if (it.hasNext()) {
firstConvertedBatch = it.next();
sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes());
} else {
// If there are messages before down-conversion and no messages after down-conversion,
// make sure we are able to send at least an overflow message to the consumer so that it can throw
// a RecordTooLargeException. Typically, the consumer would need to increase the fetch size in such cases.
// If there are no messages before down-conversion, we return an empty record batch.
firstConvertedBatch = null;
sizeInBytes = records.batches().iterator().hasNext() ? LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH : 0;
public int sizeInBytes() {
return sizeInBytes;
public LazyDownConversionRecordsSend toSend() {
return new LazyDownConversionRecordsSend(this);
public TopicPartition topicPartition() {
return topicPartition;
public boolean equals(Object o) {
if (o instanceof LazyDownConversionRecords) {
LazyDownConversionRecords that = (LazyDownConversionRecords) o;
return toMagic == that.toMagic &&
firstOffset == that.firstOffset &&
topicPartition.equals(that.topicPartition) &&
return false;
public int hashCode() {
int result = toMagic;
result = 31 * result + Long.hashCode(firstOffset);
result = 31 * result + topicPartition.hashCode();
result = 31 * result + records.hashCode();
return result;
public String toString() {
return "LazyDownConversionRecords(size=" + sizeInBytes +
", underlying=" + records +
", toMagic=" + toMagic +
", firstOffset=" + firstOffset +
public java.util.Iterator<ConvertedRecords<?>> iterator(long maximumReadSize) {
// We typically expect only one iterator instance to be created, so null out the first converted batch after
// first use to make it available for GC.
ConvertedRecords firstBatch = firstConvertedBatch;
firstConvertedBatch = null;
return new Iterator(records, maximumReadSize, firstBatch);
* Implementation for being able to iterate over down-converted records. Goal of this implementation is to keep
* it as memory-efficient as possible by not having to maintain all down-converted records in-memory. Maintains
* a view into batches of down-converted records.
private class Iterator extends AbstractIterator<ConvertedRecords<?>> {
private final AbstractIterator<? extends RecordBatch> batchIterator;
private final long maximumReadSize;
private ConvertedRecords firstConvertedBatch;
* @param recordsToDownConvert Records that require down-conversion
* @param maximumReadSize Maximum possible size of underlying records that will be down-converted in each call to
* {@link #makeNext()}. This is a soft limit as {@link #makeNext()} will always convert
* and return at least one full message batch.
private Iterator(Records recordsToDownConvert, long maximumReadSize, ConvertedRecords<?> firstConvertedBatch) {
this.batchIterator = recordsToDownConvert.batchIterator();
this.maximumReadSize = maximumReadSize;
this.firstConvertedBatch = firstConvertedBatch;
// If we already have the first down-converted batch, advance the underlying records iterator to next batch
if (firstConvertedBatch != null)
* Make next set of down-converted records
* @return Down-converted records
protected ConvertedRecords makeNext() {
// If we have cached the first down-converted batch, return that now
if (firstConvertedBatch != null) {
ConvertedRecords convertedBatch = firstConvertedBatch;
firstConvertedBatch = null;
return convertedBatch;
while (batchIterator.hasNext()) {
final List<RecordBatch> batches = new ArrayList<>();
boolean isFirstBatch = true;
long sizeSoFar = 0;
// Figure out batches we should down-convert based on the size constraints
while (batchIterator.hasNext() &&
(isFirstBatch || (batchIterator.peek().sizeInBytes() + sizeSoFar) <= maximumReadSize)) {
RecordBatch currentBatch = batchIterator.next();
sizeSoFar += currentBatch.sizeInBytes();
isFirstBatch = false;
ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
// During conversion, it is possible that we drop certain batches because they do not have an equivalent
// representation in the message format we want to convert to. For example, V0 and V1 message formats
// have no notion of transaction markers which were introduced in V2 so they get dropped during conversion.
// We return converted records only when we have at least one valid batch of messages after conversion.
if (convertedRecords.records().sizeInBytes() > 0)
return convertedRecords;
return allDone();
kafka AbstractLegacyRecordBatch 源码
kafka ByteBufferLogInputStream 源码
2、 - 优质文章
3、 gate.io
8、 golang
9、 openharmony
10、 Vue中input框自动聚焦