Spring Boot之整合Spring Batch:批處理與任務調度

前言

之前有幸跟公司大神聊Spring Boot,大神跟我聊了很多關于Spring Boot相關的知識,其中有一個就是Spring Boot框架下批處理的解決方案,考慮到批處理在實際應用場景中使用率還是有的,好奇的我,決定拿下它!

項目代碼已上傳Git Hub,歡迎取閱:

批處理框架

Spring Batch是一款基于 Spring 的企業(yè)批處理應用框架,可以幫助我們構建出健壯的批處理應用。


實現(xiàn)批處理的整體步驟

  1. 添加依賴;
  2. Spring Boot基本概念介紹;
  3. 編寫批處理過程代碼;
  4. 批處理任務調度;

1. 添加依賴;

在項目pom.xml文件的dependencies節(jié)點下添加以下依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

2. Spring Boot基本概念介紹;

在實現(xiàn)批處理之前我們需要了解一些Spring Batch的基本概念:

1). Item Reader;

表示對資源的讀處理,如從數(shù)據(jù)庫查詢、從文件讀取、從變量讀取等;

2). Item Processor;

表示對讀取的數(shù)據(jù)進行處理,開發(fā)者可以實現(xiàn)自己的業(yè)務邏輯操作來對數(shù)據(jù)處理,如對數(shù)據(jù)進行計算、邏輯處理、格式轉換等;

3). Item Writer;

表示對資源的寫處理,如寫入數(shù)據(jù)庫、寫入文件、打印log等;

4). Step;

代表一個完整的批處理步驟,一個Step由Item Reader、 Item Processor、Item Writer三部分組成;

  • Step與Item Reader、 Item Processor、Item Writer的關系:
Step

5). Job;

代表一個完整的批處理過程,一個Job由一個或多個Step組成:

  • Job與Step的關系:
Job
  • 批處理過程整體結構:
Job與Step

6). Listener;

監(jiān)聽。Spring Batch中還有個監(jiān)聽的功能,與其他地方的監(jiān)聽類似,用于對Step、Job狀態(tài)進行監(jiān)聽,我們可以實現(xiàn)監(jiān)聽方法,對其進行一些邏輯處理,如打印log等;

7). JobLauncher;

JobLauncher負責啟動job;


3. 編寫批處理過程代碼;

假設我們要解決的問題是,批量讀取數(shù)組中的數(shù)據(jù),并對數(shù)據(jù)做一些后續(xù)的處理。我會寫2個Job,一個是單個Step的Job,一個是2個Step的Job,并且2個Step的Job,第1個Step的處理后的數(shù)據(jù)要給第2個Step使用。

項目結構

過程代碼的整體步驟:

1). 編寫ItemReader;
2). 編寫ItemProcessor;
3). 編寫ItemWriter;
4). 編寫JobExecutionListener;
5). 裝配Job;
6). 使用數(shù)據(jù)庫源;
7). 修改Spring Boot入口類;

1). 編寫ItemReader;

  • ItemReaderService
package com.github.dylanz666.service;

import org.springframework.batch.item.ItemReader;
import org.springframework.stereotype.Service;

/**
 * @author : dylanz
 * @since : 08/25/2020
 */
@Service
public class ItemReaderService implements ItemReader {
    //在此處進行數(shù)據(jù)讀取操作,如從數(shù)據(jù)庫查詢、從文件中讀取、從變量中讀取等,本例從變量中讀取;
    private String[] message = {"message 1", "message 2", "message 3", "message 4", "message 5"};
    private int count = 0;

    public String read() throws Exception {
        if (count < message.length) {
            return message[count++];
        }
        count = 0;
        return null;
    }
}
  • ItemReaderService2
package com.github.dylanz666.service;

import org.springframework.batch.item.ItemReader;
import org.springframework.stereotype.Service;

/**
 * @author : dylanz
 * @since : 08/26/2020
 */
@Service
public class ItemReaderService2 implements ItemReader {
    private int count = 0;

    public String read() throws Exception {
        if (ItemProcessorService.message != null && count < ItemProcessorService.message.length) {
            return ItemProcessorService.message[count++];
        }
        count = 0;
        return null;
    }
}

2). 編寫ItemProcessor;

  • ItemProcessorService
package com.github.dylanz666.service;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

/**
 * @author : dylanz
 * @since : 08/25/2020
 */
@Service
public class ItemProcessorService implements ItemProcessor<String, String> {
    public static String[] message;
    //在此處進行數(shù)據(jù)處理操作,如進行計算、邏輯處理、格式轉換等,本例將數(shù)據(jù)變成全大寫數(shù)據(jù);
    public String process(String data) throws Exception {
        //存儲處理過的數(shù)據(jù),可供下一個step使用
        List<String> list = new ArrayList<>();
        if (message != null) {
            for (int i = 0; i < message.length; i++) {
                list.add(message[i]);
            }
        }
        list.add(data.toUpperCase());
        message = list.toArray(new String[list.size()]);
        return data.toUpperCase();
    }
}
  • ItemProcessorService2
