通過之前的《消息驅(qū)動的微服務(入門)》一文,相信很多朋友已經(jīng)對Spring Cloud Stream有了一個初步的認識。但是,對于《消息驅(qū)動的微服務(核心概念)》一文中提到的一些核心概念可能還有些迷糊,下面我們將詳細的來學習一下這些概念。本文我們就來學習和使用一下“消費組”這一概念。
使用消費組實現(xiàn)消息消費的負載均衡
通常在生產(chǎn)環(huán)境,我們的每個服務都不會以單節(jié)點的方式運行在生產(chǎn)環(huán)境,當同一個服務啟動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。
默認情況下,當生產(chǎn)者發(fā)出一條消息到綁定通道上,這條消息會產(chǎn)生多個副本被每個消費者實例接收和處理,但是有些業(yè)務場景之下,我們希望生產(chǎn)者產(chǎn)生的消息只被其中一個實例消費,這個時候我們需要為這些消費者設(shè)置消費組來實現(xiàn)這樣的功能,實現(xiàn)的方式非常簡單,我們只需要在服務消費者端設(shè)置spring.cloud.stream.bindings.input.group屬性即可,比如我們可以這樣實現(xiàn):
- 先創(chuàng)建一個消費者應用
SinkReceiver,實現(xiàn)了greetings主題上的輸入通道綁定,它的實現(xiàn)如下:
@EnableBinding(value = {Sink.class})
public class SinkReceiver {
private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);
@StreamListener(Sink.INPUT)
public void receive(User user) {
logger.info("Received: " + user);
}
}
- 為了將
SinkReceiver的輸入通道目標設(shè)置為greetings主題,以及將該服務的實例設(shè)置為同一個消費組,做如下設(shè)置:
spring.cloud.stream.bindings.input.group=Service-A
spring.cloud.stream.bindings.input.destination=greetings
通過spring.cloud.stream.bindings.input.group屬性指定了該應用實例都屬于Service-A消費組,而spring.cloud.stream.bindings.input.destination屬性則指定了輸入通道對應的主題名。
- 完成了消息消費者之后,我們再來實現(xiàn)一個消息生產(chǎn)者應用
SinkSender,具體如下:
@EnableBinding(value = {Source.class})
public class SinkSender {
private static Logger logger = LoggerFactory.getLogger(SinkSender.class);
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>("{\"name\":\"didi\", \"age\":30}");
}
}
- 為消息生產(chǎn)者
SinkSender做一些設(shè)置,讓它的輸出通道綁定目標也指向greetings主題,具體如下:
spring.cloud.stream.bindings.output.destination=greetings
到這里,對于消費分組的示例就已經(jīng)完成了。分別運行上面實現(xiàn)的生產(chǎn)者與消費者,其中消費者我們啟動多個實例。通過控制臺,我們可以發(fā)現(xiàn)每個生產(chǎn)者發(fā)出的消息,會被啟動的消費者以輪詢的方式進行接收和輸出。
以下專題教程也許您會有興趣
本文內(nèi)容部分節(jié)選自我的《Spring Cloud微服務實戰(zhàn)》,稍作改變和內(nèi)容升級