spring batch 純注解學(xué)習(xí)筆記(六)--數(shù)據(jù)庫(kù)批量讀寫

前序文章陸續(xù)介紹了批處理的基本概念,Job使用Step、Item的結(jié)構(gòu)以及文件的讀寫。本文將接著前面的內(nèi)容說(shuō)明數(shù)據(jù)庫(kù)如何進(jìn)行批處理讀寫,這也是日常使用最多的場(chǎng)景,利用數(shù)據(jù)庫(kù)批量讀寫實(shí)現(xiàn)數(shù)據(jù)庫(kù)同步,業(yè)務(wù)結(jié)算類的工作。

1.數(shù)據(jù)讀取

數(shù)據(jù)庫(kù)是絕大部分系統(tǒng)要用到的數(shù)據(jù)存儲(chǔ)工具,因此針對(duì)數(shù)據(jù)庫(kù)執(zhí)行批量數(shù)據(jù)處理任務(wù)也是很常見(jiàn)的需求。數(shù)據(jù)的批量處理與常規(guī)業(yè)務(wù)開(kāi)發(fā)不同,如果一次性讀取百萬(wàn)條,對(duì)于任何系統(tǒng)而言肯定都是不可取的。為了解決這個(gè)問(wèn)題Spring Batch提供了2套數(shù)據(jù)讀取方案:

基于游標(biāo)讀取數(shù)據(jù)
基于分頁(yè)讀取數(shù)據(jù)

1.1.自定義數(shù)據(jù)源

往往實(shí)際工作場(chǎng)景中,使用的都是動(dòng)態(tài)的數(shù)據(jù)源或者是多庫(kù)讀取,對(duì)于Reader而言是可以自定義數(shù)據(jù)源的存在,無(wú)論是基于游標(biāo)還是分頁(yè)的讀取器都是可以手動(dòng)注入數(shù)據(jù)源

2.游標(biāo)讀取數(shù)據(jù)

對(duì)于有經(jīng)驗(yàn)大數(shù)據(jù)工程師而言數(shù)據(jù)庫(kù)游標(biāo)的操作應(yīng)該是非常熟悉的,因?yàn)檫@是從數(shù)據(jù)庫(kù)讀取數(shù)據(jù)流標(biāo)準(zhǔn)方法,而且在Java中也封裝了ResultSet這種面向游標(biāo)操作的數(shù)據(jù)結(jié)構(gòu)。

ResultSet一直都會(huì)指向結(jié)果集中的某一行數(shù)據(jù),使用next方法可以讓游標(biāo)跳轉(zhuǎn)到下一行數(shù)據(jù)。Spring Batch同樣使用這個(gè)特性來(lái)控制數(shù)據(jù)的讀?。?/p>

1.在初始化時(shí)打開(kāi)游標(biāo)。
2.每一次調(diào)用ItemReader::read方法就從ResultSet獲取一行數(shù)據(jù)并執(zhí)行next。
3.返回可用于數(shù)據(jù)處理的映射結(jié)構(gòu)(map、dict)。
在一切都執(zhí)行完畢之后,框架會(huì)使用回調(diào)過(guò)程調(diào)用ResultSet::close來(lái)關(guān)閉游標(biāo)。由于所有的業(yè)務(wù)過(guò)程都綁定在一個(gè)事物之上,所以知道到Step執(zhí)行完畢或異常退出調(diào)用執(zhí)行close。下圖展示了數(shù)據(jù)讀取的過(guò)程:


分頁(yè)

SQL語(yǔ)句的查詢結(jié)果稱為數(shù)據(jù)集(對(duì)于大部分?jǐn)?shù)據(jù)庫(kù)而言,其SQL執(zhí)行結(jié)果會(huì)產(chǎn)生臨時(shí)的表空間索引來(lái)存放數(shù)據(jù)集)。游標(biāo)開(kāi)始會(huì)停滯在ID=2的位置,一次ItemReader執(zhí)行完畢后會(huì)產(chǎn)生對(duì)應(yīng)的實(shí)體FOO2,然后游標(biāo)下移直到最后的ID=6。最后關(guān)閉游標(biāo)。

2.1.JdbcCursorItemReader

