tags: springbatch
1.引言
前面《數(shù)據(jù)批處理神器-Spring Batch(1)簡介及使用場景》已經(jīng)介紹了Spring Batch是一個輕量級,完善的批處理框架,它使用起來簡單,方便,比較適合有點編程基礎(特別是使用Spring及SpringBoot框架)的開發(fā)人員,針對業(yè)務編程,只需要關心具體的業(yè)務實現(xiàn)即可,把流程以及流程的控制交給Spring Batch就好。常言道"talk is cheap, show me the code",下面我們就通過一個簡單的hello world,進入Spring Batch的世界,通過這個示例,可以快速了解開發(fā)批處理的流程和Spring Batch開發(fā)用到的組件,為后續(xù)的操作打下基礎。
2.開發(fā)環(huán)境
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 開發(fā)IDE: IDEA
- 構建工具Maven: 3.3.9
- 日志組件logback:1.2.3
- lombok:1.18.6
3.helloworld開發(fā)
3.1 helloworld說明
本helloworld實現(xiàn)一個非常簡單的功能,就是從數(shù)據(jù)組中讀取字符串,把字符串轉為大寫,然后輸出到控制臺。如圖:

整個過程就是一個批重任務(Job),它只有一個步驟(Job Step),步驟里分為三個階段,讀數(shù)據(jù)(ItemReader)、處理數(shù)據(jù)(ItemProcessor)、寫數(shù)據(jù)(ItemWriter)。
3.2 開發(fā)流程
開發(fā)的主要代碼如下:

總體來說就是,通過Reader,Processor、Writer完成任務,結束后通過Listener進行監(jiān)聽,整個任務通過配置(BatchConfig)進行配置。
3.2.1 創(chuàng)建Spring Boot工程
直接使用Idea生成或在使用Spring Initializr生成即可,此處不詳細說明。也可以直接使用我的代碼示例。當前使用的Spring Boot版本是2.1.4.RELEASE
3.2.2 添加相關依賴
-
Spring Batch依賴
在使用spring-boot-starter-parent的情況下,直接添加以下依賴即可:
<!-- 批處理框架-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
引用后,會引用兩個jar包,一個是spring-batch-infrastructure,一個是spring-batch-core,版本是4.1.2.RELEASE。分別對應的是基礎框架層和核心層。
- 添加內(nèi)存數(shù)據(jù)庫H2
Spring Batch是需要數(shù)據(jù)庫來存儲任務的基本信息以及運行狀態(tài)的,本例中不需要操作數(shù)據(jù)庫邏輯,直接使用內(nèi)存數(shù)據(jù)庫H2即可。添加以下依賴:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
- 添加測試及工具類依賴
為了簡化開發(fā),使用lombok進行處理。使用Spring Boot進行單元測試,添加依賴如下:
<!-- 工具包:lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<!-- 測試框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
3.2.3 開發(fā)讀數(shù)據(jù)組件ItemReader
添加完依賴后,就可以進入業(yè)務邏輯編程了。按Spring Batch的批處理流程,讀數(shù)據(jù)ItemReader是第一步,當前示例中,我們的任務是從數(shù)組中讀取數(shù)據(jù)。ItemReader是一個接口,開發(fā)人員直接實現(xiàn)此接口即可。此接口定義了核心方法read(),負責從給定的資源中讀取可用的數(shù)據(jù)。具體實現(xiàn)如下:
@Slf4j
public class StringReader implements ItemReader<String> {
private String[] messages = {"aaa1","aaa2","aaa3","aaa4"};
private int count = 0;
@Override
public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException {
if(count < messages.length){
String message = messages[count++];
log.debug(LogConstants.LOG_TAG + "read data:"+message);
return message;
}else{
log.debug(LogConstants.LOG_TAG + "read data end.");
count = 0;
}
return null;
}
}
說明:
- (1)StringReader實現(xiàn)ItemReader接口;
- (2)messages是數(shù)據(jù)源;
- (3)count表示讀取數(shù)據(jù)的下標,每讀一次,下標自增,讀取完后返回null表示結束。同時把count置為0,以方便下次讀取。
- (4)日志輸出使用的是logback,結合lombok的
@Slf4j注解,直接可使用log進行輸出,簡化操作。
3.2.4 開發(fā)處理數(shù)據(jù)組件ItemProcessor
讀取數(shù)據(jù)后,返回的數(shù)據(jù)會流到ItemProcessor進行處理。同樣,ItemProcessor是一個接口,要實現(xiàn)自己的處理邏輯,實現(xiàn)此接口即可。當然,如果沒有ItemProcessor,讀到的數(shù)據(jù)直接就到ItemWriter流程也是可以的。此處,Spring Batch有一個Chunk的概念,用于多次讀,直到chunk指定的數(shù)量后,再統(tǒng)一給到processor和writer,以提高效率。本示例對于ItemProcessor的實現(xiàn)很簡單,即把字符串轉為大寫。如下:
@Slf4j
public class ConvertProcessor implements ItemProcessor<String,String> {
@Autowired
private ConsoleService consoleService;
@Override
public String process(String data) {
String dataProcessed = consoleService.convert2UpperCase(data);
log.debug(LogConstants.LOG_TAG + data +" process data --> " + dataProcessed);
return dataProcessed;
}
}
說明:
- 實現(xiàn)ItemProcessor接口,它有兩個泛型,分別是I和O,I是讀階段獲取的數(shù)據(jù),O是提交給寫階段的數(shù)據(jù)。
- 使用ConsoleService服務,對數(shù)據(jù)進行大寫轉換,里面的實現(xiàn)直接使用字符串的
toUpperCase()方法
3.2.5 開發(fā)寫數(shù)據(jù)組件ItemWriter
數(shù)據(jù)處理完后,會統(tǒng)一交給寫組件(ItemWriter)進行寫入。ItemWriter也是一個接口,核心方法是write方法,參數(shù)是數(shù)組。要實現(xiàn)自己的邏輯,實現(xiàn)此接口即可。本示例中,直接把數(shù)據(jù)輸出到日志中即可。如下:
@Slf4j
public class ConsoleWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> list) {
for (String msg :list) {
log.debug(LogConstants.LOG_TAG + "write data: "+msg);
}
}
}
3.2.6 開發(fā)任務完成后的監(jiān)聽器JobExecutionListener
數(shù)據(jù)寫入到目標后,任務即結束,但有時候我們還需要在任務結束時去做一些其它工作,如清理數(shù)據(jù),更新時間等,則需要在任務完成后進行邏輯處理。Spring Batch對于任務或步驟開始和結束都會提供監(jiān)聽,以便于開發(fā)人員實現(xiàn)監(jiān)聽邏輯。如通過繼承JobExecutionListenerSupport,可以實現(xiàn)beforeJob和afterJob的監(jiān)聽,以實現(xiàn)開始任務前和結束任務后的處理。當前示例中,僅輸出任務完成的日志。如下:
@Slf4j
public class ConsoleJobEndListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED){
log.debug("console batch job complete!");
}
}
}
3.2.7 配置完整任務
經(jīng)過上面的讀、處理、寫、任務完成后監(jiān)聽的操作,現(xiàn)在需要把它們組裝在一起,形成一個完成的任務,使用Spring Boot,簡單的使用幾個配置即可完成任務的組裝。任務及其相關組件的關系如下:

