實(shí)戰(zhàn)代碼(二):Springboot Batch實(shí)現(xiàn)定時(shí)數(shù)據(jù)遷移

一、理論基礎(chǔ)

1.1 Batch是什么

Spring Batch是Spring全家桶中的一員,是一個(gè)輕量級(jí)的批處理框架,比較實(shí)際的應(yīng)用場(chǎng)景是數(shù)據(jù)遷移,比如將csv文件中的數(shù)據(jù)遷移到MySQL。

優(yōu)勢(shì)在于上手簡(jiǎn)單,編碼規(guī)范化,能以較少的代碼實(shí)現(xiàn)強(qiáng)大的功能。和ETL工具-kettle功能類(lèi)似,但是定制性比較強(qiáng)

應(yīng)用場(chǎng)景集中在各種DB、文件等各種已經(jīng)存在的歷史數(shù)據(jù),貌似不支持消息隊(duì)列的實(shí)時(shí)監(jiān)聽(tīng)(如果有知道如何實(shí)現(xiàn)的,一定要告訴我),實(shí)時(shí)數(shù)據(jù)監(jiān)聽(tīng)可以使用Storm等流式數(shù)據(jù)處理框架

1.2 基礎(chǔ)概念

  • ItemReader:讀取數(shù)據(jù),有多個(gè)封裝好的類(lèi),可以支持多種數(shù)據(jù)源,如csv、jdbc等,也可以自定義功能實(shí)現(xiàn)。
  • ItemWriter:輸出數(shù)據(jù),有Reader配套的封裝類(lèi),同樣可以自定義功能實(shí)現(xiàn),如輸出到消息隊(duì)列。
  • ItemProcessor:數(shù)據(jù)處理模塊,輸入為Reader讀取的數(shù)據(jù),輸出為Writer的輸入。
  • Step:數(shù)據(jù)操作的步驟,包括:ItemReader->ItemProcessor->ItemWriter 整個(gè)數(shù)據(jù)流
  • Job:待執(zhí)行的任務(wù),每個(gè)job可以有一個(gè)或多個(gè)step
  • JobRepository:注冊(cè)job的容器
  • JobLauncher:?jiǎn)?dòng)job
  • JobLocator:可以根據(jù)jobName獲取到指定的job,可以配合JobRepository、JobLauncher來(lái)手動(dòng)啟動(dòng)job

1.3 如何開(kāi)發(fā)一個(gè)Batch并啟動(dòng)

  • 確認(rèn)輸入輸出,分別定義InputEntity和OutputEntity
  • 編寫(xiě)Reader,輸入為各種數(shù)據(jù)源(csv、MySQL等),輸出為InputEntity,數(shù)據(jù)庫(kù)的可以選擇封裝好的類(lèi): JdbcCursorItemReader<T>
  • 編寫(xiě)Processor,輸入為InputEntity,輸出為OutputEntity,繼承ItemProcessor<T, T>,實(shí)現(xiàn)process方法即可
  • 編寫(xiě)Writer,輸入為OutputEntity,輸出為指定的數(shù)據(jù)源(MySQL等)
  • 配置Step和Job

拋卻必要配置,實(shí)現(xiàn)一個(gè)遷移任務(wù)就是這么簡(jiǎn)單

二、實(shí)戰(zhàn)代碼

2.0 創(chuàng)建測(cè)試表

數(shù)據(jù)源表