JdbcCursorItemReader是使用游標(biāo)讀取數(shù)據(jù)集的ItemReader實(shí)現(xiàn)類之一。它使用JdbcTemplate中的DataSource控制ResultSet,其過(guò)程是將ResultSet的每行數(shù)據(jù)轉(zhuǎn)換為所需要的實(shí)體類。

JdbcCursorItemReader的執(zhí)行過(guò)程有三步:

1.通過(guò)DataSource創(chuàng)建JdbcTemplate。
2.設(shè)定數(shù)據(jù)集的SQL語(yǔ)句。
3.創(chuàng)建ResultSet到實(shí)體類的映射。 大致如下:

//庸才周久一
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());

除了上面的代碼,JdbcCursorItemReader還有其他屬性:


JdbcCursorItemReader.png

在運(yùn)行代碼之前請(qǐng)先在數(shù)據(jù)庫(kù)中執(zhí)行以下DDL語(yǔ)句,并添加部分測(cè)試數(shù)據(jù)。

CREATE TABLE `tmp_test_weather` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `siteid` varchar(64) NOT NULL COMMENT '業(yè)務(wù)主鍵',
  `month` varchar(64) NOT NULL COMMENT '日期',
  `type` varchar(64) NOT NULL COMMENT '氣象類型',
  `value` int(11) NOT NULL COMMENT '值',
  `ext` varchar(255) DEFAULT NULL COMMENT '擴(kuò)展數(shù)據(jù)',
  PRIMARY KEY (`id`)
) ;

運(yùn)行代碼:

//庸才周久一
public class JdbcReader {

    @Bean
    public RowMapper<WeatherEntity> weatherEntityRowMapper() {

        return new RowMapper<WeatherEntity>() {
            public static final String SITEID_COLUMN = "siteId"; // 設(shè)置映射字段
            public static final String MONTH_COLUMN = "month";
            public static final String TYPE_COLUMN = "type";
            public static final String VALUE_COLUMN = "value";
            public static final String EXT_COLUMN = "ext";

            @Override
            // 數(shù)據(jù)轉(zhuǎn)換
            public WeatherEntity mapRow(ResultSet resultSet, int rowNum) throws SQLException {
                WeatherEntity weatherEntity = new WeatherEntity();
                weatherEntity.setSiteId(resultSet.getString(SITEID_COLUMN));
                weatherEntity.setMonth(resultSet.getString(MONTH_COLUMN));
                weatherEntity.setType(WeatherEntity.Type.valueOf(resultSet.getString(TYPE_COLUMN)));
                weatherEntity.setValue(resultSet.getInt(VALUE_COLUMN));
                weatherEntity.setExt(resultSet.getString(EXT_COLUMN));
                return weatherEntity;
            }
        };
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcCursorItemReader(
        @Qualifier("weatherEntityRowMapper") RowMapper<WeatherEntity> rowMapper, DataSource datasource) {
        JdbcCursorItemReader<WeatherEntity> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(datasource); //設(shè)置DataSource
        //設(shè)置讀取的SQL
        itemReader.setSql("SELECT siteId, month, type, value, ext from TMP_TEST_WEATHER"); 
        itemReader.setRowMapper(rowMapper); //設(shè)置轉(zhuǎn)換
        return itemReader;
    }
}

2.2.HibernateCursorItemReader

在Java體系中數(shù)據(jù)庫(kù)操作常見(jiàn)的規(guī)范有JPA或ORM,Spring Batch提供了HibernateCursorItemReader來(lái)實(shí)現(xiàn)HibernateTemplate,它可以通過(guò)Hibernate框架進(jìn)行游標(biāo)的控制。

需要注意的是:使用Hibernate框架來(lái)處理批量數(shù)據(jù)到目前為止一直都有爭(zhēng)議,核心原因是Hibernate最初是為在線聯(lián)機(jī)事物型系統(tǒng)開(kāi)發(fā)的。不過(guò)這并不意味著不能使用它來(lái)處理批數(shù)據(jù),解決此問(wèn)題就是讓Hibernate使用StatelessSession用來(lái)保持游標(biāo),而不是standard session一次讀寫,這將導(dǎo)致Hibernate的緩存機(jī)制和數(shù)據(jù)臟讀檢查失效,進(jìn)而影響批處理的過(guò)程。關(guān)于Hibernate的狀態(tài)控制機(jī)制請(qǐng)閱讀官方文檔。

HibernateCursorItemReader使用過(guò)程與JdbcCursorItemReader沒(méi)多大差異都是逐條讀取數(shù)據(jù)然后控制狀態(tài)鏈接關(guān)閉。只不過(guò)他提供了Hibernate所使用的HSQL方案。

@Bean
public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    HibernateCursorItemReader<WeatherEntity> itemReader = new HibernateCursorItemReader<>();
    itemReader.setName("hibernateCursorItemReader");
    itemReader.setQueryString("from WeatherEntity tmp_test_weather");
    itemReader.setSessionFactory(sessionFactory);
    return itemReader;
}

