前言
之前有幸跟公司大神聊Spring Boot,大神跟我聊了很多關于Spring Boot相關的知識,其中有一個就是Spring Boot框架下批處理的解決方案,考慮到批處理在實際應用場景中使用率還是有的,好奇的我,決定拿下它!
項目代碼已上傳Git Hub,歡迎取閱:
批處理框架
Spring Batch是一款基于 Spring 的企業(yè)批處理應用框架,可以幫助我們構建出健壯的批處理應用。
實現(xiàn)批處理的整體步驟
- 添加依賴;
- Spring Boot基本概念介紹;
- 編寫批處理過程代碼;
- 批處理任務調度;
1. 添加依賴;
在項目pom.xml文件的dependencies節(jié)點下添加以下依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
2. Spring Boot基本概念介紹;
在實現(xiàn)批處理之前我們需要了解一些Spring Batch的基本概念:
1). Item Reader;
表示對資源的讀處理,如從數(shù)據(jù)庫查詢、從文件讀取、從變量讀取等;
2). Item Processor;
表示對讀取的數(shù)據(jù)進行處理,開發(fā)者可以實現(xiàn)自己的業(yè)務邏輯操作來對數(shù)據(jù)處理,如對數(shù)據(jù)進行計算、邏輯處理、格式轉換等;
3). Item Writer;
表示對資源的寫處理,如寫入數(shù)據(jù)庫、寫入文件、打印log等;
4). Step;
代表一個完整的批處理步驟,一個Step由Item Reader、 Item Processor、Item Writer三部分組成;
-
Step與Item Reader、 Item Processor、Item Writer的關系:

5). Job;
代表一個完整的批處理過程,一個Job由一個或多個Step組成:
-
Job與Step的關系:

-
批處理過程整體結構:

6). Listener;
監(jiān)聽。Spring Batch中還有個監(jiān)聽的功能,與其他地方的監(jiān)聽類似,用于對Step、Job狀態(tài)進行監(jiān)聽,我們可以實現(xiàn)監(jiān)聽方法,對其進行一些邏輯處理,如打印log等;
7). JobLauncher;
JobLauncher負責啟動job;
3. 編寫批處理過程代碼;
假設我們要解決的問題是,批量讀取數(shù)組中的數(shù)據(jù),并對數(shù)據(jù)做一些后續(xù)的處理。我會寫2個Job,一個是單個Step的Job,一個是2個Step的Job,并且2個Step的Job,第1個Step的處理后的數(shù)據(jù)要給第2個Step使用。

