spring batch 純注解學(xué)習(xí)筆記(七)--異常處理與容錯機(jī)制

1.異常處理與重啟機(jī)制

1.對于chunk類型的Step,spring batch為我們提供了用于管理它的狀態(tài)

2.狀態(tài)的管理是通過ItemStream接口來實現(xiàn)的

3.ItemStream接口:

(1)open():每一次step執(zhí)行會調(diào)用

(2)Update():每一個chunk去執(zhí)行都會調(diào)用

(3)Close():所有的chunk執(zhí)行完畢會調(diào)用

流程

構(gòu)造例子

準(zhǔn)備個cvs文件,在第33條數(shù)據(jù),添加一條錯誤名字信息 ;當(dāng)讀取到這條數(shù)據(jù)時,拋出異常終止程序。
ItemReader測試代碼

@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader<Customer> {
 
    // 記錄當(dāng)前讀取的行數(shù)
    private Long curLine = 0L;
    // 重啟狀態(tài)初始值
    private boolean restart = false;
 
    private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
 
    // 持久化信息到數(shù)據(jù)庫
    private ExecutionContext executionContext;
    RestartDemoReader
    public () {
        
        reader.setResource(new ClassPathResource("restartDemo.csv"));
 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet();
 
        reader.setLineMapper(lineMapper);
    }
 
    @Override
    public Customer read() throws Exception, UnexpectedInputException, ParseException,
            NonTransientResourceException {
 
        Customer customer = null;
 
        this.curLine++;
        //如果是重啟,則從上一步讀取的行數(shù)繼續(xù)往下執(zhí)行
        if (restart) {
            reader.setLinesToSkip(this.curLine.intValue()-1);
            restart = false;
            System.out.println("Start reading from line: " + this.curLine);
        }
 
        reader.open(this.executionContext);
 
        customer = reader.read();
        //當(dāng)匹配到wrongName時,顯示拋出異常,終止程序
        if (customer != null) {
            if (customer.getFirstName().equals("wrongName"))
                throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
        } else {
            curLine--;
        }
        return customer;
    }
 
    /**
     * 判斷是否是重啟job
     * @param executionContext
     * @throws ItemStreamException
     */
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
                // 如果是重啟job,從數(shù)據(jù)庫讀取重啟的行數(shù),從重啟行數(shù)開始重新執(zhí)行
        if (executionContext.containsKey("curLine")) {
            this.curLine = executionContext.getLong("curLine");
            this.restart = true;
        }
                // 如果不是重啟job,初始化行數(shù),從第一行開始執(zhí)行
                else {
            this.curLine = 0L;
            executionContext.put("curLine", this.curLine.intValue());
        }
 
    }
 
    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
            // 每執(zhí)行完一個批次chunk,打印當(dāng)前行數(shù)
        System.out.println("update curLine: " + this.curLine);
        executionContext.put("curLine", this.curLine);
 
    }
 
    @Override
    public void close() throws ItemStreamException {
 
    }
}

Job配置

以10條記錄為一個批次,進(jìn)行讀取

@Configuration
public class RestartDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;
 
    @Autowired
    @Qualifier("restartDemoReader")
    private ItemReader<Customer> restartDemoReader;
 
    @Bean
    public Job restartDemoJob(){
        return jobBuilderFactory.get("restartDemoJob")
                .start(restartDemoStep())
                .build();
 
    }
 
    @Bean
    public Step restartDemoStep() {
        return stepBuilderFactory.get("restartDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(restartDemoReader)
                .writer(flatFileDemoWriter)
                .build();
    }
}

程序執(zhí)行時會經(jīng)歷一下三個步驟演示異常處理機(jī)制
1.當(dāng)?shù)谝淮螆?zhí)行時,程序在33行拋出異常異常,curline值是30;
這時,可以查詢數(shù)據(jù)庫 batch_step_excution表,發(fā)現(xiàn)curline值已經(jīng)以 鍵值對形式,持久化進(jìn)數(shù)據(jù)庫(上文以10條數(shù)據(jù)為一個批次;故33條數(shù)據(jù)異常時,curline值為30)

