最近研究離線批處理任務的加工,發(fā)現(xiàn)Spring Batch是一個非常不錯的選擇。其應用場景介于數(shù)據(jù)庫存儲過程與Hadoop大數(shù)據(jù)加工之間??梢蕴幚砣粘5呐幚砣蝿?,監(jiān)控任務的執(zhí)行情況。從現(xiàn)在開始,我們就慢慢深入研究下Spring Batch如何使用。
Spring Batch簡單介紹
- Spring Batch的任務包含多個步驟Step,每個Step包含三步:Reader、Processor、Writer,其實我們主要關注的就是這三步。具體圖如下:

Spring Batch Job
- 名稱解釋
JobRepository : 管理所有Job操作
Job:某一個任務
JobInstance:某一個任務對應的實例(類似類的實例)
Step:步驟
環(huán)境說明
- 主要版本說明
Spring Batch 3.0.7.RELEASE (當前最新版本)
Spring Framework 4.0.5.RELEASE(當前Spring batch 所能支持的最高版本,再高就會報錯哦)
官方的例子都是使用spring boot做DEMO,由于對Spring Boot不太了解,這里使用的都是直接Spring framework。
- 開發(fā)環(huán)境
STS
MAVEN
第一個Spring Batch例子
定義 Reader、Processor、Writer,他們分別實現(xiàn)ItemReader、ItemProcessor、ItemWriter 這三個接口。具體看代碼如下:
- Reader 返回一個簡單的字符串
package com.me.springbatch.hello;
import org.springframework.batch.item.ItemReader;
/**
* {@link ItemReader} with hard-coded input data.
*/
public class HelloItemReader implements ItemReader<String> {
private int index = 0;
/**
* 模擬讀操作
*/
public String read() throws Exception {
if (index < 1000) {
index++;
return "Hello world!" + index;
} else {
return null;
}
}
}
- Processor 為字符串做一個簡單修改
package com.me.springbatch.hello;
import org.springframework.batch.item.ItemProcessor;
public class HelloItemProcessor implements ItemProcessor<String, String> {
/**
* 模擬處理過程
*/
public String process(String item) throws Exception {
// TODO Auto-generated method stub
return item + " processed";
}
}
- Writer 通過Log輸出處理后的字符串
package com.me.springbatch.hello;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ItemWriter;
/**
* Dummy {@link ItemWriter} which only logs data it receives.
*/
public class HelloItemWriter implements ItemWriter<Object> {
private static final Log log = LogFactory.getLog(HelloItemWriter.class);
/**
* 模擬寫操作
*
* @see ItemWriter#write(java.util.List)
*/
public void write(List<? extends Object> data) throws Exception {
log.info(data);
}
}
三個接口實現(xiàn)都非常簡單,只是簡單模擬了下。這三個類如何組織成一個Job,通過XML簡單配置即可。
<description>Hello Job</description>
<batch:job id="helloJob">
<batch:step id="helloStep">
<batch:tasklet>
<!-- commit-interval:事務批量提交數(shù) -->
<batch:chunk reader="helloItemReader" processor="helloItemProcessor" writer="helloItemWriter"
commit-interval="1" />
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="helloItemReader" class="com.me.springbatch.hello.HelloItemReader"></bean>
<bean id="helloItemWriter" class="com.me.springbatch.hello.HelloItemWriter"></bean>
<bean id="helloItemProcessor" class="com.me.springbatch.hello.HelloItemProcessor"></bean>
這樣,一個簡單JOB就完成了。
執(zhí)行Job如下(jobId即xml中為job起的id名稱):
public static void run (String jobId) {
//通過應用程序上下文獲得bean
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"spring-context.xml");
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
//任務
Job job = (Job) context.getBean(jobId);
log.debug("任務【"+jobId+"】開始執(zhí)行");
long start = System.currentTimeMillis() ;
try {
//執(zhí)行任務
JobExecution execution = jobLauncher.run(job, new JobParameters());
log.debug("任務【"+jobId+"】執(zhí)行結果:"+execution.getStatus());
} catch (Exception e) {
e.printStackTrace();
}finally{
if(context != null){
context.close();
}
}
long end = System.currentTimeMillis() ;
log.debug("任務【"+jobId+"】執(zhí)行完畢!共花費:"+(end - start) + "毫秒");
}