CREATE TABLE `article` (
  `title` varchar(64) DEFAULT NULL COMMENT '標(biāo)題',
  `content` varchar(255) DEFAULT NULL COMMENT '內(nèi)容',
  `event_occurred_time` varchar(32) DEFAULT NULL COMMENT '事件發(fā)生時(shí)間'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文章';

輸出的數(shù)據(jù)表

CREATE TABLE `article_detail` (
  `title` varchar(64) DEFAULT NULL COMMENT '標(biāo)題',
  `content` varchar(255) DEFAULT NULL COMMENT '內(nèi)容',
  `event_occurred_time` varchar(32) DEFAULT NULL COMMENT '事件發(fā)生時(shí)間',
  `source` varchar(255) DEFAULT NULL COMMENT '文章來(lái)源',
  `description` varchar(255) DEFAULT NULL COMMENT '描述信息'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文章詳情';

2.1 依賴(lài)引入

# 本實(shí)例基于Springboot 2.X版本
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

2.2 配置文件

spring:
  batch:
    job:
      # 默認(rèn)為true,程序啟動(dòng)時(shí)Job會(huì)自動(dòng)執(zhí)行;false,需要手動(dòng)啟動(dòng)任務(wù)(jobLaucher.run)
      enabled: false
    # spring batch默認(rèn)情況下需要在數(shù)據(jù)庫(kù)中創(chuàng)建元數(shù)據(jù)表,always:每次都會(huì)檢查表存不存在,不存在會(huì)自動(dòng)創(chuàng)建;never:不會(huì)自動(dòng)創(chuàng)建,如果表不存在,則會(huì)報(bào)錯(cuò);
    initialize-schema: never 

如需手動(dòng)創(chuàng)建元數(shù)據(jù)表,請(qǐng)參考最后面的附錄

2.3 配置JobRepository

@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry){
    JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
    jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
    return jobRegistryBeanPostProcessor;
}

如果沒(méi)有該項(xiàng)配置,則手動(dòng)啟動(dòng)時(shí)會(huì)報(bào)錯(cuò)No job configuration with the name [XJob] was registered

2.4 可選配置

2.4.1 內(nèi)存模式

/**
* - NoPersistence 無(wú)持久化
*/
@Component
public class NoPersistenceBatchConfigurer extends DefaultBatchConfigurer {
    @Override
    public void setDataSource(DataSource dataSource) {
    }
}

加了此項(xiàng)配置后,不會(huì)在數(shù)據(jù)庫(kù)中創(chuàng)建元數(shù)據(jù)表,所有的job都是在內(nèi)存中管理。程序重啟后,任務(wù)信息會(huì)丟失,復(fù)雜的任務(wù)場(chǎng)景不建議加此配置,對(duì)于不需要嚴(yán)格任務(wù)管理的任務(wù)來(lái)講比較合適。

2.4.2 任務(wù)監(jiān)聽(tīng)

@Component
@Slf4j
public class JobListener extends JobExecutionListenerSupport {

    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("任務(wù)[{}]執(zhí)行成功,參數(shù):[{}]", jobExecution.getJobInstance().getJobName(),
                    jobExecution.getJobParameters().getString("executedTime"));
        } else {
            log.info("任務(wù)[{}]執(zhí)行失敗", jobExecution.getJobInstance().getJobName());
            // TODO something
        }
    }
}

如果不需要在任務(wù)成功或者失敗后做一些操作的話可以不加監(jiān)聽(tīng)器,因?yàn)锽atch自身包含日志執(zhí)行情況日志(info級(jí)別),包括執(zhí)行結(jié)果、執(zhí)行參數(shù)、執(zhí)行耗費(fèi)時(shí)間等

2.5 定義輸入、輸出實(shí)體

Article:輸入

@Data
public class Article {

    private String title;

    private String content;

    private String eventOccurredTime;
}

ArticleDetail:待輸出的數(shù)據(jù)結(jié)構(gòu)

@Data
public class ArticleDetail {

    private String title;

    private String content;

    private String eventOccurredTime;

    private String source;

    private String description;
}

2.6 Reader

2.6.1 JdbcCursorItemReader

/**
 * 普通讀取模式
 * - MySQL會(huì)將所有的紀(jì)錄讀到內(nèi)存中
 * - 數(shù)據(jù)量大的話內(nèi)存占用會(huì)很高
 */
