kafka ConsumerRecord 源码
kafka ConsumerRecord 代码
文件路径:/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import java.util.Optional;
/**
* A key/value pair to be received from Kafka. This also consists of a topic name and
* a partition number from which the record is being received, an offset that points
* to the record in a Kafka partition, and a timestamp as marked by the corresponding ProducerRecord.
*/
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
/**
* @deprecated checksums are no longer exposed by this class, this constant will be removed in Apache Kafka 4.0
* (deprecated since 3.0).
*/
@Deprecated
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private final Optional<Integer> leaderEpoch;
/**
* Creates a record to be received from a specified topic and partition (provided for
* compatibility with Kafka 0.9 before the message format supported timestamps and before
* serialized metadata were exposed).
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
*/
public ConsumerRecord(String topic,
int partition,
long offset,
K key,
V value) {
this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, NULL_SIZE, NULL_SIZE, key, value,
new RecordHeaders(), Optional.empty());
}
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record
* @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
*/
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
int serializedKeySize,
int serializedValueSize,
K key,
V value,
Headers headers,
Optional<Integer> leaderEpoch) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
if (headers == null)
throw new IllegalArgumentException("Headers cannot be null");
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.timestamp = timestamp;
this.timestampType = timestampType;
this.serializedKeySize = serializedKeySize;
this.serializedValueSize = serializedValueSize;
this.key = key;
this.value = value;
this.headers = headers;
this.leaderEpoch = leaderEpoch;
}
/**
* Creates a record to be received from a specified topic and partition (provided for
* compatibility with Kafka 0.10 before the message format supported headers).
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
*
* @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
* Apache Kafka 4.0 (deprecated since 3.0).
*/
@Deprecated
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
V value) {
this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize,
key, value, new RecordHeaders(), Optional.empty());
}
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record.
*
* @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
* Apache Kafka 4.0 (deprecated since 3.0).
*/
@Deprecated
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
Long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
V value,
Headers headers) {
this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize,
key, value, headers, Optional.empty());
}
/**
* Creates a record to be received from a specified topic and partition
*
* @param topic The topic this record is received from
* @param partition The partition of the topic this record is received from
* @param offset The offset of this record in the corresponding Kafka partition
* @param timestamp The timestamp of the record.
* @param timestampType The timestamp type
* @param serializedKeySize The length of the serialized key
* @param serializedValueSize The length of the serialized value
* @param key The key of the record, if one exists (null is allowed)
* @param value The record contents
* @param headers The headers of the record
* @param leaderEpoch Optional leader epoch of the record (may be empty for legacy record formats)
*
* @deprecated use one of the constructors without a `checksum` parameter. This constructor will be removed in
* Apache Kafka 4.0 (deprecated since 3.0).
*/
@Deprecated
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
Long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
V value,
Headers headers,
Optional<Integer> leaderEpoch) {
this(topic, partition, offset, timestamp, timestampType, serializedKeySize, serializedValueSize, key, value, headers,
leaderEpoch);
}
/**
* The topic this record is received from (never null)
*/
public String topic() {
return this.topic;
}
/**
* The partition from which this record is received
*/
public int partition() {
return this.partition;
}
/**
* The headers (never null)
*/
public Headers headers() {
return headers;
}
/**
* The key (or null if no key is specified)
*/
public K key() {
return key;
}
/**
* The value
*/
public V value() {
return value;
}
/**
* The position of this record in the corresponding Kafka partition.
*/
public long offset() {
return offset;
}
/**
* The timestamp of this record
*/
public long timestamp() {
return timestamp;
}
/**
* The timestamp type of this record
*/
public TimestampType timestampType() {
return timestampType;
}
/**
* The size of the serialized, uncompressed key in bytes. If key is null, the returned size
* is -1.
*/
public int serializedKeySize() {
return this.serializedKeySize;
}
/**
* The size of the serialized, uncompressed value in bytes. If value is null, the
* returned size is -1.
*/
public int serializedValueSize() {
return this.serializedValueSize;
}
/**
* Get the leader epoch for the record if available
*
* @return the leader epoch or empty for legacy record formats
*/
public Optional<Integer> leaderEpoch() {
return leaderEpoch;
}
@Override
public String toString() {
return "ConsumerRecord(topic = " + topic
+ ", partition = " + partition
+ ", leaderEpoch = " + leaderEpoch.orElse(null)
+ ", offset = " + offset
+ ", " + timestampType + " = " + timestamp
+ ", serialized key size = " + serializedKeySize
+ ", serialized value size = " + serializedValueSize
+ ", headers = " + headers
+ ", key = " + key
+ ", value = " + value + ")";
}
}
相关信息
相关文章
kafka CommitFailedException 源码
kafka ConsumerGroupMetadata 源码
kafka ConsumerPartitionAssignor 源码
kafka ConsumerRebalanceListener 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