spring-kafka ListenerContainerFactoryResolver 源码

  • 2022-08-16
  • 浏览 (454)

spring-kafka ListenerContainerFactoryResolver 代码

文件路径:/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryResolver.java

/*
 * Copyright 2018-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.retrytopic;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;

/**
 *
 * Resolves a {@link ConcurrentKafkaListenerContainerFactory} to be used by the
 * {@link RetryTopicConfiguration}.
 *
 * @author Tomaz Fernandes
 * @since 2.7
 *
 * @see ListenerContainerFactoryConfigurer
 *
 */
public class ListenerContainerFactoryResolver {

	private final BeanFactory beanFactory;

	private final List<FactoryResolver> mainEndpointResolvers;

	private final List<FactoryResolver> retryEndpointResolvers;

	private final Cache mainEndpointCache;

	private final Cache retryEndpointCache;

	public ListenerContainerFactoryResolver(BeanFactory beanFactory) {
		this.beanFactory = beanFactory;
		this.mainEndpointCache = new Cache();
		this.retryEndpointCache = new Cache();

		this.mainEndpointResolvers = Arrays.asList(
				this.mainEndpointCache::fromCache,
				(fromKafkaListenerAnnotation, configuration) -> fromKafkaListenerAnnotation,
				(fromKLAnnotation, configuration) -> configuration.factoryFromRetryTopicConfiguration,
				(fromKLAnnotation, configuration) -> fromBeanName(configuration.listenerContainerFactoryName),
				(fromKLAnnotation, configuration) ->
						fromBeanName("internalRetryTopicListenerContainerFactory"),
				(fromKLAnnotation, configuration) ->
						fromBeanName(RetryTopicBeanNames.DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME));

		this.retryEndpointResolvers = Arrays.asList(
				this.retryEndpointCache::fromCache,
				(fromKLAnnotation, configuration) -> configuration.factoryFromRetryTopicConfiguration,
				(fromKLAnnotation, configuration) -> fromBeanName(configuration.listenerContainerFactoryName),
				(fromKLAnnotation, configuration) -> fromKLAnnotation,
				(fromKLAnnotation, configuration) ->
						fromBeanName("internalRetryTopicListenerContainerFactory"),
				(fromKLAnnotation, configuration) ->
						fromBeanName(RetryTopicBeanNames.DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME));
	}

	ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactoryForMainEndpoint(
			@Nullable KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
			String defaultContainerFactoryBeanName,
			Configuration config) {
		KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
				getFactoryFromKLA(factoryFromKafkaListenerAnnotationInstance, defaultContainerFactoryBeanName);
		ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = resolveFactory(this.mainEndpointResolvers,
				factoryFromKafkaListenerAnnotation, config);
		return this.mainEndpointCache.addIfAbsent(factoryFromKafkaListenerAnnotation, config, resolvedFactory);
	}

	ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactoryForRetryEndpoint(
			@Nullable KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
			String defaultContainerFactoryBeanName,
			Configuration config) {
		KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
				getFactoryFromKLA(factoryFromKafkaListenerAnnotationInstance, defaultContainerFactoryBeanName);
		ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = resolveFactory(this.retryEndpointResolvers,
				factoryFromKafkaListenerAnnotation, config);
		return this.retryEndpointCache.addIfAbsent(factoryFromKafkaListenerAnnotation, config, resolvedFactory);
	}

	@Nullable
	private KafkaListenerContainerFactory<?> getFactoryFromKLA(KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
															String defaultContainerFactoryBeanName) {
		KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
				factoryFromKafkaListenerAnnotationInstance;
		if (factoryFromKafkaListenerAnnotation == null) {
			factoryFromKafkaListenerAnnotation = fromBeanName(defaultContainerFactoryBeanName);
		}
		return factoryFromKafkaListenerAnnotation;
	}

	private ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactory(List<FactoryResolver> factoryResolvers,
			KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation,
			Configuration config) {

		ConcurrentKafkaListenerContainerFactory<?, ?> verifiedFactoryFromKafkaListenerAnnotation = verifyClass(
				factoryFromKafkaListenerAnnotation);
		return factoryResolvers
				.stream()
				.map(resolver -> Optional.ofNullable(
						resolver.resolveFactory(verifiedFactoryFromKafkaListenerAnnotation, config)))
				.filter(Optional::isPresent)
				.map(Optional::get)
				.findFirst()
				.orElseThrow(() -> new IllegalArgumentException("Could not resolve a viable " +
						"ConcurrentKafkaListenerContainerFactory to configure the retry topic. " +
						"Try creating a bean with name " +
						RetryTopicBeanNames.DEFAULT_LISTENER_CONTAINER_FACTORY_BEAN_NAME));
	}

