spring-kafka DestinationTopicPropertiesFactory 源码
spring-kafka DestinationTopicPropertiesFactory 代码
文件路径:/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java
/*
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.retrytopic;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.util.StringUtils;
/**
*
* Creates a list of {@link DestinationTopic.Properties} based on the
* provided configurations.
*
* @author Tomaz Fernandes
* @author Gary Russell
* @since 2.7
*
*/
public class DestinationTopicPropertiesFactory {
private static final String MAIN_TOPIC_SUFFIX = "";
private final DestinationTopicSuffixes destinationTopicSuffixes;
private final List<Long> backOffValues;
private final BinaryExceptionClassifier exceptionClassifier;
private final int numPartitions;
private final int maxAttempts;
private final KafkaOperations<?, ?> kafkaOperations;
private final FixedDelayStrategy fixedDelayStrategy;
private final DltStrategy dltStrategy;
private final TopicSuffixingStrategy topicSuffixingStrategy;
private final long timeout;
private Boolean autoStartDltHandler;
public DestinationTopicPropertiesFactory(String retryTopicSuffix, String dltSuffix, List<Long> backOffValues,
BinaryExceptionClassifier exceptionClassifier,
int numPartitions, KafkaOperations<?, ?> kafkaOperations,
FixedDelayStrategy fixedDelayStrategy,
DltStrategy dltStrategy,
TopicSuffixingStrategy topicSuffixingStrategy,
long timeout) {
this.dltStrategy = dltStrategy;
this.kafkaOperations = kafkaOperations;
this.exceptionClassifier = exceptionClassifier;
this.numPartitions = numPartitions;
this.fixedDelayStrategy = fixedDelayStrategy;
this.topicSuffixingStrategy = topicSuffixingStrategy;
this.timeout = timeout;
this.destinationTopicSuffixes = new DestinationTopicSuffixes(retryTopicSuffix, dltSuffix);
this.backOffValues = backOffValues;
// Max Attempts include the initial try.
this.maxAttempts = this.backOffValues.size() + 1;
}
/**
* Set to false to not start the DLT handler.
* @param autoStart false to not start.
* @return this factory.
* @since 2.8
*/
public DestinationTopicPropertiesFactory autoStartDltHandler(Boolean autoStart) {
this.autoStartDltHandler = autoStart;
return this;
}
public List<DestinationTopic.Properties> createProperties() {
return isSingleTopicFixedDelay()
? createPropertiesForFixedDelaySingleTopic()
: createPropertiesForDefaultTopicStrategy();
}
private List<DestinationTopic.Properties> createPropertiesForFixedDelaySingleTopic() {
return isNoDltStrategy()
? Arrays.asList(createMainTopicProperties(),
createRetryProperties(1, DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn()))
: Arrays.asList(createMainTopicProperties(),
createRetryProperties(1, DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn()),
createDltProperties());
}
private boolean isSingleTopicFixedDelay() {
return isFixedDelay() && isSingleTopicStrategy();
}
private boolean isSingleTopicStrategy() {
return FixedDelayStrategy.SINGLE_TOPIC.equals(this.fixedDelayStrategy);
}
private List<DestinationTopic.Properties> createPropertiesForDefaultTopicStrategy() {
return IntStream.rangeClosed(0, isNoDltStrategy()
? this.maxAttempts - 1
: this.maxAttempts)
.mapToObj(this::createRetryOrDltTopicSuffixes)
.collect(Collectors.toList());
}
private boolean isNoDltStrategy() {
return DltStrategy.NO_DLT.equals(this.dltStrategy);
}
private DestinationTopic.Properties createRetryOrDltTopicSuffixes(int index) {
BiPredicate<Integer, Throwable> shouldRetryOn = getShouldRetryOn();
return index == 0
? createMainTopicProperties()
: index < this.maxAttempts
? createRetryProperties(index, DestinationTopic.Type.RETRY, shouldRetryOn)
: createDltProperties();
}
private DestinationTopic.Properties createMainTopicProperties() {
return new DestinationTopic.Properties(0, MAIN_TOPIC_SUFFIX, DestinationTopic.Type.MAIN, this.maxAttempts,
this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout);
}
private DestinationTopic.Properties createDltProperties() {
return new DestinationTopic.Properties(0, this.destinationTopicSuffixes.getDltSuffix(),
DestinationTopic.Type.DLT, this.maxAttempts, this.numPartitions, this.dltStrategy,
this.kafkaOperations, (a, e) -> false, this.timeout, this.autoStartDltHandler);
}
private BiPredicate<Integer, Throwable> getShouldRetryOn() {
return (attempt, throwable) -> attempt < this.maxAttempts && this.exceptionClassifier.classify(throwable);
}
private DestinationTopic.Properties createRetryProperties(int index,
DestinationTopic.Type topicType,
BiPredicate<Integer, Throwable> shouldRetryOn) {
int indexInBackoffValues = index - 1;
Long thisBackOffValue = this.backOffValues.get(indexInBackoffValues);
return createProperties(topicType, shouldRetryOn, indexInBackoffValues,
getTopicSuffix(indexInBackoffValues, thisBackOffValue));
}
private String getTopicSuffix(int indexInBackoffValues, Long thisBackOffValue) {
return isSingleTopicFixedDelay()
? this.destinationTopicSuffixes.getRetrySuffix()
: isSuffixWithIndexStrategy() || isFixedDelay()
? joinWithRetrySuffix(indexInBackoffValues)
: hasDuplicates(thisBackOffValue)
? joinWithRetrySuffix(thisBackOffValue)
.concat("-" + getIndexInBackoffValues(indexInBackoffValues, thisBackOffValue))
: joinWithRetrySuffix(thisBackOffValue);
}
private int getIndexInBackoffValues(int indexInBackoffValues, Long thisBackOffValue) {
return indexInBackoffValues - this.backOffValues.indexOf(thisBackOffValue);
}
private boolean isSuffixWithIndexStrategy() {
return TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE.equals(this.topicSuffixingStrategy);
}
private boolean hasDuplicates(Long thisBackOffValue) {
return this
.backOffValues
.stream()
.filter(value -> value.equals(thisBackOffValue))
.count() > 1;
}
private DestinationTopic.Properties createProperties(DestinationTopic.Type topicType,
BiPredicate<Integer, Throwable> shouldRetryOn,
int indexInBackoffValues,
String suffix) {
return new DestinationTopic.Properties(this.backOffValues.get(indexInBackoffValues), suffix,
topicType, this.maxAttempts, this.numPartitions, this.dltStrategy,
this.kafkaOperations, shouldRetryOn, this.timeout);
}
private boolean isFixedDelay() {
// If all values are the same, such as in NoBackOffPolicy and FixedBackoffPolicy
return this.backOffValues.size() > 1 && this.backOffValues.stream().distinct().count() == 1;
}
private String joinWithRetrySuffix(long parameter) {
return String.join("-", this.destinationTopicSuffixes.getRetrySuffix(), String.valueOf(parameter));
}
public static class DestinationTopicSuffixes {
private final String retryTopicSuffix;
private final String dltSuffix;
public DestinationTopicSuffixes(String retryTopicSuffix, String dltSuffix) {
this.retryTopicSuffix = StringUtils.hasText(retryTopicSuffix)
? retryTopicSuffix
: RetryTopicConstants.DEFAULT_RETRY_SUFFIX;
this.dltSuffix = StringUtils.hasText(dltSuffix) ? dltSuffix : RetryTopicConstants.DEFAULT_DLT_SUFFIX;
}
public String getRetrySuffix() {
return this.retryTopicSuffix;
}
public String getDltSuffix() {
return this.dltSuffix;
}
}
}
相关信息
相关文章
spring-kafka BackOffValuesGenerator 源码
spring-kafka DeadLetterPublishingRecovererFactory 源码
spring-kafka DefaultDestinationTopicProcessor 源码
spring-kafka DefaultDestinationTopicResolver 源码
spring-kafka DestinationTopic 源码
spring-kafka DestinationTopicContainer 源码
spring-kafka DestinationTopicProcessor 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