kafka ConsumerRebalanceListener 源码
kafka ConsumerRebalanceListener 代码
文件路径:/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import java.time.Duration;
import java.util.Collection;
import org.apache.kafka.common.TopicPartition;
/**
* A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the
* consumer changes.
* <p>
* This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer directly assigns partitions,
* those partitions will never be reassigned and this callback is not applicable.
* <p>
* When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group change or the subscription
* of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
* Partition re-assignments can also be triggered by changes affecting the subscribed topics (e.g. when the number of partitions is
* administratively adjusted).
* <p>
* There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
* the {@link #onPartitionsRevoked(Collection)} call we can ensure that any time partition assignment changes
* the offset gets saved.
* <p>
* Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
* consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
* number of page views per user for each five minute window. Let's say the topic is partitioned by the user id so that
* all events for a particular user go to a single consumer instance. The consumer can keep in memory a running
* tally of actions per user and only flush these out to a remote data store when its cache gets too big. However if a
* partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
* consumption.
* <p>
* This callback will only execute in the user thread as part of the {@link Consumer#poll(java.time.Duration) poll(long)} call
* whenever partition assignment changes.
* <p>
* Under normal conditions, if a partition is reassigned from one consumer to another, then the old consumer will
* always invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} for that partition prior to the new consumer
* invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} for the same partition. So if offsets or other state is saved in the
* {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one consumer member, it will be always accessible by the time the
* other consumer member taking over that partition and triggering its {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback to load the state.
* <p>
* You can think of revocation as a graceful way to give up ownership of a partition. In some cases, the consumer may not have an opportunity to do so.
* For example, if the session times out, then the partitions may be reassigned before we have a chance to revoke them gracefully.
* For this case, we have a third callback {@link #onPartitionsLost(Collection)}. The difference between this function and
* {@link #onPartitionsRevoked(Collection)} is that upon invocation of {@link #onPartitionsLost(Collection)}, the partitions
* may already be owned by some other members in the group and therefore users would not be able to commit its consumed offsets for example.
* Users could implement these two functions differently (by default,
* {@link #onPartitionsLost(Collection)} will be calling {@link #onPartitionsRevoked(Collection)} directly); for example, in the
* {@link #onPartitionsLost(Collection)} we should not need to store the offsets since we know these partitions are no longer owned by the consumer
* at that time.
* <p>
* During a rebalance event, the {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} function will always be triggered exactly once when
* the rebalance completes. That is, even if there is no newly assigned partitions for a consumer member, its {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}
* will still be triggered with an empty collection of partitions. As a result this function can be used also to notify when a rebalance event has happened.
* With eager rebalancing, {@link #onPartitionsRevoked(Collection)} will always be called at the start of a rebalance. On the other hand, {@link #onPartitionsLost(Collection)}
* will only be called when there were non-empty partitions that were lost.
* With cooperative rebalancing, {@link #onPartitionsRevoked(Collection)} and {@link #onPartitionsLost(Collection)}
* will only be triggered when there are non-empty partitions revoked or lost from this consumer member during a rebalance event.
* <p>
* It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
* to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
* Also if the callback function implementation itself throws an exception, this exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} as well.
* <p>
* Note that callbacks only serve as notification of an assignment change.
* They cannot be used to express acceptance of the change.
* Hence throwing an exception from a callback does not affect the assignment in any way,
* as it will be propagated all the way up to the {@link KafkaConsumer#poll(java.time.Duration)} call.
* If user captures the exception in the caller, the callback is still assumed successful and no further retries will be attempted.
* <p>
*
* Here is pseudo-code for a callback implementation for saving offsets:
* <pre>
* {@code
* public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
* private Consumer<?,?> consumer;
*
* public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
* this.consumer = consumer;
* }
*
* public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
* // save the offsets in an external store using some custom code not described here
* for(TopicPartition partition: partitions)
* saveOffsetInExternalStore(consumer.position(partition));
* }
*
* public void onPartitionsLost(Collection<TopicPartition> partitions) {
* // do not need to save the offsets since these partitions are probably owned by other consumers already
* }
*
* public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
* // read the offsets from an external store using some custom code not described here
* for(TopicPartition partition: partitions)
* consumer.seek(partition, readOffsetFromExternalStore(partition));
* }
* }
* }
* </pre>
*/
public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of offset commits to a customized store.
* This method will be called during a rebalance operation when the consumer has to give up some partitions.
* It can also be called when consumer is being closed ({@link KafkaConsumer#close(Duration)})
* or is unsubscribing ({@link KafkaConsumer#unsubscribe()}).
* It is recommended that offsets should be committed in this callback to either Kafka or a
* custom offset store to prevent duplicate data.
* <p>
* In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data.
* In cooperative rebalancing, it will be called at the end of a rebalance on the set of partitions being revoked iff the set is non-empty.
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}.
* <p>
* It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
* to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that were assigned to the consumer and now need to be revoked (may not
* include all currently assigned partitions, i.e. there may still be some partitions left)
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
void onPartitionsRevoked(Collection<TopicPartition> partitions);
/**
* A callback method the user can implement to provide handling of customized offsets on completion of a successful
* partition re-assignment. This method will be called after the partition re-assignment completes and before the
* consumer starts fetching data, and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
* <p>
* It is guaranteed that under normal conditions all the processes in a consumer group will execute their
* {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
* {@link #onPartitionsAssigned(Collection)} callback. During exceptional scenarios, partitions may be migrated
* without the old owner being notified (i.e. their {@link #onPartitionsRevoked(Collection)} callback not triggered),
* and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} (Collection)} callback
* will be triggered by the consumer then.
* <p>
* It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
* to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that are now assigned to the consumer (previously owned partitions will
* NOT be included, i.e. this list will only include newly added partitions)
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
void onPartitionsAssigned(Collection<TopicPartition> partitions);
/**
* A callback method you can implement to provide handling of cleaning up resources for partitions that have already
* been reassigned to other consumers. This method will not be called during normal execution as the owned partitions would
* first be revoked by calling the {@link ConsumerRebalanceListener#onPartitionsRevoked}, before being reassigned
* to other consumers during a rebalance event. However, during exceptional scenarios when the consumer realized that it
* does not own this partition any longer, i.e. not revoked via a normal rebalance event, then this method would be invoked.
* <p>
* For example, this function is called if a consumer's session timeout has expired, or if a fatal error has been
* received indicating the consumer is no longer part of the group.
* <p>
* By default it will just trigger {@link ConsumerRebalanceListener#onPartitionsRevoked}; for users who want to distinguish
* the handling logic of revoked partitions v.s. lost partitions, they can override the default implementation.
* <p>
* It is possible
* for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException}
* to be raised from one of these nested invocations. In this case, the exception will be propagated to the current
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that were assigned to the consumer and now have been reassigned
* to other consumers. With the current protocol this will always include all of the consumer's
* previously assigned partitions, but this may change in future protocols (ie there would still
* be some partitions left)
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
default void onPartitionsLost(Collection<TopicPartition> partitions) {
onPartitionsRevoked(partitions);
}
}
相关信息
相关文章
kafka CommitFailedException 源码
kafka ConsumerGroupMetadata 源码
kafka ConsumerPartitionAssignor 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