快速了解組件-spring batch(2)之helloworld

tags: springbatch


1.引言

前面《數(shù)據(jù)批處理神器-Spring Batch(1)簡介及使用場景》已經(jīng)介紹了Spring Batch是一個輕量級,完善的批處理框架,它使用起來簡單,方便,比較適合有點編程基礎(特別是使用Spring及SpringBoot框架)的開發(fā)人員,針對業(yè)務編程,只需要關心具體的業(yè)務實現(xiàn)即可,把流程以及流程的控制交給Spring Batch就好。常言道"talk is cheap, show me the code",下面我們就通過一個簡單的hello world,進入Spring Batch的世界,通過這個示例,可以快速了解開發(fā)批處理的流程和Spring Batch開發(fā)用到的組件,為后續(xù)的操作打下基礎。

2.開發(fā)環(huán)境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 開發(fā)IDE: IDEA
  • 構建工具Maven: 3.3.9
  • 日志組件logback:1.2.3
  • lombok:1.18.6

3.helloworld開發(fā)

3.1 helloworld說明

本helloworld實現(xiàn)一個非常簡單的功能,就是從數(shù)據(jù)組中讀取字符串,把字符串轉為大寫,然后輸出到控制臺。如圖:

字符串讀寫

整個過程就是一個批重任務(Job),它只有一個步驟(Job Step),步驟里分為三個階段,讀數(shù)據(jù)(ItemReader)、處理數(shù)據(jù)(ItemProcessor)、寫數(shù)據(jù)(ItemWriter)。

3.2 開發(fā)流程

開發(fā)的主要代碼如下:

主要代碼

總體來說就是,通過ReaderProcessor、Writer完成任務,結束后通過Listener進行監(jiān)聽,整個任務通過配置(BatchConfig)進行配置。

3.2.1 創(chuàng)建Spring Boot工程

直接使用Idea生成或在使用Spring Initializr生成即可,此處不詳細說明。也可以直接使用我的代碼示例。當前使用的Spring Boot版本是2.1.4.RELEASE

3.2.2 添加相關依賴

  • Spring Batch依賴
    在使用spring-boot-starter-parent的情況下,直接添加以下依賴即可:
<!-- 批處理框架-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

引用后,會引用兩個jar包,一個是spring-batch-infrastructure,一個是spring-batch-core,版本是4.1.2.RELEASE。分別對應的是基礎框架層和核心層。

  • 添加內(nèi)存數(shù)據(jù)庫H2
    Spring Batch是需要數(shù)據(jù)庫來存儲任務的基本信息以及運行狀態(tài)的,本例中不需要操作數(shù)據(jù)庫邏輯,直接使用內(nèi)存數(shù)據(jù)庫H2即可。添加以下依賴:
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
</dependency>
  • 添加測試及工具類依賴
    為了簡化開發(fā),使用lombok進行處理。使用Spring Boot進行單元測試,添加依賴如下:
<!-- 工具包:lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.6</version>
</dependency>
<!-- 測試框架 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

3.2.3 開發(fā)讀數(shù)據(jù)組件ItemReader

添加完依賴后,就可以進入業(yè)務邏輯編程了。按Spring Batch的批處理流程,讀數(shù)據(jù)ItemReader是第一步,當前示例中,我們的任務是從數(shù)組中讀取數(shù)據(jù)。ItemReader是一個接口,開發(fā)人員直接實現(xiàn)此接口即可。此接口定義了核心方法read(),負責從給定的資源中讀取可用的數(shù)據(jù)。具體實現(xiàn)如下:

@Slf4j
public class StringReader implements ItemReader<String> {
    private String[] messages = {"aaa1","aaa2","aaa3","aaa4"};
    private int count = 0;
    @Override
    public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException {
        if(count < messages.length){
            String message = messages[count++];
            log.debug(LogConstants.LOG_TAG + "read data:"+message);
            return message;
        }else{
            log.debug(LogConstants.LOG_TAG + "read data end.");
            count = 0;
        }
        return null;
    }
}

