spring-kafka RetryTopicConfigurer 源码
spring-kafka RetryTopicConfigurer 代码
文件路径:/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java
/*
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.retrytopic;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.function.Consumer;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.lang.Nullable;
/**
*
* <p>Configures main, retry and DLT topics based on a main endpoint and provided
* configurations to acomplish a distributed retry / DLT pattern in a non-blocking
* fashion, at the expense of ordering guarantees.
*
* <p>To illustrate, if you have a "main-topic" topic, and want an exponential backoff
* of 1000ms with a multiplier of 2 and 3 retry attempts, it will create the
* main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt
* topics. The configuration can be achieved using a {@link RetryTopicConfigurationBuilder}
* to create one or more {@link RetryTopicConfigurer} beans, or by using the
* {@link org.springframework.kafka.annotation.RetryableTopic} annotation.
* More details on usage below.
*
*
* <p>How it works:
*
* <p>If a message processing throws an exception, the configured
* {@link org.springframework.kafka.listener.DefaultErrorHandler}
* and {@link org.springframework.kafka.listener.DeadLetterPublishingRecoverer} forwards the message to the next topic, using a
* {@link org.springframework.kafka.retrytopic.DestinationTopicResolver}
* to know the next topic and the delay for it.
*
* <p>Each forwareded record has a back off timestamp header and, if consumption is
* attempted by the {@link org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter}
* before that time, the partition consumption is paused by a
* {@link org.springframework.kafka.listener.KafkaConsumerBackoffManager} and a
* {@link org.springframework.kafka.listener.KafkaBackoffException} is thrown.
*
* <p>When the partition has been idle for the amount of time specified in the
* ContainerProperties' idlePartitionEventInterval property.
* property, a {@link org.springframework.kafka.event.ListenerContainerPartitionIdleEvent}
* is published, which the {@link org.springframework.kafka.listener.KafkaConsumerBackoffManager}
* listens to in order to check whether or not it should unpause the partition.
*
* <p>If, when consumption is resumed, processing fails again, the message is forwarded to
* the next topic and so on, until it gets to the dlt.
*
* <p>Considering Kafka's partition ordering guarantees, and each topic having a fixed
* delay time, we know that the first message consumed in a given retry topic partition will
* be the one with the earliest backoff timestamp for that partition, so by pausing the
* partition we know we're not delaying message processing in other partitions longer than
* necessary.
*
*
* <p>Usages:
*
* <p>There are two main ways for configuring the endpoints. The first is by providing one or more
* {@link org.springframework.context.annotation.Bean}s in a {@link org.springframework.context.annotation.Configuration}
* annotated class, such as:
*
* <pre>
* <code>@Bean</code>
* <code>public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) {
* return RetryTopicConfigurationBuilder
* .newInstance()
* .create(template);
* }</code>
* </pre>
* <p>This will create retry and dlt topics for all topics in methods annotated with
* {@link org.springframework.kafka.annotation.KafkaListener}, as well as its consumers,
* using the default configurations. If message processing fails it will forward the message
* to the next topic until it gets to the DLT topic.
*
* A {@link org.springframework.kafka.core.KafkaOperations} instance is required for message forwarding.
*
* <p>For more fine-grained control over how to handle retrials for each topic, more then one bean can be provided, such as:
*
* <pre>
* <code>@Bean
* public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, MyPojo> template) {
* return RetryTopicConfigurationBuilder
* .newInstance()
* .fixedBackoff(3000)
* .maxAttempts(5)
* .includeTopics("my-topic", "my-other-topic")
* .create(template);
* }</code>
* </pre>
* <pre>
* <code>@Bean
* public RetryTopicConfiguration myOtherRetryableTopic(KafkaTemplate<String, MyPojo> template) {
* return RetryTopicConfigurationBuilder
* .newInstance()
* .exponentialBackoff(1000, 2, 5000)
* .maxAttempts(4)
* .excludeTopics("my-topic", "my-other-topic")
* .retryOn(MyException.class)
* .create(template);
* }</code>
* </pre>
* <p>Some other options include: auto-creation of topics, backoff,
* retryOn / notRetryOn / transversing as in {@link org.springframework.retry.support.RetryTemplate},
* single-topic fixed backoff processing, custom dlt listener beans, custom topic
* suffixes and providing specific listenerContainerFactories.
*
* <p>The other, non-exclusive way to configure the endpoints is through the convenient
* {@link org.springframework.kafka.annotation.RetryableTopic} annotation, that can be placed on any
* {@link org.springframework.kafka.annotation.KafkaListener} annotated methods, such as:
*
* <pre>
* <code>@RetryableTopic(attempts = 3,
* backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
* <code>@KafkaListener(topics = "my-annotated-topic")
* public void processMessage(MyPojo message) {
* // ... message processing
* }</code>
*</pre>
* <p> The same configurations are available in the annotation and the builder approaches, and both can be
* used concurrently. In case the same method / topic can be handled by both, the annotation takes precedence.
*
* <p>DLT Handling:
*
* <p>The DLT handler method can be provided through the
* {@link RetryTopicConfigurationBuilder#dltHandlerMethod(String, String)} method,
* providing the class and method name that should handle the DLT topic. If a bean
* instance of this type is found in the {@link BeanFactory} it is the instance used.
* If not an instance is created. The class can use dependency injection as a normal bean.
*
* <pre>
* <code>@Bean
* public RetryTopicConfiguration otherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
* return RetryTopicConfigurationBuilder
* .newInstance()
* .dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
* .create(template);
* }</code>
*
* <code>@Component
* public class MyCustomDltProcessor {
*
* public void processDltMessage(MyPojo message) {
* // ... message processing, persistence, etc
* }
* }</code>
* </pre>
*
* The other way to provide the DLT handler method is through the
* {@link org.springframework.kafka.annotation.DltHandler} annotation,
* that should be used within the same class as the correspondent
* {@link org.springframework.kafka.annotation.KafkaListener}.
*
* <pre>
* <code>@DltHandler
* public void processMessage(MyPojo message) {
* // ... message processing, persistence, etc
* }</code>
*</pre>
*
* If no DLT handler is provided, the default {@link LoggingDltListenerHandlerMethod} is used.
*
* @author Tomaz Fernandes
* @since 2.7
*
* @see RetryTopicConfigurationBuilder
* @see org.springframework.kafka.annotation.RetryableTopic
* @see org.springframework.kafka.annotation.KafkaListener
* @see org.springframework.retry.annotation.Backoff
* @see org.springframework.kafka.listener.DefaultErrorHandler
* @see org.springframework.kafka.listener.DeadLetterPublishingRecoverer
*
*/
public class RetryTopicConfigurer implements BeanFactoryAware {
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryTopicConfigurer.class));
/**
* The default method to handle messages in the DLT.
*/
public static final EndpointHandlerMethod DEFAULT_DLT_HANDLER = createHandlerMethodWith(LoggingDltListenerHandlerMethod.class,
LoggingDltListenerHandlerMethod.DEFAULT_DLT_METHOD_NAME);
private final DestinationTopicProcessor destinationTopicProcessor;
private final ListenerContainerFactoryResolver containerFactoryResolver;
private final ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer;
private BeanFactory beanFactory;
private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
/**
* Create an instance with the provided properties.
* @param destinationTopicProcessor the destination topic processor.
* @param containerFactoryResolver the container factory resolver.
* @param listenerContainerFactoryConfigurer the container factory configurer.
* @param beanFactory the bean factory.
* @param retryTopicNamesProviderFactory the retry topic names factory.
*/
@Deprecated
public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
BeanFactory beanFactory,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
this(destinationTopicProcessor, containerFactoryResolver,
listenerContainerFactoryConfigurer, retryTopicNamesProviderFactory);
this.beanFactory = beanFactory;
}
/**
* Create an instance with the provided properties.
* @param destinationTopicProcessor the destination topic processor.
* @param containerFactoryResolver the container factory resolver.
* @param listenerContainerFactoryConfigurer the container factory configurer.
* @param retryTopicNamesProviderFactory the retry topic names factory.
*/
@Autowired
public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
this.destinationTopicProcessor = destinationTopicProcessor;
this.containerFactoryResolver = containerFactoryResolver;
this.listenerContainerFactoryConfigurer = listenerContainerFactoryConfigurer;
this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory;
}
/**
* Entrypoint for creating and configuring the retry and dlt endpoints, as well as the
* container factory that will create the corresponding listenerContainer.
* @param endpointProcessor function that will process the endpoints
* processListener method.
* @param mainEndpoint the endpoint based on which retry and dlt endpoints are also
* created and processed.
* @param configuration the configuration for the topic.
* @param registrar The {@link KafkaListenerEndpointRegistrar} that will register the endpoints.
* @param factory The factory provided in the {@link org.springframework.kafka.annotation.KafkaListener}
* @param defaultContainerFactoryBeanName The default factory bean name for the
* {@link org.springframework.kafka.annotation.KafkaListener}
*
*/
public void processMainAndRetryListeners(EndpointProcessor endpointProcessor,
MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
RetryTopicConfiguration configuration,
KafkaListenerEndpointRegistrar registrar,
@Nullable KafkaListenerContainerFactory<?> factory,
String defaultContainerFactoryBeanName) {
throwIfMultiMethodEndpoint(mainEndpoint);
DestinationTopicProcessor.Context context =
new DestinationTopicProcessor.Context(configuration.getDestinationTopicProperties());
configureEndpoints(mainEndpoint, endpointProcessor, factory, registrar, configuration, context,
defaultContainerFactoryBeanName);
this.destinationTopicProcessor.processRegisteredDestinations(getTopicCreationFunction(configuration), context);
}
private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
EndpointProcessor endpointProcessor,
KafkaListenerContainerFactory<?> factory,
KafkaListenerEndpointRegistrar registrar,
RetryTopicConfiguration configuration,
DestinationTopicProcessor.Context context,
String defaultContainerFactoryBeanName) {
this.destinationTopicProcessor
.processDestinationTopicProperties(destinationTopicProperties ->
processAndRegisterEndpoint(mainEndpoint,
endpointProcessor,
factory,
defaultContainerFactoryBeanName,
registrar,
configuration,
context,
destinationTopicProperties),
context);
}
private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, EndpointProcessor endpointProcessor,
KafkaListenerContainerFactory<?> factory,
String defaultFactoryBeanName,
KafkaListenerEndpointRegistrar registrar,
RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context,
DestinationTopic.Properties destinationTopicProperties) {
KafkaListenerContainerFactory<?> resolvedFactory =
destinationTopicProperties.isMainEndpoint()
? resolveAndConfigureFactoryForMainEndpoint(factory, defaultFactoryBeanName, configuration)
: resolveAndConfigureFactoryForRetryEndpoint(factory, defaultFactoryBeanName, configuration);
MethodKafkaListenerEndpoint<?, ?> endpoint = destinationTopicProperties.isMainEndpoint()
? mainEndpoint
: new MethodKafkaListenerEndpoint<>();
endpointProcessor.accept(endpoint);
EndpointHandlerMethod endpointBeanMethod =
getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);
createEndpointCustomizer(endpointBeanMethod, destinationTopicProperties)
.customizeEndpointAndCollectTopics(endpoint)
.forEach(topicNamesHolder ->
this.destinationTopicProcessor
.registerDestinationTopic(topicNamesHolder.getMainTopic(),
topicNamesHolder.getCustomizedTopic(),
destinationTopicProperties, context));
registrar.registerEndpoint(endpoint, resolvedFactory);
endpoint.setBeanFactory(this.beanFactory);
}
protected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
RetryTopicConfiguration configuration,
DestinationTopic.Properties props) {
EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
EndpointHandlerMethod retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(dltHandlerMethod) : retryBeanMethod;
}
private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfiguration config) {
RetryTopicConfiguration.TopicCreation topicCreationConfig = config.forKafkaTopicAutoCreation();
return topicCreationConfig.shouldCreateTopics()
? topics -> createNewTopicBeans(topics, topicCreationConfig)
: topics -> { };
}
protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfiguration.TopicCreation config) {
topics.forEach(topic -> {
DefaultListableBeanFactory bf = ((DefaultListableBeanFactory) this.beanFactory);
String beanName = topic + "-topicRegistrationBean";
if (!bf.containsBean(beanName)) {
bf.registerSingleton(beanName,
new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor()));
}
}
);
}
protected EndpointCustomizer createEndpointCustomizer(
EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) {
return new EndpointCustomizerFactory(destinationTopicProperties,
endpointBeanMethod,
this.beanFactory,
this.retryTopicNamesProviderFactory)
.createEndpointCustomizer();
}
private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandlerMethod dltEndpointHandlerMethod) {
return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
}
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(
KafkaListenerContainerFactory<?> providedFactory,
String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver
.resolveFactoryForMainEndpoint(providedFactory, defaultFactoryBeanName,
configuration.forContainerFactoryResolver());
return this.listenerContainerFactoryConfigurer
.decorateFactoryWithoutSettingContainerProperties(resolvedFactory,
configuration.forContainerFactoryConfigurer());
}
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpoint(
KafkaListenerContainerFactory<?> providedFactory,
String defaultFactoryBeanName,
RetryTopicConfiguration configuration) {
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
this.containerFactoryResolver.resolveFactoryForRetryEndpoint(providedFactory, defaultFactoryBeanName,
configuration.forContainerFactoryResolver());
return this.listenerContainerFactoryConfigurer
.decorateFactory(resolvedFactory, configuration.forContainerFactoryConfigurer());
}
private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
throw new IllegalArgumentException("Retry Topic is not compatible with " + MultiMethodKafkaListenerEndpoint.class);
}
}
public static EndpointHandlerMethod createHandlerMethodWith(Object beanOrClass, String methodName) {
return new EndpointHandlerMethod(beanOrClass, methodName);
}
public static EndpointHandlerMethod createHandlerMethodWith(Object bean, Method method) {
return new EndpointHandlerMethod(bean, method);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
public interface EndpointProcessor extends Consumer<MethodKafkaListenerEndpoint<?, ?>> {
default void process(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {
accept(listenerEndpoint);
}
}
static class LoggingDltListenerHandlerMethod {
public static final String DEFAULT_DLT_METHOD_NAME = "logMessage";
public void logMessage(Object message) {
if (message instanceof ConsumerRecord) {
LOGGER.info(() -> "Received message in dlt listener: "
+ KafkaUtils.format((ConsumerRecord<?, ?>) message));
}
else {
LOGGER.info(() -> "Received message in dlt listener.");
}
}
}
}
相关信息
相关文章
spring-kafka BackOffValuesGenerator 源码
spring-kafka DeadLetterPublishingRecovererFactory 源码
spring-kafka DefaultDestinationTopicProcessor 源码
spring-kafka DefaultDestinationTopicResolver 源码
spring-kafka DestinationTopic 源码
spring-kafka DestinationTopicContainer 源码
spring-kafka DestinationTopicProcessor 源码
spring-kafka DestinationTopicPropertiesFactory 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