package com.github.dylanz666.service;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Service;

/**
 * @author : dylanz
 * @since : 08/26/2020
 */
@Service
public class ItemProcessorService2 implements ItemProcessor<String, String> {
    public String process(String data) throws Exception {
        return data + " dylanz";
    }
}

3). 編寫ItemWriter;

package com.github.dylanz666.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @author : dylanz
 * @since : 08/25/2020
 */
@Service
public class ItemWriterService implements ItemWriter<String> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    //在此處進行數(shù)據(jù)輸出操作,如寫入數(shù)據(jù)庫、寫入文件、打印log等,本例為打印log;
    public void write(List<? extends String> messages) throws Exception {
        for (String message : messages) {
            logger.info("Writing data: " + message);
        }
    }
}

4). 編寫JobExecutionListener;

我們對Job運行前后進行監(jiān)聽,并打印相應log:

package com.github.dylanz666.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Service;

/**
 * @author : dylanz
 * @since : 08/25/2020
 */
@Service
public class JobListener implements JobExecutionListener {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public void beforeJob(JobExecution jobExecution) {
        logger.info("JOB IS STARTED.");
    }

    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.FAILED) {
            logger.info("JOB IS EXECUTED FAILED.");
            return;
        }
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            logger.info("JOB IS EXECUTED SUCCESSFULLY.");
        }
    }
}

5). 裝配Job;

在config包底下創(chuàng)建BathConfig.java類(名字隨意),我們裝配2個Job,一個為單Step Job,一個為2個Step Job,同時在每個job上設置監(jiān)聽:

package com.github.dylanz666.config;

import com.github.dylanz666.service.*;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author : dylanz
 * @since : 08/25/2020
 */
@Configuration
public class BatchConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ItemReaderService itemReaderService;
    @Autowired
    private ItemReaderService2 itemReaderService2;
    @Autowired
    private ItemProcessorService itemProcessorService;
    @Autowired
    private ItemProcessorService2 itemProcessorService2;
    @Autowired
    private ItemWriterService itemWriterService;
    @Autowired
    private JobListener jobListener;

    @Bean
    public Job singleStepJob() {
        return jobBuilderFactory.get("singleStepJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .start(uppercaseStep())
                .build();
    }

    @Bean
    public Job multiBoundStepsJob() {
        return jobBuilderFactory.get("multiBoundStepsJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .start(uppercaseStep())
                .next(addMessageStep())
                .build();
    }

    @Bean
    public Step uppercaseStep() {
        return stepBuilderFactory.get("uppercaseStep")
                .<String, String>chunk(1)
                .reader(itemReaderService)
                .processor(itemProcessorService)
                .writer(itemWriterService).build();
    }

    @Bean
    public Step addMessageStep() {
        return stepBuilderFactory.get("addMessageStep")
                .<String, String>chunk(1)
                .reader(itemReaderService2)
                .processor(itemProcessorService2)
                .writer(itemWriterService).build();
    }

    @Bean
    public JobExecutionListener listener() {
        return jobListener;
    }
}

6). 使用數(shù)據(jù)庫源(非必需);

  • 不使用數(shù)據(jù)庫存儲批處理job的元數(shù)據(jù)及執(zhí)行信息;
    默認是不需要任何改動的,此時不保存元數(shù)據(jù)及執(zhí)行信息;

如果遇到提示數(shù)據(jù)源缺失問題,也可嘗試在Spring Boot啟動類的@SpringBootApplication注解添加屬性:exclude = {DataSourceAutoConfiguration.class},即:

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
  • 使用數(shù)據(jù)存儲批處理job的元數(shù)據(jù)及執(zhí)行信息;
    該方式可以隨時跟蹤執(zhí)行進度,重新執(zhí)行失敗記錄等,我們可以使用mysql等數(shù)據(jù)庫,另外一種更常用、簡單的方式是使用嵌入式數(shù)據(jù)庫H2 Database。

使用H2 Database只需在src/main/resources/application.properties添加以下配置即可:(前提是要移除啟動類@SpringBootApplication注解的屬性:exclude = {DataSourceAutoConfiguration.class})

server.port=8080
spring.datasource.url=jdbc:h2:~/test
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
#初次密碼可隨意
spring.datasource.password=123456
spring.h2.console.path=/h2-console
spring.h2.console.enabled=true

7). 修改Spring Boot入口類;

在Spring Boot項目入口類上添加注解@EnableBatchProcessing即可,如:

package com.github.dylanz666;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author : dylanz
 * @since : 08/25/2020
 */
@SpringBootApplication
@EnableBatchProcessing
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}
H2 Database
  • 輸入密碼并點擊Connect按鈕鏈接H2數(shù)據(jù)庫后:
