kafka AbstractRecords 源码
kafka AbstractRecords 代码
文件路径:/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.Iterator;
public abstract class AbstractRecords implements Records {
private final Iterable<Record> records = this::recordsIterator;
@Override
public boolean hasMatchingMagic(byte magic) {
for (RecordBatch batch : batches())
if (batch.magic() != magic)
return false;
return true;
}
public RecordBatch firstBatch() {
Iterator<? extends RecordBatch> iterator = batches().iterator();
if (!iterator.hasNext())
return null;
return iterator.next();
}
/**
* Get an iterator over the deep records.
* @return An iterator over the records
*/
@Override
public Iterable<Record> records() {
return records;
}
@Override
public DefaultRecordsSend<Records> toSend() {
return new DefaultRecordsSend<>(this);
}
private Iterator<Record> recordsIterator() {
return new AbstractIterator<Record>() {
private final Iterator<? extends RecordBatch> batches = batches().iterator();
private Iterator<Record> records;
@Override
protected Record makeNext() {
if (records != null && records.hasNext())
return records.next();
if (batches.hasNext()) {
records = batches.next().iterator();
return makeNext();
}
return allDone();
}
};
}
public static int estimateSizeInBytes(byte magic,
long baseOffset,
CompressionType compressionType,
Iterable<Record> records) {
int size = 0;
if (magic <= RecordBatch.MAGIC_VALUE_V1) {
for (Record record : records)
size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
} else {
size = DefaultRecordBatch.sizeInBytes(baseOffset, records);
}
return estimateCompressedSizeInBytes(size, compressionType);
}
public static int estimateSizeInBytes(byte magic,
CompressionType compressionType,
Iterable<SimpleRecord> records) {
int size = 0;
if (magic <= RecordBatch.MAGIC_VALUE_V1) {
for (SimpleRecord record : records)
size += Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, record.key(), record.value());
} else {
size = DefaultRecordBatch.sizeInBytes(records);
}
return estimateCompressedSizeInBytes(size, compressionType);
}
private static int estimateCompressedSizeInBytes(int size, CompressionType compressionType) {
return compressionType == CompressionType.NONE ? size : Math.min(Math.max(size / 2, 1024), 1 << 16);
}
/**
* Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
* an estimate because it does not take into account overhead from the compression algorithm.
*/
public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, byte[] key, byte[] value, Header[] headers) {
return estimateSizeInBytesUpperBound(magic, compressionType, Utils.wrapNullable(key), Utils.wrapNullable(value), headers);
}
/**
* Get an upper bound estimate on the batch size needed to hold a record with the given fields. This is only
* an estimate because it does not take into account overhead from the compression algorithm.
*/
public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key,
ByteBuffer value, Header[] headers) {
if (magic >= RecordBatch.MAGIC_VALUE_V2)
return DefaultRecordBatch.estimateBatchSizeUpperBound(key, value, headers);
else if (compressionType != CompressionType.NONE)
return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic) + LegacyRecord.recordSize(magic, key, value);
else
return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
}
/**
* Return the size of the record batch header.
*
* For V0 and V1 with no compression, it's unclear if Records.LOG_OVERHEAD or 0 should be chosen. There is no header
* per batch, but a sequence of batches is preceded by the offset and size. This method returns `0` as it's what
* `MemoryRecordsBuilder` requires.
*/
public static int recordBatchHeaderSizeInBytes(byte magic, CompressionType compressionType) {
if (magic > RecordBatch.MAGIC_VALUE_V1) {
return DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
} else if (compressionType != CompressionType.NONE) {
return Records.LOG_OVERHEAD + LegacyRecord.recordOverhead(magic);
} else {
return 0;
}
}
}
相关信息
相关文章
kafka AbstractLegacyRecordBatch 源码
kafka ByteBufferLogInputStream 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