Spring Cloud Stream如何處理消息重復消費?

最近收到好幾個類似的問題:使用Spring Cloud Stream操作RabbitMQ或Kafka的時候,出現(xiàn)消息重復消費的問題。通過溝通與排查下來主要還是用戶對消費組的認識不夠。其實,在之前的博文以及《Spring Cloud微服務(wù)實戰(zhàn)》一書中都有提到關(guān)于消費組的概念以及作用。

那么什么是消費組呢?為什么要用消費組?它解決什么問題呢?摘錄一段之前博文的內(nèi)容,來解答這些疑問:

通常在生產(chǎn)環(huán)境,我們的每個服務(wù)都不會以單節(jié)點的方式運行在生產(chǎn)環(huán)境,當同一個服務(wù)啟動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。默認情況下,當生產(chǎn)者發(fā)出一條消息到綁定通道上,這條消息會產(chǎn)生多個副本被每個消費者實例接收和處理(出現(xiàn)上述重復消費問題)。但是有些業(yè)務(wù)場景之下,我們希望生產(chǎn)者產(chǎn)生的消息只被其中一個實例消費,這個時候我們需要為這些消費者設(shè)置消費組來實現(xiàn)這樣的功能。

詳細也可查看原文:消息驅(qū)動的微服務(wù)(消費組)。

下面,通過一個例子來看看如何使用消費組:

問題重現(xiàn)

構(gòu)建消息消費端

第一步:創(chuàng)建綁定接口,綁定example-topic輸入通道(默認情況下,會綁定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。

interface ExampleBinder {

    String NAME = "example-topic";

    @Input(NAME)
    SubscribableChannel input();

}

第二步:對上述輸入通道創(chuàng)建監(jiān)聽與處理邏輯。

@EnableBinding(ExampleBinder.class)
public class ExampleReceiver {

    private static Logger logger = LoggerFactory.getLogger(ExampleReceiver.class);

    @StreamListener(ExampleBinder.NAME)
    public void receive(String payload) {
        logger.info("Received: " + payload);
    }

}

第三步;創(chuàng)建應(yīng)用主類和配置文件

@SpringBootApplication
public class ExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }

}
spring.application.name=stream-consumer-group
server.port=0

這里設(shè)置server.port=0,以方便在本地啟動多實例來重現(xiàn)問題。

完成上述操作之后,啟動兩個該應(yīng)用的實例,以備后續(xù)調(diào)用。

構(gòu)建消息生產(chǎn)端

比較簡單,需要注意的是,使用@Output創(chuàng)建一個同名的輸出綁定,這樣發(fā)出的消息才能被上述啟動的實例接收到。具體實現(xiàn)如下:

@RunWith(SpringRunner.class)
@EnableBinding(value = {ExampleApplicationTests.ExampleBinder.class})
public class ExampleApplicationTests {

    @Autowired
    private ExampleBinder exampleBinder;

    @Test
    public void exampleBinderTester() {
        exampleBinder.output().send(MessageBuilder.withPayload("Produce a message from : http://blog.didispace.com").build());
    }

    public interface ExampleBinder {

        String NAME = "example-topic";

        @Output(NAME)
        MessageChannel output();

    }

}

啟動上述測試用例之后,可以發(fā)現(xiàn)之前啟動的兩個實例都收到的消息,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com。消息重復消費的問題成功重現(xiàn)!

使用消費組解決問題

如何解決上述消息重復消費的問題呢?我們只需要在配置文件中增加如下配置即可:

spring.cloud.stream.bindings.example-topic.group=aaa

當我們指定了某個綁定所指向的消費組之后,往當前主題發(fā)送的消息在每個訂閱消費組中,只會有一個訂閱者接收和消費,從而實現(xiàn)了對消息的負載均衡。只所以之前會出現(xiàn)重復消費的問題,是由于默認情況下,任何訂閱都會產(chǎn)生一個匿名消費組,所以每個訂閱實例都會有自己的消費組,從而當有消息發(fā)送的時候,就形成了廣播的模式。

另外,需要注意上述配置中example-topic是在代碼中@Output@Input中傳入的名字。

代碼示例

本文示例讀者可以通過查看下面?zhèn)}庫的中的stream-consumer-group項目:

如果您對這些感興趣,歡迎star、follow、收藏、轉(zhuǎn)發(fā)給予支持!

以下專題教程也許您會有興趣

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,604評論 19 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,275評論 6 342
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong閱讀 22,943評論 1 92
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會使用輕量級的消息代理來構(gòu)建一個共用的消息主題讓系統(tǒng)中所有微服務(wù)實例都連接上來...
    Chandler_玨瑜閱讀 6,787評論 2 39
  • 2018.8.29 星期三 天氣多云 吃過早飯,老媽老爸又去掰玉米了,我在家看孩子,大兒子在聽國學機。小兒子...
    五年六班陳樂奇閱讀 200評論 0 1

友情鏈接更多精彩內(nèi)容