public JdbcCursorItemReader<Article> getArticle(String executedTime) {
    String lastExecutedTime = "2020-01-01 00:00:00";
    String sql = StringUtils.join("SELECT * FROM article WHERE event_occurred_time >= '",
            lastExecutedTime, "' AND event_occurred_time < '", executedTime, "'");
    return new JdbcCursorItemReaderBuilder<Article>()
            .dataSource(dataSource)
            .sql(sql)
            .fetchSize(10)
            .name("getArticle")
            .beanRowMapper(Article.class)
            .build();
}

2.6.2 分頁(yè)讀取

/**
     * 分頁(yè)讀取模式
     * - 只要分頁(yè)合理配置,內(nèi)存占用可控
     */
    public JdbcPagingItemReader<Article> getArticlePaging(String executedTime) {
        String lastExecutedTime = "";
        Map<String, Object> parameterValues = new HashMap<>(2);
        parameterValues.put("startTime", lastExecutedTime);
        parameterValues.put("stopTime", executedTime);
        return new JdbcPagingItemReaderBuilder<Article>()
                .dataSource(dataSource)
                .name("getArticlePaging")
                .fetchSize(10)
                .parameterValues(parameterValues)
                .pageSize(10)
                .rowMapper(new ArticleMapper())
                .queryProvider(articleProvider())
                .build();
    }

    private PagingQueryProvider articleProvider() {
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("event_occurred_time", Order.ASCENDING);
        MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
        provider.setSelectClause("title, content, event_occurred_time");
        provider.setFromClause("article");
        provider.setWhereClause("event_occurred_time >= :startTime AND event_occurred_time < :stopTime");
        provider.setSortKeys(sortKeys);
        return provider;
    }

2.6.3 說(shuō)明

  • 可以繼承ItemReader<T>,實(shí)現(xiàn)自定義功能的Reader
  • 分頁(yè)雖然對(duì)于資源的使用時(shí)可控的,但是效率會(huì)低很多,需要合理設(shè)置每一頁(yè)的數(shù)據(jù)量。
    • 如果有很多個(gè)任務(wù)一起執(zhí)行,是看總數(shù)據(jù)量,比如有五個(gè)任務(wù),每個(gè)任務(wù)采集的數(shù)據(jù)量為10W,那么設(shè)置分頁(yè)的時(shí)候,要考慮到50W的數(shù)據(jù)量的內(nèi)存占用情況
  • JdbcCursorItemReader在內(nèi)存足夠的情況下可以使用,效率很高

2.7 Processor

2.7.1 示例代碼

@Component
public class ArticleProcessor implements ItemProcessor<Article, ArticleDetail> {

    @Override
    public ArticleDetail process(Article data) throws Exception {
        ArticleDetail articleDetail = new ArticleDetail();
        BeanUtils.copyProperties(data, articleDetail);
        articleDetail.setSource("weibo");
        articleDetail.setDescription("這是一條來(lái)源于微博的新聞");
        return articleDetail;
    }
}

2.7.2 說(shuō)明

  • processor只需要繼承ItemProcessor<T1, T2>實(shí)現(xiàn)其中的process方法即可。
    • T1是Reader讀取的數(shù)據(jù)實(shí)體
    • T2是要輸出到Writer的數(shù)據(jù)實(shí)體,也就是Writer的輸入數(shù)據(jù)實(shí)體

2.8 Writer

2.8.1 JdbcBatchItemWriter

@Component
public class ArticleJdbcWriter {

    private final DataSource dataSource;

    public ArticleJdbcWriter(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public JdbcBatchItemWriter<ArticleDetail> writer() {
        return new JdbcBatchItemWriterBuilder<ArticleDetail>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO article_detail (title, content, event_occurred_time, source, description) VALUES (:title, :content, :eventOccurredTime, :source, :description)")
                .dataSource(dataSource)
                .build();
    }
}

2.8.2 自定義writer

@Slf4j
public class ArticleWriter implements ItemWriter<ArticleDetail> {

