使用 Spring Kafka 進行非阻塞重試的集成測試

Kafka的非阻塞重試是通過為主題配置重試主題來實現(xiàn)的。如果需要,還可以配置額外的死信主題。如果所有重試都耗盡,事件將被轉(zhuǎn)發(fā)到DLT。在公共領域中有很多資源可用于了解技術(shù)細節(jié)。對于代碼中的重試機制編寫集成測試確實是一項具有挑戰(zhàn)性的工作。以下是一些測試方法,可以用來驗證重試機制的正確性:

  1. 驗證事件已經(jīng)按照所需的次數(shù)進行了重試:
  • 在測試中,模擬一個會觸發(fā)重試的事件,并設置重試次數(shù)為所需的次數(shù)。

  • 使用斷言來驗證事件是否被重試了指定的次數(shù)。

  1. 驗證只有在特定的異常發(fā)生時才進行重試,而不是其他異常:
  • 在測試中,模擬不同的異常情況,包括需要重試的異常和不需要重試的異常。

  • 使用斷言來驗證只有特定的異常觸發(fā)了重試,而其他異常沒有觸發(fā)重試。

  1. 驗證如果前一次重試已經(jīng)解決了異常,不會進行另一次重試:
  • 在測試中,模擬一個會觸發(fā)重試的事件,并在每次重試之間解決異常。

  • 使用斷言來驗證只有在異常沒有被解決的情況下才進行重試。

  1. 驗證在前面的 (n-1) 次重試失敗后,第 n 次重試成功:
  • 在測試中,模擬一個會觸發(fā)重試的事件,并設置重試次數(shù)為 n。

  • 使用斷言來驗證在前面的 (n-1) 次重試失敗后,第 n 次重試成功。

  1. 驗證如果所有的重試嘗試都失敗,事件是否已經(jīng)發(fā)送到了死信隊列:
  • 在測試中,模擬一個會觸發(fā)重試的事件,并設置重試次數(shù)為一個較小的值。
  • 使用斷言來驗證當所有的重試嘗試都失敗后,事件是否已經(jīng)發(fā)送到了死信隊列。

設置可重試的消費者

@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {

    private final CustomEventHandler handler;

    @RetryableTopic(attempts = "${retry.attempts}",
            backoff = @Backoff(
                    delayExpression = "${retry.delay}",
                    multiplierExpression = "${retry.delay.multiplier}"
            ),
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
            dltStrategy = FAIL_ON_ERROR,
            autoStartDltHandler = "true",
            autoCreateTopics = "false",
            include = {CustomRetryableException.class})
    @KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
    public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        try {
            log.info("Received event on topic {}", topic);
            handler.handleEvent(event);
        } catch (Exception e) {
            log.error("Error occurred while processing event", e);
            throw e;
        }
    }

    @DltHandler
    public void listenOnDlt(@Payload CustomEvent event) {
        log.error("Received event on dlt.");
        handler.handleEventFromDlt(event);
    }

}

如果您注意上面的代碼片段,參數(shù)@RetryableTopic中包含includes。這告訴消費者只在方法拋出CustomRetryableException時進行重試。您可以添加任意數(shù)量的異常類型。還有一個exclude參數(shù),但一次只能使用其中一個。在將事件發(fā)布到死信隊列之前,事件處理最多應重試指定的次數(shù)。

設置測試基礎設施

為了編寫集成測試,您需要確保擁有一個正常運行的Kafka代理(最好是嵌入式的)和一個完全運行的發(fā)布者。讓我們設置我們的基礎設施:

@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
        brokerProperties = {"listeners=" + "${kafka.broker.listeners}", 
                            "port=" + "${kafka.broker.port}"},
        controlledShutdown = true,
        topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {

  @Autowired
  private KafkaTemplate<String, CustomEvent> testKafkaTemplate;


    // tests

}

配置從application-test.yml文件中導入。當使用嵌入式Kafka代理時,重要的是要提及要創(chuàng)建的主題。它們不會自動創(chuàng)建。在這種情況下,我們將創(chuàng)建四個主題,分別是:

"test", "test-retry-0", "test-retry-1", "test-dlt"

我們將最大重試次數(shù)設置為三次。每個主題對應于每次重試嘗試。因此,如果三次重試都耗盡,事件應該被轉(zhuǎn)發(fā)到DLT(死信隊列)。

測試用例

如果在第一次嘗試中成功消費,就不應該進行重試。可以通過方法只被調(diào)用一次來測試這一點。還可以添加對日志語句的進一步測試。

 @Test
    void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

如果引發(fā)了不可重試的異常,就不應該進行重試。在這種情況下,方法CustomEventHandler#handleEvent應該只被調(diào)用一次。

 @Test    void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {        CustomEvent event = new CustomEvent("Hello");        // GIVEN        doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));        // WHEN        testKafkaTemplate.send("test", event).get();        // THEN        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));    }

如果拋出了RetryableException,則應該按照配置的最大重試次數(shù)進行重試,當重試次數(shù)耗盡時,事件應該被發(fā)布到死信主題。在這種情況下,方法CustomEventHandler#handleEvent應該被調(diào)用三次(maxRetries次),而方法CustomEventHandler#handleEventFromDlt應該只被調(diào)用一次。

 @Test
    void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

在驗證階段添加了相當長的超時時間,以便在測試完成之前考慮指數(shù)退避延遲。這是很重要的,如果沒有正確設置,可能會導致斷言失敗。應該重試直到RetryableException被解決,并且如果引發(fā)了不可重試的異?;蛘咦罱K成功消費,就不應該繼續(xù)重試。測試已經(jīng)設置為首先拋出RetryableException,然后再拋出NonRetryableException,以便進行一次重試。

@Test
    void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }ndleEventFromDlt(any(CustomEvent.class));    }
 @Test
    void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
        }

結(jié)論

因此,您可以看到集成測試是一種混合和匹配的策略,超時時間,延遲和驗證,以確保您的Kafka事件驅(qū)動架構(gòu)的重試機制是可靠的。

作者: Mukut Bhattacharjee

更多技術(shù)干貨盡在wx“云原生數(shù)據(jù)庫”

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容