前言
本文將從0到1講解一個(gè)Spring Batch是如何搭建并運(yùn)行起來的。
本教程將講解從一個(gè)文本文件讀取數(shù)據(jù),然后寫入MySQL。
什么是 Spring Batch
Spring Batch 作為 Spring 的子項(xiàng)目,是一款基于 Spring 的企業(yè)批處理框架。通過它可以構(gòu)建出健壯的企業(yè)批處理應(yīng)用。Spring Batch 不僅提供了統(tǒng)一的讀寫接口、豐富的任務(wù)處理方式、靈活的事務(wù)管理及并發(fā)處理,同時(shí)還支持日志、監(jiān)控、任務(wù)重啟與跳過等特性,大大簡(jiǎn)化了批處理應(yīng)用開發(fā),將開發(fā)人員從復(fù)雜的任務(wù)配置管理過程中解放出來,使他們可以更多地去關(guān)注核心的業(yè)務(wù)處理過程。
更多的介紹可以參考官網(wǎng):https://spring.io/projects/sp...
環(huán)境搭建
我是用的Intellij Idea,用gradle構(gòu)建。
可以使用Spring Initializr 來創(chuàng)建Spring boot應(yīng)用。地址:https://start.spring.io/

首先選擇Gradle Project,然后選擇Java。填上你的Group和Artifact名字。
最后再搜索你需要用的包,比如Batch是一定要的。另外,由于我寫的Batch項(xiàng)目是使用JPA向MySQL插入數(shù)據(jù),所以也添加了JPA和MySQL。其他可以根據(jù)自己需要添加。
點(diǎn)擊Generate Project,一個(gè)項(xiàng)目就創(chuàng)建好了。
Build.gralde文件大概就長(zhǎng)這個(gè)樣子:
buildscript {
ext {
springBootVersion = '2.0.4.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
group = 'com.demo'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-batch')
compile('org.springframework.boot:spring-boot-starter-jdbc')
compile("org.springframework.boot:spring-boot-starter-data-jpa")
compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.9.4'
compile group: 'org.jadira.usertype', name: 'usertype.core', version: '6.0.1.GA'
compile group: 'mysql', name: 'mysql-connector-java', version: '6.0.6',
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('org.springframework.batch:spring-batch-test')
}
Spring Batch 結(jié)構(gòu)
網(wǎng)上有很多Spring Batch結(jié)構(gòu)和原理的講解,我就不詳細(xì)闡述了,我這里只講一下Spring Batch的一個(gè)基本層級(jí)結(jié)構(gòu)。
首先,Spring Batch運(yùn)行的基本單位是一個(gè)Job,一個(gè)Job就做一件批處理的事情。
一個(gè)Job包含很多Step,step就是每個(gè)job要執(zhí)行的單個(gè)步驟。
如下圖所示,Step里面,會(huì)有Tasklet,Tasklet是一個(gè)任務(wù)單元,它是屬于可以重復(fù)利用的東西。
然后是Chunk,chunk就是數(shù)據(jù)塊,你需要定義多大的數(shù)據(jù)量是一個(gè)chunk。
Chunk里面就是不斷循環(huán)的一個(gè)流程,讀數(shù)據(jù),處理數(shù)據(jù),然后寫數(shù)據(jù)。Spring Batch會(huì)不斷的循環(huán)這個(gè)流程,直到批處理數(shù)據(jù)完成。

構(gòu)建Spring Batch
首先,我們需要一個(gè)全局的Configuration來配置所有的Job和一些全局配置。
代碼如下:
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
@Bean
public ApplicationContextFactory firstJobContext() {
return new GenericApplicationContextFactory(FirstJobConfiguration.class);
}
@Bean
public ApplicationContextFactory secondJobContext() {
return new GenericApplicationContextFactory(SecondJobConfiguration.class);
}
}
@EnableBatchProcessing是打開Batch。如果要實(shí)現(xiàn)多Job的情況,需要把EnableBatchProcessing注解的modular設(shè)置為true,讓每個(gè)Job使用自己的ApplicationConext。
比如上面代碼的就創(chuàng)建了兩個(gè)Job。
例子背景
本博客的例子是遷移數(shù)據(jù),數(shù)據(jù)源是一個(gè)文本文件,數(shù)據(jù)量是上百萬條,一行就是一條數(shù)據(jù)。然后我們通過Spring Batch幫我們把文本文件的數(shù)據(jù)全部遷移到MySQL數(shù)據(jù)庫對(duì)應(yīng)的表里面。
假設(shè)我們遷移的數(shù)據(jù)是Message,那么我們就需要提前創(chuàng)建一個(gè)叫Message的和數(shù)據(jù)庫映射的數(shù)據(jù)類。
@Entity
@Table(name = "message")
public class Message {
@Id
@Column(name = "object_id", nullable = false)
private String objectId;
@Column(name = "content")
private String content;
@Column(name = "last_modified_time")
private LocalDateTime lastModifiedTime;
@Column(name = "created_time")
private LocalDateTime createdTime;
}
構(gòu)建Job
首先我們需要一個(gè)關(guān)于這個(gè)Job的Configuration,它將在SpringBatchConfigration里面被加載。
@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
@Bean
public ApplicationContextFactory messageMigrationJobContext() {
return new GenericApplicationContextFactory(MessageMigrationJobConfiguration.class);
}
}
下面的關(guān)于構(gòu)建Job的代碼都將寫在這個(gè)MessageMigrationJobConfiguration里面。
public class MessageMigrationJobConfiguration {
}
我們先定義一個(gè)Job的Bean。
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) {
return jobBuilderFactory.get("messageMigrationJob")
.start(messageMigrationStep)
.build();
}
jobBuilderFactory是注入進(jìn)來的,get里面的就是job的名字。
這個(gè)job只有一個(gè)step。
Step
接下來就是創(chuàng)建Step。
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader<Message> jsonMessageReader,
@Qualifier("messageItemWriter") JpaItemWriter<Message> messageItemWriter,
@Qualifier("errorWriter") Writer errorWriter) {
return stepBuilderFactory.get("messageMigrationStep")
.<Message, Message>chunk(CHUNK_SIZE)
.reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT)
.listener(new MessageItemReadListener(errorWriter))
.writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
.listener(new MessageWriteListener())
.build();
}
stepBuilderFactory是注入進(jìn)來的,然后get里面是Step的名字。
我們的Step中可以構(gòu)建很多東西,比如reader,processer,writer,listener等等。
下面我們就逐個(gè)來看看step里面的這些東西是如何使用的。
Chunk
Spring batch在配置Step時(shí)采用的是基于Chunk的機(jī)制,即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作。這樣可以最大化的優(yōu)化寫入效率,整個(gè)事務(wù)也是基于Chunk來進(jìn)行。
比如我們定義chunk size是50,那就意味著,spring batch處理了50條數(shù)據(jù)后,再統(tǒng)一向數(shù)據(jù)庫寫入。
這里有個(gè)很重要的點(diǎn),chunk前面需要定義數(shù)據(jù)輸入類型和輸出類型,由于我們輸入是Message,輸出也是Message,所以兩個(gè)都直接寫Message了。
如果不定義這個(gè)類型,會(huì)報(bào)錯(cuò)。
.<Message, Message>chunk(CHUNK_SIZE)
Reader
Reader顧名思義就是從數(shù)據(jù)源讀取數(shù)據(jù)。
Spring Batch給我們提供了很多好用實(shí)用的reader,基本能滿足我們所有需求。比如FlatFileItemReader,JdbcCursorItemReader,JpaPagingItemReader等。也可以自己實(shí)現(xiàn)Reader。
本例子里面,數(shù)據(jù)源是文本文件,所以我們就使用FlatFileItemReader。FlatFileItemReader是從文件里面一行一行的讀取數(shù)據(jù)。
首先需要設(shè)置文件路徑,也就是設(shè)置resource。
因?yàn)槲覀冃枰岩恍形谋居成錇镸essage類,所以我們需要自己設(shè)置并實(shí)現(xiàn)LineMapper。
@Bean
public FlatFileItemReader<Message> jsonMessageReader() {
FlatFileItemReader<Message> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource(new File(MESSAGE_FILE)));
reader.setLineMapper(new MessageLineMapper());
return reader;
}
CommonMultiResourceItemReader
批量讀取多個(gè)文件(最好是同類型),此reader的delegate只能有一個(gè)
@Bean("itemStockMultiItemReader")
@StepScope
public CommonMultiResourceItemReader<BiItemStock> getMultiResourceItemReader() throws FileNotFoundException {
logger.debug("create itemStockMultiItemReader.");
CommonMultiResourceItemReader itemStockItemReader = new CommonMultiResourceItemReader(filePath,"csv");
FlatFileItemReader reader = new FlatFileItemReader<BiItemStock>();
//通過裝飾類實(shí)現(xiàn)讀取多個(gè)同類型文件
reader.setLineMapper(itemStockLineMapper);
itemStockItemReader.setDelegate(reader);
return itemStockItemReader;
}
public CommonMultiResourceItemReader(String dirPath,String needFileExtension) throws FileNotFoundException {
/*存放文件的地址*/
File file = new File(dirPath);
final File[] files= file.listFiles();
List<Resource> resources = new ArrayList<>();
for(int i=0;i<files.length;i++){
final int finalI = i;
String fileName = files[finalI].getName();
String suffix = fileName.substring(fileName.lastIndexOf(".") + 1);
if(!suffix.equalsIgnoreCase(needFileExtension)){
continue;
}
/*這里如果使用InputStreamResource 需要重寫getFilename 因?yàn)镮nputStreamResource 沒有實(shí)現(xiàn) 默認(rèn)返回的是null*/
/*如果不重寫這個(gè)方法 自己實(shí)現(xiàn)setComparator 也行*/
InputStreamResource inputStreamResource = new InputStreamResource(new FileInputStream(files[finalI])){
@Override
public String getFilename() {
return files[finalI].getName();
}
};
resources.add(inputStreamResource);
}
Resource[] tempResources = new Resource[resources.size()];
setResources(resources.toArray(tempResources));
}
Line Mapper
LineMapper的輸入就是獲取一行文本,和行號(hào),然后轉(zhuǎn)換成Message。
在本例子里面,一行文本就是一個(gè)json對(duì)象,所以我們使用JsonParser來轉(zhuǎn)換成Message。
public class MessageLineMapper implements LineMapper<Message> {
private MappingJsonFactory factory = new MappingJsonFactory();
@Override
public Message mapLine(String line, int lineNumber) throws Exception {
JsonParser parser = factory.createParser(line);
Map<String, Object> map = (Map) parser.readValueAs(Map.class);
Message message = new Message();
... // 轉(zhuǎn)換邏輯
return message;
}
}
Processor
由于本例子里面,數(shù)據(jù)是一行文本,通過reader變成Message的類,然后writer直接把Message寫入MySQL。所以我們的例子里面就不需要Processor,關(guān)于如何寫Processor其實(shí)和reader/writer是一樣的道理。
從它的接口可以看出,需要定義輸入和輸出的類型,把輸入I通過某些邏輯處理之后,返回輸出O。
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
Writer
Writer顧名思義就是把數(shù)據(jù)寫入到目標(biāo)數(shù)據(jù)源里面。
Spring Batch同樣給我們提供很多好用實(shí)用的writer。比如JpaItemWriter,F(xiàn)latFileItemWriter,HibernateItemWriter,JdbcBatchItemWriter等。同樣也可以自定義。
本例子里面,使用的是JpaItemWriter,可以直接把Message對(duì)象寫到數(shù)據(jù)庫里面。但是需要設(shè)置一個(gè)EntityManagerFactory,可以注入進(jìn)來。
@Autowired
private EntityManagerFactory entityManager;
@Bean
public JpaItemWriter<Message> messageItemWriter() {
JpaItemWriter<Message> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManager);
return writer;
}
CompositeItemWriter
@Bean("itemStockMultiItemWrite")
@StepScope
public CompositeItemWriter<BiItemStock> getItemStockCompositeWriter(){
CompositeItemWriter<BiItemStock> itemWriter = new CompositeItemWriter<>();
List<ItemWriter<? super BiItemStock>> delegates = new ArrayList<>();
//通過裝飾類實(shí)現(xiàn)一行寫入兩個(gè)數(shù)據(jù)源
delegates.add(itemStockWriter);
delegates.add(itemStocHistorykWriter);
itemWriter.setDelegates(delegates);
return itemWriter;
}
另外,你需要配置數(shù)據(jù)庫的連接等東西。由于我使用的spring,所以直接在Application.properties里面配置如下:
spring.datasource.url=jdbc:mysql://database
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect
spring.jpa.show-sql=true
spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true
spring.jackson.serialization.write-dates-as-timestamps=false
spring.batch.initialize-schema=ALWAYS
spring.jpa.hibernate.ddl-auto=update
spring.datasource相關(guān)的設(shè)置都是在配置數(shù)據(jù)庫的連接。
spring.batch.initialize-schema=always表示讓spring batch在數(shù)據(jù)庫里面創(chuàng)建默認(rèn)的數(shù)據(jù)表。
spring.jpa.show-sql=true表示在控制臺(tái)輸出hibernate讀寫數(shù)據(jù)庫時(shí)候的SQL。
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect是在指定MySQL的方言。
Listener
Spring Batch同樣實(shí)現(xiàn)了非常完善全面的listener,listener很好理解,就是用來監(jiān)聽每個(gè)步驟的結(jié)果。比如可以有監(jiān)聽step的,有監(jiān)聽job的,有監(jiān)聽reader的,有監(jiān)聽writer的。沒有你找不到的listener,只有你想不到的listener。
在本例子里面,我只關(guān)心,read的時(shí)候有沒有出錯(cuò),和write的時(shí)候有沒有出錯(cuò),所以,我只實(shí)現(xiàn)了ReadListener和WriteListener。
在read出錯(cuò)的時(shí)候,把錯(cuò)誤結(jié)果寫入一個(gè)單獨(dú)的error列表文件中。
public class MessageItemReadListener implements ItemReadListener<Message> {
private Writer errorWriter;
public MessageItemReadListener(Writer errorWriter) {
this.errorWriter = errorWriter;
}
@Override
public void beforeRead() {
}
@Override
public void afterRead(Message item) {
}
@Override
public void onReadError(Exception ex) {
errorWriter.write(format("%s%n", ex.getMessage()));
}
}
在write出錯(cuò)的時(shí)候,也做同樣的事情,把出錯(cuò)的原因?qū)懭雴为?dú)的日志中。
public class MessageWriteListener implements ItemWriteListener<Message> {
@Autowired
private Writer errorWriter;
@Override
public void beforeWrite(List<? extends Message> items) {
}
@Override
public void afterWrite(List<? extends Message> items) {
}
@Override
public void onWriteError(Exception exception, List<? extends Message> items) {
errorWriter.write(format("%s%n", exception.getMessage()));
for (Message message : items) {
errorWriter.write(format("Failed writing message id: %s", message.getObjectId()));
}
}
}
前面有說chuck機(jī)制,所以write的listener傳入?yún)?shù)是一個(gè)List,因?yàn)樗抢鄯e到一定的數(shù)量才一起寫入。
Skip
Spring Batch提供了skip的機(jī)制,也就是說,如果出錯(cuò)了,可以跳過。如果你不設(shè)置skip,那么一條數(shù)據(jù)出錯(cuò)了,整個(gè)job都會(huì)掛掉。
設(shè)置skip的時(shí)候一定要設(shè)置什么Exception才需要跳過,并且跳過多少條數(shù)據(jù)。如果失敗的數(shù)據(jù)超過你設(shè)置的skip limit,那么job就會(huì)失敗。
你可以分別給reader和writer等設(shè)置skip機(jī)制。
writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
Retry
這個(gè)和Skip是一樣的原理,就是失敗之后可以重試,你同樣需要設(shè)置重試的次數(shù)。
同樣可以分別給reader,writer等設(shè)置retry機(jī)制。
如果同時(shí)設(shè)置了retry和skip,會(huì)先重試所有次數(shù),然后再開始skip。比如retry是10次,skip是20,會(huì)先重試10次之后,再開始算第一次skip。
運(yùn)行Job
所有東西都準(zhǔn)備好以后,就是如何運(yùn)行了。
運(yùn)行就是在main方法里面用JobLauncher去運(yùn)行你制定的job。
下面是我寫的main方法,main方法的第一個(gè)參數(shù)是job的名字,這樣我們就可以通過不同的job名字跑不同的job了。
首先我們通過運(yùn)行起來的Spring application得到j(luò)obRegistry,然后通過job的名字找到對(duì)應(yīng)的job。
接著,我們就可以用jobLauncher去運(yùn)行這個(gè)job了,運(yùn)行的時(shí)候會(huì)傳一些參數(shù),比如你job里面需要的文件路徑或者文件日期等,就可以通過這個(gè)jobParameters傳進(jìn)去。如果沒有參數(shù),可以默認(rèn)傳當(dāng)前時(shí)間進(jìn)去。
public static void main(String[] args) {
String jobName = args[0];
try {
ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
JobRegistry jobRegistry = context.getBean(JobRegistry.class);
Job job = jobRegistry.getJob(jobName);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
JobExecution jobExecution = jobLauncher.run(job, createJobParams());
if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
throw new RuntimeException(format("%s Job execution failed.", jobName));
}
} catch (Exception e) {
throw new RuntimeException(format("%s Job execution failed.", jobName));
}
}
private static JobParameters createJobParams() {
return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}
最后,把jar包編譯出來,在命令行執(zhí)行下面的命令,就可以運(yùn)行你的Spring Batch了。
java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME
調(diào)試
調(diào)試主要依靠控制臺(tái)輸出的log,可以在application.properties里面設(shè)置log輸出的級(jí)別,比如你希望輸出INFO信息還是DEBUG信息。
基本上,通過查看log都能定位到問題。
logging.path=build/logs
logging.file=${logging.path}/batch.log
logging.level.com.easystudio=INFO
logging.level.root=INFO
log4j.logger.org.springframework.jdbc=INFO
log4j.logger.org.springframework.batch=INFO
logging.level.org.hibernate.SQL=INFO
Spring Batch數(shù)據(jù)表
如果你的batch最終會(huì)寫入數(shù)據(jù)庫,那么Spring Batch會(huì)默認(rèn)在你的數(shù)據(jù)庫里面創(chuàng)建一些batch相關(guān)的表,來記錄所有job/step運(yùn)行的狀態(tài)和結(jié)果。
大部分表你都不需要關(guān)心,你只需要關(guān)心幾張表。

batch_job_instance:這張表能看到每次運(yùn)行的job名字。

batch_job_execution:這張表能看到每次運(yùn)行job的開始時(shí)間,結(jié)束時(shí)間,狀態(tài),以及失敗后的錯(cuò)誤消息是什么。

batch_step_execution:這張表你能看到更多關(guān)于step的詳細(xì)信息。比如step的開始時(shí)間,結(jié)束時(shí)間,提交次數(shù),讀寫次數(shù),狀態(tài),以及失敗后的錯(cuò)誤信息等。

總結(jié)
Spring Batch為我們提供了非常實(shí)用的功能,對(duì)批處理場(chǎng)景進(jìn)行了完善的抽象,它不僅能實(shí)現(xiàn)小數(shù)據(jù)的遷移,也能應(yīng)對(duì)大企業(yè)的大數(shù)據(jù)實(shí)踐應(yīng)用。它讓我們開發(fā)批處理應(yīng)用可以事半功倍。
最后一個(gè)tips,搭建Spring Batch的過程中,會(huì)遇到各種各樣的問題。只要善用Google,都能找到答案。