public ItemReader<WeatherEntity> hibernateCursorItemReader(SessionFactory sessionFactory) {
    return new HibernateCursorItemReaderBuilder<CustomerCredit>()
            .name("creditReader")
            .sessionFactory(sessionFactory)
            .queryString("from CustomerCredit")
            .build();
}

如果沒(méi)有特別的需要,不推薦使用Hibernate。

2.3.StoredProcedureItemReader

存儲(chǔ)過(guò)程是在同一個(gè)數(shù)據(jù)庫(kù)中處理大量數(shù)據(jù)的常用方法。StoredProcedureItemReader的執(zhí)行過(guò)程和JdbcCursorItemReader一致,但是底層邏輯是先執(zhí)行存儲(chǔ)過(guò)程,然后返回存儲(chǔ)過(guò)程執(zhí)行結(jié)果游標(biāo)。不同的數(shù)據(jù)庫(kù)存儲(chǔ)過(guò)程游標(biāo)返回會(huì)有一些差異:

  1. 作為一個(gè)ResultSet返回。(SQL Server, Sybase, DB2, Derby以及MySQL
  2. 參數(shù)返回一個(gè) ref-cursor實(shí)例。比如Oracle、PostgreSQL數(shù)據(jù)庫(kù),這類數(shù)據(jù)庫(kù)存儲(chǔ)過(guò)程是不會(huì)直接return任何內(nèi)容的,需要從傳參獲取。
  3. 返回存儲(chǔ)過(guò)程調(diào)用后的返回值。

針對(duì)以上3個(gè)類型,配置上有一些差異:

//庸才周久一
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
    StoredProcedureItemReader reader = new StoredProcedureItemReader();

    reader.setDataSource(dataSource);
    reader.setProcedureName("sp_processor_weather");
    reader.setRowMapper(new weatherEntityRowMapper());
    
    reader.setRefCursorPosition(1);//第二種類型需要指定ref-cursor的參數(shù)位置

    reader.setFunction(true);//第三種類型需要明確的告知reader通過(guò)返回獲取

    return reader;
}

使用存儲(chǔ)過(guò)程處理數(shù)據(jù)的好處是可以實(shí)現(xiàn)針對(duì)庫(kù)內(nèi)的數(shù)據(jù)進(jìn)行合并、分割、排序等處理。如果數(shù)據(jù)在同一個(gè)數(shù)據(jù)庫(kù),性能也明顯好于通過(guò)Java處理。

3.1.JdbcPagingItemReader

分頁(yè)查詢的默認(rèn)實(shí)現(xiàn)類是JdbcPagingItemReader,它的核心功能是用分頁(yè)器PagingQueryProvider進(jìn)行分頁(yè)控制。由于不同的數(shù)據(jù)庫(kù)分頁(yè)方法差別很大,所以針對(duì)不同的數(shù)據(jù)庫(kù)有不同的實(shí)現(xiàn)類??蚣芴峁┝薙qlPagingQueryProviderFactoryBean用于檢查當(dāng)前數(shù)據(jù)庫(kù)并自動(dòng)注入對(duì)應(yīng)的PagingQueryProvider。

JdbcPagingItemReader會(huì)從數(shù)據(jù)庫(kù)中一次性讀取一整頁(yè)的數(shù)據(jù),但是調(diào)用Reader的時(shí)候還是會(huì)一行一行的返回?cái)?shù)據(jù)??蚣軙?huì)自行根據(jù)運(yùn)行情況確定什么時(shí)候需要執(zhí)行下一個(gè)分頁(yè)的查詢。

