SpringCloud 進(jìn)階:消息驅(qū)動(dòng)之Spring Cloud Stream 消費(fèi)者分組

我的博客:程序員笑笑生,歡迎瀏覽博客!

? ?上一章 SpringCloud進(jìn)階:Spring Cloud Stream 核心組件當(dāng)中,我們了解了Spring Cloud Stream的核心組件和Spring Integration的簡介,本章我們將聊一聊消費(fèi)者分組相關(guān)的知識。

# 前言

?在實(shí)際的企業(yè)應(yīng)用場景下,一條消息只能被一個(gè)消費(fèi)者消費(fèi),但是在我們部署的應(yīng)用中,通常會(huì)一個(gè)消費(fèi)者應(yīng)用部署了多臺(tái)實(shí)例。Spring Cloud Stream利用消費(fèi)者分組就解決這個(gè)問題,確保當(dāng)生產(chǎn)者發(fā)送一條消息后,多個(gè)實(shí)例當(dāng)中只有一個(gè)能夠消費(fèi)到這樣的消息。

一 、多實(shí)例未分組消費(fèi)者測試

?在我們之前的章節(jié)中, SpringCloud進(jìn)階-消息驅(qū)動(dòng)pring Cloud Stream中,我們創(chuàng)建了消費(fèi)者服務(wù):server-receiver和生產(chǎn)者:server-sender,接下來我們結(jié)合之前的注冊中心Eureka搭建多實(shí)例的消費(fèi)者,首先在server-receiver引入Eureka客戶端的依賴:

  <dependency>
    <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

?在resource目錄下新建applicaiton-s1.yml 、applicaiton-s2.yml 通過applicaiton.yml中配置spring.profiles.active屬性激活不同的配置文件:

applicaiton-s1.yml

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: mytopic
            binder: defaultRabbit
      binders:
         defaultRabbit:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
server:
  port: 8081


eureka:
    instance:
      hostname: eureka7001.com  #eureka服務(wù)端的實(shí)例名稱
      instance-id: receiver1
    client:
      service-url:
         # 與注冊中心交互的url
        defaultZone: http://eureka7001.com:7001/eureka/
        enabled: true

applicaiton-s2.yml

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: mytopic
            binder: defaultRabbit
      binders:
         defaultRabbit:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
server:
  port: 8082

eureka:
    instance:
      hostname: eureka7001.com  #eureka服務(wù)端的實(shí)例名稱
      instance-id: receiver2
    client:
      service-url:
         # 與注冊中心交互的url
        defaultZone: http://eureka7001.com:7001/eureka/
        enabled: true

?生產(chǎn)者:server-sender的代碼不變化,也不需要注冊到Eureak中,

@RestController
public class SenderController {

    @Autowired
    SenderSource source;

