spring-kafka StreamsBuilderFactoryBean 源码

  • 2022-08-16
  • 浏览 (396)

spring-kafka StreamsBuilderFactoryBean 代码

文件路径:/spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

/*
 * Copyright 2017-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.config;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
 * An {@link AbstractFactoryBean} for the {@link StreamsBuilder} instance
 * and lifecycle control for the internal {@link KafkaStreams} instance.
 * <p>
 * A fine grained control on {@link KafkaStreams} can be achieved by
 * {@link KafkaStreamsCustomizer}s.
 *
 * @author Artem Bilan
 * @author Ivan Ursul
 * @author Soby Chacko
 * @author Zach Olauson
 * @author Nurettin Yilmaz
 * @author Denis Washington
 * @author Gary Russell
 *
 * @since 1.1.4
 */
public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilder>
		implements SmartLifecycle, BeanNameAware {

	/**
	 * The default {@link Duration} of {@code 10 seconds} for close timeout.
	 * @see KafkaStreams#close(Duration)
	 */
	public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(10);

	private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(StreamsBuilderFactoryBean.class));

	private static final String STREAMS_CONFIG_MUST_NOT_BE_NULL = "'streamsConfig' must not be null";

	private static final String CLEANUP_CONFIG_MUST_NOT_BE_NULL = "'cleanupConfig' must not be null";

	private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();

	private Properties properties;

	private CleanupConfig cleanupConfig;

	private final List<Listener> listeners = new ArrayList<>();

	private KafkaStreamsInfrastructureCustomizer infrastructureCustomizer = new KafkaStreamsInfrastructureCustomizer() {
	};

	private KafkaStreamsCustomizer kafkaStreamsCustomizer;

	private KafkaStreams.StateListener stateListener;

	private StateRestoreListener stateRestoreListener;

	private StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler;

	private boolean autoStartup = true;

	private int phase = Integer.MAX_VALUE - 1000; // NOSONAR magic #

	private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;

	private KafkaStreams kafkaStreams;

	private volatile boolean running;

	private Topology topology;

	private String beanName;

	/**
	 * Default constructor that creates the factory without configuration
	 * {@link Properties}. It is the factory user's responsibility to properly set
	 * {@link Properties} using
	 * {@link StreamsBuilderFactoryBean#setStreamsConfiguration(Properties)}.
	 * @since 2.1.3.
	 */
	public StreamsBuilderFactoryBean() {
		this.cleanupConfig = new CleanupConfig();
	}

	/**
	 * Construct an instance with the supplied streams configuration and
	 * clean up configuration.
	 * @param streamsConfig the streams configuration.
	 * @param cleanupConfig the cleanup configuration.
	 * @since 2.2
	 */
	public StreamsBuilderFactoryBean(KafkaStreamsConfiguration streamsConfig, CleanupConfig cleanupConfig) {
		Assert.notNull(streamsConfig, STREAMS_CONFIG_MUST_NOT_BE_NULL);
		Assert.notNull(cleanupConfig, CLEANUP_CONFIG_MUST_NOT_BE_NULL);
		this.properties = streamsConfig.asProperties();
		this.cleanupConfig = cleanupConfig;
	}

	/**
	 * Construct an instance with the supplied streams configuration.
	 * @param streamsConfig the streams configuration.
	 * @since 2.2
	 */
	public StreamsBuilderFactoryBean(KafkaStreamsConfiguration streamsConfig) {
		this(streamsConfig, new CleanupConfig());
	}

	@Override
	public synchronized void setBeanName(String name) {
		this.beanName = name;
	}

	/**
	 * Set the streams configuration {@link Properties} on this factory.
	 * @param streamsConfig the streams configuration.
	 * @since 2.2
	 */
	public void setStreamsConfiguration(Properties streamsConfig) {
		Assert.notNull(streamsConfig, STREAMS_CONFIG_MUST_NOT_BE_NULL);
		this.properties = streamsConfig;
	}

	@Nullable
	public Properties getStreamsConfiguration() {
		return this.properties; // NOSONAR - inconsistent synchronization
	}

	public void setClientSupplier(KafkaClientSupplier clientSupplier) {
		Assert.notNull(clientSupplier, "'clientSupplier' must not be null");
		this.clientSupplier = clientSupplier; // NOSONAR (sync)
	}

	/**
	 * Set a customizer to configure the builder and/or topology before creating the stream.
	 * @param infrastructureCustomizer the customizer
	 * @since 2.4.1
	 */
	public void setInfrastructureCustomizer(KafkaStreamsInfrastructureCustomizer infrastructureCustomizer) {
		Assert.notNull(infrastructureCustomizer, "'infrastructureCustomizer' must not be null");
		this.infrastructureCustomizer = infrastructureCustomizer; // NOSONAR (sync)
	}

	/**
	 * Specify a {@link KafkaStreamsCustomizer} to customize a {@link KafkaStreams}
	 * instance during {@link #start()}.
	 * @param kafkaStreamsCustomizer the {@link KafkaStreamsCustomizer} to use.
	 * @since 2.1.5
	 */
	public void setKafkaStreamsCustomizer(KafkaStreamsCustomizer kafkaStreamsCustomizer) {
		Assert.notNull(kafkaStreamsCustomizer, "'kafkaStreamsCustomizer' must not be null");
		this.kafkaStreamsCustomizer = kafkaStreamsCustomizer; // NOSONAR (sync)
	}

	public void setStateListener(KafkaStreams.StateListener stateListener) {
		this.stateListener = stateListener; // NOSONAR (sync)
	}

	/**
	 * Set a {@link StreamsUncaughtExceptionHandler}. Supercedes
	 * {@link #setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)}.
	 * @param streamsUncaughtExceptionHandler the handler.
	 * @since 2.8
	 */
	public void setStreamsUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
		this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; // NOSONAR (sync)
	}

	/**
	 * Retrieves the current {@link StreamsUncaughtExceptionHandler} set on this factory bean.
	 * @return {@link StreamsUncaughtExceptionHandler}
	 * @since 2.8.4
	 */
	@Nullable
	public StreamsUncaughtExceptionHandler getStreamsUncaughtExceptionHandler() {
		return this.streamsUncaughtExceptionHandler;
	}

	public void setStateRestoreListener(StateRestoreListener stateRestoreListener) {
		this.stateRestoreListener = stateRestoreListener; // NOSONAR (sync)
	}

	/**
	 * Specify the timeout in seconds for the {@link KafkaStreams#close(Duration)}
	 * operation. Defaults to {@link #DEFAULT_CLOSE_TIMEOUT} seconds.
	 * @param closeTimeout the timeout for close in seconds.
	 * @see KafkaStreams#close(Duration)
	 */
	public void setCloseTimeout(int closeTimeout) {
		this.closeTimeout = Duration.ofSeconds(closeTimeout); // NOSONAR (sync)
	}

	/**
	 * Providing access to the associated {@link Topology} of this
	 * {@link StreamsBuilderFactoryBean}.
	 * @return {@link Topology} object
	 * @since 2.4.4
	 */
	@Nullable
	public Topology getTopology() {
		return this.topology;
	}

	@Override
	public Class<?> getObjectType() {
		return StreamsBuilder.class;
	}

	public void setAutoStartup(boolean autoStartup) {
		this.autoStartup = autoStartup;
	}

	public void setPhase(int phase) {
		this.phase = phase;
	}

	@Override
	public int getPhase() {
		return this.phase;
	}

	public void setCleanupConfig(CleanupConfig cleanupConfig) {
		Assert.notNull(cleanupConfig, CLEANUP_CONFIG_MUST_NOT_BE_NULL);
		this.cleanupConfig = cleanupConfig; // NOSONAR (sync)
	}

	/**
	 * Get a managed by this {@link StreamsBuilderFactoryBean} {@link KafkaStreams} instance.
	 * @return KafkaStreams managed instance;
	 * may be null if this {@link StreamsBuilderFactoryBean} hasn't been started.
	 * @since 1.1.4
	 */
	@Nullable
	public synchronized KafkaStreams getKafkaStreams() {
		return this.kafkaStreams;
	}

	/**
	 * Get the current list of listeners.
	 * @return the listeners.
	 * @since 2.5.3
	 */
	public List<Listener> getListeners() {
		return Collections.unmodifiableList(this.listeners);
	}

	/**
	 * Add a {@link Listener} which will be called after starting and stopping the
	 * streams.
	 * @param listener the listener.
	 * @since 2.5.3
	 */
	public void addListener(Listener listener) {
		Assert.notNull(listener, "'listener' cannot be null");
		this.listeners.add(listener);
	}

	/**
	 * Remove a listener.
	 * @param listener the listener.
	 * @return true if removed.
	 * @since 2.5.3
	 */
	public boolean removeListener(Listener listener) {
		return this.listeners.remove(listener);
	}

	@Override
	protected synchronized StreamsBuilder createInstance() {
		if (this.autoStartup) {
			Assert.state(this.properties != null,
					"streams configuration properties must not be null");
		}
		StreamsBuilder builder = new StreamsBuilder();
		this.infrastructureCustomizer.configureBuilder(builder);
		return builder;
	}

	@Override
	public boolean isAutoStartup() {
		return this.autoStartup;
	}

	@Override
	public void stop(Runnable callback) {
		stop();
		if (callback != null) {
			callback.run();
		}
	}

	@Override
	public synchronized void start() {
		if (!this.running) {
			try {
				Assert.state(this.properties != null,
						"streams configuration properties must not be null");
				Topology topol = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
				this.infrastructureCustomizer.configureTopology(topol);
				this.topology = topol;
				LOGGER.debug(() -> topol.describe().toString());
				this.kafkaStreams = new KafkaStreams(topol, this.properties, this.clientSupplier);
				this.kafkaStreams.setStateListener(this.stateListener);
				this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
				if (this.streamsUncaughtExceptionHandler != null) {
					this.kafkaStreams.setUncaughtExceptionHandler(this.streamsUncaughtExceptionHandler);
				}
				if (this.kafkaStreamsCustomizer != null) {
					this.kafkaStreamsCustomizer.customize(this.kafkaStreams);
				}
				if (this.cleanupConfig.cleanupOnStart()) {
					this.kafkaStreams.cleanUp();
				}
				this.kafkaStreams.start();
				for (Listener listener : this.listeners) {
					listener.streamsAdded(this.beanName, this.kafkaStreams);
				}
				this.running = true;
			}
			catch (Exception e) {
				throw new KafkaException("Could not start stream: ", e);
			}
		}
	}

	@Override
	public synchronized void stop() {
		if (this.running) {
			try {
				if (this.kafkaStreams != null) {
					this.kafkaStreams.close(this.closeTimeout);
					if (this.cleanupConfig.cleanupOnStop()) {
						this.kafkaStreams.cleanUp();
					}
					for (Listener listener : this.listeners) {
						listener.streamsRemoved(this.beanName, this.kafkaStreams);
					}
					this.kafkaStreams = null;
				}
			}
			catch (Exception e) {
				LOGGER.error(e, "Failed to stop streams");
			}
			finally {
				this.running = false;
			}
		}
	}

	@Override
	public synchronized boolean isRunning() {
		return this.running;
	}

	/**
	 * Called whenever a {@link KafkaStreams} is added or removed.
	 *
	 * @since 2.5.3
	 *
	 */
	public interface Listener {

		/**
		 * A new {@link KafkaStreams} was created.
		 * @param id the streams id (factory bean name).
		 * @param streams the streams;
		 */
		default void streamsAdded(String id, KafkaStreams streams) {
		}

		/**
		 * An existing {@link KafkaStreams} was removed.
		 * @param id the streams id (factory bean name).
		 * @param streams the streams;
		 */
		default void streamsRemoved(String id, KafkaStreams streams) {
		}

	}

}

相关信息

spring-kafka 源码目录

相关文章

spring-kafka AbstractKafkaListenerContainerFactory 源码

spring-kafka AbstractKafkaListenerEndpoint 源码

spring-kafka CompositeKafkaStreamsCustomizer 源码

spring-kafka CompositeKafkaStreamsInfrastructureCustomizer 源码

spring-kafka ConcurrentKafkaListenerContainerFactory 源码

spring-kafka ContainerCustomizer 源码

spring-kafka KafkaListenerConfigUtils 源码

spring-kafka KafkaListenerContainerFactory 源码

spring-kafka KafkaListenerEndpoint 源码

spring-kafka KafkaListenerEndpointAdapter 源码

0  赞