//庸才周久一
public class PageReader {
    final private boolean wrapperBuilder = false;
    @Bean
    //設(shè)置 queryProvider
    public SqlPagingQueryProviderFactoryBean queryProvider(DataSource dataSource) {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setDataSource(dataSource);
        provider.setSelectClause("select id, siteid, month, type, value, ext");
        provider.setFromClause("from tmp_test_weather");
        provider.setWhereClause("where id>:start");
        provider.setSortKey("id");

        return provider;
    }

    @Bean
    public ItemReader<WeatherEntity> jdbcPagingItemReader(DataSource dataSource,
            PagingQueryProvider queryProvider,
            RowMapper<WeatherEntity> rowMapper) {

        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("start", "1");
        JdbcPagingItemReader<WeatherEntity> itemReader;
        if (wrapperBuilder) {
            itemReader = new JdbcPagingItemReaderBuilder<WeatherEntity>()
                    .name("creditReader")
                    .dataSource(dataSource)
                    .queryProvider(queryProvider)
                    .parameterValues(parameterValues)
                    .rowMapper(rowMapper)
                    .pageSize(1000)
                    .build();
        } else {
            itemReader = new JdbcPagingItemReader<>();
            itemReader.setName("weatherEntityJdbcPagingItemReader");
            itemReader.setDataSource(dataSource);
            itemReader.setQueryProvider(queryProvider);
            itemReader.setParameterValues(parameterValues);
            itemReader.setRowMapper(rowMapper);
            itemReader.setPageSize(1000);
        }
        return itemReader;
    }
}

4.數(shù)據(jù)寫入

Spring Batch為不同類型的文件的寫入提供了多個(gè)實(shí)現(xiàn)類,但并沒(méi)有為數(shù)據(jù)庫(kù)的寫入提供任何實(shí)現(xiàn)類,而是交由開(kāi)發(fā)者自己去實(shí)現(xiàn)接口。理由是:

數(shù)據(jù)庫(kù)的寫入與文件寫入有巨大的差別。對(duì)于一個(gè)Step而言,在寫入一份文件時(shí)需要保持對(duì)文件的打開(kāi)狀態(tài)從而能夠高效的向隊(duì)尾添加數(shù)據(jù)。如果每次都重新打開(kāi)文件,從開(kāi)始位置移動(dòng)到隊(duì)尾會(huì)耗費(fèi)大量的時(shí)間(很多文件流無(wú)法在open時(shí)就知道長(zhǎng)度)。當(dāng)整個(gè)Step結(jié)束時(shí)才能關(guān)閉文件的打開(kāi)狀態(tài),框架提供的文件讀寫類都實(shí)現(xiàn)了這個(gè)控制過(guò)程。
另外無(wú)論使用何種方式將數(shù)據(jù)寫入文件都是"逐行進(jìn)行"的(流數(shù)據(jù)寫入、字符串逐行寫入)。因此當(dāng)數(shù)據(jù)寫入與整個(gè)Step綁定為事物時(shí)還需要實(shí)現(xiàn)一個(gè)控制過(guò)程是:在寫入數(shù)據(jù)的過(guò)程中出現(xiàn)異常時(shí)要擦除本次事物已經(jīng)寫入的數(shù)據(jù),這樣才能和整個(gè)Step的狀態(tài)保持一致??蚣苤械念愅瑯訉?shí)現(xiàn)了這個(gè)過(guò)程。
但是向數(shù)據(jù)庫(kù)寫入數(shù)據(jù)并不需要類似于文件的尾部寫入控制,因?yàn)閿?shù)據(jù)庫(kù)的各種鏈接池本身就保證了鏈接->寫入->釋放的高效執(zhí)行,也不存在向隊(duì)尾添加數(shù)據(jù)的問(wèn)題。而且?guī)缀跛械臄?shù)據(jù)庫(kù)驅(qū)動(dòng)都提供了事物能力,在任何時(shí)候出現(xiàn)異常都會(huì)自動(dòng)回退,不存在擦除數(shù)據(jù)的問(wèn)題。
因此,對(duì)于數(shù)據(jù)庫(kù)的寫入操作只要按照常規(guī)的批量數(shù)據(jù)寫入的方式即可,開(kāi)發(fā)者使用任何工具都可以完成這個(gè)過(guò)程。
如下圖自定義實(shí)現(xiàn)的寫入器