image.png
  • 可在H2 數(shù)據(jù)庫中執(zhí)行sql進行元數(shù)據(jù)和執(zhí)行信息的查詢等操作;

4. 批處理任務調度;

批處理任務調度常見的幾種方式:

1). 項目啟動時自啟動(一次性執(zhí)行所有批處理任務);

默認啟動項目時會一次性執(zhí)行所有批處理任務。
如果我們不想在項目啟動時執(zhí)行所有批處理任務,那么需要在application.properties添加配置項:

spring.batch.job.enabled=false

2). 通過接口調用方式把任務調度交給客戶端;

  • 在controller包下編寫2個批處理任務調度接口:
package com.github.dylanz666.controller;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author : dylanz
 * @since : 08/25/2020
 */
@RestController
public class BatchController {
    @Autowired
    private Job singleStepJob;
    @Autowired
    private Job multiBoundStepsJob;
    @Autowired
    private JobLauncher jobLauncher;

    @GetMapping("/job/step")
    public String invokeStep() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(singleStepJob, jobParameters);
        return "The job is proceed.";
    }

    @GetMapping("/job/steps")
    public String invokeSteps() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(multiBoundStepsJob, jobParameters);
        return "The multi bound steps job is proceed.";
    }
}
批處理log 1

(2). http://127.0.0.1:8080/job/steps

任務調度接口2
批處理log 2

通過log我們會發(fā)現(xiàn),http://127.0.0.1:8080/job/steps這個API,使用的Job multiBoundStepsJob,先執(zhí)行了uppercaseStep()方法,把字符串轉成大寫,然后在這基礎上,執(zhí)行了addMessageStep()方法,在字符串尾部添加" dylanz"字符串,其中關鍵點是:
[1]. 我們在ItemProcessorService方法中用static成員變量String[] message保存ItemProcessorService處理后的數(shù)據(jù);
[2]. 在itemReaderService2中使用static成員變量String[] message作為數(shù)據(jù)源;
[3]. 在multiBoundStepsJob中使用了uppercaseStep和addMessageStep這2個Step;

這個是2個關聯(lián)Step間數(shù)據(jù)傳遞的一種方法;
我們也可以設置一個Job,包含多個互不關聯(lián)的Step,只需要在編寫Step時使用鏈式寫法:

.start(xxx)
.next(xxx)
.next(xxx)
....
.build()

3). 定期調度批處理任務;

由于Spring Batch只是一個批處理應用框架,而不是調度框架,它只關注批處理相關的問題,并不提供調度功能,因此,我們需要借助其他調度框架實現(xiàn)定期調度。
我了解到的Spring Boot框架內常用、成熟的調度方式、調度框架有:

(1). Spring Boot自帶的@Scheduled;
(2). Quartz;

(1). Spring Boot自帶的@Scheduled

@Scheduled有3種執(zhí)行方式:

//1. 按照指定的cron表達式,一旦符合cron表示的時間,則執(zhí)行任務,如,//每5秒中執(zhí)行一次任務:
@Scheduled(cron = "0/5 * * * * ?")
//2. 以固定頻率執(zhí)行任務,如每1分鐘執(zhí)行一次任務;
@Scheduled(fixedRate = 60000)
//3. 任務執(zhí)行完成后再延遲固定時間后再執(zhí)行下一次,如延遲1分鐘再執(zhí)行任務;
@Scheduled(fixedDelay = 60000)

cron表達式可以參考網(wǎng)上的介紹:http://www.itdecent.cn/p/e9ce1a7e1ed1
cron表達式也可以使用在線生成工具:https://cron.qqe2.com/

cron表達式在線生成工具

[1]. 編寫Schedule類,如:

package com.github.dylanz666.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @author : dylanz
 * @since : 08/25/2020
 */
@Component
public class SpringScheduledConfig {
    @Autowired
    private Job singleStepJob;
    @Autowired
    private JobLauncher jobLauncher;

    @Scheduled(cron = "0/5 * * * * ?")
    public void demoScheduled() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(singleStepJob, jobParameters);
    }
}

[2]. 項目啟動類增加注解:

  • @EnableScheduling

[3]. 實際運行效果:

定時調度
這整個過程還是非常簡單的,但cron疑似在支持年份時有問題。

(2). 批處理調度框架Quartz

Quartz是OpenSymphony開源組織在Job scheduling領域又一個開源項目,完全由Java開發(fā),可以用來執(zhí)行定時任務,類似于java.util.Timer。但是相較于Timer, Quartz增加了很多功能:

  • 持久性作業(yè) - 就是保持調度定時的狀態(tài);
  • 作業(yè)管理 - 對調度作業(yè)進行有效的管理;

Quartz依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

我的理解是Quartz不僅是批處理調度框架,同時也是批處理應用框架。由于Quartz相對靈活,換句話說就是使用起來相對復雜些,我們就參考其他人的文章,改日再敘:


如果本文對您有幫助,麻煩動動手指點點贊?

謝謝!

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容