    @Override
    public void write(List<? extends ArticleDetail> list) throws Exception {
        log.info("list的大小等于job中設(shè)置的chunkSize, size = {}", list.size());
        // TODO 此處可輸出數(shù)據(jù),比如輸出到消息隊(duì)列
        list.forEach(article -> log.info("輸出測(cè)試,title:{}", article.getTitle()));
    }
}

2.8.3 說(shuō)明

  • 繼承ItemWriter<T>,實(shí)現(xiàn)writer方法即可
  • T是Processor的輸出
  • list是Step中設(shè)置的chunkSize,也就是每次提交到writer的數(shù)據(jù)量

2.9 Step與Job

2.9.1 示例代碼

@Configuration
@EnableBatchProcessing
public class ArticleBatchJob {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ArticleReaderDemo articleReader;
    @Autowired
    private ArticleProcessor articleProcessor;
    @Autowired
    private ArticleJdbcWriter articleJdbcWriter;

    @Bean(name = "articleReader")
    @StepScope
    public JdbcPagingItemReader<Article> batchReader(@Value("#{jobParameters['executedTime']}") String executedTime) {
        return articleReader.getArticlePaging(executedTime);
    }

    @Bean(name = "articleWriter")
    public ItemWriter<ArticleDetail> batchWriter() {
//      return articleJdbcWriter.writer();
        return new ArticleWriter();
    }

    @Bean(name = "articleJob")
    public Job batchJob(JobListener listener, Step articleStep) {
        return jobBuilderFactory.get("articleJob")
                .listener(listener)
                .incrementer(new RunIdIncrementer())
                .flow(articleStep)
                .end()
                .build();
    }

    @Bean(name = "articleStep")
    public Step step(JdbcPagingItemReader<Article> articleReader, ItemWriter<ArticleDetail> articleWriter) {
        return stepBuilderFactory.get("crossHistoryStep")
                // 數(shù)據(jù)會(huì)累積到一定量再提交到writer
                .<Article, ArticleDetail>chunk(10)
                .reader(articleReader)
                .processor(articleProcessor)
                .writer(articleWriter)
                // 默認(rèn)為false(如果參數(shù)未發(fā)生變化的話,任務(wù)不會(huì)重復(fù)執(zhí)行)
                .allowStartIfComplete(true)
                .build();
    }
}

2.9.1 說(shuō)明

  • @EnableBatchProcessing是必須的
  • 每個(gè)Step中,并不是每處理一條數(shù)據(jù)都提交到Writer的,需要配置chunkSize,合理的chunkSize對(duì)于數(shù)據(jù)采集效率的提升效果很明顯
  • Job如果執(zhí)行成功一次,下次任務(wù)啟動(dòng)時(shí)如果參數(shù)沒(méi)有變化的話,默認(rèn)情況下是不會(huì)重復(fù)執(zhí)行的,如果想要執(zhí)行可以傳一個(gè)時(shí)間參數(shù)或者設(shè)置allowStartIfComplete(true)

2.10 集成Quartz實(shí)現(xiàn)定時(shí)啟動(dòng)

Springboot如何集成Quartz可以看 《實(shí)戰(zhàn)代碼(一):SpringBoot集成Quartz》

2.10.1 QuartzJob

@Component
@Slf4j
@DisallowConcurrentExecution
public class ArticleQuartzJob extends QuartzJobBean {

    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobLocator jobLocator;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            Job job = jobLocator.getJob("articleJob");
            jobLauncher.run(job, new JobParametersBuilder()
                    .addString("executedTime", "2020-11-10 16:21:01")
                    .toJobParameters());
        } catch (Exception e) {
            e.printStackTrace();
            log.error("任務(wù)[articleJob]啟動(dòng)失敗,錯(cuò)誤信息:{}", e.getMessage());
        }
    }
}

