spring-batch CommandLineJobRunner 源码
spring-batch CommandLineJobRunner 代码
文件路径:/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/CommandLineJobRunner.java
/*
* Copyright 2006-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.launch.support;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.converter.JobParametersConverter;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobExecutionNotFailedException;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobExecutionNotStoppedException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* <p>
* Basic launcher for starting jobs from the command line. In general, it is assumed that
* this launcher will primarily be used to start a job via a script from an Enterprise
* Scheduler. Therefore, exit codes are mapped to integers so that schedulers can use the
* returned values to determine the next course of action. The returned values can also be
* useful to operations teams in determining what should happen upon failure. For example,
* a returned code of 5 might mean that some resource wasn't available and the job should
* be restarted. However, a code of 10 might mean that something critical has happened and
* the issue should be escalated.
* </p>
*
* <p>
* With any launch of a batch job within Spring Batch, a Spring context containing the
* {@link Job} and some execution context has to be created. This command line launcher
* can be used to load the job and its context from a single location. All dependencies of
* the launcher will then be satisfied by autowiring by type from the combined application
* context. Default values are provided for all fields except the {@link JobLauncher} and
* {@link JobLocator} . Therefore, if autowiring fails to set it (it should be noted that
* dependency checking is disabled because most of the fields have default values and thus
* don't require dependencies to be fulfilled via autowiring) then an exception will be
* thrown. It should also be noted that even if an exception is thrown by this class, it
* will be mapped to an integer and returned.
* </p>
*
* <p>
* Notice a property is available to set the {@link SystemExiter}. This class is used to
* exit from the main method, rather than calling System.exit() directly. This is because
* unit testing a class the calls System.exit() is impossible without kicking off the test
* within a new JVM, which it is possible to do, however it is a complex solution, much
* more so than strategizing the exiter.
* </p>
*
* <p>
* The arguments to this class can be provided on the command line (separated by spaces),
* or through stdin (separated by new line). They are as follows:
* </p>
*
* <code>
* jobPath <options> jobIdentifier (jobParameters)*
* </code>
*
* <p>
* The command line options are as follows
* </p>
* <ul>
* <li>jobPath: the xml application context containing a {@link Job}
* <li>-restart: (optional) to restart the last failed execution</li>
* <li>-stop: (optional) to stop a running execution</li>
* <li>-abandon: (optional) to abandon a stopped execution</li>
* <li>-next: (optional) to start the next in a sequence according to the
* {@link JobParametersIncrementer} in the {@link Job}</li>
* <li>jobIdentifier: the name of the job or the id of a job execution (for -stop,
* -abandon or -restart).
* <li>jobParameters: 0 to many parameters that will be used to launch a job specified in
* the form of <code>key=value</code> pairs.
* </ul>
*
* <p>
* If the <code>-next</code> option is used the parameters on the command line (if any)
* are appended to those retrieved from the incrementer, overriding any with the same key.
* </p>
*
* <p>
* The combined application context must contain only one instance of {@link JobLauncher}.
* The job parameters passed in to the command line will be converted to
* {@link Properties} by assuming that each individual element is one parameter that is
* separated by an equals sign. For example, "vendor.id=290232". The resulting properties
* instance is converted to {@link JobParameters} using a {@link JobParametersConverter}
* from the application context (if there is one, or a
* {@link DefaultJobParametersConverter} otherwise). Below is an example arguments list: "
* </p>
*
* <p>
* <code>
* java org.springframework.batch.core.launch.support.CommandLineJobRunner testJob.xml
* testJob schedule.date=2008/01/24 vendor.id=3902483920
* </code>
* </p>
*
* <p>
* By default, the `CommandLineJobRunner` uses a {@link DefaultJobParametersConverter}
* which implicitly converts key/value pairs to identifying job parameters. However, it is
* possible to explicitly specify which job parameters are identifying and which are not
* by prefixing them with `+` or `-` respectively. In the following example,
* `schedule.date` is an identifying job parameter while `vendor.id` is not:
* </p>
*
* <p>
* <code>
* java org.springframework.batch.core.launch.support.CommandLineJobRunner testJob.xml
* testJob +schedule.date=2008/01/24 -vendor.id=3902483920
* </code>
* </p>
*
* <p>
* This behaviour can be overridden by using a custom `JobParametersConverter`.
* </p>
*
* <p>
* Once arguments have been successfully parsed, autowiring will be used to set various
* dependencies. The {@link JobLauncher} for example, will be loaded this way. If none is
* contained in the bean factory (it searches by type) then a
* {@link BeanDefinitionStoreException} will be thrown. The same exception will also be
* thrown if there is more than one present. Assuming the JobLauncher has been set
* correctly, the jobIdentifier argument will be used to obtain an actual {@link Job}. If
* a {@link JobLocator} has been set, then it will be used, if not the beanFactory will be
* asked, using the jobIdentifier as the bean id.
* </p>
*
* @author Dave Syer
* @author Lucas Ward
* @author Mahmoud Ben Hassine
* @since 1.0
*/
public class CommandLineJobRunner {
protected static final Log logger = LogFactory.getLog(CommandLineJobRunner.class);
private ExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper();
private JobLauncher launcher;
private JobLocator jobLocator;
// Package private for unit test
private static SystemExiter systemExiter = new JvmSystemExiter();
private static String message = "";
private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter();
private JobExplorer jobExplorer;
private JobRepository jobRepository;
private final static List<String> VALID_OPTS = Arrays
.asList(new String[] { "-restart", "-next", "-stop", "-abandon" });
/**
* Injection setter for the {@link JobLauncher}.
* @param launcher the launcher to set
*/
public void setLauncher(JobLauncher launcher) {
this.launcher = launcher;
}
/**
* @param jobRepository the jobRepository to set
*/
public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
/**
* Injection setter for {@link JobExplorer}.
* @param jobExplorer the {@link JobExplorer} to set
*/
public void setJobExplorer(JobExplorer jobExplorer) {
this.jobExplorer = jobExplorer;
}
/**
* Injection setter for the {@link ExitCodeMapper}.
* @param exitCodeMapper the exitCodeMapper to set
*/
public void setExitCodeMapper(ExitCodeMapper exitCodeMapper) {
this.exitCodeMapper = exitCodeMapper;
}
/**
* Static setter for the {@link SystemExiter} so it can be adjusted before dependency
* injection. Typically overridden by {@link #setSystemExiter(SystemExiter)}.
* @param systemExiter {@link SystemExiter} instance to be used by
* CommandLineJobRunner instance.
*/
public static void presetSystemExiter(SystemExiter systemExiter) {
CommandLineJobRunner.systemExiter = systemExiter;
}
/**
* Retrieve the error message set by an instance of {@link CommandLineJobRunner} as it
* exits. Empty if the last job launched was successful.
* @return the error message
*/
public static String getErrorMessage() {
return message;
}
/**
* Injection setter for the {@link SystemExiter}.
* @param systemExiter {@link SystemExiter} instance to be used by
* CommandLineJobRunner instance.
*/
public void setSystemExiter(SystemExiter systemExiter) {
CommandLineJobRunner.systemExiter = systemExiter;
}
/**
* Injection setter for {@link JobParametersConverter}.
* @param jobParametersConverter instance of {@link JobParametersConverter} to be used
* by the CommandLineJobRunner instance.
*/
public void setJobParametersConverter(JobParametersConverter jobParametersConverter) {
this.jobParametersConverter = jobParametersConverter;
}
/**
* Delegate to the exiter to (possibly) exit the VM gracefully.
* @param status int exit code that should be reported.
*/
public void exit(int status) {
systemExiter.exit(status);
}
/**
* {@link JobLocator} to find a job to run.
* @param jobLocator a {@link JobLocator}
*/
public void setJobLocator(JobLocator jobLocator) {
this.jobLocator = jobLocator;
}
/*
* Start a job by obtaining a combined classpath using the job launcher and job paths.
* If a JobLocator has been set, then use it to obtain an actual job, if not ask the
* context for it.
*/
@SuppressWarnings("resource")
int start(String jobPath, String jobIdentifier, String[] parameters, Set<String> opts) {
ConfigurableApplicationContext context = null;
try {
try {
context = new AnnotationConfigApplicationContext(Class.forName(jobPath));
}
catch (ClassNotFoundException cnfe) {
context = new ClassPathXmlApplicationContext(jobPath);
}
context.getAutowireCapableBeanFactory().autowireBeanProperties(this,
AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false);
Assert.state(launcher != null, "A JobLauncher must be provided. Please add one to the configuration.");
if (opts.contains("-restart") || opts.contains("-next")) {
Assert.state(jobExplorer != null,
"A JobExplorer must be provided for a restart or start next operation. Please add one to the configuration.");
}
String jobName = jobIdentifier;
JobParameters jobParameters = jobParametersConverter
.getJobParameters(StringUtils.splitArrayElementsIntoProperties(parameters, "="));
Assert.isTrue(parameters == null || parameters.length == 0 || !jobParameters.isEmpty(),
"Invalid JobParameters " + Arrays.asList(parameters)
+ ". If parameters are provided they should be in the form name=value (no whitespace).");
if (opts.contains("-stop")) {
List<JobExecution> jobExecutions = getRunningJobExecutions(jobIdentifier);
if (jobExecutions == null) {
throw new JobExecutionNotRunningException("No running execution found for job=" + jobIdentifier);
}
for (JobExecution jobExecution : jobExecutions) {
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
}
return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode());
}
if (opts.contains("-abandon")) {
List<JobExecution> jobExecutions = getStoppedJobExecutions(jobIdentifier);
if (jobExecutions == null) {
throw new JobExecutionNotStoppedException("No stopped execution found for job=" + jobIdentifier);
}
for (JobExecution jobExecution : jobExecutions) {
jobExecution.setStatus(BatchStatus.ABANDONED);
jobRepository.update(jobExecution);
}
return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode());
}
if (opts.contains("-restart")) {
JobExecution jobExecution = getLastFailedJobExecution(jobIdentifier);
if (jobExecution == null) {
throw new JobExecutionNotFailedException(
"No failed or stopped execution found for job=" + jobIdentifier);
}
jobParameters = jobExecution.getJobParameters();
jobName = jobExecution.getJobInstance().getJobName();
}
Job job = null;
if (jobLocator != null) {
try {
job = jobLocator.getJob(jobName);
}
catch (NoSuchJobException e) {
}
}
if (job == null) {
job = (Job) context.getBean(jobName);
}
if (opts.contains("-next")) {
jobParameters = new JobParametersBuilder(jobParameters, jobExplorer).getNextJobParameters(job)
.toJobParameters();
}
JobExecution jobExecution = launcher.run(job, jobParameters);
return exitCodeMapper.intValue(jobExecution.getExitStatus().getExitCode());
}
catch (Throwable e) {
String message = "Job Terminated in error: " + e.getMessage();
logger.error(message, e);
CommandLineJobRunner.message = message;
return exitCodeMapper.intValue(ExitStatus.FAILED.getExitCode());
}
finally {
if (context != null) {
context.close();
}
}
}
/**
* @param jobIdentifier a job execution id or job name
* @param minStatus the highest status to exclude from the result
* @return
*/
private List<JobExecution> getJobExecutionsWithStatusGreaterThan(String jobIdentifier, BatchStatus minStatus) {
Long executionId = getLongIdentifier(jobIdentifier);
if (executionId != null) {
JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
if (jobExecution.getStatus().isGreaterThan(minStatus)) {
return Arrays.asList(jobExecution);
}
return Collections.emptyList();
}
int start = 0;
int count = 100;
List<JobExecution> executions = new ArrayList<>();
List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count);
while (!lastInstances.isEmpty()) {
for (JobInstance jobInstance : lastInstances) {
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
if (jobExecutions == null || jobExecutions.isEmpty()) {
continue;
}
for (JobExecution jobExecution : jobExecutions) {
if (jobExecution.getStatus().isGreaterThan(minStatus)) {
executions.add(jobExecution);
}
}
}
start += count;
lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count);
}
return executions;
}
private JobExecution getLastFailedJobExecution(String jobIdentifier) {
List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STOPPING);
if (jobExecutions.isEmpty()) {
return null;
}
return jobExecutions.get(0);
}
private List<JobExecution> getStoppedJobExecutions(String jobIdentifier) {
List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STARTED);
if (jobExecutions.isEmpty()) {
return null;
}
List<JobExecution> result = new ArrayList<>();
for (JobExecution jobExecution : jobExecutions) {
if (jobExecution.getStatus() != BatchStatus.ABANDONED) {
result.add(jobExecution);
}
}
return result.isEmpty() ? null : result;
}
private List<JobExecution> getRunningJobExecutions(String jobIdentifier) {
Long executionId = getLongIdentifier(jobIdentifier);
List<JobExecution> result = new ArrayList<>();
if (executionId != null) {
JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
if (jobExecution != null && jobExecution.isRunning()) {
result.add(jobExecution);
}
}
else {
result.addAll(jobExplorer.findRunningJobExecutions(jobIdentifier));
}
return result.isEmpty() ? null : result;
}
private Long getLongIdentifier(String jobIdentifier) {
try {
return Long.parseLong(jobIdentifier);
}
catch (NumberFormatException e) {
// Not an ID - must be a name
return null;
}
}
/**
* Launch a batch job using a {@link CommandLineJobRunner}. Creates a new Spring
* context for the job execution, and uses a common parent for all such contexts. No
* exception are thrown from this method, rather exceptions are logged and an integer
* returned through the exit status in a {@link JvmSystemExiter} (which can be
* overridden by defining one in the Spring context).<br>
* Parameters can be provided in the form key=value, and will be converted using the
* injected {@link JobParametersConverter}.
* @param args
* <ul>
* <li>-restart: (optional) if the job has failed or stopped and the most should be
* restarted. If specified then the jobIdentifier parameter can be interpreted either
* as the name of the job or the id of the job execution that failed.</li>
* <li>-next: (optional) if the job has a {@link JobParametersIncrementer} that can be
* used to launch the next instance in a sequence</li>
* <li>jobPath: the xml application context containing a {@link Job}
* <li>jobIdentifier: the bean id of the job or id of the failed execution in the case
* of a restart.
* <li>jobParameters: 0 to many parameters that will be used to launch a job.
* </ul>
* <p>
* The options (<code>-restart, -next</code>) can occur anywhere in the command line.
* </p>
* @throws Exception is thrown if error occurs.
*/
public static void main(String[] args) throws Exception {
CommandLineJobRunner command = new CommandLineJobRunner();
List<String> newargs = new ArrayList<>(Arrays.asList(args));
try {
if (System.in.available() > 0) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line = " ";
while (line != null) {
if (!line.startsWith("#") && StringUtils.hasText(line)) {
if (logger.isDebugEnabled()) {
logger.debug("Stdin arg: " + line);
}
newargs.add(line);
}
line = reader.readLine();
}
}
}
catch (IOException e) {
logger.warn("Could not access stdin (maybe a platform limitation)");
if (logger.isDebugEnabled()) {
logger.debug("Exception details", e);
}
}
Set<String> opts = new LinkedHashSet<>();
List<String> params = new ArrayList<>();
int count = 0;
String jobPath = null;
String jobIdentifier = null;
for (String arg : newargs) {
if (VALID_OPTS.contains(arg)) {
opts.add(arg);
}
else {
switch (count) {
case 0:
jobPath = arg;
break;
case 1:
jobIdentifier = arg;
break;
default:
params.add(arg);
break;
}
count++;
}
}
if (jobPath == null || jobIdentifier == null) {
String message = "At least 2 arguments are required: JobPath/JobClass and jobIdentifier.";
logger.error(message);
CommandLineJobRunner.message = message;
command.exit(1);
return;
}
String[] parameters = params.toArray(new String[params.size()]);
int result = command.start(jobPath, jobIdentifier, parameters, opts);
command.exit(result);
}
}
相关信息
相关文章
spring-batch DataFieldMaxValueJobParametersIncrementer 源码
spring-batch ExitCodeMapper 源码
spring-batch JobRegistryBackgroundJobRunner 源码
spring-batch JvmSystemExiter 源码
spring-batch RunIdIncrementer 源码
spring-batch RuntimeExceptionTranslator 源码
spring-batch SimpleJobLauncher 源码
spring-batch SimpleJobOperator 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