SpringCloud Stream

相關(guān)資料以及注意事項(xiàng):

簡介

SpringCloud Stream是SpringCloud對(duì)于消息中間件的進(jìn)一步封裝,它簡化了開發(fā)人員對(duì)于消息隊(duì)列的操作。目前僅支持RabbitMQ與Kafka。


示例代碼

1.引入Maven

根據(jù)你的消息隊(duì)列類型引入spring-cloud-starter-stream-rabbit或者spring-cloud-starter-stream-kafka

2.相關(guān)注釋

  • @input : 設(shè)置輸入信道名稱。不設(shè)置參數(shù),通道名稱就默認(rèn)為方法名。
  • @output:設(shè)置輸出信道名稱。不設(shè)置參數(shù),通道名稱就默認(rèn)為方法名。
  • @StreamListener:設(shè)置監(jiān)聽信道,用于接受來自消息隊(duì)列的消息
  • @SendTo:配合@StreamListener使用,在收到信息后發(fā)送反饋信息
  • @EnableBinding:注解用于綁定一個(gè)或者多個(gè)接口作為參數(shù)

3.預(yù)設(shè)類

  • Sink:stream中接受消息的接口
  • Source:stream中輸出消息的接口
  • Processor:stream中綁定輸入輸出的接口,主要用來講發(fā)布者和訂閱者綁定到一起

4.配置文件參數(shù)

對(duì)于RabbitMQ,destination 對(duì)應(yīng)的是exchange,group對(duì)應(yīng)的是queue(帶有前綴)。對(duì)于kafka,destination 對(duì)應(yīng)的是Topic,group就是對(duì)應(yīng)的消費(fèi)group。對(duì)于一個(gè)應(yīng)用集群,如果不需要重復(fù)消費(fèi)消息,必須定義group,否則不必定義group(比如刷新配置消息)。接收消息通道和發(fā)送消息通道名不可以重復(fù)。需要在后臺(tái)監(jiān)控頁面看到直觀的對(duì)象數(shù)據(jù),需要設(shè)置 content-type: application/json。

  cloud:
    stream:
      bindings:
        input: #通道名稱
        group: group1 
        content-type: application/json
        destination: exchange

5.程序代碼

一個(gè)controller,分別是傳遞字符串與傳對(duì)象。
一個(gè)receiver,一個(gè)是接受并發(fā)出反饋信息,另一個(gè)為接收反饋。

@RestController
public class SendMessageController {
    @Autowired
    private Processor processor;


    @GetMapping("/sendMessage")
    public void process()
    {
        String message = new StringBuilder().append("now ").append(new Date()).toString();
        processor.output().send(MessageBuilder.withPayload(message).build());
    }

    @GetMapping("/sendObject")
    public void sendObject()
    {
        Person person = new Person();
        person.setName("張三");
        person.setAge(123);
        //process中已設(shè)置input
        processor.output().send(MessageBuilder.withPayload(person).build());
    }
}

@Component
@EnableBinding(Processor.class)
@Slf4j
public class StreamReceiver {

    @StreamListener(Processor.OUTPUT)
    @SendTo(Processor.INPUT)
    public String process(String message)
    {
        System.out.println(message);
        log.info("process: StreamReceiver:{}",message);
        return "我是回執(zhí)";
    }
    //@StreamListener(Processor.OUTPUT)
    //public void process1(Person person)
    //{
    //    System.out.println(person);
    //    log.info("process1: StreamReceiver:{}",person);
    //}

    @StreamListener(Processor.INPUT)
    public void process2(String message)
    {
        System.out.println(message);
        log.info("process2: StreamReceiver:{}",message);
    }
}

好了,那么有關(guān)于SpringCloud Stream就介紹到這里。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容