spring-kafka ContainerProperties 源码
spring-kafka ContainerProperties 代码
文件路径:/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java
/*
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* Contains runtime properties for a listener container.
*
* @author Gary Russell
* @author Artem Bilan
* @author Artem Yakshin
* @author Johnny Lim
* @author Lukasz Kaminski
* @author Kyuhyeok Park
*/
public class ContainerProperties extends ConsumerProperties {
/**
* The offset commit behavior enumeration.
*/
public enum AckMode {
/**
* Commit the offset after each record is processed by the listener.
*/
RECORD,
/**
* Commit the offsets of all records returned by the previous poll after they all
* have been processed by the listener.
*/
BATCH,
/**
* Commit pending offsets after
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
*/
TIME,
/**
* Commit pending offsets after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded.
*/
COUNT,
/**
* Commit pending offsets after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded or after {@link ContainerProperties#setAckTime(long)
* ackTime} has elapsed.
*/
COUNT_TIME,
/**
* Listener is responsible for acking - use a
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}; acks
* will be queued and offsets will be committed when all the records returned by
* the previous poll have been processed by the listener.
*/
MANUAL,
/**
* Listener is responsible for acking - use a
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}; the
* commit will be performed immediately if the {@code Acknowledgment} is
* acknowledged on the calling consumer thread; otherwise, the acks will be queued
* and offsets will be committed when all the records returned by the previous
* poll have been processed by the listener; results will be indeterminate if you
* sometimes acknowledge on the calling thread and sometimes not.
*/
MANUAL_IMMEDIATE,
}
/**
* Offset commit behavior during assignment.
* @since 2.3.6
*/
public enum AssignmentCommitOption {
/**
* Always commit the current offset during partition assignment.
*/
ALWAYS,
/**
* Never commit the current offset during partition assignment.
*/
NEVER,
/**
* Commit the current offset during partition assignment when auto.offset.reset is
* 'latest'; transactional if so configured.
*/
LATEST_ONLY,
/**
* Commit the current offset during partition assignment when auto.offset.reset is
* 'latest'; use consumer commit even when transactions are being used.
*/
LATEST_ONLY_NO_TX
}
/**
* Mode for exactly once semantics.
*
* @since 2.5
*/
public enum EOSMode {
/**
* fetch-offset-request fencing (2.5+ brokers).
*/
V2;
}
/**
* The default {@link #setShutdownTimeout(long) shutDownTimeout} (ms).
*/
public static final long DEFAULT_SHUTDOWN_TIMEOUT = 10_000L;
/**
* The default {@link #setMonitorInterval(int) monitorInterval} (s).
*/
public static final int DEFAULT_MONITOR_INTERVAL = 30;
/**
* The default {@link #setNoPollThreshold(float) noPollThreshold}.
*/
public static final float DEFAULT_NO_POLL_THRESHOLD = 3f;
private static final Duration DEFAULT_CONSUMER_START_TIMEOUT = Duration.ofSeconds(30);
private static final int DEFAULT_ACK_TIME = 5000;
private static final double DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER = 5.0;
private final Map<String, String> micrometerTags = new HashMap<>();
private final List<Advice> adviceChain = new ArrayList<>();
/**
* The ack mode to use when auto ack (in the configuration properties) is false.
* <ul>
* <li>RECORD: Commit the offset after each record has been processed by the
* listener.</li>
* <li>BATCH: Commit the offsets for each batch of records received from the consumer
* when they all have been processed by the listener</li>
* <li>TIME: Commit pending offsets after {@link #setAckTime(long) ackTime} number of
* milliseconds; (should be greater than
* {@code #setPollTimeout(long) pollTimeout}.</li>
* <li>COUNT: Commit pending offsets after at least {@link #setAckCount(int) ackCount}
* number of records have been processed</li>
* <li>COUNT_TIME: Commit pending offsets after {@link #setAckTime(long) ackTime}
* number of milliseconds or at least {@link #setAckCount(int) ackCount} number of
* records have been processed</li>
* <li>MANUAL: Listener is responsible for acking - use a
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}. Acks will
* be queued and offsets will be committed when all the records returned by the
* previous poll have been processed by the listener.</li>
* <li>MANUAL_IMMDEDIATE: Listener is responsible for acking - use a
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}. The commit
* will be performed immediately if the {@code Acknowledgment} is acknowledged on the
* calling consumer thread. Otherwise, the acks will be queued and offsets will be
* committed when all the records returned by the previous poll have been processed by
* the listener. Results will be indeterminate if you sometimes acknowledge on the
* calling thread and sometimes not.</li>
* </ul>
*/
private AckMode ackMode = AckMode.BATCH;
/**
* The number of outstanding record count after which offsets should be
* committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being
* used.
*/
private int ackCount = 1;
/**
* The time (ms) after which outstanding offsets should be committed when
* {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
* larger than
*/
private long ackTime = DEFAULT_ACK_TIME;
/**
* The message listener; must be a {@link org.springframework.kafka.listener.MessageListener}
* or {@link org.springframework.kafka.listener.AcknowledgingMessageListener}.
*/
private Object messageListener;
/**
* The executor for threads that poll the consumer.
*/
private AsyncTaskExecutor listenerTaskExecutor;
/**
* The timeout for shutting down the container. This is the maximum amount of
* time that the invocation to {@code #stop(Runnable)} will block for, before
* returning.
*/
private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
private Long idleEventInterval;
private Long idlePartitionEventInterval;
private double idleBeforeDataMultiplier = DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER;
private PlatformTransactionManager transactionManager;
private int monitorInterval = DEFAULT_MONITOR_INTERVAL;
private TaskScheduler scheduler;
private float noPollThreshold = DEFAULT_NO_POLL_THRESHOLD;
private boolean logContainerConfig;
private boolean missingTopicsFatal = false;
private long idleBetweenPolls;
private boolean micrometerEnabled = true;
private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT;
private Boolean subBatchPerPartition;
private AssignmentCommitOption assignmentCommitOption = AssignmentCommitOption.LATEST_ONLY_NO_TX;
private boolean deliveryAttemptHeader;
private EOSMode eosMode = EOSMode.V2;
private TransactionDefinition transactionDefinition;
private boolean stopContainerWhenFenced;
private boolean stopImmediate;
private boolean asyncAcks;
private boolean pauseImmediate;
/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
*/
public ContainerProperties(String... topics) {
super(topics);
}
/**
* Create properties for a container that will subscribe to topics matching the
* specified pattern. The framework will create a container that subscribes to all
* topics matching the specified pattern to get dynamically assigned partitions. The
* pattern matching will be performed periodically against topics existing at the time
* of check.
* @param topicPattern the pattern.
* @see org.apache.kafka.clients.CommonClientConfigs#METADATA_MAX_AGE_CONFIG
*/
public ContainerProperties(Pattern topicPattern) {
super(topicPattern);
}
/**
* Create properties for a container that will assign itself the provided topic
* partitions.
* @param topicPartitions the topic partitions.
*/
public ContainerProperties(TopicPartitionOffset... topicPartitions) {
super(topicPartitions);
}
/**
* Set the message listener; must be a {@link org.springframework.kafka.listener.MessageListener}
* or {@link org.springframework.kafka.listener.AcknowledgingMessageListener}.
* @param messageListener the listener.
*/
public void setMessageListener(Object messageListener) {
this.messageListener = messageListener;
adviseListenerIfNeeded();
}
/**
* Set the ack mode to use when auto ack (in the configuration properties) is false.
* <ul>
* <li>RECORD: Commit the offset after each record has been processed by the
* listener.</li>
* <li>BATCH: Commit the offsets for each batch of records received from the consumer
* when they all have been processed by the listener</li>
* <li>TIME: Commit pending offsets after {@link #setAckTime(long) ackTime} number of
* milliseconds; (should be greater than
* {@code #setPollTimeout(long) pollTimeout}.</li>
* <li>COUNT: Commit pending offsets after at least {@link #setAckCount(int) ackCount}
* number of records have been processed</li>
* <li>COUNT_TIME: Commit pending offsets after {@link #setAckTime(long) ackTime}
* number of milliseconds or at least {@link #setAckCount(int) ackCount} number of
* records have been processed</li>
* <li>MANUAL: Listener is responsible for acking - use a
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}. Acks will
* be queued and offsets will be committed when all the records returned by the
* previous poll have been processed by the listener.</li>
* <li>MANUAL_IMMDEDIATE: Listener is responsible for acking - use a
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}. The commit
* will be performed immediately if the {@code Acknowledgment} is acknowledged on the
* calling consumer thread. Otherwise, the acks will be queued and offsets will be
* committed when all the records returned by the previous poll have been processed by
* the listener. Results will be indeterminate if you sometimes acknowledge on the
* calling thread and sometimes not.</li>
* </ul>
* @param ackMode the {@link AckMode}; default BATCH.
* @see #setTransactionManager(PlatformTransactionManager)
*/
public void setAckMode(AckMode ackMode) {
Assert.notNull(ackMode, "'ackMode' cannot be null");
this.ackMode = ackMode;
}
/**
* Set the number of outstanding record count after which offsets should be
* committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being used.
* @param count the count
*/
public void setAckCount(int count) {
Assert.state(count > 0, "'ackCount' must be > 0");
this.ackCount = count;
}
/**
* Set the time (ms) after which outstanding offsets should be committed when
* {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
* larger than
* @param ackTime the time
*/
public void setAckTime(long ackTime) {
Assert.state(ackTime > 0, "'ackTime' must be > 0");
this.ackTime = ackTime;
}
/**
* Set the executor for threads that poll the consumer.
* @param listenerTaskExecutor the executor
* @since 2.8.9
*/
public void setListenerTaskExecutor(@Nullable AsyncTaskExecutor listenerTaskExecutor) {
this.listenerTaskExecutor = listenerTaskExecutor;
}
/**
* Set the timeout for shutting down the container. This is the maximum amount of
* time that the invocation to {@code #stop(Runnable)} will block for, before
* returning; default {@value #DEFAULT_SHUTDOWN_TIMEOUT}.
* @param shutdownTimeout the shutdown timeout.
*/
public void setShutdownTimeout(long shutdownTimeout) {
this.shutdownTimeout = shutdownTimeout;
}
/**
* Set the timeout for commitSync operations (if {@link #isSyncCommits()}. Overrides
* the default api timeout property. In order of precedence:
* <ul>
* <li>this property</li>
* <li>{@code ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} in
* {@link #setKafkaConsumerProperties(java.util.Properties)}</li>
* <li>{@code ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} in the consumer factory
* properties</li>
* <li>60 seconds</li>
* </ul>
* @param syncCommitTimeout the timeout.
* @see #setSyncCommits(boolean)
*/
@Override
public void setSyncCommitTimeout(@Nullable Duration syncCommitTimeout) { // NOSONAR - not useless; enhanced javadoc
super.setSyncCommitTimeout(syncCommitTimeout);
}
/**
* Set the idle event interval; when set, an event is emitted if a poll returns
* no records and this interval has elapsed since a record was returned.
* @param idleEventInterval the interval.
* @see #setIdleBeforeDataMultiplier(double)
*/
public void setIdleEventInterval(@Nullable Long idleEventInterval) {
this.idleEventInterval = idleEventInterval;
}
/**
* Multiply the {@link #setIdleEventInterval(Long)} by this value until at least
* one record is received. Default 5.0.
* @param idleBeforeDataMultiplier false to allow publishing.
* @since 2.8
* @see #setIdleEventInterval(Long)
*/
public void setIdleBeforeDataMultiplier(double idleBeforeDataMultiplier) {
this.idleBeforeDataMultiplier = idleBeforeDataMultiplier;
}
/**
* Set the idle partition event interval; when set, an event is emitted if a poll returns
* no records for a partition and this interval has elapsed since a record was returned.
* @param idlePartitionEventInterval the interval.
*/
public void setIdlePartitionEventInterval(@Nullable Long idlePartitionEventInterval) {
this.idlePartitionEventInterval = idlePartitionEventInterval;
}
public AckMode getAckMode() {
return this.ackMode;
}
public int getAckCount() {
return this.ackCount;
}
public long getAckTime() {
return this.ackTime;
}
public Object getMessageListener() {
return this.messageListener;
}
/**
* Return the consumer task executor.
* @return the executor.
*/
@Nullable
public AsyncTaskExecutor getListenerTaskExecutor() {
return this.listenerTaskExecutor;
}
public long getShutdownTimeout() {
return this.shutdownTimeout;
}
/**
* Return the idle event interval.
* @return the interval.
*/
@Nullable
public Long getIdleEventInterval() {
return this.idleEventInterval;
}
/**
* Multiply the {@link #setIdleEventInterval(Long)} by this value until at least
* one record is received. Default 5.0.
* @return the noIdleBeforeData.
* @since 2.8
* @see #getIdleEventInterval()
*/
public double getIdleBeforeDataMultiplier() {
return this.idleBeforeDataMultiplier;
}
/**
* Return the idle partition event interval.
* @return the interval.
*/
@Nullable
public Long getIdlePartitionEventInterval() {
return this.idlePartitionEventInterval;
}
@Nullable
public PlatformTransactionManager getTransactionManager() {
return this.transactionManager;
}
/**
* Set the transaction manager to start a transaction; if it is a
* {@link org.springframework.kafka.transaction.KafkaAwareTransactionManager}, offsets
* are committed with semantics equivalent to {@link AckMode#RECORD} and
* {@link AckMode#BATCH} depending on the listener type (record or batch). For other
* transaction managers, adding the transaction manager to the container facilitates,
* for example, a record or batch interceptor participating in the same transaction
* (you must set the container's {@code interceptBeforeTx} property to false).
* @param transactionManager the transaction manager.
* @since 1.3
* @see #setAckMode(AckMode)
*/
public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public int getMonitorInterval() {
return this.monitorInterval;
}
/**
* The interval between checks for a non-responsive consumer in
* seconds; default {@value #DEFAULT_MONITOR_INTERVAL}.
* @param monitorInterval the interval.
* @since 1.3.1
*/
public void setMonitorInterval(int monitorInterval) {
this.monitorInterval = monitorInterval;
}
/**
* Return the task scheduler, if present.
* @return the scheduler.
*/
@Nullable
public TaskScheduler getScheduler() {
return this.scheduler;
}
/**
* A scheduler used with the monitor interval.
* @param scheduler the scheduler.
* @since 1.3.1
* @see #setMonitorInterval(int)
*/
public void setScheduler(@Nullable TaskScheduler scheduler) {
this.scheduler = scheduler;
}
public float getNoPollThreshold() {
return this.noPollThreshold;
}
/**
* If the time since the last poll / {@link #getPollTimeout() poll timeout}
* exceeds this value, a NonResponsiveConsumerEvent is published.
* This value should be more than 1.0 to avoid a race condition that can cause
* spurious events to be published.
* Default {@value #DEFAULT_NO_POLL_THRESHOLD}.
* @param noPollThreshold the threshold
* @since 1.3.1
*/
public void setNoPollThreshold(float noPollThreshold) {
this.noPollThreshold = noPollThreshold;
}
/**
* Log the container configuration if true (INFO).
* @return true to log.
* @since 2.1.1
*/
public boolean isLogContainerConfig() {
return this.logContainerConfig;
}
/**
* Set to true to instruct each container to log this configuration.
* @param logContainerConfig true to log.
* @since 2.1.1
*/
public void setLogContainerConfig(boolean logContainerConfig) {
this.logContainerConfig = logContainerConfig;
}
/**
* If true, the container won't start if any of the configured topics are not present
* on the broker. Does not apply when topic patterns are configured. Default false.
* @return the missingTopicsFatal.
* @since 2.2
*/
public boolean isMissingTopicsFatal() {
return this.missingTopicsFatal;
}
/**
* Set to true to prevent the container from starting if any of the configured topics
* are not present on the broker. Does not apply when topic patterns are configured.
* Default false;
* @param missingTopicsFatal the missingTopicsFatal.
* @since 2.2
*/
public void setMissingTopicsFatal(boolean missingTopicsFatal) {
this.missingTopicsFatal = missingTopicsFatal;
}
/**
* The sleep interval in milliseconds used in the main loop between
* {@link org.apache.kafka.clients.consumer.Consumer#poll(Duration)} calls.
* Defaults to {@code 0} - no idling.
* @param idleBetweenPolls the interval to sleep between polling cycles.
* @since 2.3
*/
public void setIdleBetweenPolls(long idleBetweenPolls) {
this.idleBetweenPolls = idleBetweenPolls;
}
public long getIdleBetweenPolls() {
return this.idleBetweenPolls;
}
public boolean isMicrometerEnabled() {
return this.micrometerEnabled;
}
/**
* Set to false to disable the Micrometer listener timers. Default true.
* @param micrometerEnabled false to disable.
* @since 2.3
*/
public void setMicrometerEnabled(boolean micrometerEnabled) {
this.micrometerEnabled = micrometerEnabled;
}
/**
* Set additional tags for the Micrometer listener timers.
* @param tags the tags.
* @since 2.3
*/
public void setMicrometerTags(Map<String, String> tags) {
if (tags != null) {
this.micrometerTags.putAll(tags);
}
}
public Map<String, String> getMicrometerTags() {
return Collections.unmodifiableMap(this.micrometerTags);
}
public Duration getConsumerStartTimeout() {
return this.consumerStartTimeout;
}
/**
* Set the timeout to wait for a consumer thread to start before logging
* an error. Default 30 seconds.
* @param consumerStartTimeout the consumer start timeout.
*/
public void setConsumerStartTimeout(Duration consumerStartTimeout) {
Assert.notNull(consumerStartTimeout, "'consumerStartTimout' cannot be null");
this.consumerStartTimeout = consumerStartTimeout;
}
/**
* Return whether to split batches by partition.
* @return subBatchPerPartition.
* @since 2.3.2
*/
public boolean isSubBatchPerPartition() {
return this.subBatchPerPartition == null ? false : this.subBatchPerPartition;
}
/**
* Return whether to split batches by partition; null if not set.
* @return subBatchPerPartition.
* @since 2.5
*/
@Nullable
public Boolean getSubBatchPerPartition() {
return this.subBatchPerPartition;
}
/**
* When using a batch message listener whether to dispatch records by partition (with
* a transaction for each sub batch if transactions are in use) or the complete batch
* received by the {@code poll()}. Useful when using transactions to enable zombie
* fencing, by using a {@code transactional.id} that is unique for each
* group/topic/partition. Defaults to true when using transactions with
* {@link #setEosMode(EOSMode) EOSMode.ALPHA} and false when not using transactions or
* with {@link #setEosMode(EOSMode) EOSMode.BETA}.
* @param subBatchPerPartition true for a separate transaction for each partition.
* @since 2.3.2
*/
public void setSubBatchPerPartition(@Nullable Boolean subBatchPerPartition) {
this.subBatchPerPartition = subBatchPerPartition;
}
public AssignmentCommitOption getAssignmentCommitOption() {
return this.assignmentCommitOption;
}
/**
* Set the assignment commit option. Default
* {@link AssignmentCommitOption#LATEST_ONLY_NO_TX}.
* @param assignmentCommitOption the option.
* @since 2.3.6
*/
public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {
Assert.notNull(assignmentCommitOption, "'assignmentCommitOption' cannot be null");
this.assignmentCommitOption = assignmentCommitOption;
}
public boolean isDeliveryAttemptHeader() {
return this.deliveryAttemptHeader;
}
/**
* Set to true to populate the
* {@link org.springframework.kafka.support.KafkaHeaders#DELIVERY_ATTEMPT} header when
* the error handler or after rollback processor implements
* {@code DeliveryAttemptAware}. There is a small overhead so this is false by
* default.
* @param deliveryAttemptHeader true to populate
* @since 2.5
*/
public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader) {
this.deliveryAttemptHeader = deliveryAttemptHeader;
}
/**
* Get the exactly once semantics mode.
* @return the mode.
* @since 2.5
* @see #setEosMode(EOSMode)
*/
public EOSMode getEosMode() {
return this.eosMode;
}
/**
* Set the exactly once semantics mode. Only {@link EOSMode#V2} is supported
* since version 3.0.
* @param eosMode the mode; default V2.
* @since 2.5
*/
public void setEosMode(EOSMode eosMode) {
Assert.notNull(eosMode, "'eosMode' cannot be null");
this.eosMode = eosMode;
}
/**
* Get the transaction definition.
* @return the definition.
* @since 2.5.4
*/
@Nullable
public TransactionDefinition getTransactionDefinition() {
return this.transactionDefinition;
}
/**
* Set a transaction definition with properties (e.g. timeout) that will be copied to
* the container's transaction template. Note that this is only generally useful when
* used with a {@link #setTransactionManager(PlatformTransactionManager)
* PlatformTransactionManager} that supports a custom definition; this does NOT
* include the {@link org.springframework.kafka.transaction.KafkaTransactionManager}
* which has no concept of transaction timeout. It can be useful to start, for example
* a database transaction, in the container, rather than using {@code @Transactional}
* on the listener, because then a record interceptor, or filter in a listener adapter
* can participate in the transaction.
* @param transactionDefinition the definition.
* @since 2.5.4
* @see #setTransactionManager(PlatformTransactionManager)
*/
public void setTransactionDefinition(@Nullable TransactionDefinition transactionDefinition) {
this.transactionDefinition = transactionDefinition;
}
/**
* A chain of listener {@link Advice}s.
* @return the adviceChain.
* @since 2.5.6
*/
public Advice[] getAdviceChain() {
return this.adviceChain.toArray(new Advice[0]);
}
/**
* Set a chain of listener {@link Advice}s; must not be null or have null elements.
* @param adviceChain the adviceChain to set.
* @since 2.5.6
*/
public void setAdviceChain(Advice... adviceChain) {
Assert.notNull(adviceChain, "'adviceChain' cannot be null");
Assert.noNullElements(adviceChain, "'adviceChain' cannot have null elements");
this.adviceChain.clear();
this.adviceChain.addAll(Arrays.asList(adviceChain));
if (this.messageListener != null) {
adviseListenerIfNeeded();
}
}
/**
* When true, the container will stop after a
* {@link org.apache.kafka.common.errors.ProducerFencedException}.
* @return the stopContainerWhenFenced
* @since 2.5.8
*/
public boolean isStopContainerWhenFenced() {
return this.stopContainerWhenFenced;
}
/**
* Set to true to stop the container when a
* {@link org.apache.kafka.common.errors.ProducerFencedException} is thrown.
* Currently, there is no way to determine if such an exception is thrown due to a
* rebalance Vs. a timeout. We therefore cannot call the after rollback processor. The
* best solution is to ensure that the {@code transaction.timeout.ms} is large enough
* so that transactions don't time out.
* @param stopContainerWhenFenced true to stop the container.
* @since 2.5.8
*/
public void setStopContainerWhenFenced(boolean stopContainerWhenFenced) {
this.stopContainerWhenFenced = stopContainerWhenFenced;
}
/**
* When true, the container will be stopped immediately after processing the current record.
* @return true to stop immediately.
* @since 2.5.11
*/
public boolean isStopImmediate() {
return this.stopImmediate;
}
/**
* Set to true to stop the container after processing the current record (when stop()
* is called). When false (default), the container will stop after all the results of
* the previous poll are processed.
* @param stopImmediate true to stop after the current record.
* @since 2.5.11
*/
public void setStopImmediate(boolean stopImmediate) {
this.stopImmediate = stopImmediate;
}
/**
* When true, async manual acknowledgments are supported.
* @return true for async ack support.
* @since 2.8
*/
public boolean isAsyncAcks() {
return this.asyncAcks;
}
/**
* Set to true to support asynchronous record acknowledgments. Only applies with
* {@link AckMode#MANUAL} or {@link AckMode#MANUAL_IMMEDIATE}. Out of order offset
* commits are deferred until all previous offsets in the partition have been
* committed. The consumer is paused, if necessary, until all acks have been
* completed.
* @param asyncAcks true to use async acks.
* @since 2.8
*/
public void setAsyncAcks(boolean asyncAcks) {
this.asyncAcks = asyncAcks;
}
/**
* When pausing the container with a record listener, whether the pause takes effect
* immediately, when the current record has been processed, or after all records from
* the previous poll have been processed. Default false.
* @return whether to pause immediately.
* @since 2.9
*/
public boolean isPauseImmediate() {
return this.pauseImmediate;
}
/**
* Set to true to pause the container after the current record has been processed, rather
* than after all the records from the previous poll have been processed.
* @param pauseImmediate true to pause immediately.
* @since 2.9
*/
public void setPauseImmediate(boolean pauseImmediate) {
this.pauseImmediate = pauseImmediate;
}
private void adviseListenerIfNeeded() {
if (!CollectionUtils.isEmpty(this.adviceChain)) {
if (AopUtils.isAopProxy(this.messageListener)) {
Advised advised = (Advised) this.messageListener;
this.adviceChain.forEach(advised::removeAdvice);
this.adviceChain.forEach(advised::addAdvice);
}
else {
ProxyFactory pf = new ProxyFactory(this.messageListener);
this.adviceChain.forEach(pf::addAdvice);
this.messageListener = pf.getProxy();
}
}
}
@Override
public String toString() {
return "ContainerProperties ["
+ renderProperties()
+ "\n ackMode=" + this.ackMode
+ "\n ackCount=" + this.ackCount
+ "\n ackTime=" + this.ackTime
+ "\n messageListener=" + this.messageListener
+ (this.listenerTaskExecutor != null
? "\n listenerTaskExecutor=" + this.listenerTaskExecutor
: "")
+ "\n shutdownTimeout=" + this.shutdownTimeout
+ "\n idleEventInterval="
+ (this.idleEventInterval == null ? "not enabled" : this.idleEventInterval)
+ "\n idlePartitionEventInterval="
+ (this.idlePartitionEventInterval == null ? "not enabled" : this.idlePartitionEventInterval)
+ (this.transactionManager != null
? "\n transactionManager=" + this.transactionManager
: "")
+ "\n monitorInterval=" + this.monitorInterval
+ (this.scheduler != null ? "\n scheduler=" + this.scheduler : "")
+ "\n noPollThreshold=" + this.noPollThreshold
+ "\n subBatchPerPartition=" + this.subBatchPerPartition
+ "\n assignmentCommitOption=" + this.assignmentCommitOption
+ "\n deliveryAttemptHeader=" + this.deliveryAttemptHeader
+ "\n eosMode=" + this.eosMode
+ "\n transactionDefinition=" + this.transactionDefinition
+ "\n stopContainerWhenFenced=" + this.stopContainerWhenFenced
+ "\n stopImmediate=" + this.stopImmediate
+ "\n asyncAcks=" + this.asyncAcks
+ "\n idleBeforeDataMultiplier" + this.idleBeforeDataMultiplier
+ "\n]";
}
}
相关信息
相关文章
spring-kafka AbstractConsumerSeekAware 源码
spring-kafka AbstractKafkaBackOffManagerFactory 源码
spring-kafka AbstractMessageListenerContainer 源码
spring-kafka AcknowledgingConsumerAwareMessageListener 源码
spring-kafka AcknowledgingMessageListener 源码
spring-kafka AfterRollbackProcessor 源码
spring-kafka BackOffHandler 源码
spring-kafka BatchAcknowledgingConsumerAwareMessageListener 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