spring-kafka DeadLetterPublishingRecovererFactory 源码

  • 2022-08-16
  • 浏览 (528)

spring-kafka DeadLetterPublishingRecovererFactory 代码

文件路径:/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

/*
 * Copyright 2018-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.retrytopic;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

import org.springframework.core.NestedRuntimeException;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekUtils;
import org.springframework.kafka.listener.TimestampedException;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.util.Assert;

/**
 *
 * Creates and configures the {@link DeadLetterPublishingRecoverer} that will be used to
 * forward the messages using the {@link DestinationTopicResolver}.
 *
 * @author Tomaz Fernandes
 * @author Gary Russell
 * @since 2.7
 *
 */
public class DeadLetterPublishingRecovererFactory {

	private static final LogAccessor LOGGER =
			new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecovererFactory.class));

	private final DestinationTopicResolver destinationTopicResolver;

	private final Set<Class<? extends Exception>> fatalExceptions = new LinkedHashSet<>();

	private final Set<Class<? extends Exception>> nonFatalExceptions = new HashSet<>();

	private Consumer<DeadLetterPublishingRecoverer> recovererCustomizer = recoverer -> { };

	private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction;

	private ListenerExceptionLoggingStrategy loggingStrategy = ListenerExceptionLoggingStrategy.AFTER_RETRIES_EXHAUSTED;

	public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
		this.destinationTopicResolver = destinationTopicResolver;
	}

	/**
	 * Set a function that creates additional headers for the output record, in addition to the standard
	 * retry headers added by this factory.
	 * @param headersFunction the function.
	 * @since 2.8.4
	 */
	public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction) {
		this.headersFunction = headersFunction;
	}

	/**
	 * Add exception type to the default list. By default, the following exceptions will
	 * not be retried:
	 * <ul>
	 * <li>{@link org.springframework.kafka.support.serializer.DeserializationException}</li>
	 * <li>{@link org.springframework.messaging.converter.MessageConversionException}</li>
	 * <li>{@link org.springframework.kafka.support.converter.ConversionException}</li>
	 * <li>{@link org.springframework.messaging.handler.invocation.MethodArgumentResolutionException}</li>
	 * <li>{@link NoSuchMethodException}</li>
	 * <li>{@link ClassCastException}</li>
	 * </ul>
	 * All others will be retried.
	 * @param exceptionType the exception type.
	 * @since 2.8
	 * @see #removeNotRetryableException(Class)
	 */
	public final void addNotRetryableException(Class<? extends Exception> exceptionType) {
		Assert.notNull(exceptionType, "'exceptionType' cannot be null");
		this.fatalExceptions.add(exceptionType);
	}

	/**
	 * Remove an exception type from the configured list. By default, the following
	 * exceptions will not be retried:
	 * <ul>
	 * <li>{@link org.springframework.kafka.support.serializer.DeserializationException}</li>
	 * <li>{@link org.springframework.messaging.converter.MessageConversionException}</li>
	 * <li>{@link org.springframework.kafka.support.converter.ConversionException}</li>
	 * <li>{@link org.springframework.messaging.handler.invocation.MethodArgumentResolutionException}</li>
	 * <li>{@link NoSuchMethodException}</li>
	 * <li>{@link ClassCastException}</li>
	 * </ul>
	 * All others will be retried.
	 * @param exceptionType the exception type.
	 * @return true if the removal was successful.
	 * @see #addNotRetryableException(Class)
	 */
	public boolean removeNotRetryableException(Class<? extends Exception> exceptionType) {
		return this.nonFatalExceptions.add(exceptionType);
	}

	/**
	 * Never logs the listener exception.
	 * The default is logging only after retries are exhausted.
	 * @since 2.7.13
	 */
	public void neverLogListenerException() {
		this.loggingStrategy = ListenerExceptionLoggingStrategy.NEVER;
	}

	/**
	 * Logs the listener exception at each attempt.
	 * The default is logging only after retries are exhausted.
	 * @since 2.7.13
	 */
	public void alwaysLogListenerException() {
		this.loggingStrategy = ListenerExceptionLoggingStrategy.EACH_ATTEMPT;
	}

	@SuppressWarnings("unchecked")
	public DeadLetterPublishingRecoverer create() {
		DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(// NOSONAR anon. class size
				this::resolveTemplate,
				false, (this::resolveDestination)) {

			@Override
			protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
				return DeadLetterPublishingRecoverer.HeaderNames.Builder
						.original()
							.offsetHeader(KafkaHeaders.ORIGINAL_OFFSET)
							.timestampHeader(KafkaHeaders.ORIGINAL_TIMESTAMP)
							.timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE)
							.topicHeader(KafkaHeaders.ORIGINAL_TOPIC)
							.partitionHeader(KafkaHeaders.ORIGINAL_PARTITION)
							.consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP)
						.exception()
							.keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN)
							.exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN)
							.exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN)
							.keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE)
							.exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE)
							.keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE)
							.exceptionStacktrace(KafkaHeaders.EXCEPTION_STACKTRACE)
						.build();
			}
		};

		recoverer.setHeadersFunction((consumerRecord, e) -> addHeaders(consumerRecord, e, getAttempts(consumerRecord)));
		if (this.headersFunction != null) {
			recoverer.addHeadersFunction(this.headersFunction);
		}
		recoverer.setFailIfSendResultIsError(true);
		recoverer.setAppendOriginalHeaders(false);
		recoverer.setThrowIfNoDestinationReturned(false);
		recoverer.setSkipSameTopicFatalExceptions(false);
		this.recovererCustomizer.accept(recoverer);
		this.fatalExceptions.forEach(recoverer::addNotRetryableExceptions);
		this.nonFatalExceptions.forEach(recoverer::removeClassification);
		return recoverer;
	}

	private KafkaOperations<?, ?> resolveTemplate(ProducerRecord<?, ?> outRecord) {
		return this.destinationTopicResolver
						.getDestinationTopicByName(outRecord.topic())
						.getKafkaOperations();
	}

	public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> customizer) {
		this.recovererCustomizer = customizer;
	}

	private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e) {
		if (SeekUtils.isBackoffException(e)) {
			throw (NestedRuntimeException) e; // Necessary to not commit the offset and seek to current again
		}

		DestinationTopic nextDestination = this.destinationTopicResolver.resolveDestinationTopic(
				cr.topic(), getAttempts(cr), e, getOriginalTimestampHeaderLong(cr));

		LOGGER.debug(() -> "Resolved topic: " + (nextDestination.isNoOpsTopic()
				? "none"
				: nextDestination.getDestinationName()));

		maybeLogListenerException(e, cr, nextDestination);

		return nextDestination.isNoOpsTopic()
					? null
					: resolveTopicPartition(cr, nextDestination);
	}

	private void maybeLogListenerException(Exception e, ConsumerRecord<?, ?> cr, DestinationTopic nextDestination) {
		if (nextDestination.isDltTopic()
				&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
			LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. "
					+ "Sending to DLT with name " + nextDestination.getDestinationName() + ".");
		}
		else if (nextDestination.isNoOpsTopic()
				&& !ListenerExceptionLoggingStrategy.NEVER.equals(this.loggingStrategy)) {
			LOGGER.error(e, () -> getErrorMessage(cr) + " and won't be retried. "
					+ "No further action will be taken with this record.");
		}
		else if (ListenerExceptionLoggingStrategy.EACH_ATTEMPT.equals(this.loggingStrategy)) {
			LOGGER.error(e, () -> getErrorMessage(cr) + ". "
					+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
		}
		else {
			LOGGER.debug(e, () -> getErrorMessage(cr) + ". "
					+ "Sending to retry topic " + nextDestination.getDestinationName() + ".");
		}
	}

	private static String getErrorMessage(ConsumerRecord<?, ?> cr) {
		return "Record: " + getRecordInfo(cr) + " threw an error at topic " + cr.topic();
	}

	private static String getRecordInfo(ConsumerRecord<?, ?> cr) {
		Header originalTopicHeader = cr.headers().lastHeader(KafkaHeaders.ORIGINAL_TOPIC);
		return String.format("topic = %s, partition = %s, offset = %s, main topic = %s",
				cr.topic(), cr.partition(), cr.offset(),
				originalTopicHeader != null ? new String(originalTopicHeader.value()) : cr.topic());
	}

	/**
	 * Creates and returns the {@link TopicPartition}, where the original record should be forwarded.
	 * By default, it will use the partition same as original record's partition, in the next destination topic.
	 *
	 * <p>{@link DeadLetterPublishingRecoverer#checkPartition} has logic to check whether that partition exists,
	 * and if it doesn't it sets -1, to allow the Producer itself to assign a partition to the record.</p>
	 *
	 * <p>Subclasses can inherit from this method to override the implementation, if necessary.</p>
	 *
	 * @param cr The original {@link ConsumerRecord}, which is to be forwarded to DLT
	 * @param nextDestination The next {@link DestinationTopic}, where the consumerRecord is to be forwarded
	 * @return An instance of {@link TopicPartition}, specifying the topic and partition, where the cr is to be sent
	 */
	protected TopicPartition resolveTopicPartition(final ConsumerRecord<?, ?> cr, final DestinationTopic nextDestination) {
		return new TopicPartition(nextDestination.getDestinationName(), cr.partition());
	}

	private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
		Header header = consumerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
		if (header != null) {
			byte[] value = header.value();
			if (value.length == Byte.BYTES) { // backwards compatibility
				return value[0];
			}
			else if (value.length == Integer.BYTES) {
				return ByteBuffer.wrap(value).getInt();
			}
			else {
				LOGGER.debug(() -> "Unexected size for " + RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS + " header: "
						+ value.length);
			}
		}
		return 1;
	}

	private Headers addHeaders(ConsumerRecord<?, ?> consumerRecord, Exception e, int attempts) {
		Headers headers = new RecordHeaders();
		byte[] originalTimestampHeader = getOriginalTimestampHeaderBytes(consumerRecord);
		headers.add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampHeader);
		headers.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS,
				ByteBuffer.wrap(new byte[Integer.BYTES]).putInt(attempts + 1).array());
		headers.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP,
				BigInteger.valueOf(getNextExecutionTimestamp(consumerRecord, e, originalTimestampHeader))
						.toByteArray());
		return headers;
	}

	private long getNextExecutionTimestamp(ConsumerRecord<?, ?> consumerRecord, Exception e,
			byte[] originalTimestampHeader) {

		long originalTimestamp = new BigInteger(originalTimestampHeader).longValue();
		long failureTimestamp = getFailureTimestamp(e);
		long nextExecutionTimestamp =  failureTimestamp + this.destinationTopicResolver
				.resolveDestinationTopic(consumerRecord.topic(), getAttempts(consumerRecord), e, originalTimestamp)
				.getDestinationDelay();
		LOGGER.debug(() -> String.format("FailureTimestamp: %s, Original timestamp: %s, nextExecutionTimestamp: %s",
				failureTimestamp, originalTimestamp, nextExecutionTimestamp));
		return nextExecutionTimestamp;
	}

	private long getFailureTimestamp(Exception e) {
		return e instanceof NestedRuntimeException && ((NestedRuntimeException) e).contains(TimestampedException.class)
					? getTimestampedException(e).getTimestamp()
					: Instant.now().toEpochMilli();
	}

	private TimestampedException getTimestampedException(Throwable e) {
		if (e == null) {
			throw new IllegalArgumentException("Provided exception does not contain a "
					+ TimestampedException.class.getSimpleName() + " cause.");
		}
		return e.getClass().isAssignableFrom(TimestampedException.class)
				? (TimestampedException) e
				: getTimestampedException(e.getCause());
	}

	private byte[] getOriginalTimestampHeaderBytes(ConsumerRecord<?, ?> consumerRecord) {
		Header currentOriginalTimestampHeader = getOriginaTimeStampHeader(consumerRecord);
		return currentOriginalTimestampHeader != null
				? currentOriginalTimestampHeader.value()
				: BigInteger.valueOf(consumerRecord.timestamp()).toByteArray();
	}

	private long getOriginalTimestampHeaderLong(ConsumerRecord<?, ?> consumerRecord) {
		Header currentOriginalTimestampHeader = getOriginaTimeStampHeader(consumerRecord);
		return currentOriginalTimestampHeader != null
				? new BigInteger(currentOriginalTimestampHeader.value()).longValue()
				: consumerRecord.timestamp();
	}

	private Header getOriginaTimeStampHeader(ConsumerRecord<?, ?> consumerRecord) {
		return consumerRecord.headers()
					.lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
	}

	private enum ListenerExceptionLoggingStrategy {

		/**
		 * Never log the listener exception.
		 */
		NEVER,

		/**
		 * Log the listener exception after each attempt.
		 */
		EACH_ATTEMPT,

		/**
		 * Log the listener only after retries are exhausted.
		 */
		AFTER_RETRIES_EXHAUSTED

	}
}

相关信息

spring-kafka 源码目录

相关文章

spring-kafka BackOffValuesGenerator 源码

spring-kafka DefaultDestinationTopicProcessor 源码

spring-kafka DefaultDestinationTopicResolver 源码

spring-kafka DestinationTopic 源码

spring-kafka DestinationTopicContainer 源码

spring-kafka DestinationTopicProcessor 源码

spring-kafka DestinationTopicPropertiesFactory 源码

spring-kafka DestinationTopicResolver 源码

spring-kafka DltStrategy 源码

spring-kafka EndpointCustomizer 源码

0  赞