Spring-Kafka(十)—— ConsumerAwareErrorHandler異常處理器

異常處理

代碼異常十之八九,十段代碼九個(gè)bug,哈哈哈哈。平常程序異常我們使用try catch捕獲異常,在catch方法中根據(jù)異常類型進(jìn)行相關(guān)處理,既然我們可以使用try catch處理異常,那為什么還要使用ConsumerAwareErrorHandler異常處理器去處理異常呢?
首先,KafkaListener要做的事只是監(jiān)聽Topic中的數(shù)據(jù)并消費(fèi),如果在KafkaListener中還需要對(duì)異常進(jìn)行處理則會(huì)顯得代碼塊非常臃腫不利于維護(hù),我們可以把異常處理的這些代碼抽象出來,構(gòu)造成一個(gè)異常處理器,KafkaListener中所拋出的異常都會(huì)經(jīng)過ConsumerAwareErrorHandler異常處理器進(jìn)行處理,這樣就非常方便我們進(jìn)行后期維護(hù),比如后期更改異常處理業(yè)務(wù)的時(shí)候,只需要修改ConsumerAwareErrorHandler處理器就行了,而不需要KafkaListener的一堆代碼中去修改代碼。這也是一種思想的體現(xiàn)。

單消息消費(fèi)異常處理器

這里主要就是注冊(cè)一個(gè)ConsumerAwareListenerErrorHandler 類型的異常處理器,bean的注冊(cè)默認(rèn)使用的是方法名,所以我們將這個(gè)異常處理的BeanName放到@KafkaListener注解的errorHandler屬性里面。當(dāng)KafkaListener拋出異常的時(shí)候,則會(huì)自動(dòng)調(diào)用異常處理器。

@Component
public class ErrorListener {

    private static final Logger log= LoggerFactory.getLogger(ErrorListener.class);

    @KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler")
    public void errorListener(String data) {
        log.info("topic.quick.error  receive : " + data);
        throw new RuntimeException("fail");
    }

    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
                return null;
            }
        };
    }

}



編寫測(cè)試方法,發(fā)送一條消息到topic.quick.error中,運(yùn)行測(cè)試方法后我們可以看到異常處理器已經(jīng)能正常使用了。

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    public void testErrorHandler() {
        kafkaTemplate.send("topic.quick.error", "test error handle");
    }
2018-09-14 11:42:05.099  INFO 8912 --- [      err-0-C-1] com.viu.kafka.listen.ErrorListener       : topic.quick.error  receive : test error handle
2018-09-14 11:42:05.101  INFO 8912 --- [      err-0-C-1] com.viu.kafka.listen.ErrorListener       : consumerAwareErrorHandler receive : test error handle


批量消費(fèi)異常處理器

批量消費(fèi)代碼也是差不多的,只不過傳遞過來的數(shù)據(jù)都是List集合方式,這里就不做其他代碼的展示了。

    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
                MessageHeaders headers = message.getHeaders();
                List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
                List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
                List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
                Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
          
                return null;
            }
        };
    }


更多文章請(qǐng)關(guān)注該 Spring-Kafka史上最強(qiáng)入門教程 專題

博主常駐地~ http://blog.seasedge.cn/archives/51.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容