flink kafka connector scan.startup.mode 的几个选项
flink kafka创建表样例:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
scan.startup.mode指定了读取kafka的位置,有几个选项:
group-offsets
: start from committed offsets in ZK / Kafka brokers of a specific consumer group.earliest-offset
: start from the earliest offset possible.latest-offset
: start from the latest offset.timestamp
: start from user-supplied timestamp for each partition.specific-offsets
: start from user-supplied specific offsets for each partition.
The default option value is group-offsets
which indicates to consume from last committed offsets in ZK / Kafka brokers.
If timestamp
is specified, another config option scan.startup.timestamp-millis
is required to specify a specific startup timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
If specific-offsets
is specified, another config option scan.startup.specific-offsets
is required to specify specific startup offsets for each partition, e.g. an option value partition:0,offset:42;partition:1,offset:300
indicates offset 42
for partition 0
and offset 300
for partition 1
.
2
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