前序文章陸續(xù)介紹了批處理的基本概念,Job使用、Step、Item的結(jié)構(gòu)以及文件的讀寫。本文將接著前面的內(nèi)容說(shuō)明數(shù)據(jù)庫(kù)如何進(jìn)行批處理讀寫,這也是日常使用最多的場(chǎng)景,利用數(shù)據(jù)庫(kù)批量讀寫實(shí)現(xiàn)數(shù)據(jù)庫(kù)同步,業(yè)務(wù)結(jié)算類的工作。
1.數(shù)據(jù)讀取
數(shù)據(jù)庫(kù)是絕大部分系統(tǒng)要用到的數(shù)據(jù)存儲(chǔ)工具,因此針對(duì)數(shù)據(jù)庫(kù)執(zhí)行批量數(shù)據(jù)處理任務(wù)也是很常見(jiàn)的需求。數(shù)據(jù)的批量處理與常規(guī)業(yè)務(wù)開(kāi)發(fā)不同,如果一次性讀取百萬(wàn)條,對(duì)于任何系統(tǒng)而言肯定都是不可取的。為了解決這個(gè)問(wèn)題Spring Batch提供了2套數(shù)據(jù)讀取方案:
基于游標(biāo)讀取數(shù)據(jù)
基于分頁(yè)讀取數(shù)據(jù)
1.1.自定義數(shù)據(jù)源
往往實(shí)際工作場(chǎng)景中,使用的都是動(dòng)態(tài)的數(shù)據(jù)源或者是多庫(kù)讀取,對(duì)于Reader而言是可以自定義數(shù)據(jù)源的存在,無(wú)論是基于游標(biāo)還是分頁(yè)的讀取器都是可以手動(dòng)注入數(shù)據(jù)源
2.游標(biāo)讀取數(shù)據(jù)
對(duì)于有經(jīng)驗(yàn)大數(shù)據(jù)工程師而言數(shù)據(jù)庫(kù)游標(biāo)的操作應(yīng)該是非常熟悉的,因?yàn)檫@是從數(shù)據(jù)庫(kù)讀取數(shù)據(jù)流標(biāo)準(zhǔn)方法,而且在Java中也封裝了ResultSet這種面向游標(biāo)操作的數(shù)據(jù)結(jié)構(gòu)。
ResultSet一直都會(huì)指向結(jié)果集中的某一行數(shù)據(jù),使用next方法可以讓游標(biāo)跳轉(zhuǎn)到下一行數(shù)據(jù)。Spring Batch同樣使用這個(gè)特性來(lái)控制數(shù)據(jù)的讀?。?/p>
1.在初始化時(shí)打開(kāi)游標(biāo)。
2.每一次調(diào)用ItemReader::read方法就從ResultSet獲取一行數(shù)據(jù)并執(zhí)行next。
3.返回可用于數(shù)據(jù)處理的映射結(jié)構(gòu)(map、dict)。
在一切都執(zhí)行完畢之后,框架會(huì)使用回調(diào)過(guò)程調(diào)用ResultSet::close來(lái)關(guān)閉游標(biāo)。由于所有的業(yè)務(wù)過(guò)程都綁定在一個(gè)事物之上,所以知道到Step執(zhí)行完畢或異常退出調(diào)用執(zhí)行close。下圖展示了數(shù)據(jù)讀取的過(guò)程:

SQL語(yǔ)句的查詢結(jié)果稱為數(shù)據(jù)集(對(duì)于大部分?jǐn)?shù)據(jù)庫(kù)而言,其SQL執(zhí)行結(jié)果會(huì)產(chǎn)生臨時(shí)的表空間索引來(lái)存放數(shù)據(jù)集)。游標(biāo)開(kāi)始會(huì)停滯在ID=2的位置,一次ItemReader執(zhí)行完畢后會(huì)產(chǎn)生對(duì)應(yīng)的實(shí)體FOO2,然后游標(biāo)下移直到最后的ID=6。最后關(guān)閉游標(biāo)。
2.1.JdbcCursorItemReader
JdbcCursorItemReader是使用游標(biāo)讀取數(shù)據(jù)集的ItemReader實(shí)現(xiàn)類之一。它使用JdbcTemplate中的DataSource控制ResultSet,其過(guò)程是將ResultSet的每行數(shù)據(jù)轉(zhuǎn)換為所需要的實(shí)體類。
JdbcCursorItemReader的執(zhí)行過(guò)程有三步:
1.通過(guò)DataSource創(chuàng)建JdbcTemplate。
2.設(shè)定數(shù)據(jù)集的SQL語(yǔ)句。
3.創(chuàng)建ResultSet到實(shí)體類的映射。 大致如下:
//庸才周久一
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
除了上面的代碼,JdbcCursorItemReader還有其他屬性:

