spring-batch MessageChannelPartitionHandler 源码
spring-batch MessageChannelPartitionHandler 代码
文件路径:/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java
/*
* Copyright 2009-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.integration.partition;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.StepExecutionSplitter;
import org.springframework.batch.core.partition.support.AbstractPartitionHandler;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.poller.DirectPoller;
import org.springframework.batch.poller.Poller;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Payloads;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* A {@link PartitionHandler} that uses {@link MessageChannel} instances to send
* instructions to remote workers and receive their responses. The {@link MessageChannel}
* provides a nice abstraction so that the location of the workers and the transport used
* to communicate with them can be changed at run time. The communication with the remote
* workers does not need to be transactional or have guaranteed delivery, so a local
* thread pool based implementation works as well as a remote web service or JMS
* implementation. If a remote worker fails, the job will fail and can be restarted to
* pick up missing messages and processing. The remote workers need access to the Spring
* Batch {@link JobRepository} so that the shared state across those restarts can be
* managed centrally.
*
* While a {@link org.springframework.messaging.MessageChannel} is used for sending the
* requests to the workers, the worker's responses can be obtained in one of two ways:
* <ul>
* <li>A reply channel - Workers will respond with messages that will be aggregated via
* this component.</li>
* <li>Polling the job repository - Since the state of each worker is maintained
* independently within the job repository, we can poll the store to determine the state
* without the need of the workers to formally respond.</li>
* </ul>
*
* Note: The reply channel for this is instance based. Sharing this component across
* multiple step instances may result in the crossing of messages. It's recommended that
* this component be step or job scoped.
*
* @author Dave Syer
* @author Will Schipp
* @author Michael Minella
* @author Mahmoud Ben Hassine
*
*/
@MessageEndpoint
public class MessageChannelPartitionHandler extends AbstractPartitionHandler implements InitializingBean {
private static Log logger = LogFactory.getLog(MessageChannelPartitionHandler.class);
private MessagingTemplate messagingGateway;
private String stepName;
private long pollInterval = 10000;
private JobExplorer jobExplorer;
private boolean pollRepositoryForResults = false;
private long timeout = -1;
private DataSource dataSource;
/**
* pollable channel for the replies
*/
private PollableChannel replyChannel;
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(stepName, "A step name must be provided for the remote workers.");
Assert.state(messagingGateway != null, "The MessagingOperations must be set");
pollRepositoryForResults = !(dataSource == null && jobExplorer == null);
if (pollRepositoryForResults) {
logger.debug("MessageChannelPartitionHandler is configured to poll the job repository for worker results");
}
if (dataSource != null && jobExplorer == null) {
JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
jobExplorerFactoryBean.setDataSource(dataSource);
jobExplorerFactoryBean.afterPropertiesSet();
jobExplorer = jobExplorerFactoryBean.getObject();
}
if (!pollRepositoryForResults && replyChannel == null) {
replyChannel = new QueueChannel();
} // end if
}
/**
* When using job repository polling, the time limit to wait.
* @param timeout milliseconds to wait, defaults to -1 (no timeout).
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* {@link org.springframework.batch.core.explore.JobExplorer} to use to query the job
* repository. Either this or a {@link javax.sql.DataSource} is required when using
* job repository polling.
* @param jobExplorer {@link org.springframework.batch.core.explore.JobExplorer} to
* use for lookups
*/
public void setJobExplorer(JobExplorer jobExplorer) {
this.jobExplorer = jobExplorer;
}
/**
* How often to poll the job repository for the status of the workers.
* @param pollInterval milliseconds between polls, defaults to 10000 (10 seconds).
*/
public void setPollInterval(long pollInterval) {
this.pollInterval = pollInterval;
}
/**
* {@link javax.sql.DataSource} pointing to the job repository
* @param dataSource {@link javax.sql.DataSource} that points to the job repository's
* store
*/
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
* A pre-configured gateway for sending and receiving messages to the remote workers.
* Using this property allows a large degree of control over the timeouts and other
* properties of the send. It should have channels set up internally:
* <ul>
* <li>request channel capable of accepting {@link StepExecutionRequest} payloads</li>
* <li>reply channel that returns a list of {@link StepExecution} results</li>
* </ul>
* The timeout for the reply should be set sufficiently long that the remote steps
* have time to complete.
* @param messagingGateway the
* {@link org.springframework.integration.core.MessagingTemplate} to set
*/
public void setMessagingOperations(MessagingTemplate messagingGateway) {
this.messagingGateway = messagingGateway;
}
/**
* The name of the {@link Step} that will be used to execute the partitioned
* {@link StepExecution}. This is a regular Spring Batch step, with all the business
* logic required to complete an execution based on the input parameters in its
* {@link StepExecution} context. The name will be translated into a {@link Step}
* instance by the remote worker.
* @param stepName the name of the {@link Step} instance to execute business logic
*/
public void setStepName(String stepName) {
this.stepName = stepName;
}
/**
* @param messages the messages to be aggregated
* @return the list as it was passed in
*/
@Aggregator(sendPartialResultsOnExpiry = "true")
public List<?> aggregate(@Payloads List<?> messages) {
return messages;
}
public void setReplyChannel(PollableChannel replyChannel) {
this.replyChannel = replyChannel;
}
/**
* Sends {@link StepExecutionRequest} objects to the request channel of the
* {@link MessagingTemplate}, and then receives the result back as a list of
* {@link StepExecution} on a reply channel. Use the {@link #aggregate(List)} method
* as an aggregator of the individual remote replies. The receive timeout needs to be
* set realistically in the {@link MessagingTemplate} <b>and</b> the aggregator, so
* that there is a good chance of all work being done.
*
* @see PartitionHandler#handle(StepExecutionSplitter, StepExecution)
*/
@Override
protected Set<StepExecution> doHandle(StepExecution managerStepExecution,
Set<StepExecution> partitionStepExecutions) throws Exception {
if (CollectionUtils.isEmpty(partitionStepExecutions)) {
return partitionStepExecutions;
}
int count = 0;
for (StepExecution stepExecution : partitionStepExecutions) {
Message<StepExecutionRequest> request = createMessage(count++, partitionStepExecutions.size(),
new StepExecutionRequest(stepName, stepExecution.getJobExecutionId(), stepExecution.getId()),
replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Sending request: " + request);
}
messagingGateway.send(request);
}
if (!pollRepositoryForResults) {
return receiveReplies(replyChannel);
}
else {
return pollReplies(managerStepExecution, partitionStepExecutions);
}
}
private Set<StepExecution> pollReplies(final StepExecution managerStepExecution, final Set<StepExecution> split)
throws Exception {
final Set<StepExecution> result = new HashSet<>(split.size());
Callable<Set<StepExecution>> callback = new Callable<Set<StepExecution>>() {
@Override
public Set<StepExecution> call() throws Exception {
for (Iterator<StepExecution> stepExecutionIterator = split.iterator(); stepExecutionIterator
.hasNext();) {
StepExecution curStepExecution = stepExecutionIterator.next();
if (!result.contains(curStepExecution)) {
StepExecution partitionStepExecution = jobExplorer
.getStepExecution(managerStepExecution.getJobExecutionId(), curStepExecution.getId());
if (!partitionStepExecution.getStatus().isRunning()) {
result.add(partitionStepExecution);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug(String.format("Currently waiting on %s partitions to finish", split.size()));
}
if (result.size() == split.size()) {
return result;
}
else {
return null;
}
}
};
Poller<Set<StepExecution>> poller = new DirectPoller<>(pollInterval);
Future<Set<StepExecution>> resultsFuture = poller.poll(callback);
if (timeout >= 0) {
return resultsFuture.get(timeout, TimeUnit.MILLISECONDS);
}
else {
return resultsFuture.get();
}
}
private Set<StepExecution> receiveReplies(PollableChannel currentReplyChannel) {
Message<Set<StepExecution>> message = (Message<Set<StepExecution>>) messagingGateway
.receive(currentReplyChannel);
if (message == null) {
throw new MessageTimeoutException("Timeout occurred before all partitions returned");
}
else if (logger.isDebugEnabled()) {
logger.debug("Received replies: " + message);
}
return new HashSet<>(message.getPayload());
}
private Message<StepExecutionRequest> createMessage(int sequenceNumber, int sequenceSize,
StepExecutionRequest stepExecutionRequest, PollableChannel replyChannel) {
return MessageBuilder.withPayload(stepExecutionRequest).setSequenceNumber(sequenceNumber)
.setSequenceSize(sequenceSize)
.setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())
.setReplyChannel(replyChannel).build();
}
}
相关信息
相关文章
spring-batch BeanFactoryStepLocator 源码
spring-batch RemotePartitioningManagerStepBuilder 源码
spring-batch RemotePartitioningManagerStepBuilderFactory 源码
spring-batch RemotePartitioningWorkerStepBuilder 源码
spring-batch RemotePartitioningWorkerStepBuilderFactory 源码
spring-batch StepExecutionRequest 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