Kafka的非阻塞重試是通過為主題配置重試主題來實現(xiàn)的。如果需要,還可以配置額外的死信主題。如果所有重試都耗盡,事件將被轉(zhuǎn)發(fā)到DLT。在公共領域中有很多資源可用于了解技術(shù)細節(jié)。對于代碼中的重試機制編寫集成測試確實是一項具有挑戰(zhàn)性的工作。以下是一些測試方法,可以用來驗證重試機制的正確性:
- 驗證事件已經(jīng)按照所需的次數(shù)進行了重試:
在測試中,模擬一個會觸發(fā)重試的事件,并設置重試次數(shù)為所需的次數(shù)。
使用斷言來驗證事件是否被重試了指定的次數(shù)。
- 驗證只有在特定的異常發(fā)生時才進行重試,而不是其他異常:
在測試中,模擬不同的異常情況,包括需要重試的異常和不需要重試的異常。
使用斷言來驗證只有特定的異常觸發(fā)了重試,而其他異常沒有觸發(fā)重試。
- 驗證如果前一次重試已經(jīng)解決了異常,不會進行另一次重試:
在測試中,模擬一個會觸發(fā)重試的事件,并在每次重試之間解決異常。
使用斷言來驗證只有在異常沒有被解決的情況下才進行重試。
- 驗證在前面的 (n-1) 次重試失敗后,第 n 次重試成功:
在測試中,模擬一個會觸發(fā)重試的事件,并設置重試次數(shù)為 n。
使用斷言來驗證在前面的 (n-1) 次重試失敗后,第 n 次重試成功。
- 驗證如果所有的重試嘗試都失敗,事件是否已經(jīng)發(fā)送到了死信隊列:
- 在測試中,模擬一個會觸發(fā)重試的事件,并設置重試次數(shù)為一個較小的值。
- 使用斷言來驗證當所有的重試嘗試都失敗后,事件是否已經(jīng)發(fā)送到了死信隊列。
設置可重試的消費者
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {
private final CustomEventHandler handler;
@RetryableTopic(attempts = "${retry.attempts}",
backoff = @Backoff(
delayExpression = "${retry.delay}",
multiplierExpression = "${retry.delay.multiplier}"
),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = FAIL_ON_ERROR,
autoStartDltHandler = "true",
autoCreateTopics = "false",
include = {CustomRetryableException.class})
@KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
log.info("Received event on topic {}", topic);
handler.handleEvent(event);
} catch (Exception e) {
log.error("Error occurred while processing event", e);
throw e;
}
}
@DltHandler
public void listenOnDlt(@Payload CustomEvent event) {
log.error("Received event on dlt.");
handler.handleEventFromDlt(event);
}
}
如果您注意上面的代碼片段,參數(shù)@RetryableTopic中包含includes。這告訴消費者只在方法拋出CustomRetryableException時進行重試。您可以添加任意數(shù)量的異常類型。還有一個exclude參數(shù),但一次只能使用其中一個。在將事件發(fā)布到死信隊列之前,事件處理最多應重試指定的次數(shù)。
設置測試基礎設施
為了編寫集成測試,您需要確保擁有一個正常運行的Kafka代理(最好是嵌入式的)和一個完全運行的發(fā)布者。讓我們設置我們的基礎設施:
@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=" + "${kafka.broker.listeners}",
"port=" + "${kafka.broker.port}"},
controlledShutdown = true,
topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {
@Autowired
private KafkaTemplate<String, CustomEvent> testKafkaTemplate;
// tests
}
配置從application-test.yml文件中導入。當使用嵌入式Kafka代理時,重要的是要提及要創(chuàng)建的主題。它們不會自動創(chuàng)建。在這種情況下,我們將創(chuàng)建四個主題,分別是:
"test", "test-retry-0", "test-retry-1", "test-dlt"
我們將最大重試次數(shù)設置為三次。每個主題對應于每次重試嘗試。因此,如果三次重試都耗盡,事件應該被轉(zhuǎn)發(fā)到DLT(死信隊列)。
測試用例
如果在第一次嘗試中成功消費,就不應該進行重試。可以通過方法只被調(diào)用一次來測試這一點。還可以添加對日志語句的進一步測試。
@Test
void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
如果引發(fā)了不可重試的異常,就不應該進行重試。在這種情況下,方法CustomEventHandler#handleEvent應該只被調(diào)用一次。
@Test void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException { CustomEvent event = new CustomEvent("Hello"); // GIVEN doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class)); // WHEN testKafkaTemplate.send("test", event).get(); // THEN verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class)); verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class)); }
如果拋出了RetryableException,則應該按照配置的最大重試次數(shù)進行重試,當重試次數(shù)耗盡時,事件應該被發(fā)布到死信主題。在這種情況下,方法CustomEventHandler#handleEvent應該被調(diào)用三次(maxRetries次),而方法CustomEventHandler#handleEventFromDlt應該只被調(diào)用一次。
@Test
void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
在驗證階段添加了相當長的超時時間,以便在測試完成之前考慮指數(shù)退避延遲。這是很重要的,如果沒有正確設置,可能會導致斷言失敗。應該重試直到RetryableException被解決,并且如果引發(fā)了不可重試的異?;蛘咦罱K成功消費,就不應該繼續(xù)重試。測試已經(jīng)設置為首先拋出RetryableException,然后再拋出NonRetryableException,以便進行一次重試。
@Test
void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}ndleEventFromDlt(any(CustomEvent.class)); }
@Test
void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
結(jié)論
因此,您可以看到集成測試是一種混合和匹配的策略,超時時間,延遲和驗證,以確保您的Kafka事件驅(qū)動架構(gòu)的重試機制是可靠的。
作者: Mukut Bhattacharjee
更多技術(shù)干貨盡在wx“云原生數(shù)據(jù)庫”