kafka RecordsUtil 源码

  • 2022-10-20
  • 浏览 (490)

kafka RecordsUtil 代码

文件路径:/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.record;

import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class RecordsUtil {
    /**
     * Down convert batches to the provided message format version. The first offset parameter is only relevant in the
     * conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed records in v0 and v1
     * are not batched (put another way, each batch always has 1 record).
     *
     * If a client requests records in v1 format starting from the middle of an uncompressed batch in v2 format, we
     * need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for
     * correctness.
     *
     * The temporaryMemoryBytes computation assumes that the batches are not loaded into the heap
     * (via classes like FileChannelRecordBatch) before this method is called. This is the case in the broker (we
     * only load records into the heap when down converting), but it's not for the producer. However, down converting
     * in the producer is very uncommon and the extra complexity to handle that case is not worth it.
     */
    protected static ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
                                                                 long firstOffset, Time time) {
        // maintain the batch along with the decompressed records to avoid the need to decompress again
        List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
        int totalSizeEstimate = 0;
        long startNanos = time.nanoseconds();

        for (RecordBatch batch : batches) {
            if (toMagic < RecordBatch.MAGIC_VALUE_V2) {
                if (batch.isControlBatch())
                    continue;

                if (batch.compressionType() == CompressionType.ZSTD)
                    throw new UnsupportedCompressionTypeException("Down-conversion of zstandard-compressed batches " +
                        "is not supported");
            }

            if (batch.magic() <= toMagic) {
                totalSizeEstimate += batch.sizeInBytes();
                recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
            } else {
                List<Record> records = new ArrayList<>();
                for (Record record : batch) {
                    // See the method javadoc for an explanation
                    if (toMagic > RecordBatch.MAGIC_VALUE_V1 || batch.isCompressed() || record.offset() >= firstOffset)
                        records.add(record);
                }
                if (records.isEmpty())
                    continue;
                final long baseOffset;
                if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && toMagic >= RecordBatch.MAGIC_VALUE_V2)
                    baseOffset = batch.baseOffset();
                else
                    baseOffset = records.get(0).offset();
                totalSizeEstimate += AbstractRecords.estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
                recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
            }
        }

        ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
        long temporaryMemoryBytes = 0;
        int numRecordsConverted = 0;

        for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
            temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
            if (recordBatchAndRecords.batch.magic() <= toMagic) {
                buffer = Utils.ensureCapacity(buffer, buffer.position() + recordBatchAndRecords.batch.sizeInBytes());
                recordBatchAndRecords.batch.writeTo(buffer);
            } else {
                MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
                buffer = builder.buffer();
                temporaryMemoryBytes += builder.uncompressedBytesWritten();
                numRecordsConverted += builder.numRecords();
            }
        }

        buffer.flip();
        RecordConversionStats stats = new RecordConversionStats(temporaryMemoryBytes, numRecordsConverted,
                time.nanoseconds() - startNanos);
        return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
    }

    /**
     * Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
     * one (e.g. it may require expansion).
     */
    private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
        RecordBatch batch = recordBatchAndRecords.batch;
        final TimestampType timestampType = batch.timestampType();
        long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;

        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
                timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
        for (Record record : recordBatchAndRecords.records) {
            // Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
            if (magic > RecordBatch.MAGIC_VALUE_V1)
                builder.append(record);
            else
                builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
        }

        builder.close();
        return builder;
    }


    private static class RecordBatchAndRecords {
        private final RecordBatch batch;
        private final List<Record> records;
        private final Long baseOffset;

        private RecordBatchAndRecords(RecordBatch batch, List<Record> records, Long baseOffset) {
            this.batch = batch;
            this.records = records;
            this.baseOffset = baseOffset;
        }
    }

}

相关信息

kafka 源码目录

相关文章

kafka AbstractLegacyRecordBatch 源码

kafka AbstractRecordBatch 源码

kafka AbstractRecords 源码

kafka BaseRecords 源码

kafka ByteBufferLogInputStream 源码

kafka CompressionRatioEstimator 源码

kafka CompressionType 源码

kafka ControlRecordType 源码

kafka ControlRecordUtils 源码

kafka ConvertedRecords 源码

0  赞