2.接下來,更新wrongName,再次執(zhí)行程序;

3.程序會執(zhí)行open方法,判斷數(shù)據(jù)庫step中map是否存在curline,如果存在,則是重跑,即讀取curline,從該批次開始往下繼續(xù)執(zhí)行;

2.容錯機(jī)制

Spring batch的容錯機(jī)制是一種與事務(wù)機(jī)制相結(jié)合的機(jī)制,它主要包括有3種操作:

  • restart
  • restart是針對job來使用,是重啟job的一個操作。默認(rèn)情況下,當(dāng)任務(wù)出現(xiàn)異常時,SpringBatch會結(jié)束任務(wù),當(dāng)使用相同參數(shù)重啟任務(wù)時,SpringBatch會去執(zhí)行未執(zhí)行的剩余任務(wù)
  • retry
  • retry是對job的某一step而言,處理一條數(shù)據(jù)item的時候發(fā)現(xiàn)有異常,則重試一次該數(shù)據(jù)item的step的操作。
  • skip
  • skip是對job的某一個step而言,處理一條數(shù)據(jù)item的時候發(fā)現(xiàn)有異常,則跳過該數(shù)據(jù)item的step的操作。
    restart示例代碼如下,當(dāng)?shù)谝淮螆?zhí)行的時候,上下文中沒有該字段,拋出異常,第二次執(zhí)行,已存在該字段,執(zhí)行成功
    retry、skip示例如下,更改一下之前step的配置,參考代碼如下:
@Bean
public Step stepForTranscation(StepBuilderFactory stepBuilderFactory, @Qualifier("stepForTranscationReader")ListItemReader<String> reader,
        @Qualifier("stepForTranscationProcessor")ItemProcessor<String, String> processor, @Qualifier("stepForTranscationWriter")ItemWriter<String> writer) {
   return stepBuilderFactory.get("stepForTranscation")
           .<String, String> chunk(3)
           .reader(reader)
           .processor(processor)
           .writer(writer).
           faultTolerant().
           retryLimit(3).
           retry(DataIntegrityViolationException.class).
           skipLimit(1).
           skip(DataIntegrityViolationException.class).
           startLimit(3)
           .build();
}

這里設(shè)置了允許重試的次數(shù)為3次,允許跳過的數(shù)據(jù)最多為1條,如果job失敗了,運(yùn)行重跑次數(shù)最多為3次。
在skip后面配置跳過錯誤的監(jiān)聽器SkipListener

public class Mr implements SkipListener<String, String>{ // 發(fā)生讀操作跳過錯誤時,需要執(zhí)行的監(jiān)聽 public void onSkipInRead(Throwable t){ }


    // 發(fā)生寫操作跳過錯誤時,需要執(zhí)行的監(jiān)聽
    public void onSkipInWrite(String item, Throwable t){
    }

    // 處理數(shù)據(jù)時跳過錯誤時,需要執(zhí)行的監(jiān)聽
    public void onSkipInProcess (String item, Throwable t){
        System.out.println(item + "occur exception" + t);
    }

}

重新運(yùn)行程序,可以得到新的結(jié)果:


結(jié)果

這次可以看到,12條數(shù)據(jù)中總共有11條數(shù)據(jù)進(jìn)入到數(shù)據(jù)庫,而過長的008008008008數(shù)據(jù),則因為設(shè)置了skip,所以容錯機(jī)制允許它不進(jìn)入數(shù)據(jù)庫,這次的Spring batch最終沒有因為回滾而中斷。

查閱一下Spring batch的持久化數(shù)據(jù)表:
可以看出,的確是有一條數(shù)據(jù)被跳過了,但因為是我們允許它跳過的,所以整個job順利完成,即COMPLETED。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容