spring-kafka KafkaListenerAnnotationBeanPostProcessor 源码

  • 2022-08-16
  • 浏览 (528)

spring-kafka KafkaListenerAnnotationBeanPostProcessor 代码

文件路径:/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

/*
 * Copyright 2014-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.logging.LogFactory;

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.Scope;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.OrderComparator;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.ConditionalGenericConverter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.converter.ConverterFactory;
import org.springframework.core.convert.converter.GenericConverter;
import org.springframework.core.log.LogAccessor;
import org.springframework.format.Formatter;
import org.springframework.format.FormatterRegistry;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ContainerGroupSequencer;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport;
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.validation.Validator;

/**
 * Bean post-processor that registers methods annotated with {@link KafkaListener}
 * to be invoked by a Kafka message listener container created under the covers
 * by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
 * according to the parameters of the annotation.
 *
 * <p>Annotated methods can use flexible arguments as defined by {@link KafkaListener}.
 *
 * <p>This post-processor is automatically registered by Spring's {@link EnableKafka}
 * annotation.
 *
 * <p>Auto-detect any {@link KafkaListenerConfigurer} instances in the container,
 * allowing for customization of the registry to be used, the default container
 * factory or for fine-grained control over endpoints registration. See
 * {@link EnableKafka} Javadoc for complete usage details.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Stephane Nicoll
 * @author Juergen Hoeller
 * @author Gary Russell
 * @author Artem Bilan
 * @author Dariusz Szablinski
 * @author Venil Noronha
 * @author Dimitri Penner
 * @author Filip Halemba
 * @author Tomaz Fernandes
 *
 * @see KafkaListener
 * @see KafkaListenerErrorHandler
 * @see EnableKafka
 * @see KafkaListenerConfigurer
 * @see KafkaListenerEndpointRegistrar
 * @see KafkaListenerEndpointRegistry
 * @see org.springframework.kafka.config.KafkaListenerEndpoint
 * @see MethodKafkaListenerEndpoint
 */
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
		implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {

	private static final String UNCHECKED = "unchecked";

	private static final String THE_LEFT = "The [";

	private static final String RESOLVED_TO_LEFT = "Resolved to [";

	private static final String RIGHT_FOR_LEFT = "] for [";

	private static final String GENERATED_ID_PREFIX = "org.springframework.kafka.KafkaListenerEndpointContainer#";

	/**
	 * The bean name of the default {@link org.springframework.kafka.config.KafkaListenerContainerFactory}.
	 */
	public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";

	private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

	private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));

	private final ListenerScope listenerScope = new ListenerScope();

	private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
			new KafkaHandlerMethodFactoryAdapter();

	private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();

	private final AtomicInteger counter = new AtomicInteger();

	private KafkaListenerEndpointRegistry endpointRegistry;

	private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

	private ApplicationContext applicationContext;

	private BeanFactory beanFactory;

	private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();

	private BeanExpressionContext expressionContext;

	private Charset charset = StandardCharsets.UTF_8;

	private AnnotationEnhancer enhancer;

	private RetryTopicConfigurer retryTopicConfigurer;

	@Override
	public int getOrder() {
		return LOWEST_PRECEDENCE;
	}

	/**
	 * Set the {@link KafkaListenerEndpointRegistry} that will hold the created
	 * endpoint and manage the lifecycle of the related listener container.
	 * @param endpointRegistry the {@link KafkaListenerEndpointRegistry} to set.
	 */
	public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {
		this.endpointRegistry = endpointRegistry;
	}

	/**
	 * Set the name of the {@link KafkaListenerContainerFactory} to use by default.
	 * <p>If none is specified, "kafkaListenerContainerFactory" is assumed to be defined.
	 * @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name.
	 */
	public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {
		this.defaultContainerFactoryBeanName = containerFactoryBeanName;
	}

	/**
	 * Set the {@link MessageHandlerMethodFactory} to use to configure the message
	 * listener responsible to serve an endpoint detected by this processor.
	 * <p>By default, {@link DefaultMessageHandlerMethodFactory} is used and it
	 * can be configured further to support additional method arguments
	 * or to customize conversion and validation support. See
	 * {@link DefaultMessageHandlerMethodFactory} Javadoc for more details.
	 * @param messageHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.
	 */
	public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {
		this.messageHandlerMethodFactory.setHandlerMethodFactory(messageHandlerMethodFactory);
	}

	/**
	 * Return the configured handler factory.
	 * @return the factory.
	 * @since 2.5.7
	 */
	public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
		return this.messageHandlerMethodFactory;
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
		if (applicationContext instanceof ConfigurableApplicationContext) {
			setBeanFactory(((ConfigurableApplicationContext) applicationContext).getBeanFactory());
		}
		else {
			setBeanFactory(applicationContext);
		}
	}

	/**
	 * Making a {@link BeanFactory} available is optional; if not set,
	 * {@link KafkaListenerConfigurer} beans won't get autodetected and an
	 * {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
	 * @param beanFactory the {@link BeanFactory} to be used.
	 */
	public void setBeanFactory(BeanFactory beanFactory) {
		this.beanFactory = beanFactory;
		if (beanFactory instanceof ConfigurableListableBeanFactory) {
			this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
			this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
					this.listenerScope);
		}
	}

	/**
	 * Set a charset to use when converting byte[] to String in method arguments and other
	 * String/byte[] conversions. Default UTF-8.
	 * @param charset the charset.
	 * @since 2.2
	 */
	public void setCharset(Charset charset) {
		Assert.notNull(charset, "'charset' cannot be null");
		this.charset = charset;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		buildEnhancer();
	}

	@Override
	public void afterSingletonsInstantiated() {
		this.registrar.setBeanFactory(this.beanFactory);

		if (this.beanFactory instanceof ListableBeanFactory) {
			Map<String, KafkaListenerConfigurer> instances =
					((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
			for (KafkaListenerConfigurer configurer : instances.values()) {
				configurer.configureKafkaListeners(this.registrar);
			}
		}

		if (this.registrar.getEndpointRegistry() == null) {
			if (this.endpointRegistry == null) {
				Assert.state(this.beanFactory != null,
						"BeanFactory must be set to find endpoint registry by bean name");
				this.endpointRegistry = this.beanFactory.getBean(
						KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
						KafkaListenerEndpointRegistry.class);
			}
			this.registrar.setEndpointRegistry(this.endpointRegistry);
		}

		if (this.defaultContainerFactoryBeanName != null) {
			this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
		}

		// Set the custom handler method factory once resolved by the configurer
		MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
		if (handlerMethodFactory != null) {
			this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);
		}
		else {
			addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
		}

		// Actually register all listeners
		this.registrar.afterPropertiesSet();
		Map<String, ContainerGroupSequencer> sequencers =
				this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
		sequencers.values().forEach(seq -> seq.initialize());
	}

	private void buildEnhancer() {
		if (this.applicationContext != null) {
			Map<String, AnnotationEnhancer> enhancersMap =
					this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
			if (enhancersMap.size() > 0) {
				List<AnnotationEnhancer> enhancers = enhancersMap.values()
						.stream()
						.sorted(new OrderComparator())
						.collect(Collectors.toList());
				this.enhancer = (attrs, element) -> {
					Map<String, Object> newAttrs = attrs;
					for (AnnotationEnhancer enh : enhancers) {
						newAttrs = enh.apply(newAttrs, element);
					}
					return attrs;
				};
			}
		}
	}

	@Override
	public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
		return bean;
	}

	@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<>();
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
						Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					});
			if (hasClassLevelListeners) {
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
			}
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

	/*
	 * AnnotationUtils.getRepeatableAnnotations does not look at interfaces
	 */
	private Collection<KafkaListener> findListenerAnnotations(Class<?> clazz) {
		Set<KafkaListener> listeners = new HashSet<>();
		KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(clazz, KafkaListener.class);
		if (ann != null) {
			ann = enhance(clazz, ann);
			listeners.add(ann);
		}
		KafkaListeners anns = AnnotationUtils.findAnnotation(clazz, KafkaListeners.class);
		if (anns != null) {
			listeners.addAll(Arrays.stream(anns.value())
					.map(anno -> enhance(clazz, anno))
					.collect(Collectors.toList()));
		}
		return listeners;
	}

	/*
	 * AnnotationUtils.getRepeatableAnnotations does not look at interfaces
	 */
	private Set<KafkaListener> findListenerAnnotations(Method method) {
		Set<KafkaListener> listeners = new HashSet<>();
		KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class);
		if (ann != null) {
			ann = enhance(method, ann);
			listeners.add(ann);
		}
		KafkaListeners anns = AnnotationUtils.findAnnotation(method, KafkaListeners.class);
		if (anns != null) {
			listeners.addAll(Arrays.stream(anns.value())
					.map(anno -> enhance(method, anno))
					.collect(Collectors.toList()));
		}
		return listeners;
	}

	private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
		if (this.enhancer == null) {
			return ann;
		}
		else {
			return AnnotationUtils.synthesizeAnnotation(
				this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), KafkaListener.class, null);
		}
	}

	private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners, List<Method> multiMethods,
			Object bean, String beanName) {

		List<Method> checkedMethods = new ArrayList<>();
		Method defaultMethod = null;
		for (Method method : multiMethods) {
			Method checked = checkProxy(method, bean);
			KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
			if (annotation != null && annotation.isDefault()) {
				final Method toAssert = defaultMethod;
				Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
						+ toAssert.toString() + " and " + method.toString());
				defaultMethod = checked;
			}
			checkedMethods.add(checked);
		}
		for (KafkaListener classLevelListener : classLevelListeners) {
			MultiMethodKafkaListenerEndpoint<K, V> endpoint =
					new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
			String beanRef = classLevelListener.beanRef();
			this.listenerScope.addListener(beanRef, bean);
			processListener(endpoint, classLevelListener, bean, beanName, resolveTopics(classLevelListener),
					resolveTopicPartitions(classLevelListener));
			this.listenerScope.removeListener(beanRef);
		}
	}

	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);

		String beanRef = kafkaListener.beanRef();
		this.listenerScope.addListener(beanRef, bean);
		String[] topics = resolveTopics(kafkaListener);
		TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
		if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
			processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
		}
		this.listenerScope.removeListener(beanRef);
	}

	private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
			Method methodToUse, MethodKafkaListenerEndpoint<K, V> endpoint, String[] topics,
			TopicPartitionOffset[] tps) {

		String[] retryableCandidates = topics;
		if (retryableCandidates.length == 0 && tps.length > 0) {
			retryableCandidates = Arrays.stream(tps)
					.map(tp -> tp.getTopic())
					.distinct()
					.collect(Collectors.toList())
					.toArray(new String[0]);
		}

		RetryTopicConfiguration retryTopicConfiguration = new RetryTopicConfigurationProvider(this.beanFactory,
				this.resolver, this.expressionContext)
						.findRetryConfigurationFor(retryableCandidates, methodToUse, bean);

		if (retryTopicConfiguration == null) {
			String[] candidates = retryableCandidates;
			this.logger.debug(() ->
					"No retry topic configuration found for topics " + Arrays.toString(candidates));
			return false;
		}

		RetryTopicConfigurer.EndpointProcessor endpointProcessor = endpointToProcess ->
				this.processKafkaListenerAnnotation(endpointToProcess, kafkaListener, bean, topics, tps);

		KafkaListenerContainerFactory<?> factory =
				resolveContainerFactory(kafkaListener, resolve(kafkaListener.containerFactory()), beanName);

		getRetryTopicConfigurer()
				.processMainAndRetryListeners(endpointProcessor, endpoint, retryTopicConfiguration,
						this.registrar, factory, this.defaultContainerFactoryBeanName);
		return true;
	}

	private RetryTopicConfigurer getRetryTopicConfigurer() {
		if (this.retryTopicConfigurer == null) {
			try {
				this.retryTopicConfigurer = this.beanFactory
						.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
				this.retryTopicConfigurer = createDefaultConfigurer();
			}
		}
		return this.retryTopicConfigurer;
	}

	private RetryTopicConfigurer createDefaultConfigurer() {
		if (this.applicationContext instanceof GenericApplicationContext) {
			GenericApplicationContext gac = (GenericApplicationContext) this.applicationContext;
			gac.registerBean(
					RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
					RetryTopicConfigurationSupport.class,
					() -> new RetryTopicConfigurationSupport());
			RetryTopicConfigurationSupport rtcs = this.applicationContext.getBean(
					RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
					RetryTopicConfigurationSupport.class);
			DestinationTopicResolver destResolver = rtcs.destinationTopicResolver();
			RetryTopicSchedulerWrapper schedW = gac.getBeanProvider(RetryTopicSchedulerWrapper.class).getIfUnique();
			TaskScheduler sched = gac.getBeanProvider(TaskScheduler.class).getIfUnique();
			if (schedW == null && sched == null) {
				RetryTopicSchedulerWrapper newSchedW = new RetryTopicSchedulerWrapper(new ThreadPoolTaskScheduler());
				gac.registerBean(RetryTopicBeanNames.DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME,
						RetryTopicSchedulerWrapper.class, () -> newSchedW);
				schedW = gac.getBean(RetryTopicSchedulerWrapper.class);
			}
			KafkaConsumerBackoffManager bom =
					rtcs.kafkaConsumerBackoffManager(this.applicationContext, this.registrar.getEndpointRegistry(), // NOSONAR
							schedW, sched);
			RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, this.beanFactory);

			gac.registerBean(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME, DestinationTopicResolver.class,
					() -> destResolver);
			gac.registerBean(KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME,
					KafkaConsumerBackoffManager.class, () -> bom);
			gac.registerBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class,
					() -> rtc);

			return this.beanFactory
				.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
		}
		throw new IllegalStateException("When there is no RetryTopicConfigurationSupport bean, the application context "
				+ "must be a GenericApplicationContext");
	}

	private Method checkProxy(Method methodArg, Object bean) {
		Method method = methodArg;
		if (AopUtils.isJdkDynamicProxy(bean)) {
			try {
				// Found a @KafkaListener method on the target class for this JDK proxy ->
				// is it also present on the proxy itself?
				method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
				Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
				for (Class<?> iface : proxiedInterfaces) {
					try {
						method = iface.getMethod(method.getName(), method.getParameterTypes());
						break;
					}
					catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) {
						// NOSONAR
					}
				}
			}
			catch (SecurityException ex) {
				ReflectionUtils.handleReflectionException(ex);
			}
			catch (NoSuchMethodException ex) {
				throw new IllegalStateException(String.format(
						"@KafkaListener method '%s' found on bean target class '%s', " +
								"but not found in any interface(s) for bean JDK proxy. Either " +
								"pull the method up to an interface or switch to subclass (CGLIB) " +
								"proxies by setting proxy-target-class/proxyTargetClass " +
								"attribute to 'true'", method.getName(),
						method.getDeclaringClass().getSimpleName()), ex);
			}
		}
		return method;
	}

	protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
								Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {

		processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);

		String containerFactory = resolve(kafkaListener.containerFactory());
		KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
				containerFactory, beanName);

		this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
	}

	private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> endpoint,
			KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {

		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(tps);
		endpoint.setTopics(topics);
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), "info"));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		String concurrency = kafkaListener.concurrency();
		if (StringUtils.hasText(concurrency)) {
			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
		}
		String autoStartup = kafkaListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
		}
		resolveKafkaProperties(endpoint, kafkaListener.properties());
		endpoint.setSplitIterables(kafkaListener.splitIterables());
		if (StringUtils.hasText(kafkaListener.batch())) {
			endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
		}
		endpoint.setBeanFactory(this.beanFactory);
		resolveErrorHandler(endpoint, kafkaListener);
		resolveContentTypeConverter(endpoint, kafkaListener);
		resolveFilter(endpoint, kafkaListener);
	}

	private void resolveErrorHandler(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
		Object errorHandler = resolveExpression(kafkaListener.errorHandler());
		if (errorHandler instanceof KafkaListenerErrorHandler) {
			endpoint.setErrorHandler((KafkaListenerErrorHandler) errorHandler);
		}
		else {
			String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
			if (StringUtils.hasText(errorHandlerBeanName)) {
				endpoint.setErrorHandler(
						this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
			}
		}
	}

	private void resolveContentTypeConverter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
		Object converter = resolveExpression(kafkaListener.contentTypeConverter());
		if (converter instanceof SmartMessageConverter) {
			endpoint.setMessagingConverter((SmartMessageConverter) converter);
		}
		else {
			String converterBeanName = resolveExpressionAsString(kafkaListener.contentTypeConverter(),
					"contentTypeConverter");
			if (StringUtils.hasText(converterBeanName)) {
				endpoint.setMessagingConverter(
						this.beanFactory.getBean(converterBeanName, SmartMessageConverter.class));
			}
		}
	}

	@SuppressWarnings({ "rawtypes", UNCHECKED })
	private void resolveFilter(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener) {
		Object filter = resolveExpression(kafkaListener.filter());
		if (filter instanceof RecordFilterStrategy) {
			endpoint.setRecordFilterStrategy((RecordFilterStrategy) filter);
		}
		else {
			String filterBeanName = resolveExpressionAsString(kafkaListener.filter(), "filter");
			if (StringUtils.hasText(filterBeanName)) {
				endpoint.setRecordFilterStrategy(
						this.beanFactory.getBean(filterBeanName, RecordFilterStrategy.class));
			}
		}
	}

	@Nullable
	private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener kafkaListener,
			Object factoryTarget, String beanName) {

		String containerFactory = kafkaListener.containerFactory();
		if (!StringUtils.hasText(containerFactory)) {
			return null;
		}

		KafkaListenerContainerFactory<?> factory = null;

		Object resolved = resolveExpression(containerFactory);
		if (resolved instanceof KafkaListenerContainerFactory) {
			return (KafkaListenerContainerFactory<?>) resolved;
		}
		String containerFactoryBeanName = resolveExpressionAsString(containerFactory,
				"containerFactory");
		if (StringUtils.hasText(containerFactoryBeanName)) {
			assertBeanFactory();
			try {
				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
				throw new BeanInitializationException(
						noBeanFoundMessage(factoryTarget, beanName, containerFactoryBeanName,
								KafkaListenerContainerFactory.class), ex);
			}
		}
		return factory;
	}

	protected void assertBeanFactory() {
		Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
	}

	protected String noBeanFoundMessage(Object target, String listenerBeanName, String requestedBeanName,
			Class<?> expectedClass) {

		return "Could not register Kafka listener endpoint on ["
				+ target + "] for bean " + listenerBeanName + ", no '" + expectedClass.getSimpleName() + "' with id '"
				+ requestedBeanName + "' was found in the application context";
	}

	@SuppressWarnings(UNCHECKED)
	private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
		if (propertyStrings.length > 0) {
			Properties properties = new Properties();
			for (String property : propertyStrings) {
				Object value = resolveExpression(property);
				if (value instanceof String) {
					loadProperty(properties, property, value);
				}
				else if (value instanceof String[]) {
					for (String prop : (String[]) value) {
						loadProperty(properties, prop, prop);
					}
				}
				else if (value instanceof Collection) {
					Collection<?> values = (Collection<?>) value;
					if (values.size() > 0 && values.iterator().next() instanceof String) {
						for (String prop : (Collection<String>) value) {
							loadProperty(properties, prop, prop);
						}
					}
				}
				else {
					throw new IllegalStateException("'properties' must resolve to a String, a String[] or "
							+ "Collection<String>");
				}
			}
			endpoint.setConsumerProperties(properties);
		}
	}

	private void loadProperty(Properties properties, String property, Object value) {
		try {
			properties.load(new StringReader((String) value));
		}
		catch (IOException e) {
			this.logger.error(e, () -> "Failed to load property " + property + ", continuing...");
		}
	}

	private String getEndpointId(KafkaListener kafkaListener) {
		if (StringUtils.hasText(kafkaListener.id())) {
			return resolveExpressionAsString(kafkaListener.id(), "id");
		}
		else {
			return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
		}
	}

	private String getEndpointGroupId(KafkaListener kafkaListener, String id) {
		String groupId = null;
		if (StringUtils.hasText(kafkaListener.groupId())) {
			groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
		}
		if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {
			groupId = id;
		}
		return groupId;
	}

	private TopicPartitionOffset[] resolveTopicPartitions(KafkaListener kafkaListener) {
		TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
		List<TopicPartitionOffset> result = new ArrayList<>();
		if (topicPartitions.length > 0) {
			for (TopicPartition topicPartition : topicPartitions) {
				result.addAll(resolveTopicPartitionsList(topicPartition));
			}
		}
		return result.toArray(new TopicPartitionOffset[0]);
	}

	private String[] resolveTopics(KafkaListener kafkaListener) {
		String[] topics = kafkaListener.topics();
		List<String> result = new ArrayList<>();
		if (topics.length > 0) {
			for (String topic1 : topics) {
				Object topic = resolveExpression(topic1);
				resolveAsString(topic, result);
			}
		}
		return result.toArray(new String[0]);
	}

	private Pattern resolvePattern(KafkaListener kafkaListener) {
		Pattern pattern = null;
		String text = kafkaListener.topicPattern();
		if (StringUtils.hasText(text)) {
			Object resolved = resolveExpression(text);
			if (resolved instanceof Pattern) {
				pattern = (Pattern) resolved;
			}
			else if (resolved instanceof String) {
				pattern = Pattern.compile((String) resolved);
			}
			else if (resolved != null) {
				throw new IllegalStateException(
						"topicPattern must resolve to a Pattern or String, not " + resolved.getClass());
			}
		}
		return pattern;
	}

	private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
		Object topic = resolveExpression(topicPartition.topic());
		Assert.state(topic instanceof String,
				() -> "topic in @TopicPartition must resolve to a String, not " + topic.getClass());
		Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
		String[] partitions = topicPartition.partitions();
		PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
		Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
				() -> "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
		List<TopicPartitionOffset> result = new ArrayList<>();
		for (String partition : partitions) {
			resolvePartitionAsInteger((String) topic, resolveExpression(partition), result, null, false, false);
		}
		if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) {
			result.forEach(tpo -> {
				tpo.setOffset(resolveInitialOffset(tpo.getTopic(), partitionOffsets[0]));
				tpo.setRelativeToCurrent(isRelative(tpo.getTopic(), partitionOffsets[0]));
			});
		}
		else {
			for (PartitionOffset partitionOffset : partitionOffsets) {
				Assert.isTrue(!partitionOffset.partition().equals("*"), () ->
						"Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result);
				resolvePartitionAsInteger((String) topic, resolveExpression(partitionOffset.partition()), result,
						resolveInitialOffset(topic, partitionOffset), isRelative(topic, partitionOffset), true);
			}
		}
		Assert.isTrue(result.size() > 0, () -> "At least one partition required for " + topic);
		return result;
	}

	private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {
		Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
		Long initialOffset;
		if (initialOffsetValue instanceof String) {
			Assert.state(StringUtils.hasText((String) initialOffsetValue),
					() -> "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
			initialOffset = Long.valueOf((String) initialOffsetValue);
		}
		else if (initialOffsetValue instanceof Long) {
			initialOffset = (Long) initialOffsetValue;
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
					topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
		}
		return initialOffset;
	}

	private boolean isRelative(Object topic, PartitionOffset partitionOffset) {
		Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
		Boolean relativeToCurrent;
		if (relativeToCurrentValue instanceof String) {
			relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
		}
		else if (relativeToCurrentValue instanceof Boolean) {
			relativeToCurrent = (Boolean) relativeToCurrentValue;
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
					topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
		}
		return relativeToCurrent;
	}

	@SuppressWarnings(UNCHECKED)
	private void resolveAsString(Object resolvedValue, List<String> result) {
		if (resolvedValue instanceof String[]) {
			for (Object object : (String[]) resolvedValue) {
				resolveAsString(object, result);
			}
		}
		else if (resolvedValue instanceof String) {
			result.add((String) resolvedValue);
		}
		else if (resolvedValue instanceof Iterable) {
			for (Object object : (Iterable<Object>) resolvedValue) {
				resolveAsString(object, result);
			}
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@KafKaListener can't resolve '%s' as a String", resolvedValue));
		}
	}

	@SuppressWarnings(UNCHECKED)
	private void resolvePartitionAsInteger(String topic, Object resolvedValue,
			List<TopicPartitionOffset> result, @Nullable Long offset, boolean isRelative, boolean checkDups) {

		if (resolvedValue instanceof String[]) {
			for (Object object : (String[]) resolvedValue) {
				resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
			}
		}
		else if (resolvedValue instanceof String) {
			Assert.state(StringUtils.hasText((String) resolvedValue),
					() -> "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
			List<TopicPartitionOffset> collected = parsePartitions((String) resolvedValue)
					.map(part -> new TopicPartitionOffset(topic, part, offset, isRelative))
					.collect(Collectors.toList());
			if (checkDups) {
				collected.forEach(tpo -> {
					Assert.state(!result.contains(tpo), () ->
							String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
									tpo));
				});
			}
			result.addAll(collected);
		}
		else if (resolvedValue instanceof Integer[]) {
			for (Integer partition : (Integer[]) resolvedValue) {
				result.add(new TopicPartitionOffset(topic, partition));
			}
		}
		else if (resolvedValue instanceof Integer) {
			result.add(new TopicPartitionOffset(topic, (Integer) resolvedValue));
		}
		else if (resolvedValue instanceof Iterable) {
			for (Object object : (Iterable<Object>) resolvedValue) {
				resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
			}
		}
		else {
			throw new IllegalArgumentException(String.format(
					"@KafKaListener for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
		}
	}

	private String resolveExpressionAsString(String value, String attribute) {
		Object resolved = resolveExpression(value);
		if (resolved instanceof String) {
			return (String) resolved;
		}
		else if (resolved != null) {
			throw new IllegalStateException(THE_LEFT + attribute + "] must resolve to a String. "
					+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
		}
		return null;
	}

	@Nullable
	private byte[] resolveExpressionAsBytes(String value, String attribute) {
		Object resolved = resolveExpression(value);
		if (resolved instanceof String) {
			if (StringUtils.hasText((CharSequence) resolved)) {
				return ((String) resolved).getBytes(this.charset);
			}
		}
		else if (resolved instanceof byte[]) {
			return (byte[]) resolved;
		}
		else if (resolved != null) {
			throw new IllegalStateException(THE_LEFT + attribute + "] must resolve to a String or byte[]. "
					+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
		}
		return null;
	}

	private Integer resolveExpressionAsInteger(String value, String attribute) {
		Object resolved = resolveExpression(value);
		Integer result = null;
		if (resolved instanceof String) {
			result = Integer.parseInt((String) resolved);
		}
		else if (resolved instanceof Number) {
			result = ((Number) resolved).intValue();
		}
		else if (resolved != null) {
			throw new IllegalStateException(
					THE_LEFT + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
							+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
		}
		return result;
	}

	private Boolean resolveExpressionAsBoolean(String value, String attribute) {
		Object resolved = resolveExpression(value);
		Boolean result = null;
		if (resolved instanceof Boolean) {
			result = (Boolean) resolved;
		}
		else if (resolved instanceof String) {
			result = Boolean.parseBoolean((String) resolved);
		}
		else if (resolved != null) {
			throw new IllegalStateException(
					THE_LEFT + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
							+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
		}
		return result;
	}

	private Object resolveExpression(String value) {
		return this.resolver.evaluate(resolve(value), this.expressionContext);
	}

	/**
	 * Resolve the specified value if possible.
	 * @param value the value to resolve
	 * @return the resolved value
	 * @see ConfigurableBeanFactory#resolveEmbeddedValue
	 */
	private String resolve(String value) {
		if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
			return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
		}
		return value;
	}

	private void addFormatters(FormatterRegistry registry) {
		for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
			registry.addConverter(converter);
		}
		for (ConverterFactory<?, ?> converter : getBeansOfType(ConverterFactory.class)) {
			registry.addConverterFactory(converter);
		}
		for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
			registry.addConverter(converter);
		}
		for (Formatter<?> formatter : getBeansOfType(Formatter.class)) {
			registry.addFormatter(formatter);
		}
	}

	private <T> Collection<T> getBeansOfType(Class<T> type) {
		if (KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {
			return ((ListableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory)
					.getBeansOfType(type)
					.values();
		}
		else {
			return Collections.emptySet();
		}
	}

	/**
	 * Parse a list of partitions into a {@link List}. Example: "0-5,10-15".
	 * @param partsString the comma-delimited list of partitions/ranges.
	 * @return the stream of partition numbers, sorted and de-duplicated.
	 * @since 2.6.4
	 */
	private Stream<Integer> parsePartitions(String partsString) {
		String[] partsStrings = partsString.split(",");
		if (partsStrings.length == 1 && !partsStrings[0].contains("-")) {
			return Stream.of(Integer.parseInt(partsStrings[0].trim()));
		}
		List<Integer> parts = new ArrayList<>();
		for (String part : partsStrings) {
			if (part.contains("-")) {
				String[] startEnd = part.split("-");
				Assert.state(startEnd.length == 2, "Only one hyphen allowed for a range of partitions: " + part);
				int start = Integer.parseInt(startEnd[0].trim());
				int end = Integer.parseInt(startEnd[1].trim());
				Assert.state(end >= start, "Invalid range: " + part);
				for (int i = start; i <= end; i++) {
					parts.add(i);
				}
			}
			else {
				parsePartitions(part).forEach(p -> parts.add(p));
			}
		}
		return parts.stream()
				.sorted()
				.distinct();
	}

	/**
	 * An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying
	 * instance to use. Useful if the factory to use is determined once the endpoints
	 * have been registered but not created yet.
	 * @see KafkaListenerEndpointRegistrar#setMessageHandlerMethodFactory
	 */
	private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {

		private final DefaultFormattingConversionService defaultFormattingConversionService =
				new DefaultFormattingConversionService();

		private MessageHandlerMethodFactory handlerMethodFactory;

		public void setHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
			this.handlerMethodFactory = kafkaHandlerMethodFactory1;
		}

		@Override
		public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
			return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
		}

		private MessageHandlerMethodFactory getHandlerMethodFactory() {
			if (this.handlerMethodFactory == null) {
				this.handlerMethodFactory = createDefaultMessageHandlerMethodFactory();
			}
			return this.handlerMethodFactory;
		}

		private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
			DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
			Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
			if (validator != null) {
				defaultFactory.setValidator(validator);
			}
			defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
			this.defaultFormattingConversionService.addConverter(
					new BytesToStringConverter(KafkaListenerAnnotationBeanPostProcessor.this.charset));
			this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());
			defaultFactory.setConversionService(this.defaultFormattingConversionService);
			GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
			defaultFactory.setMessageConverter(messageConverter);

			List<HandlerMethodArgumentResolver> customArgumentsResolver =
					new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
			// Has to be at the end - look at PayloadMethodArgumentResolver documentation
			customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator));
			defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);

			defaultFactory.afterPropertiesSet();

			return defaultFactory;
		}

	}

	private static class BytesToStringConverter implements Converter<byte[], String> {


		private final Charset charset;

		BytesToStringConverter(Charset charset) {
			this.charset = charset;
		}

		@Override
		public String convert(byte[] source) {
			return new String(source, this.charset);
		}

	}

	static class ListenerScope implements Scope {

		private final Map<String, Object> listeners = new HashMap<>();

		ListenerScope() {
		}

		public void addListener(String key, Object bean) {
			this.listeners.put(key, bean);
		}

		public void removeListener(String key) {
			this.listeners.remove(key);
		}

		@Override
		public Object get(String name, ObjectFactory<?> objectFactory) {
			return this.listeners.get(name);
		}

		@Override
		public Object remove(String name) {
			return null;
		}

		@Override
		public void registerDestructionCallback(String name, Runnable callback) {
		}

		@Override
		public Object resolveContextualObject(String key) {
			return this.listeners.get(key);
		}

		@Override
		public String getConversationId() {
			return null;
		}

	}

	/**
	 * Post processes each set of annotation attributes.
	 *
	 * @since 2.7.2
	 *
	 */
	public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>> {

	}

	private final class BytesToNumberConverter implements ConditionalGenericConverter {

		BytesToNumberConverter() {
		}

		@Override
		@Nullable
		public Set<ConvertiblePair> getConvertibleTypes() {
			HashSet<ConvertiblePair> pairs = new HashSet<>();
			pairs.add(new ConvertiblePair(byte[].class, long.class));
			pairs.add(new ConvertiblePair(byte[].class, int.class));
			pairs.add(new ConvertiblePair(byte[].class, short.class));
			pairs.add(new ConvertiblePair(byte[].class, byte.class));
			pairs.add(new ConvertiblePair(byte[].class, Long.class));
			pairs.add(new ConvertiblePair(byte[].class, Integer.class));
			pairs.add(new ConvertiblePair(byte[].class, Short.class));
			pairs.add(new ConvertiblePair(byte[].class, Byte.class));
			return pairs;
		}

		@Override
		@Nullable
		public Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
			byte[] bytes = (byte[]) source;
			if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {
				Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONAR
				return ByteBuffer.wrap(bytes).getLong();
			}
			else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {
				Assert.state(bytes.length >= 4, "At least 4 bytes needed to convert a byte[] to an integer"); // NOSONAR
				return ByteBuffer.wrap(bytes).getInt();
			}
			else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {
				Assert.state(bytes.length >= 2, "At least 2 bytes needed to convert a byte[] to a short");
				return ByteBuffer.wrap(bytes).getShort();
			}
			else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {
				Assert.state(bytes.length >= 1, "At least 1 byte needed to convert a byte[] to a byte");
				return ByteBuffer.wrap(bytes).get();
			}
			return null;
		}

		@Override
		public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {
			if (sourceType.getType().equals(byte[].class)) {
				Class<?> target = targetType.getType();
				return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR
						|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)
						|| target.equals(Short.class) || target.equals(Byte.class);
			}
			else {
				return false;
			}
		}

	}

}

相关信息

spring-kafka 源码目录

相关文章

spring-kafka DltHandler 源码

spring-kafka EnableKafka 源码

spring-kafka EnableKafkaRetryTopic 源码

spring-kafka EnableKafkaStreams 源码

spring-kafka KafkaBootstrapConfiguration 源码

spring-kafka KafkaHandler 源码

spring-kafka KafkaListener 源码

spring-kafka KafkaListenerConfigurationSelector 源码

spring-kafka KafkaListenerConfigurer 源码

spring-kafka KafkaListeners 源码

0  赞