    @RequestMapping("/send")
    public String sender(String msg) {
        source.output().send(MessageBuilder.withPayload(msg).build());
        return "ok";
    }

?先后啟動(dòng)Eureak和服務(wù)消費(fèi)者server-receiver 兩 個(gè)實(shí)例,最后在啟動(dòng)生產(chǎn)者:server-sender,我們看看Eureka中:

file

顯示了2個(gè)消費(fèi)者,我們通過HTTP調(diào)用生產(chǎn)者的發(fā)送接口: http://localhost:8081/send?msg=test

我們看到receiver1:日志

file

我們看到receiver2:日志

file

我們看到每個(gè)實(shí)例都會(huì)受到消息。這不是我們想要的,我們需要不管消費(fèi)者服務(wù)有多少實(shí)例,確保只有一個(gè)實(shí)例消費(fèi)信息。

二、添加分組配置

? 在Spring Cloud Stream中,如果不給消費(fèi)者指定一個(gè)組Group,那么Spring Cloud Stream將會(huì)給當(dāng)前的實(shí)例分配一個(gè)匿名的、獨(dú)立的只有一個(gè)成員的消費(fèi)組,這就導(dǎo)致了一個(gè)服務(wù)n個(gè)實(shí)例,就會(huì)有n個(gè)消費(fèi)者分組;

?怎么樣去配置是的所有的實(shí)例都是一個(gè)組呢?我們可以通過配置 spring.cloud.stream.bindings.input.group=group1就可以實(shí)現(xiàn),我們在applicaiton-s1.yml 和 applicaiton-s2.yml 添加:

spring:
  cloud:
    stream:
      bindings:
         input:
           group: group1

?重啟項(xiàng)目后,再次通過生產(chǎn)者發(fā)送消息后,就能確保只有一個(gè)消費(fèi)者收到消息了。

我們可以看到未分組之前是這樣的:

我們看到每個(gè)實(shí)例都會(huì)受到消息。這不是我們想要的,我們需要不管消費(fèi)者服務(wù)有多少實(shí)例,確保只有一個(gè)實(shí)例消費(fèi)信息。

二、添加分組配置

? 在Spring Cloud Stream中,如果不給消費(fèi)者指定一個(gè)組Group,那么Spring Cloud Stream將會(huì)給當(dāng)前的實(shí)例分配一個(gè)匿名的、獨(dú)立的只有一個(gè)成員的消費(fèi)組,這就導(dǎo)致了一個(gè)服務(wù)n個(gè)實(shí)例,就會(huì)有n個(gè)消費(fèi)者分組;

?怎么樣去配置是的所有的實(shí)例都是一個(gè)組呢?我們可以通過配置 spring.cloud.stream.bindings.input.group=group1就可以實(shí)現(xiàn),我們在applicaiton-s1.yml 和 applicaiton-s2.yml 添加:

spring:
  cloud:
    stream:
      bindings:
         input:
           group: group1

?重啟項(xiàng)目后,再次通過生產(chǎn)者發(fā)送消息后,就能確保只有一個(gè)消費(fèi)者收到消息了。

我們可以看到未分組之前是這樣的:

file

分組之后:

file

總結(jié)

?本章主要是通過示例的方式,使用Spring Cloud Stream如何實(shí)現(xiàn)消費(fèi)者分組,這也是在實(shí)際的開發(fā)中需要考慮的問題。

----END----

? .以就是本期的分享,你還可以關(guān)注公眾號: 程序員笑笑生,關(guān)注更多精彩內(nèi)容!

file
file

SpringCloud基礎(chǔ)教程(一)-微服務(wù)與SpringCloud

SpringCloud基礎(chǔ)教程(二)-服務(wù)發(fā)現(xiàn) Eureka

SpringCloud基礎(chǔ)教程(三)-Eureka進(jìn)階

SpringCloud 基礎(chǔ)教程(四)-配置中心入門

SpringCloud基礎(chǔ)教程(五)-配置中心熱生效和高可用

SpringCloud 基礎(chǔ)教程(六)-負(fù)載均衡Ribbon

SpringCloud 基礎(chǔ)教程(七)-Feign聲明式服務(wù)調(diào)用

SpringCloud 基礎(chǔ)教程(八)-Hystrix熔斷器(上)

SpringCloud 基礎(chǔ)教程(九)-Hystrix服務(wù)監(jiān)控(下)

SpringCloud 基礎(chǔ)教程(十)-Zull服務(wù)網(wǎng)關(guān)

SpringCloud 基礎(chǔ)教程(十一)- Sleuth 調(diào)用鏈追蹤簡介

SpringCloud 基礎(chǔ)教程(十二)-Zipkin 分布式鏈路追蹤系統(tǒng)搭建

SpringCloud 進(jìn)階: 消息驅(qū)動(dòng)(入門) Spring Cloud Stream【Greenwich.SR3】

SpringCloud 進(jìn)階: Spring Cloud Stream 核心組件

SpringCloud 進(jìn)階: 消息驅(qū)動(dòng)之Spring Cloud Stream 消費(fèi)者分組

更多精彩內(nèi)容,請期待...

本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容