在運(yùn)行代碼之前請(qǐng)先在數(shù)據(jù)庫(kù)中執(zhí)行以下DDL語(yǔ)句,并添加部分測(cè)試數(shù)據(jù)。
CREATE TABLE `tmp_test_weather` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`siteid` varchar(64) NOT NULL COMMENT '業(yè)務(wù)主鍵',
`month` varchar(64) NOT NULL COMMENT '日期',
`type` varchar(64) NOT NULL COMMENT '氣象類型',
`value` int(11) NOT NULL COMMENT '值',
`ext` varchar(255) DEFAULT NULL COMMENT '擴(kuò)展數(shù)據(jù)',
PRIMARY KEY (`id`)
) ;
運(yùn)行代碼:
//庸才周久一
public class JdbcReader {
@Bean
public RowMapper<WeatherEntity> weatherEntityRowMapper() {
return new RowMapper<WeatherEntity>() {
public static final String SITEID_COLUMN = "siteId"; // 設(shè)置映射字段
public static final String MONTH_COLUMN = "month";
public static final String TYPE_COLUMN = "type";
public static final String VALUE_COLUMN = "value";
public static final String EXT_COLUMN = "ext";
@Override
// 數(shù)據(jù)轉(zhuǎn)換
public WeatherEntity mapRow(ResultSet resultSet, int rowNum) throws SQLException {
WeatherEntity weatherEntity = new WeatherEntity();
weatherEntity.setSiteId(resultSet.getString(SITEID_COLUMN));
weatherEntity.setMonth(resultSet.getString(MONTH_COLUMN));
weatherEntity.setType(WeatherEntity.Type.valueOf(resultSet.getString(TYPE_COLUMN)));
weatherEntity.setValue(resultSet.getInt(VALUE_COLUMN));
weatherEntity.setExt(resultSet.getString(EXT_COLUMN));
return weatherEntity;
}
};
}
@Bean
public ItemReader<WeatherEntity> jdbcCursorItemReader(
@Qualifier("weatherEntityRowMapper") RowMapper<WeatherEntity> rowMapper, DataSource datasource) {
JdbcCursorItemReader<WeatherEntity> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(datasource); //設(shè)置DataSource
//設(shè)置讀取的SQL
itemReader.setSql("SELECT siteId, month, type, value, ext from TMP_TEST_WEATHER");
itemReader.setRowMapper(rowMapper); //設(shè)置轉(zhuǎn)換
return itemReader;
}
}
2.2.HibernateCursorItemReader
在Java體系中數(shù)據(jù)庫(kù)操作常見(jiàn)的規(guī)范有JPA或ORM,Spring Batch提供了HibernateCursorItemReader來(lái)實(shí)現(xiàn)HibernateTemplate,它可以通過(guò)Hibernate框架進(jìn)行游標(biāo)的控制。
需要注意的是:使用Hibernate框架來(lái)處理批量數(shù)據(jù)到目前為止一直都有爭(zhēng)議,核心原因是Hibernate最初是為在線聯(lián)機(jī)事物型系統(tǒng)開(kāi)發(fā)的。不過(guò)這并不意味著不能使用它來(lái)處理批數(shù)據(jù),解決此問(wèn)題就是讓Hibernate使用StatelessSession用來(lái)保持游標(biāo),而不是standard session一次讀寫,這將導(dǎo)致Hibernate的緩存機(jī)制和數(shù)據(jù)臟讀檢查失效,進(jìn)而影響批處理的過(guò)程。關(guān)于Hibernate的狀態(tài)控制機(jī)制請(qǐng)閱讀官方文檔。
HibernateCursorItemReader使用過(guò)程與JdbcCursorItemReader沒(méi)多大差異都是逐條讀取數(shù)據(jù)然后控制狀態(tài)鏈接關(guān)閉。只不過(guò)他提供了Hibernate所使用的HSQL方案。
@Bean
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
HibernateCursorItemReader<WeatherEntity> itemReader = new HibernateCursorItemReader<>();
itemReader.setName("hibernateCursorItemReader");
itemReader.setQueryString("from WeatherEntity tmp_test_weather");
itemReader.setSessionFactory(sessionFactory);
return itemReader;
}
或
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
如果沒(méi)有特別的需要,不推薦使用Hibernate。
2.3.StoredProcedureItemReader
存儲(chǔ)過(guò)程是在同一個(gè)數(shù)據(jù)庫(kù)中處理大量數(shù)據(jù)的常用方法。StoredProcedureItemReader的執(zhí)行過(guò)程和JdbcCursorItemReader一致,但是底層邏輯是先執(zhí)行存儲(chǔ)過(guò)程,然后返回存儲(chǔ)過(guò)程執(zhí)行結(jié)果游標(biāo)。不同的數(shù)據(jù)庫(kù)存儲(chǔ)過(guò)程游標(biāo)返回會(huì)有一些差異:
- 作為一個(gè)
ResultSet返回。(SQL Server, Sybase, DB2, Derby以及MySQL) - 參數(shù)返回一個(gè)
ref-cursor實(shí)例。比如Oracle、PostgreSQL數(shù)據(jù)庫(kù),這類數(shù)據(jù)庫(kù)存儲(chǔ)過(guò)程是不會(huì)直接return任何內(nèi)容的,需要從傳參獲取。 - 返回存儲(chǔ)過(guò)程調(diào)用后的返回值。
針對(duì)以上3個(gè)類型,配置上有一些差異:
//庸才周久一
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_processor_weather");
reader.setRowMapper(new weatherEntityRowMapper());
reader.setRefCursorPosition(1);//第二種類型需要指定ref-cursor的參數(shù)位置
reader.setFunction(true);//第三種類型需要明確的告知reader通過(guò)返回獲取
return reader;
}
使用存儲(chǔ)過(guò)程處理數(shù)據(jù)的好處是可以實(shí)現(xiàn)針對(duì)庫(kù)內(nèi)的數(shù)據(jù)進(jìn)行合并、分割、排序等處理。如果數(shù)據(jù)在同一個(gè)數(shù)據(jù)庫(kù),性能也明顯好于通過(guò)Java處理。
3.1.JdbcPagingItemReader
分頁(yè)查詢的默認(rèn)實(shí)現(xiàn)類是JdbcPagingItemReader,它的核心功能是用分頁(yè)器PagingQueryProvider進(jìn)行分頁(yè)控制。由于不同的數(shù)據(jù)庫(kù)分頁(yè)方法差別很大,所以針對(duì)不同的數(shù)據(jù)庫(kù)有不同的實(shí)現(xiàn)類??蚣芴峁┝薙qlPagingQueryProviderFactoryBean用于檢查當(dāng)前數(shù)據(jù)庫(kù)并自動(dòng)注入對(duì)應(yīng)的PagingQueryProvider。
JdbcPagingItemReader會(huì)從數(shù)據(jù)庫(kù)中一次性讀取一整頁(yè)的數(shù)據(jù),但是調(diào)用Reader的時(shí)候還是會(huì)一行一行的返回?cái)?shù)據(jù)??蚣軙?huì)自行根據(jù)運(yùn)行情況確定什么時(shí)候需要執(zhí)行下一個(gè)分頁(yè)的查詢。
//庸才周久一
public class PageReader {
final private boolean wrapperBuilder = false;
@Bean
//設(shè)置 queryProvider
public SqlPagingQueryProviderFactoryBean queryProvider(DataSource dataSource) {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setDataSource(dataSource);
provider.setSelectClause("select id, siteid, month, type, value, ext");
provider.setFromClause("from tmp_test_weather");
provider.setWhereClause("where id>:start");
provider.setSortKey("id");
return provider;
}
@Bean
public ItemReader<WeatherEntity> jdbcPagingItemReader(DataSource dataSource,
PagingQueryProvider queryProvider,
RowMapper<WeatherEntity> rowMapper) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("start", "1");
JdbcPagingItemReader<WeatherEntity> itemReader;
if (wrapperBuilder) {
itemReader = new JdbcPagingItemReaderBuilder<WeatherEntity>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(rowMapper)
.pageSize(1000)
.build();
} else {
itemReader = new JdbcPagingItemReader<>();
itemReader.setName("weatherEntityJdbcPagingItemReader");
itemReader.setDataSource(dataSource);
itemReader.setQueryProvider(queryProvider);
itemReader.setParameterValues(parameterValues);
itemReader.setRowMapper(rowMapper);
itemReader.setPageSize(1000);
}
return itemReader;
}
}
4.數(shù)據(jù)寫入
Spring Batch為不同類型的文件的寫入提供了多個(gè)實(shí)現(xiàn)類,但并沒(méi)有為數(shù)據(jù)庫(kù)的寫入提供任何實(shí)現(xiàn)類,而是交由開(kāi)發(fā)者自己去實(shí)現(xiàn)接口。理由是:
數(shù)據(jù)庫(kù)的寫入與文件寫入有巨大的差別。對(duì)于一個(gè)Step而言,在寫入一份文件時(shí)需要保持對(duì)文件的打開(kāi)狀態(tài)從而能夠高效的向隊(duì)尾添加數(shù)據(jù)。如果每次都重新打開(kāi)文件,從開(kāi)始位置移動(dòng)到隊(duì)尾會(huì)耗費(fèi)大量的時(shí)間(很多文件流無(wú)法在open時(shí)就知道長(zhǎng)度)。當(dāng)整個(gè)Step結(jié)束時(shí)才能關(guān)閉文件的打開(kāi)狀態(tài),框架提供的文件讀寫類都實(shí)現(xiàn)了這個(gè)控制過(guò)程。
另外無(wú)論使用何種方式將數(shù)據(jù)寫入文件都是"逐行進(jìn)行"的(流數(shù)據(jù)寫入、字符串逐行寫入)。因此當(dāng)數(shù)據(jù)寫入與整個(gè)Step綁定為事物時(shí)還需要實(shí)現(xiàn)一個(gè)控制過(guò)程是:在寫入數(shù)據(jù)的過(guò)程中出現(xiàn)異常時(shí)要擦除本次事物已經(jīng)寫入的數(shù)據(jù),這樣才能和整個(gè)Step的狀態(tài)保持一致??蚣苤械念愅瑯訉?shí)現(xiàn)了這個(gè)過(guò)程。
但是向數(shù)據(jù)庫(kù)寫入數(shù)據(jù)并不需要類似于文件的尾部寫入控制,因?yàn)閿?shù)據(jù)庫(kù)的各種鏈接池本身就保證了鏈接->寫入->釋放的高效執(zhí)行,也不存在向隊(duì)尾添加數(shù)據(jù)的問(wèn)題。而且?guī)缀跛械臄?shù)據(jù)庫(kù)驅(qū)動(dòng)都提供了事物能力,在任何時(shí)候出現(xiàn)異常都會(huì)自動(dòng)回退,不存在擦除數(shù)據(jù)的問(wèn)題。
因此,對(duì)于數(shù)據(jù)庫(kù)的寫入操作只要按照常規(guī)的批量數(shù)據(jù)寫入的方式即可,開(kāi)發(fā)者使用任何工具都可以完成這個(gè)過(guò)程。
如下圖自定義實(shí)現(xiàn)的寫入器
/**
* itemWrite抽象類,主要處理平臺(tái)是否有數(shù)據(jù)變化
* @author zhouhui
*
* @param <T>
*/
@Slf4j
public abstract class AbstractItemStreamWriter<T> implements ItemStreamWriter<T> {
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
// TODO Auto-generated method stub
log.info("打開(kāi)寫入器");
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// TODO Auto-generated method stub
log.info("更新寫入器");
}
@Override
public void close() throws ItemStreamException {
// TODO Auto-generated method stub
log.info("關(guān)閉寫入器");
}
}
具體實(shí)現(xiàn)
@Configuration
@Slf4j
public class CreateTBCamWriterConfig{
@Bean("createTBCamWriter")
@StepScope
public ItemStreamWriter<CamData> writeDataToTbCam(@Autowired final CacheUtils cacheUtil) {
return new AbstractItemStreamWriter<CamData>() {
@Override
public void write(List<? extends CamData> items) throws Exception {
System.out.println("do something");
}
};
}
}
這里值得注意的是,如果讀取器異常,寫入器并不會(huì)執(zhí)行
5.組合使用案例
下面是一些組合使用過(guò)程,簡(jiǎn)單實(shí)現(xiàn)了文件到數(shù)據(jù)庫(kù)、數(shù)據(jù)庫(kù)到文件的過(guò)程。文件讀寫的過(guò)程已經(jīng)在文件讀寫中介紹過(guò),這里會(huì)重復(fù)使用之前介紹的文件讀寫的功能。
下面的案例是將data.csv中的數(shù)據(jù)寫入到數(shù)據(jù)庫(kù),然后再將數(shù)據(jù)寫入到out-data.csv。案例組合使用已有的item完成任務(wù):flatFileReader、jdbcBatchWriter、jdbcCursorItemReader、simpleProcessor、flatFileWriter。這種Reader、Processor、Writer組合的方式也是完成一個(gè)批處理工程的常見(jiàn)開(kāi)發(fā)方式。
案例的運(yùn)行代碼在org.chenkui.spring.batch.sample.database.complex包中,使用了2個(gè)Step來(lái)完成任務(wù),一個(gè)將數(shù)據(jù)讀取到數(shù)據(jù)庫(kù),一個(gè)將數(shù)據(jù)進(jìn)行過(guò)濾,然后再寫入到文件:
//庸才周久一
public class FileComplexProcessConfig {
@Bean
// 配置Step1
public Step file2DatabaseStep(StepBuilderFactory builder,
@Qualifier("flatFileReader") ItemReader<WeatherEntity> reader,
@Qualifier("jdbcBatchWriter") ItemWriter<WeatherEntity> writer) {
return builder.get("file2DatabaseStep") // 創(chuàng)建
.<WeatherEntity, WeatherEntity>chunk(50) // 分片
.reader(reader) // 讀取
.writer(writer) // 寫入
.faultTolerant() // 開(kāi)啟容錯(cuò)處理
.skipLimit(20) // 跳過(guò)設(shè)置
.skip(Exception.class) // 跳過(guò)異常
.build();
}
@Bean
// 配置Step2
public Step database2FileStep(StepBuilderFactory builder,
@Qualifier("jdbcCursorItemReader") ItemReader<WeatherEntity> reader,
@Qualifier("simpleProcessor") ItemProcessor<WeatherEntity, MaxTemperatureEntiry> processor,
@Qualifier("flatFileWriter") ItemWriter<MaxTemperatureEntiry> writer) {
return builder.get("database2FileStep") // 創(chuàng)建
.<WeatherEntity, MaxTemperatureEntiry>chunk(50) // 分片
.reader(reader) // 讀取
.processor(processor) //
.writer(writer) // 寫入
.faultTolerant() // 開(kāi)啟容錯(cuò)處理
.skipLimit(20) // 跳過(guò)設(shè)置
.skip(Exception.class) // 跳過(guò)異常
.build();
}
@Bean
public Job file2DatabaseJob(@Qualifier("file2DatabaseStep") Step step2Database,
@Qualifier("database2FileStep") Step step2File, JobBuilderFactory builder) {
return builder.get("File2Database").start(step2Database).next(step2File).build();
}
}