說明:

  • (1)StringReader實現(xiàn)ItemReader接口;
  • (2)messages是數(shù)據(jù)源;
  • (3)count表示讀取數(shù)據(jù)的下標,每讀一次,下標自增,讀取完后返回null表示結束。同時把count置為0,以方便下次讀取。
  • (4)日志輸出使用的是logback,結合lombok的@Slf4j注解,直接可使用log進行輸出,簡化操作。

3.2.4 開發(fā)處理數(shù)據(jù)組件ItemProcessor

讀取數(shù)據(jù)后,返回的數(shù)據(jù)會流到ItemProcessor進行處理。同樣,ItemProcessor是一個接口,要實現(xiàn)自己的處理邏輯,實現(xiàn)此接口即可。當然,如果沒有ItemProcessor,讀到的數(shù)據(jù)直接就到ItemWriter流程也是可以的。此處,Spring Batch有一個Chunk的概念,用于多次讀,直到chunk指定的數(shù)量后,再統(tǒng)一給到processor和writer,以提高效率。本示例對于ItemProcessor的實現(xiàn)很簡單,即把字符串轉為大寫。如下:

@Slf4j
public class ConvertProcessor implements ItemProcessor<String,String> {
    @Autowired
    private ConsoleService consoleService;
    @Override
    public String process(String data) {
        String dataProcessed = consoleService.convert2UpperCase(data);
        log.debug(LogConstants.LOG_TAG + data +" process data --> " + dataProcessed);
        return dataProcessed;
    }
}

說明:

  • 實現(xiàn)ItemProcessor接口,它有兩個泛型,分別是I和O,I是讀階段獲取的數(shù)據(jù),O是提交給寫階段的數(shù)據(jù)。
  • 使用ConsoleService服務,對數(shù)據(jù)進行大寫轉換,里面的實現(xiàn)直接使用字符串的toUpperCase()方法

3.2.5 開發(fā)寫數(shù)據(jù)組件ItemWriter

數(shù)據(jù)處理完后,會統(tǒng)一交給寫組件(ItemWriter)進行寫入。ItemWriter也是一個接口,核心方法是write方法,參數(shù)是數(shù)組。要實現(xiàn)自己的邏輯,實現(xiàn)此接口即可。本示例中,直接把數(shù)據(jù)輸出到日志中即可。如下:

@Slf4j
public class ConsoleWriter implements ItemWriter<String> {
    @Override
    public void write(List<? extends String> list) {
        for (String msg :list) {
            log.debug(LogConstants.LOG_TAG + "write data: "+msg);
        }
    }
}

3.2.6 開發(fā)任務完成后的監(jiān)聽器JobExecutionListener

數(shù)據(jù)寫入到目標后,任務即結束,但有時候我們還需要在任務結束時去做一些其它工作,如清理數(shù)據(jù),更新時間等,則需要在任務完成后進行邏輯處理。Spring Batch對于任務或步驟開始和結束都會提供監(jiān)聽,以便于開發(fā)人員實現(xiàn)監(jiān)聽邏輯。如通過繼承JobExecutionListenerSupport,可以實現(xiàn)beforeJobafterJob的監(jiān)聽,以實現(xiàn)開始任務前和結束任務后的處理。當前示例中,僅輸出任務完成的日志。如下:

@Slf4j
public class ConsoleJobEndListener extends JobExecutionListenerSupport {
    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED){
            log.debug("console batch job complete!");
        }
    }
}

3.2.7 配置完整任務

經(jīng)過上面的讀、處理、寫、任務完成后監(jiān)聽的操作,現(xiàn)在需要把它們組裝在一起,形成一個完成的任務,使用Spring Boot,簡單的使用幾個配置即可完成任務的組裝。任務及其相關組件的關系如下:

組件關系

創(chuàng)建配置文件ConsoleBatchConfig.java,具體代碼如下:

@Configuration
@EnableBatchProcessing
public class ConsoleBatchConfig {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job consoleJob(Step consoleStep,JobExecutionListener consoleListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return jobBuilderFactory.get(funcName).listener(consoleListener).flow(consoleStep)
                .end().build();
    }

    @Bean
    public Step consoleStep(ItemReader stringReader,ItemProcessor convertProcessor
            ,ItemWriter consoleWriter, CommonStepListener commonStepListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return stepBuilderFactory.get(funcName).listener(commonStepListener)
                .<String,String>chunk(3).reader(stringReader).processor(convertProcessor)
                .writer(consoleWriter).build();
    }

    @Bean
    public ItemReader stringReader(){return new StringReader();}

    @Bean
    public ItemWriter consoleWriter(){return new ConsoleWriter();}

    @Bean
    public ItemProcessor convertProcessor(){return new ConvertProcessor();}

    @Bean
    public JobExecutionListener consoleListener(){return new ConsoleJobEndListener();}
}

說明:

  • 添加注解@Configuration及@EnableBatchProcessing,標識為配置及啟用Spring Batch的配置(可以直接使用JobBuilderFactoryStepBuilderFactory分別用于創(chuàng)建Job和Step)。
  • 創(chuàng)建ItemReader、ItemWriterItemProcessor、Listener對應的Bean,以供Step及Job的注入。
  • 使用stepBuilderFactory創(chuàng)建作業(yè)Step,其中chunk進行面向塊的處理,即多次讀取后再寫入,提高效率。當前配置是3個為一個chunk。
  • 使用jobBuilderFactory添加step,創(chuàng)建任務。
  • 注意step和Job都需要有對應的名稱(get方法確定),此處直接使用方法名作為Job和Step的名稱。

3.2.8 測試批處理

經(jīng)過上面的步驟,已經(jīng)完成Job的開發(fā),測試則可使用兩種方式,一個是編寫Controller,以接口調(diào)用的方式運行job,一種編寫單元測試。

  • Job的運行
    通過JobLauncherrun方法來運行任務,run方法參數(shù)分別是JobjobParameters,即已配置的Job及job運行的參數(shù)。每個任務的區(qū)分是通過任務名(jobName)和任務參數(shù)(jobParameters)作為區(qū)別的,即如果jobNamejobParameters相同,Spring Batch會認為是同一任務,若任務已運行成功,同一任務不會再運行。因此,一般來說,不同的任務,我們的jobParameters可以直接以時間作為參數(shù),以便于區(qū)別。生成jobParameters。代碼如下:
JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
  • 編寫單元測試
    編寫ConsoleJobTest,加載job,運行測試,如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,ConsoleBatchConfig.class})
@Slf4j
public class ConsoleJobTest {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job consoleJob;

    public void testConsoleJob2() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        //構建參數(shù)
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
        //執(zhí)行任務
        JobExecution run = jobLauncher.run(consoleJob, jobParameters);
        ExitStatus exitStatus = run.getExitStatus();
        log.debug(exitStatus.toString());
    }
}

說明:引入SpringBootTest注解時,需要把Spring Batch任務也引入進來。

  • 執(zhí)行結果輸出
    執(zhí)行結果如下圖所示:

    執(zhí)行結果

    從輸出可知,由于設置的chunk是3,讀取3個數(shù)據(jù)后,就統(tǒng)一給ItemProcessor進行大寫轉換處理,然后統(tǒng)一交給ItemWriter進行寫入。執(zhí)行完成后,Job的exitCode表示任務執(zhí)行的狀態(tài),如果正常則為COMPLETED,失敗則為FAILED

4.總結

經(jīng)過以上的操作步驟,即可完成批處理操作。關于任務的狀態(tài),流程的步驟(讀、處理、寫)均交給Spring Batch來完成,開發(fā)人員所做的工作是根據(jù)自己的業(yè)務邏輯編寫具體的讀數(shù)據(jù)、處理數(shù)據(jù)和寫數(shù)據(jù)即可。希望通過本文,大家可以對Spring Batch的組件有清晰的了解。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容