/**
 * itemWrite抽象類,主要處理平臺(tái)是否有數(shù)據(jù)變化
 * @author zhouhui
 *
 * @param <T>
 */
@Slf4j
public abstract class AbstractItemStreamWriter<T> implements ItemStreamWriter<T> {

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        // TODO Auto-generated method stub
        log.info("打開(kāi)寫入器");
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        // TODO Auto-generated method stub
        log.info("更新寫入器");
    }

    @Override
    public void close() throws ItemStreamException {
        // TODO Auto-generated method stub
        log.info("關(guān)閉寫入器");
    }
    
    
}

具體實(shí)現(xiàn)

@Configuration
@Slf4j
public class CreateTBCamWriterConfig{
    
    @Bean("createTBCamWriter")
    @StepScope
    public ItemStreamWriter<CamData> writeDataToTbCam(@Autowired final CacheUtils cacheUtil) {
        return new AbstractItemStreamWriter<CamData>() {

            @Override
            public void write(List<? extends CamData> items) throws Exception {
                System.out.println("do something");
            }

            
        };
    }

}

這里值得注意的是,如果讀取器異常,寫入器并不會(huì)執(zhí)行

5.組合使用案例

下面是一些組合使用過(guò)程,簡(jiǎn)單實(shí)現(xiàn)了文件到數(shù)據(jù)庫(kù)、數(shù)據(jù)庫(kù)到文件的過(guò)程。文件讀寫的過(guò)程已經(jīng)在文件讀寫中介紹過(guò),這里會(huì)重復(fù)使用之前介紹的文件讀寫的功能。

下面的案例是將data.csv中的數(shù)據(jù)寫入到數(shù)據(jù)庫(kù),然后再將數(shù)據(jù)寫入到out-data.csv。案例組合使用已有的item完成任務(wù):flatFileReader、jdbcBatchWriter、jdbcCursorItemReader、simpleProcessor、flatFileWriter。這種Reader、Processor、Writer組合的方式也是完成一個(gè)批處理工程的常見(jiàn)開(kāi)發(fā)方式。

案例的運(yùn)行代碼在org.chenkui.spring.batch.sample.database.complex包中,使用了2個(gè)Step來(lái)完成任務(wù),一個(gè)將數(shù)據(jù)讀取到數(shù)據(jù)庫(kù),一個(gè)將數(shù)據(jù)進(jìn)行過(guò)濾,然后再寫入到文件:

//庸才周久一
public class FileComplexProcessConfig {
    @Bean
    // 配置Step1
    public Step file2DatabaseStep(StepBuilderFactory builder,
            @Qualifier("flatFileReader") ItemReader<WeatherEntity> reader,
            @Qualifier("jdbcBatchWriter") ItemWriter<WeatherEntity> writer) {
        return builder.get("file2DatabaseStep") // 創(chuàng)建
                .<WeatherEntity, WeatherEntity>chunk(50) // 分片
                .reader(reader) // 讀取
                .writer(writer) // 寫入
                .faultTolerant() // 開(kāi)啟容錯(cuò)處理
                .skipLimit(20) // 跳過(guò)設(shè)置
                .skip(Exception.class) // 跳過(guò)異常
                .build();
    }

    @Bean
    // 配置Step2
    public Step database2FileStep(StepBuilderFactory builder, 
            @Qualifier("jdbcCursorItemReader") ItemReader<WeatherEntity> reader,
            @Qualifier("simpleProcessor") ItemProcessor<WeatherEntity, MaxTemperatureEntiry> processor,
            @Qualifier("flatFileWriter") ItemWriter<MaxTemperatureEntiry> writer) {
        return builder.get("database2FileStep") // 創(chuàng)建
                .<WeatherEntity, MaxTemperatureEntiry>chunk(50) // 分片
                .reader(reader) // 讀取
                .processor(processor) //
                .writer(writer) // 寫入
                .faultTolerant() // 開(kāi)啟容錯(cuò)處理
                .skipLimit(20) // 跳過(guò)設(shè)置
                .skip(Exception.class) // 跳過(guò)異常
                .build();
    }

    @Bean
    public Job file2DatabaseJob(@Qualifier("file2DatabaseStep") Step step2Database,
            @Qualifier("database2FileStep") Step step2File, JobBuilderFactory builder) {
        return builder.get("File2Database").start(step2Database).next(step2File).build();
    }
}


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容