2.10.2 初始化QuartzJob

@Component
public class QuartzJobInit implements CommandLineRunner {

    @Autowired
    private QuartzUtils quartzUtils;

    @Override
    public void run(String... args) throws Exception {
        quartzUtils.addSingleJob(ArticleQuartzJob.class, "articleJob", 60);
    }
}

源碼地址

https://github.com/lysmile/spring-boot-demo/tree/master/spring-boot-batch-demo

附錄 元數(shù)據(jù)表建表語(yǔ)句(MYSQL)

創(chuàng)建元數(shù)據(jù)表的SQL文件在org.springframework.batch.core包中可以找到,可以針對(duì)不同的數(shù)據(jù)庫(kù)進(jìn)行配置

-- Autogenerated: do not edit this file


CREATE TABLE BATCH_JOB_INSTANCE  (
   JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
   VERSION BIGINT ,
   JOB_NAME VARCHAR(100) NOT NULL,
   JOB_KEY VARCHAR(32) NOT NULL,
   constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;


CREATE TABLE BATCH_JOB_EXECUTION  (
   JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
   VERSION BIGINT  ,
   JOB_INSTANCE_ID BIGINT NOT NULL,
   CREATE_TIME DATETIME NOT NULL,
   START_TIME DATETIME DEFAULT NULL ,
   END_TIME DATETIME DEFAULT NULL ,
   STATUS VARCHAR(10) ,
   EXIT_CODE VARCHAR(2500) ,
   EXIT_MESSAGE VARCHAR(2500) ,
   LAST_UPDATED DATETIME,
   JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
   constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
   references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
   JOB_EXECUTION_ID BIGINT NOT NULL ,
   TYPE_CD VARCHAR(6) NOT NULL ,
   KEY_NAME VARCHAR(100) NOT NULL ,
   STRING_VAL VARCHAR(250) ,
   DATE_VAL DATETIME DEFAULT NULL ,
   LONG_VAL BIGINT ,
   DOUBLE_VAL DOUBLE PRECISION ,
   IDENTIFYING CHAR(1) NOT NULL ,
   constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
   references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_STEP_EXECUTION  (
   STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
   VERSION BIGINT NOT NULL,
   STEP_NAME VARCHAR(100) NOT NULL,
   JOB_EXECUTION_ID BIGINT NOT NULL,
   START_TIME DATETIME NOT NULL ,
   END_TIME DATETIME DEFAULT NULL ,
   STATUS VARCHAR(10) ,
   COMMIT_COUNT BIGINT ,
   READ_COUNT BIGINT ,
   FILTER_COUNT BIGINT ,
   WRITE_COUNT BIGINT ,
   READ_SKIP_COUNT BIGINT ,
   WRITE_SKIP_COUNT BIGINT ,
   PROCESS_SKIP_COUNT BIGINT ,
   ROLLBACK_COUNT BIGINT ,
   EXIT_CODE VARCHAR(2500) ,
   EXIT_MESSAGE VARCHAR(2500) ,
   LAST_UPDATED DATETIME,
   constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
   references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
   STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
   SHORT_CONTEXT VARCHAR(2500) NOT NULL,
   SERIALIZED_CONTEXT TEXT ,
   constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
   references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
   JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
   SHORT_CONTEXT VARCHAR(2500) NOT NULL,
   SERIALIZED_CONTEXT TEXT ,
   constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
   references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;


CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
   ID BIGINT NOT NULL,
   UNIQUE_KEY CHAR(1) NOT NULL,
   constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;


INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);


CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
   ID BIGINT NOT NULL,
   UNIQUE_KEY CHAR(1) NOT NULL,
   constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;


INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);


CREATE TABLE BATCH_JOB_SEQ (
   ID BIGINT NOT NULL,
   UNIQUE_KEY CHAR(1) NOT NULL,
   constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;


INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容