	@Nullable
	private ConcurrentKafkaListenerContainerFactory<?, ?> verifyClass(KafkaListenerContainerFactory<?> fromKafkaListenerAnnotationFactory) {
		return fromKafkaListenerAnnotationFactory != null
				&& ConcurrentKafkaListenerContainerFactory.class.isAssignableFrom(fromKafkaListenerAnnotationFactory.getClass())
				? (ConcurrentKafkaListenerContainerFactory<?, ?>) fromKafkaListenerAnnotationFactory
				: null;
	}

	@Nullable
	private ConcurrentKafkaListenerContainerFactory<?, ?> fromBeanName(String factoryBeanName) {
		try {
			return StringUtils.hasText(factoryBeanName)
					? this.beanFactory.getBean(factoryBeanName, ConcurrentKafkaListenerContainerFactory.class)
					: null;
		}
		catch (NoSuchBeanDefinitionException ex) {
			return null;
		}
	}

	private interface FactoryResolver {
		ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactory(ConcurrentKafkaListenerContainerFactory<?, ?> candidate,
																	Configuration configuration);
	}

	static class Configuration {

		private final ConcurrentKafkaListenerContainerFactory<?, ?> factoryFromRetryTopicConfiguration;

		private final String listenerContainerFactoryName;

		Configuration(ConcurrentKafkaListenerContainerFactory<?, ?> factoryFromRetryTopicConfiguration,
					String listenerContainerFactoryName) {
			this.factoryFromRetryTopicConfiguration = factoryFromRetryTopicConfiguration;
			this.listenerContainerFactoryName = listenerContainerFactoryName;
		}

		@Override
		public boolean equals(Object o) {
			if (this == o) {
				return true;
			}
			if (o == null || getClass() != o.getClass()) {
				return false;
			}
			Configuration that = (Configuration) o;
			return Objects.equals(this.factoryFromRetryTopicConfiguration, that.factoryFromRetryTopicConfiguration)
					&& Objects.equals(this.listenerContainerFactoryName, that.listenerContainerFactoryName);
		}

		@Override
		public int hashCode() {
			return Objects.hash(this.factoryFromRetryTopicConfiguration, this.listenerContainerFactoryName);
		}

	}

	static class Cache {

		private final Map<Key, ConcurrentKafkaListenerContainerFactory<?, ?>> cacheMap;

		Cache() {
			this.cacheMap = new HashMap<>();
		}

		ConcurrentKafkaListenerContainerFactory<?, ?> addIfAbsent(KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation,
																Configuration config,
																ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory) {
			synchronized (this.cacheMap) {
				Key key = cacheKey(factoryFromKafkaListenerAnnotation, config);
				if (!this.cacheMap.containsKey(key)) {
					this.cacheMap.put(key, resolvedFactory);
				}
				return resolvedFactory;
			}
		}

		ConcurrentKafkaListenerContainerFactory<?, ?> fromCache(KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation,
																Configuration config) {
			synchronized (this.cacheMap) {
				return this.cacheMap.get(cacheKey(factoryFromKafkaListenerAnnotation, config));
			}
		}

		private Key cacheKey(KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation, Configuration config) {
			return new Key(factoryFromKafkaListenerAnnotation, config);
		}

		static class Key {

			private final KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation;

			private final Configuration config;

			Key(KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation, Configuration config) {
				this.factoryFromKafkaListenerAnnotation = factoryFromKafkaListenerAnnotation;
				this.config = config;
			}

			@Override
			public boolean equals(Object o) {
				if (this == o) {
					return true;
				}
				if (o == null || getClass() != o.getClass()) {
					return false;
				}
				Key key = (Key) o;
				return Objects.equals(this.factoryFromKafkaListenerAnnotation, key.factoryFromKafkaListenerAnnotation)
						&& Objects.equals(this.config, key.config);
			}

			@Override
			public int hashCode() {
				return Objects.hash(this.factoryFromKafkaListenerAnnotation, this.config);
			}
		}

	}

}

相关信息

spring-kafka 源码目录

相关文章

spring-kafka BackOffValuesGenerator 源码

spring-kafka DeadLetterPublishingRecovererFactory 源码

spring-kafka DefaultDestinationTopicProcessor 源码

spring-kafka DefaultDestinationTopicResolver 源码

spring-kafka DestinationTopic 源码

spring-kafka DestinationTopicContainer 源码

spring-kafka DestinationTopicProcessor 源码

spring-kafka DestinationTopicPropertiesFactory 源码

spring-kafka DestinationTopicResolver 源码

spring-kafka DltStrategy 源码

0  赞