kafka AbstractLegacyRecordBatch 源码

  • 2022-10-20
  • 浏览 (559)

kafka AbstractLegacyRecordBatch 代码


 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *    http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.kafka.common.record;

import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.OptionalLong;

import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;

 * This {@link RecordBatch} implementation is for magic versions 0 and 1. In addition to implementing
 * {@link RecordBatch}, it also implements {@link Record}, which exposes the duality of the old message
 * format in its handling of compressed messages. The wrapper record is considered the record batch in this
 * interface, while the inner records are considered the log records (though they both share the same schema).
 * In general, this class should not be used directly. Instances of {@link Records} provides access to this
 * class indirectly through the {@link RecordBatch} interface.
public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch implements Record {

    public abstract LegacyRecord outerRecord();

    public long lastOffset() {
        return offset();

    public boolean isValid() {
        return outerRecord().isValid();

    public void ensureValid() {

    public int keySize() {
        return outerRecord().keySize();

    public boolean hasKey() {
        return outerRecord().hasKey();

    public ByteBuffer key() {
        return outerRecord().key();

    public int valueSize() {
        return outerRecord().valueSize();

    public boolean hasValue() {
        return !outerRecord().hasNullValue();

    public ByteBuffer value() {
        return outerRecord().value();

    public Header[] headers() {
        return Record.EMPTY_HEADERS;

    public boolean hasMagic(byte magic) {
        return magic == outerRecord().magic();

    public boolean hasTimestampType(TimestampType timestampType) {
        return outerRecord().timestampType() == timestampType;

    public long checksum() {
        return outerRecord().checksum();

    public long maxTimestamp() {
        return timestamp();

    public long timestamp() {
        return outerRecord().timestamp();

    public TimestampType timestampType() {
        return outerRecord().timestampType();

    public long baseOffset() {
        return iterator().next().offset();

    public byte magic() {
        return outerRecord().magic();

    public CompressionType compressionType() {
        return outerRecord().compressionType();

    public int sizeInBytes() {
        return outerRecord().sizeInBytes() + LOG_OVERHEAD;

    public Integer countOrNull() {
        return null;

    public String toString() {
        return "LegacyRecordBatch(offset=" + offset() + ", " + outerRecord() + ")";

    public void writeTo(ByteBuffer buffer) {
        writeHeader(buffer, offset(), outerRecord().sizeInBytes());

    public long producerId() {
        return RecordBatch.NO_PRODUCER_ID;

    public short producerEpoch() {
        return RecordBatch.NO_PRODUCER_EPOCH;

    public boolean hasProducerId() {
        return false;

    public int sequence() {
        return RecordBatch.NO_SEQUENCE;

    public int baseSequence() {
        return RecordBatch.NO_SEQUENCE;

    public int lastSequence() {
        return RecordBatch.NO_SEQUENCE;

    public boolean isTransactional() {
        return false;

    public int partitionLeaderEpoch() {
        return RecordBatch.NO_PARTITION_LEADER_EPOCH;

    public boolean isControlBatch() {
        return false;

    public OptionalLong deleteHorizonMs() {
        return OptionalLong.empty();

     * Get an iterator for the nested entries contained within this batch. Note that
     * if the batch is not compressed, then this method will return an iterator over the
     * shallow record only (i.e. this object).
     * @return An iterator over the records contained within this batch
    public Iterator<Record> iterator() {
        return iterator(BufferSupplier.NO_CACHING);

    CloseableIterator<Record> iterator(BufferSupplier bufferSupplier) {
        if (isCompressed())
            return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier);

        return new CloseableIterator<Record>() {
            private boolean hasNext = true;

            public void close() {}

            public boolean hasNext() {
                return hasNext;

            public Record next() {
                if (!hasNext)
                    throw new NoSuchElementException();
                hasNext = false;
                return AbstractLegacyRecordBatch.this;

            public void remove() {
                throw new UnsupportedOperationException();

    public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
        // the older message format versions do not support streaming, so we return the normal iterator
        return iterator(bufferSupplier);

    static void writeHeader(ByteBuffer buffer, long offset, int size) {

    static void writeHeader(DataOutputStream out, long offset, int size) throws IOException {

    private static final class DataLogInputStream implements LogInputStream<AbstractLegacyRecordBatch> {
        private final InputStream stream;
        protected final int maxMessageSize;
        private final ByteBuffer offsetAndSizeBuffer;

        DataLogInputStream(InputStream stream, int maxMessageSize) {
            this.stream = stream;
            this.maxMessageSize = maxMessageSize;
            this.offsetAndSizeBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);

        public AbstractLegacyRecordBatch nextBatch() throws IOException {
            Utils.readFully(stream, offsetAndSizeBuffer);
            if (offsetAndSizeBuffer.hasRemaining())
                return null;

            long offset = offsetAndSizeBuffer.getLong(Records.OFFSET_OFFSET);
            int size = offsetAndSizeBuffer.getInt(Records.SIZE_OFFSET);
            if (size < LegacyRecord.RECORD_OVERHEAD_V0)
                throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0));
            if (size > maxMessageSize)
                throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));

            ByteBuffer batchBuffer = ByteBuffer.allocate(size);
            Utils.readFully(stream, batchBuffer);
            if (batchBuffer.hasRemaining())
                return null;

            return new BasicLegacyRecordBatch(offset, new LegacyRecord(batchBuffer));

    private static class DeepRecordsIterator extends AbstractIterator<Record> implements CloseableIterator<Record> {
        private final ArrayDeque<AbstractLegacyRecordBatch> innerEntries;
        private final long absoluteBaseOffset;
        private final byte wrapperMagic;

        private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
                                    boolean ensureMatchingMagic,
                                    int maxMessageSize,
                                    BufferSupplier bufferSupplier) {
            LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
            this.wrapperMagic = wrapperRecord.magic();
            if (wrapperMagic != RecordBatch.MAGIC_VALUE_V0 && wrapperMagic != RecordBatch.MAGIC_VALUE_V1)
                throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);

            CompressionType compressionType = wrapperRecord.compressionType();
            if (compressionType == CompressionType.ZSTD)
                throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic);
            ByteBuffer wrapperValue = wrapperRecord.value();
            if (wrapperValue == null)
                throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
                        wrapperMagic + ")");

            InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier);
            LogInputStream<AbstractLegacyRecordBatch> logStream = new DataLogInputStream(stream, maxMessageSize);

            long lastOffsetFromWrapper = wrapperEntry.lastOffset();
            long timestampFromWrapper = wrapperRecord.timestamp();
            this.innerEntries = new ArrayDeque<>();

            // If relative offset is used, we need to decompress the entire message first to compute
            // the absolute offset. For simplicity and because it's a format that is on its way out, we
            // do the same for message format version 0
            try {
                while (true) {
                    AbstractLegacyRecordBatch innerEntry = logStream.nextBatch();
                    if (innerEntry == null)

                    LegacyRecord record = innerEntry.outerRecord();
                    byte magic = record.magic();

                    if (ensureMatchingMagic && magic != wrapperMagic)
                        throw new InvalidRecordException("Compressed message magic " + magic +
                                " does not match wrapper magic " + wrapperMagic);

                    if (magic == RecordBatch.MAGIC_VALUE_V1) {
                        LegacyRecord recordWithTimestamp = new LegacyRecord(
                        innerEntry = new BasicLegacyRecordBatch(innerEntry.lastOffset(), recordWithTimestamp);


                if (innerEntries.isEmpty())
                    throw new InvalidRecordException("Found invalid compressed record set with no inner records");

                if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) {
                    if (lastOffsetFromWrapper == 0) {
                        // The outer offset may be 0 if this is produce data from certain versions of librdkafka.
                        this.absoluteBaseOffset = 0;
                    } else {
                        long lastInnerOffset = innerEntries.getLast().offset();
                        if (lastOffsetFromWrapper < lastInnerOffset)
                            throw new InvalidRecordException("Found invalid wrapper offset in compressed v1 message set, " +
                                    "wrapper offset '" + lastOffsetFromWrapper + "' is less than the last inner message " +
                                    "offset '" + lastInnerOffset + "' and it is not zero.");
                        this.absoluteBaseOffset = lastOffsetFromWrapper - lastInnerOffset;
                } else {
                    this.absoluteBaseOffset = -1;
            } catch (IOException e) {
                throw new KafkaException(e);
            } finally {
                Utils.closeQuietly(stream, "records iterator stream");

        protected Record makeNext() {
            if (innerEntries.isEmpty())
                return allDone();

            AbstractLegacyRecordBatch entry = innerEntries.remove();

            // Convert offset to absolute offset if needed.
            if (wrapperMagic == RecordBatch.MAGIC_VALUE_V1) {
                long absoluteOffset = absoluteBaseOffset + entry.offset();
                entry = new BasicLegacyRecordBatch(absoluteOffset, entry.outerRecord());

            if (entry.isCompressed())
                throw new InvalidRecordException("Inner messages must not be compressed");

            return entry;

        public void close() {}

    private static class BasicLegacyRecordBatch extends AbstractLegacyRecordBatch {
        private final LegacyRecord record;
        private final long offset;

        private BasicLegacyRecordBatch(long offset, LegacyRecord record) {
            this.offset = offset;
            this.record = record;

        public long offset() {
            return offset;

        public LegacyRecord outerRecord() {
            return record;

        public boolean equals(Object o) {
            if (this == o)
                return true;
            if (o == null || getClass() != o.getClass())
                return false;

            BasicLegacyRecordBatch that = (BasicLegacyRecordBatch) o;

            return offset == that.offset &&
                Objects.equals(record, that.record);

        public int hashCode() {
            int result = record != null ? record.hashCode() : 0;
            result = 31 * result + Long.hashCode(offset);
            return result;

    static class ByteBufferLegacyRecordBatch extends AbstractLegacyRecordBatch implements MutableRecordBatch {
        private final ByteBuffer buffer;
        private final LegacyRecord record;

        ByteBufferLegacyRecordBatch(ByteBuffer buffer) {
            this.buffer = buffer;
            this.record = new LegacyRecord(buffer.slice());

        public long offset() {
            return buffer.getLong(OFFSET_OFFSET);

        public OptionalLong deleteHorizonMs() {
            return OptionalLong.empty();

        public LegacyRecord outerRecord() {
            return record;

        public void setLastOffset(long offset) {
            buffer.putLong(OFFSET_OFFSET, offset);

        public void setMaxTimestamp(TimestampType timestampType, long timestamp) {
            if (record.magic() == RecordBatch.MAGIC_VALUE_V0)
                throw new UnsupportedOperationException("Cannot set timestamp for a record with magic = 0");

            long currentTimestamp = record.timestamp();
            // We don't need to recompute crc if the timestamp is not updated.
            if (record.timestampType() == timestampType && currentTimestamp == timestamp)

            setTimestampAndUpdateCrc(timestampType, timestamp);

        public void setPartitionLeaderEpoch(int epoch) {
            throw new UnsupportedOperationException("Magic versions prior to 2 do not support partition leader epoch");

        private void setTimestampAndUpdateCrc(TimestampType timestampType, long timestamp) {
            byte attributes = LegacyRecord.computeAttributes(magic(), compressionType(), timestampType);
            buffer.put(LOG_OVERHEAD + LegacyRecord.ATTRIBUTES_OFFSET, attributes);
            buffer.putLong(LOG_OVERHEAD + LegacyRecord.TIMESTAMP_OFFSET, timestamp);
            long crc = record.computeChecksum();
            ByteUtils.writeUnsignedInt(buffer, LOG_OVERHEAD + LegacyRecord.CRC_OFFSET, crc);

         * LegacyRecordBatch does not implement this iterator and would hence fallback to the normal iterator.
         * @return An iterator over the records contained within this batch
        public CloseableIterator<Record> skipKeyValueIterator(BufferSupplier bufferSupplier) {
            return CloseableIterator.wrap(iterator(bufferSupplier));

        public void writeTo(ByteBufferOutputStream outputStream) {

        public boolean equals(Object o) {
            if (this == o)
                return true;
            if (o == null || getClass() != o.getClass())
                return false;

            ByteBufferLegacyRecordBatch that = (ByteBufferLegacyRecordBatch) o;

            return Objects.equals(buffer, that.buffer);

        public int hashCode() {
            return buffer != null ? buffer.hashCode() : 0;

    static class LegacyFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {

        LegacyFileChannelRecordBatch(long offset,
                                     byte magic,
                                     FileRecords fileRecords,
                                     int position,
                                     int batchSize) {
            super(offset, magic, fileRecords, position, batchSize);

        protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
            return new ByteBufferLegacyRecordBatch(buffer);

        public long baseOffset() {
            return loadFullBatch().baseOffset();

        public OptionalLong deleteHorizonMs() {
            return OptionalLong.empty();

        public long lastOffset() {
            return offset;

        public long producerId() {
            return RecordBatch.NO_PRODUCER_ID;

        public short producerEpoch() {
            return RecordBatch.NO_PRODUCER_EPOCH;

        public int baseSequence() {
            return RecordBatch.NO_SEQUENCE;

        public int lastSequence() {
            return RecordBatch.NO_SEQUENCE;

        public Integer countOrNull() {
            return null;

        public boolean isTransactional() {
            return false;

        public boolean isControlBatch() {
            return false;

        public int partitionLeaderEpoch() {
            return RecordBatch.NO_PARTITION_LEADER_EPOCH;

        protected int headerSize() {
            return LOG_OVERHEAD + LegacyRecord.headerSize(magic);




kafka 源码目录


kafka AbstractRecordBatch 源码

kafka AbstractRecords 源码

kafka BaseRecords 源码

kafka ByteBufferLogInputStream 源码

kafka CompressionRatioEstimator 源码

kafka CompressionType 源码

kafka ControlRecordType 源码

kafka ControlRecordUtils 源码

kafka ConvertedRecords 源码

kafka DefaultRecord 源码

0  赞