Java Spring Batch Example Why
Batch processing—typified by bulk-oriented, non-interactive, and frequently long running, background execution—is widely used across virtually every industry and is applied to a diverse array of tasks. Batch processing may be data or computationally intensive, execute sequentially or in parallel, and may be initiated through various invocation models, including ad hoc, scheduled, and on-demand.
This Spring Batch tutorial explains the programming model and the domain language of batch applications in general and, in particular, shows some useful approaches to the design and development of batch applications using the current Spring Batch 3.0.7 version.
What is Spring Batch?
Spring Batch is a lightweight, comprehensive framework designed to facilitate development of robust batch applications. It also provides more advanced technical services and features that support extremely high volume and high performance batch jobs through its optimization and partitioning techniques. Spring Batch builds upon the POJO-based development approach of the Spring Framework, familiar to all experienced Spring developers.
By way of example, this article considers source code from a sample project that loads an XML-formatted customer file, filters customers by various attributes, and outputs the filtered entries to a text file. The source code for our Spring Batch example (which makes use of Lombok annotations) is available here on GitHub and requires Java SE 8 and Maven.
What is Batch Processing? Key Concepts and Terminology
It is important for any batch developer to be familiar and comfortable with the main concepts of batch processing. The diagram below is a simplified version of the batch reference architecture that has been proven through decades of implementations on many different platforms. It introduces the key concepts and terms relevant to batch processing, as used by Spring Batch.
As shown in our batch processing example, a batch process is typically encapsulated by a Job
consisting of multiple Step
s. Each Step
typically has a single ItemReader
, ItemProcessor
, and ItemWriter
. A Job
is executed by a JobLauncher
, and metadata about configured and executed jobs is stored in a JobRepository
.
Each Job
may be associated with multiple JobInstance
s, each of which is defined uniquely by its particular JobParameters
that are used to start a batch job. Each run of a JobInstance
is referred to as a JobExecution
. Each JobExecution
typically tracks what happened during a run, such as current and exit statuses, start and end times, etc.
A Step
is an independent, specific phase of a batch Job
, such that every Job
is composed of one or more Step
s. Similar to a Job
, a Step
has an individual StepExecution
that represents a single attempt to execute a Step
. StepExecution
stores the information about current and exit statuses, start and end times, and so on, as well as references to its corresponding Step
and JobExecution
instances.
An ExecutionContext
is a set of key-value pairs containing information that is scoped to either StepExecution
or JobExecution
. Spring Batch persists the ExecutionContext
, which helps in cases where you want to restart a batch run (e.g., when a fatal error has occurred, etc.). All that is needed is to put any object to be shared between steps into the context and the framework will take care of the rest. After restart, the values from the prior ExecutionContext
are restored from the database and applied.
JobRepository
is the mechanism in Spring Batch that makes all this persistence possible. It provides CRUD operations for JobLauncher
, Job
, and Step
instantiations. Once a Job
is launched, a JobExecution
is obtained from the repository and, during the course of execution, StepExecution
and JobExecution
instances are persisted to the repository.
Getting Started with Spring Batch Framework
One of the advantages of Spring Batch is that project dependencies are minimal, which makes it easier to get up and running quickly. The few dependencies that do exist are clearly specified and explained in the project's pom.xml
, which can be accessed here.
The actual startup of the application happens in a class looking something like the following:
@EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }
The @EnableBatchProcessing
annotation enables Spring Batch features and provides a base configuration for setting up batch jobs.
The @SpringBootApplication
annotation comes from the Spring Boot project that provides standalone, production-ready, Spring-based applications. It specifies a configuration class that declares one or more Spring beans and also triggers auto-configuration and Spring's component scanning.
Our sample project has only one job that is configured by CustomerReportJobConfig
with an injected JobBuilderFactory
and StepBuilderFactory
. The minimal job configuration can be defined in CustomerReportJobConfig
as follows:
@Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }
There are two main approaches to building a step.
One approach, as shown in the above example, is tasklet-based. A Tasklet
supports a simple interface that has only one method, execute()
, which is called repeatedly until it either returns RepeatStatus.FINISHED
or throws an exception to signal a failure. Each call to the Tasklet
is wrapped in a transaction.
Another approach, chunk-oriented processing, refers to reading the data sequentially and creating "chunks" that will be written out within a transaction boundary. Each individual item is read in from an ItemReader
, handed to an ItemProcessor
, and aggregated. Once the number of items read equals the commit interval, the entire chunk is written out via the ItemWriter
, and then the transaction is committed. A chunk-oriented step can be configured as follows:
@Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
The chunk()
method builds a step that processes items in chunks with the size provided, with each chunk then being passed to the specified reader, processor, and writer. These methods are discussed in more detail in the next sections of this article.
Custom Reader
For our Spring Batch sample application, in order to read a list of customers from an XML file, we need to provide an implementation of the interface org.springframework.batch.item.ItemReader
:
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
An ItemReader
provides the data and is expected to be stateful. It is typically called multiple times for each batch, with each call to read()
returning the next value and finally returning null
when all input data has been exhausted.
Spring Batch provides some out-of-the-box implementations of ItemReader
, which can be used for a variety of purposes such as reading collections, files, integrating JMS and JDBC as well as multiple sources, and so on.
In our sample application, the CustomerItemReader
class delegates actual read()
calls to a lazily initialized instance of the IteratorItemReader
class:
public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }
A Spring bean for this implementation is created with the @Component
and @StepScope
annotations, letting Spring know that this class is a step-scoped Spring component and will be created once per step execution as follows:
@StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }
Custom Processors
ItemProcessors
transform input items and introduce business logic in an item-oriented processing scenario. They must provide an implementation of the interface org.springframework.batch.item.ItemProcessor
:
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
The method process()
accepts one instance of the I
class and may or may not return an instance of the same type. Returning null
indicates that the item should not continue to be processed. As usual, Spring provides few standard processors, such as CompositeItemProcessor
that passes the item through a sequence of injected ItemProcessor
s and a ValidatingItemProcessor
that validates input.
In the case of our sample application, processors are used to filter customers by the following requirements:
- A customer must be born in the current month (e.g., to flag for birthday specials, etc.)
- A customer must have less than five completed transactions (e.g., to identify newer customers)
The "current month" requirement is implemented via a custom ItemProcessor
:
public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }
The "limited number of transactions" requirement is implemented as a ValidatingItemProcessor
:
public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }
This pair of processors is then encapsulated within a CompositeItemProcessor
that implements the delegate pattern:
@StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }
Custom Writers
For outputting the data, Spring Batch provides the interface org.springframework.batch.item.ItemWriter
for serializing objects as necessary:
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }
The write()
method is responsible for making sure that any internal buffers are flushed. If a transaction is active, it will also usually be necessary to discard the output on a subsequent rollback. The resource to which the writer is sending data should normally be able to handle this itself. There are standard implementations such as CompositeItemWriter
, JdbcBatchItemWriter
, JmsItemWriter
, JpaItemWriter
, SimpleMailMessageItemWriter
, and others.
In our sample application, the list of filtered customers is written out as follows:
public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }
Scheduling Spring Batch Jobs
By default, Spring Batch executes all jobs it can find (i.e., that are configured as in CustomerReportJobConfig
) at startup. To change this behavior, disable job execution at startup by adding the following property to application.properties
:
spring.batch.job.enabled=false
The actual scheduling is then achieved by adding the @EnableScheduling
annotation to a configuration class and the @Scheduled
annotation to the method that executes the job itself. Scheduling can be configured with delay, rates, or cron expressions:
// run every 5000 msec (i.e., every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }
There is a problem with the above example though. At run time, the job will succeed the first time only. When it launches the second time (i.e. after five seconds), it will generate the following messages in the logs (note that in previous versions of Spring Batch a JobInstanceAlreadyCompleteException
would have been thrown):
INFO 36988 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=
This happens because only unique JobInstance
s may be created and executed and Spring Batch has no way of distinguishing between the first and second JobInstance
.
There are two ways of avoiding this problem when you schedule a batch job.
One is to be sure to introduce one or more unique parameters (e.g., actual start time in nanoseconds) to each job:
@Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }
Alternatively, you can launch the next job in a sequence of JobInstance
s determined by the JobParametersIncrementer
attached to the specified job with SimpleJobOperator.startNextInstance()
:
@Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }
Spring Batch Unit Testing
Usually, to run unit tests in a Spring Boot application, the framework must load a corresponding ApplicationContext
. Two annotations are used for this purpose:
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})
There is a utility class org.springframework.batch.test.JobLauncherTestUtils
to test batch jobs. It provides methods for launching an entire job as well as allowing for end-to-end testing of individual steps without having to run every step in the job. It must be declared as a Spring bean:
@Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }
A typical test for a job and a step looks as follows (and can use any mocking frameworks as well):
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }
Spring Batch introduces additional scopes for step and job contexts. Objects in these scopes use the Spring container as an object factory, so there is only one instance of each such bean per execution step or job. In addition, support is provided for late binding of references accessible from the StepContext
or JobContext
. The components that are configured at runtime to be step- or job-scoped are tricky to test as standalone components unless you have a way to set the context as if they were in a step or job execution. That is the goal of the org.springframework.batch.test.StepScopeTestExecutionListener
and org.springframework.batch.test.StepScopeTestUtils
components in Spring Batch, as well as JobScopeTestExecutionListener
and JobScopeTestUtils
.
The TestExecutionListeners
are declared at the class level, and its job is to create a step execution context for each test method. For example:
@RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }
There are two TestExecutionListener
s. One is from the regular Spring Test framework and handles dependency injection from the configured application context. The other is the Spring Batch StepScopeTestExecutionListener
that sets up step-scope context for dependency injection into unit tests. A StepContext
is created for the duration of a test method and made available to any dependencies that are injected. The default behavior is just to create a StepExecution
with fixed properties. Alternatively, the StepContext
can be provided by the test case as a factory method returning the correct type.
Another approach is based on the StepScopeTestUtils
utility class. This class is used to create and manipulate StepScope
in unit tests in a more flexible way without using dependency injection. For example, reading the ID of the customer filtered by the processor above could be done as follows:
@Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }
Ready for Advanced Spring Batch?
This article introduces some of the basics of design and development of Spring Batch applications. However, there are many more advanced topics and capabilities—such as scaling, parallel processing, listeners, and more—that are not addressed in this article. Hopefully, this article provides a useful foundation for getting started.
Information on these more advanced topics can then be found in the official Spring Back documentation for Spring Batch.
Source: https://www.toptal.com/spring/spring-batch-tutorial
0 Response to "Java Spring Batch Example Why"
Postar um comentário