創(chuàng)建配置文件ConsoleBatchConfig.java,具體代碼如下:
@Configuration
@EnableBatchProcessing
public class ConsoleBatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job consoleJob(Step consoleStep,JobExecutionListener consoleListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName).listener(consoleListener).flow(consoleStep)
.end().build();
}
@Bean
public Step consoleStep(ItemReader stringReader,ItemProcessor convertProcessor
,ItemWriter consoleWriter, CommonStepListener commonStepListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName).listener(commonStepListener)
.<String,String>chunk(3).reader(stringReader).processor(convertProcessor)
.writer(consoleWriter).build();
}
@Bean
public ItemReader stringReader(){return new StringReader();}
@Bean
public ItemWriter consoleWriter(){return new ConsoleWriter();}
@Bean
public ItemProcessor convertProcessor(){return new ConvertProcessor();}
@Bean
public JobExecutionListener consoleListener(){return new ConsoleJobEndListener();}
}
說明:
- 添加注解
@Configuration及和@EnableBatchProcessing,標識為配置及啟用Spring Batch的配置(可以直接使用JobBuilderFactory及StepBuilderFactory分別用于創(chuàng)建Job和Step)。 - 創(chuàng)建
ItemReader、ItemWriter、ItemProcessor、Listener對應的Bean,以供Step及Job的注入。 - 使用
stepBuilderFactory創(chuàng)建作業(yè)Step,其中chunk進行面向塊的處理,即多次讀取后再寫入,提高效率。當前配置是3個為一個chunk。 - 使用
jobBuilderFactory添加step,創(chuàng)建任務。 - 注意step和Job都需要有對應的名稱(
get方法確定),此處直接使用方法名作為Job和Step的名稱。
3.2.8 測試批處理
經(jīng)過上面的步驟,已經(jīng)完成Job的開發(fā),測試則可使用兩種方式,一個是編寫Controller,以接口調(diào)用的方式運行job,一種編寫單元測試。
- Job的運行
通過JobLauncher的run方法來運行任務,run方法參數(shù)分別是Job和jobParameters,即已配置的Job及job運行的參數(shù)。每個任務的區(qū)分是通過任務名(jobName)和任務參數(shù)(jobParameters)作為區(qū)別的,即如果jobName和jobParameters相同,Spring Batch會認為是同一任務,若任務已運行成功,同一任務不會再運行。因此,一般來說,不同的任務,我們的jobParameters可以直接以時間作為參數(shù),以便于區(qū)別。生成jobParameters。代碼如下:
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.toJobParameters();
- 編寫單元測試
編寫ConsoleJobTest,加載job,運行測試,如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,ConsoleBatchConfig.class})
@Slf4j
public class ConsoleJobTest {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job consoleJob;
public void testConsoleJob2() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
//構建參數(shù)
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.toJobParameters();
//執(zhí)行任務
JobExecution run = jobLauncher.run(consoleJob, jobParameters);
ExitStatus exitStatus = run.getExitStatus();
log.debug(exitStatus.toString());
}
}
說明:引入SpringBootTest注解時,需要把Spring Batch任務也引入進來。
-
執(zhí)行結果輸出
執(zhí)行結果如下圖所示:執(zhí)行結果從輸出可知,由于設置的
chunk是3,讀取3個數(shù)據(jù)后,就統(tǒng)一給ItemProcessor進行大寫轉換處理,然后統(tǒng)一交給ItemWriter進行寫入。執(zhí)行完成后,Job的exitCode表示任務執(zhí)行的狀態(tài),如果正常則為COMPLETED,失敗則為FAILED。
4.總結
經(jīng)過以上的操作步驟,即可完成批處理操作。關于任務的狀態(tài),流程的步驟(讀、處理、寫)均交給Spring Batch來完成,開發(fā)人員所做的工作是根據(jù)自己的業(yè)務邏輯編寫具體的讀數(shù)據(jù)、處理數(shù)據(jù)和寫數(shù)據(jù)即可。希望通過本文,大家可以對Spring Batch的組件有清晰的了解。
