kafka StickyAssignor 源码
kafka StickyAssignor 代码
文件路径:/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.CollectionUtils;
/**
* <p>The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either:
* <ul>
* <li>the numbers of topic partitions assigned to consumers differ by at most one; or</li>
* <li>each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.</li>
* </ul>
* Second, it preserved as many existing assignment as possible when a reassignment occurs. This helps in saving some of the
* overhead processing when topic partitions move from one consumer to another.</p>
*
* <p>Starting fresh it would work by distributing the partitions over consumers as evenly as possible. Even though this may sound similar to
* how round robin assignor works, the second example below shows that it is not.
* During a reassignment it would perform the reassignment in such a way that in the new assignment
* <ol>
* <li>topic partitions are still distributed as evenly as possible, and</li>
* <li>topic partitions stay with their previously assigned consumers as much as possible.</li>
* </ol>
* Of course, the first goal above takes precedence over the second one.</p>
*
* <p><b>Example 1.</b> Suppose there are three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>,
* four topics <code>t0,</code> <code>t1</code>, <code>t2</code>, <code>t3</code>, and each topic has 2 partitions,
* resulting in partitions <code>t0p0</code>, <code>t0p1</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>,
* <code>t2p1</code>, <code>t3p0</code>, <code>t3p1</code>. Each consumer is subscribed to all three topics.
*
* The assignment with both sticky and round robin assignors will be:
* <ul>
* <li><code>C0: [t0p0, t1p1, t3p0]</code></li>
* <li><code>C1: [t0p1, t2p0, t3p1]</code></li>
* <li><code>C2: [t1p0, t2p1]</code></li>
* </ul>
*
* Now, let's assume <code>C1</code> is removed and a reassignment is about to happen. The round robin assignor would produce:
* <ul>
* <li><code>C0: [t0p0, t1p0, t2p0, t3p0]</code></li>
* <li><code>C2: [t0p1, t1p1, t2p1, t3p1]</code></li>
* </ul>
*
* while the sticky assignor would result in:
* <ul>
* <li><code>C0 [t0p0, t1p1, t3p0, t2p0]</code></li>
* <li><code>C2 [t1p0, t2p1, t0p1, t3p1]</code></li>
* </ul>
* preserving all the previous assignments (unlike the round robin assignor).
*</p>
* <p><b>Example 2.</b> There are three consumers <code>C0</code>, <code>C1</code>, <code>C2</code>,
* and three topics <code>t0</code>, <code>t1</code>, <code>t2</code>, with 1, 2, and 3 partitions respectively.
* Therefore, the partitions are <code>t0p0</code>, <code>t1p0</code>, <code>t1p1</code>, <code>t2p0</code>,
* <code>t2p1</code>, <code>t2p2</code>. <code>C0</code> is subscribed to <code>t0</code>; <code>C1</code> is subscribed to
* <code>t0</code>, <code>t1</code>; and <code>C2</code> is subscribed to <code>t0</code>, <code>t1</code>, <code>t2</code>.
*
* The round robin assignor would come up with the following assignment:
* <ul>
* <li><code>C0 [t0p0]</code></li>
* <li><code>C1 [t1p0]</code></li>
* <li><code>C2 [t1p1, t2p0, t2p1, t2p2]</code></li>
* </ul>
*
* which is not as balanced as the assignment suggested by sticky assignor:
* <ul>
* <li><code>C0 [t0p0]</code></li>
* <li><code>C1 [t1p0, t1p1]</code></li>
* <li><code>C2 [t2p0, t2p1, t2p2]</code></li>
* </ul>
*
* Now, if consumer <code>C0</code> is removed, these two assignors would produce the following assignments.
* Round Robin (preserves 3 partition assignments):
* <ul>
* <li><code>C1 [t0p0, t1p1]</code></li>
* <li><code>C2 [t1p0, t2p0, t2p1, t2p2]</code></li>
* </ul>
*
* Sticky (preserves 5 partition assignments):
* <ul>
* <li><code>C1 [t1p0, t1p1, t0p0]</code></li>
* <li><code>C2 [t2p0, t2p1, t2p2]</code></li>
* </ul>
*</p>
* <h3>Impact on <code>ConsumerRebalanceListener</code></h3>
* The sticky assignment strategy can provide some optimization to those consumers that have some partition cleanup code
* in their <code>onPartitionsRevoked()</code> callback listeners. The cleanup code is placed in that callback listener
* because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it
* is using range or round robin assignor. The listener code would look like this:
* <pre>
* {@code
* class TheOldRebalanceListener implements ConsumerRebalanceListener {
*
* void onPartitionsRevoked(Collection<TopicPartition> partitions) {
* for (TopicPartition partition: partitions) {
* commitOffsets(partition);
* cleanupState(partition);
* }
* }
*
* void onPartitionsAssigned(Collection<TopicPartition> partitions) {
* for (TopicPartition partition: partitions) {
* initializeState(partition);
* initializeOffset(partition);
* }
* }
* }
* }
* </pre>
*
* As mentioned above, one advantage of the sticky assignor is that, in general, it reduces the number of partitions that
* actually move from one consumer to another during a reassignment. Therefore, it allows consumers to do their cleanup
* more efficiently. Of course, they still can perform the partition cleanup in the <code>onPartitionsRevoked()</code>
* listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the
* cleanup after the rebalance only on the partitions they have lost (which is normally not a lot). The code snippet below
* clarifies this point:
* <pre>
* {@code
* class TheNewRebalanceListener implements ConsumerRebalanceListener {
* Collection<TopicPartition> lastAssignment = Collections.emptyList();
*
* void onPartitionsRevoked(Collection<TopicPartition> partitions) {
* for (TopicPartition partition: partitions)
* commitOffsets(partition);
* }
*
* void onPartitionsAssigned(Collection<TopicPartition> assignment) {
* for (TopicPartition partition: difference(lastAssignment, assignment))
* cleanupState(partition);
*
* for (TopicPartition partition: difference(assignment, lastAssignment))
* initializeState(partition);
*
* for (TopicPartition partition: assignment)
* initializeOffset(partition);
*
* this.lastAssignment = assignment;
* }
* }
* }
* </pre>
*
* Any consumer that uses sticky assignment can leverage this listener like this:
* <code>consumer.subscribe(topics, new TheNewRebalanceListener());</code>
*
* Note that you can leverage the {@link CooperativeStickyAssignor} so that only partitions which are being
* reassigned to another consumer will be revoked. That is the preferred assignor for newer cluster. See
* {@link ConsumerPartitionAssignor.RebalanceProtocol} for a detailed explanation of cooperative rebalancing.
*/
public class StickyAssignor extends AbstractStickyAssignor {
public static final String STICKY_ASSIGNOR_NAME = "sticky";
// these schemas are used for preserving consumer's previously assigned partitions
// list and sending it as user data to the leader during a rebalance
static final String TOPIC_PARTITIONS_KEY_NAME = "previous_assignment";
static final String TOPIC_KEY_NAME = "topic";
static final String PARTITIONS_KEY_NAME = "partitions";
private static final String GENERATION_KEY_NAME = "generation";
static final Schema TOPIC_ASSIGNMENT = new Schema(
new Field(TOPIC_KEY_NAME, Type.STRING),
new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));
static final Schema STICKY_ASSIGNOR_USER_DATA_V0 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT)));
private static final Schema STICKY_ASSIGNOR_USER_DATA_V1 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT)),
new Field(GENERATION_KEY_NAME, Type.INT32));
private List<TopicPartition> memberAssignment = null;
private int generation = DEFAULT_GENERATION; // consumer group generation
@Override
public String name() {
return STICKY_ASSIGNOR_NAME;
}
@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
memberAssignment = assignment.partitions();
this.generation = metadata.generationId();
}
@Override
public ByteBuffer subscriptionUserData(Set<String> topics) {
if (memberAssignment == null)
return null;
return serializeTopicPartitionAssignment(new MemberData(memberAssignment, Optional.of(generation)));
}
@Override
protected MemberData memberData(Subscription subscription) {
ByteBuffer userData = subscription.userData();
if (userData == null || !userData.hasRemaining()) {
return new MemberData(Collections.emptyList(), Optional.empty());
}
return deserializeTopicPartitionAssignment(userData);
}
// visible for testing
static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
List<Struct> topicAssignments = new ArrayList<>();
for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
topicAssignments.add(topicAssignment);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
if (memberData.generation.isPresent())
struct.set(GENERATION_KEY_NAME, memberData.generation.get());
ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
buffer.flip();
return buffer;
}
private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer) {
Struct struct;
ByteBuffer copy = buffer.duplicate();
try {
struct = STICKY_ASSIGNOR_USER_DATA_V1.read(buffer);
} catch (Exception e1) {
try {
// fall back to older schema
struct = STICKY_ASSIGNOR_USER_DATA_V0.read(copy);
} catch (Exception e2) {
// ignore the consumer's previous assignment if it cannot be parsed
return new MemberData(Collections.emptyList(), Optional.of(DEFAULT_GENERATION));
}
}
List<TopicPartition> partitions = new ArrayList<>();
for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) {
Struct assignment = (Struct) structObj;
String topic = assignment.getString(TOPIC_KEY_NAME);
for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) {
Integer partition = (Integer) partitionObj;
partitions.add(new TopicPartition(topic, partition));
}
}
// make sure this is backward compatible
Optional<Integer> generation = struct.hasField(GENERATION_KEY_NAME) ? Optional.of(struct.getInt(GENERATION_KEY_NAME)) : Optional.empty();
return new MemberData(partitions, generation);
}
}
相关信息
相关文章
kafka CommitFailedException 源码
kafka ConsumerGroupMetadata 源码
kafka ConsumerPartitionAssignor 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