過程代碼的整體步驟:
1). 編寫ItemReader;
2). 編寫ItemProcessor;
3). 編寫ItemWriter;
4). 編寫JobExecutionListener;
5). 裝配Job;
6). 使用數(shù)據(jù)庫源;
7). 修改Spring Boot入口類;
1). 編寫ItemReader;
- ItemReaderService
package com.github.dylanz666.service;
import org.springframework.batch.item.ItemReader;
import org.springframework.stereotype.Service;
/**
* @author : dylanz
* @since : 08/25/2020
*/
@Service
public class ItemReaderService implements ItemReader {
//在此處進行數(shù)據(jù)讀取操作,如從數(shù)據(jù)庫查詢、從文件中讀取、從變量中讀取等,本例從變量中讀取;
private String[] message = {"message 1", "message 2", "message 3", "message 4", "message 5"};
private int count = 0;
public String read() throws Exception {
if (count < message.length) {
return message[count++];
}
count = 0;
return null;
}
}
- ItemReaderService2
package com.github.dylanz666.service;
import org.springframework.batch.item.ItemReader;
import org.springframework.stereotype.Service;
/**
* @author : dylanz
* @since : 08/26/2020
*/
@Service
public class ItemReaderService2 implements ItemReader {
private int count = 0;
public String read() throws Exception {
if (ItemProcessorService.message != null && count < ItemProcessorService.message.length) {
return ItemProcessorService.message[count++];
}
count = 0;
return null;
}
}
2). 編寫ItemProcessor;
- ItemProcessorService
package com.github.dylanz666.service;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* @author : dylanz
* @since : 08/25/2020
*/
@Service
public class ItemProcessorService implements ItemProcessor<String, String> {
public static String[] message;
//在此處進行數(shù)據(jù)處理操作,如進行計算、邏輯處理、格式轉換等,本例將數(shù)據(jù)變成全大寫數(shù)據(jù);
public String process(String data) throws Exception {
//存儲處理過的數(shù)據(jù),可供下一個step使用
List<String> list = new ArrayList<>();
if (message != null) {
for (int i = 0; i < message.length; i++) {
list.add(message[i]);
}
}
list.add(data.toUpperCase());
message = list.toArray(new String[list.size()]);
return data.toUpperCase();
}
}
- ItemProcessorService2
package com.github.dylanz666.service;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Service;
/**
* @author : dylanz
* @since : 08/26/2020
*/
@Service
public class ItemProcessorService2 implements ItemProcessor<String, String> {
public String process(String data) throws Exception {
return data + " dylanz";
}
}
3). 編寫ItemWriter;
package com.github.dylanz666.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author : dylanz
* @since : 08/25/2020
*/
@Service
public class ItemWriterService implements ItemWriter<String> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
//在此處進行數(shù)據(jù)輸出操作,如寫入數(shù)據(jù)庫、寫入文件、打印log等,本例為打印log;
public void write(List<? extends String> messages) throws Exception {
for (String message : messages) {
logger.info("Writing data: " + message);
}
}
}
4). 編寫JobExecutionListener;
我們對Job運行前后進行監(jiān)聽,并打印相應log:
package com.github.dylanz666.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Service;
/**
* @author : dylanz
* @since : 08/25/2020
*/
@Service
public class JobListener implements JobExecutionListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void beforeJob(JobExecution jobExecution) {
logger.info("JOB IS STARTED.");
}
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.FAILED) {
logger.info("JOB IS EXECUTED FAILED.");
return;
}
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
logger.info("JOB IS EXECUTED SUCCESSFULLY.");
}
}
}
5). 裝配Job;
在config包底下創(chuàng)建BathConfig.java類(名字隨意),我們裝配2個Job,一個為單Step Job,一個為2個Step Job,同時在每個job上設置監(jiān)聽:
package com.github.dylanz666.config;
import com.github.dylanz666.service.*;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author : dylanz
* @since : 08/25/2020
*/
@Configuration
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private ItemReaderService itemReaderService;
@Autowired
private ItemReaderService2 itemReaderService2;
@Autowired
private ItemProcessorService itemProcessorService;
@Autowired
private ItemProcessorService2 itemProcessorService2;
@Autowired
private ItemWriterService itemWriterService;
@Autowired
private JobListener jobListener;
@Bean
public Job singleStepJob() {
return jobBuilderFactory.get("singleStepJob")
.incrementer(new RunIdIncrementer())
.listener(listener())
.start(uppercaseStep())
.build();
}
@Bean
public Job multiBoundStepsJob() {
return jobBuilderFactory.get("multiBoundStepsJob")
.incrementer(new RunIdIncrementer())
.listener(listener())
.start(uppercaseStep())
.next(addMessageStep())
.build();
}
@Bean
public Step uppercaseStep() {
return stepBuilderFactory.get("uppercaseStep")
.<String, String>chunk(1)
.reader(itemReaderService)
.processor(itemProcessorService)
.writer(itemWriterService).build();
}
@Bean
public Step addMessageStep() {
return stepBuilderFactory.get("addMessageStep")
.<String, String>chunk(1)
.reader(itemReaderService2)
.processor(itemProcessorService2)
.writer(itemWriterService).build();
}
@Bean
public JobExecutionListener listener() {
return jobListener;
}
}
6). 使用數(shù)據(jù)庫源(非必需);
-
不使用數(shù)據(jù)庫存儲批處理job的元數(shù)據(jù)及執(zhí)行信息;
默認是不需要任何改動的,此時不保存元數(shù)據(jù)及執(zhí)行信息;
如果遇到提示數(shù)據(jù)源缺失問題,也可嘗試在Spring Boot啟動類的@SpringBootApplication注解添加屬性:exclude = {DataSourceAutoConfiguration.class},即:
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
-
使用數(shù)據(jù)存儲批處理job的元數(shù)據(jù)及執(zhí)行信息;
該方式可以隨時跟蹤執(zhí)行進度,重新執(zhí)行失敗記錄等,我們可以使用mysql等數(shù)據(jù)庫,另外一種更常用、簡單的方式是使用嵌入式數(shù)據(jù)庫H2 Database。
使用H2 Database只需在src/main/resources/application.properties添加以下配置即可:(前提是要移除啟動類@SpringBootApplication注解的屬性:exclude = {DataSourceAutoConfiguration.class})
server.port=8080
spring.datasource.url=jdbc:h2:~/test
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
#初次密碼可隨意
spring.datasource.password=123456
spring.h2.console.path=/h2-console
spring.h2.console.enabled=true
7). 修改Spring Boot入口類;
在Spring Boot項目入口類上添加注解@EnableBatchProcessing即可,如:
package com.github.dylanz666;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author : dylanz
* @since : 08/25/2020
*/
@SpringBootApplication
@EnableBatchProcessing
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
- 啟動項目后,我們可訪問http://127.0.0.1:8080/h2-console 查看元數(shù)據(jù)和執(zhí)行信息:

- 輸入密碼并點擊Connect按鈕鏈接H2數(shù)據(jù)庫后:

- 可在H2 數(shù)據(jù)庫中執(zhí)行sql進行元數(shù)據(jù)和執(zhí)行信息的查詢等操作;
4. 批處理任務調度;
批處理任務調度常見的幾種方式:
1). 項目啟動時自啟動(一次性執(zhí)行所有批處理任務);
默認啟動項目時會一次性執(zhí)行所有批處理任務。
如果我們不想在項目啟動時執(zhí)行所有批處理任務,那么需要在application.properties添加配置項:
spring.batch.job.enabled=false
2). 通過接口調用方式把任務調度交給客戶端;
- 在controller包下編寫2個批處理任務調度接口:
package com.github.dylanz666.controller;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author : dylanz
* @since : 08/25/2020
*/
@RestController
public class BatchController {
@Autowired
private Job singleStepJob;
@Autowired
private Job multiBoundStepsJob;
@Autowired
private JobLauncher jobLauncher;
@GetMapping("/job/step")
public String invokeStep() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(singleStepJob, jobParameters);
return "The job is proceed.";
}
@GetMapping("/job/steps")
public String invokeSteps() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(multiBoundStepsJob, jobParameters);
return "The multi bound steps job is proceed.";
}
}
- 項目啟動后,調用任務調度接口,如本例使用的2個接口:
(1). http://127.0.0.1:8080/job/step
任務調度接口1

(2). http://127.0.0.1:8080/job/steps


通過log我們會發(fā)現(xiàn),http://127.0.0.1:8080/job/steps這個API,使用的Job multiBoundStepsJob,先執(zhí)行了uppercaseStep()方法,把字符串轉成大寫,然后在這基礎上,執(zhí)行了addMessageStep()方法,在字符串尾部添加" dylanz"字符串,其中關鍵點是:
[1]. 我們在ItemProcessorService方法中用static成員變量String[] message保存ItemProcessorService處理后的數(shù)據(jù);
[2]. 在itemReaderService2中使用static成員變量String[] message作為數(shù)據(jù)源;
[3]. 在multiBoundStepsJob中使用了uppercaseStep和addMessageStep這2個Step;
這個是2個關聯(lián)Step間數(shù)據(jù)傳遞的一種方法;
我們也可以設置一個Job,包含多個互不關聯(lián)的Step,只需要在編寫Step時使用鏈式寫法:
.start(xxx)
.next(xxx)
.next(xxx)
....
.build()
3). 定期調度批處理任務;
由于Spring Batch只是一個批處理應用框架,而不是調度框架,它只關注批處理相關的問題,并不提供調度功能,因此,我們需要借助其他調度框架實現(xiàn)定期調度。
我了解到的Spring Boot框架內常用、成熟的調度方式、調度框架有:
(1). Spring Boot自帶的@Scheduled;
(2). Quartz;
(1). Spring Boot自帶的@Scheduled
@Scheduled有3種執(zhí)行方式:
//1. 按照指定的cron表達式,一旦符合cron表示的時間,則執(zhí)行任務,如,//每5秒中執(zhí)行一次任務:
@Scheduled(cron = "0/5 * * * * ?")
//2. 以固定頻率執(zhí)行任務,如每1分鐘執(zhí)行一次任務;
@Scheduled(fixedRate = 60000)
//3. 任務執(zhí)行完成后再延遲固定時間后再執(zhí)行下一次,如延遲1分鐘再執(zhí)行任務;
@Scheduled(fixedDelay = 60000)
cron表達式可以參考網(wǎng)上的介紹:http://www.itdecent.cn/p/e9ce1a7e1ed1
cron表達式也可以使用在線生成工具:https://cron.qqe2.com/

[1]. 編寫Schedule類,如:
package com.github.dylanz666.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* @author : dylanz
* @since : 08/25/2020
*/
@Component
public class SpringScheduledConfig {
@Autowired
private Job singleStepJob;
@Autowired
private JobLauncher jobLauncher;
@Scheduled(cron = "0/5 * * * * ?")
public void demoScheduled() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(singleStepJob, jobParameters);
}
}
[2]. 項目啟動類增加注解:
- @EnableScheduling
[3]. 實際運行效果:

這整個過程還是非常簡單的,但cron疑似在支持年份時有問題。
(2). 批處理調度框架Quartz
Quartz是OpenSymphony開源組織在Job scheduling領域又一個開源項目,完全由Java開發(fā),可以用來執(zhí)行定時任務,類似于java.util.Timer。但是相較于Timer, Quartz增加了很多功能:
- 持久性作業(yè) - 就是保持調度定時的狀態(tài);
- 作業(yè)管理 - 對調度作業(yè)進行有效的管理;
Quartz依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
我的理解是Quartz不僅是批處理調度框架,同時也是批處理應用框架。由于Quartz相對靈活,換句話說就是使用起來相對復雜些,我們就參考其他人的文章,改日再敘:
- https://www.cnblogs.com/imyanger/p/11828301.html
- https://blog.csdn.net/noaman_wgs/article/details/80984873
如果本文對您有幫助,麻煩動動手指點點贊?
謝謝!
