異常處理
代碼異常十之八九,十段代碼九個(gè)bug,哈哈哈哈。平常程序異常我們使用try catch捕獲異常,在catch方法中根據(jù)異常類型進(jìn)行相關(guān)處理,既然我們可以使用try catch處理異常,那為什么還要使用ConsumerAwareErrorHandler異常處理器去處理異常呢?
首先,KafkaListener要做的事只是監(jiān)聽Topic中的數(shù)據(jù)并消費(fèi),如果在KafkaListener中還需要對(duì)異常進(jìn)行處理則會(huì)顯得代碼塊非常臃腫不利于維護(hù),我們可以把異常處理的這些代碼抽象出來,構(gòu)造成一個(gè)異常處理器,KafkaListener中所拋出的異常都會(huì)經(jīng)過ConsumerAwareErrorHandler異常處理器進(jìn)行處理,這樣就非常方便我們進(jìn)行后期維護(hù),比如后期更改異常處理業(yè)務(wù)的時(shí)候,只需要修改ConsumerAwareErrorHandler處理器就行了,而不需要KafkaListener的一堆代碼中去修改代碼。這也是一種思想的體現(xiàn)。
單消息消費(fèi)異常處理器
這里主要就是注冊(cè)一個(gè)ConsumerAwareListenerErrorHandler 類型的異常處理器,bean的注冊(cè)默認(rèn)使用的是方法名,所以我們將這個(gè)異常處理的BeanName放到@KafkaListener注解的errorHandler屬性里面。當(dāng)KafkaListener拋出異常的時(shí)候,則會(huì)自動(dòng)調(diào)用異常處理器。
@Component
public class ErrorListener {
private static final Logger log= LoggerFactory.getLogger(ErrorListener.class);
@KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler")
public void errorListener(String data) {
log.info("topic.quick.error receive : " + data);
throw new RuntimeException("fail");
}
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
return null;
}
};
}
}
編寫測(cè)試方法,發(fā)送一條消息到topic.quick.error中,運(yùn)行測(cè)試方法后我們可以看到異常處理器已經(jīng)能正常使用了。
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void testErrorHandler() {
kafkaTemplate.send("topic.quick.error", "test error handle");
}
2018-09-14 11:42:05.099 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : topic.quick.error receive : test error handle
2018-09-14 11:42:05.101 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : consumerAwareErrorHandler receive : test error handle
批量消費(fèi)異常處理器
批量消費(fèi)代碼也是差不多的,只不過傳遞過來的數(shù)據(jù)都是List集合方式,這里就不做其他代碼的展示了。
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
MessageHeaders headers = message.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
return null;
}
};
}