gRPC入門-雙向流式通信

Gradle工程的環(huán)境搭建依然和前文一致,參考:gRPC入門-Hello World

示例代碼

  • src\main\proto下新建.proto契約,如下:

    syntax = "proto3";
    
    package com.mattie.netty.grpc;
    
    option java_package = "com.mattie.netty.grpc";
    option java_outer_classname = "HelloWorldProtos";
    
    service HelloService {
        //雙向流
        rpc biStream (stream StreamReq) returns (stream StreamResp) {};
    }
    
    //雙向流
     message StreamReq {
       string req_info = 1;
    }
    
    message StreamResp {
        string resp_info = 1;
    }
    
  • 執(zhí)行gradle clean build,自動(dòng)生成gRPC相關(guān)代碼: HelloServiceGrpc, 我們需要使用就是它和它的幾個(gè)內(nèi)部類。

  • 自定義服務(wù)類MyServiceImpl繼承HelloServiceGrpc的內(nèi)部類HelloServiceImplBase并重寫(xiě)契約中定義的服務(wù)biStream。注意到重寫(xiě)的方法接收的參數(shù)類型是StreamObserver(針對(duì)response),返回值的類型也是StreamObserver(針對(duì)request)。

    public class MyServiceImpl extends HelloServiceImplBase {
        @Override
        public StreamObserver<HelloWorldProtos.StreamReq> biStream(StreamObserver<HelloWorldProtos.StreamResp> responseObserver) {
      return new StreamObserver<HelloWorldProtos.StreamReq>() {
    
          //接收請(qǐng)求
          @Override
          public void onNext(HelloWorldProtos.StreamReq streamReq) {
              System.out.println(streamReq.getReqInfo());
              //接收請(qǐng)求后就返回一個(gè)響應(yīng)
              responseObserver.onNext(HelloWorldProtos.StreamResp.newBuilder().setRespInfo("from server").build());
          }
    
          @Override
          public void onError(Throwable throwable) {
    
          }
    
          //客戶端發(fā)送數(shù)據(jù)完畢
          @Override
          public void onCompleted() {
              //服務(wù)端也完畢
              responseObserver.onCompleted();
          }
        };
      }
    

RequestObserver在服務(wù)端實(shí)現(xiàn),responseObserver在客戶端實(shí)現(xiàn)。

因此,在服務(wù)代碼MyServiceImpl中需要返回一個(gè)requestObserver,實(shí)現(xiàn)其中的回調(diào)方法:
1. onNext表示接收到一個(gè)request,這里通過(guò)responseObserveronNext()立刻返回了一條數(shù)據(jù)。
2. onCompleted表示客戶端已經(jīng)發(fā)送數(shù)據(jù)完畢,這里調(diào)用responseObserveronCompleted()也告訴客戶端連接關(guān)閉。

  • 客戶端不僅需要發(fā)送數(shù)據(jù),而且需要實(shí)現(xiàn)一個(gè)responseObserver:

      public void biCommute() {
          StreamObserver<HelloWorldProtos.StreamReq> reqStreamObserver = this.stub.biStream(new StreamObserver<HelloWorldProtos.StreamResp>() {
              //接收到服務(wù)返回
              @Override
              public void onNext(HelloWorldProtos.StreamResp streamResp) {
                  System.out.println(streamResp.getRespInfo());
              }
    
              @Override
              public void onError(Throwable throwable) {
    
              }
    
              //服務(wù)發(fā)送完畢
              @Override
              public void onCompleted() {
    
              }
          });
    
          reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server1").build());
          reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server2").build());
          reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server3").build());
          reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server4").build());
          reqStreamObserver.onCompleted();
    
         try {
            Thread.sleep(10000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
       }
    
  1. 這里調(diào)用服務(wù)方法biStream使用的是stub(異步),而非之前的blockingStub(同步)。
  2. 調(diào)用biStream需要傳入responseObserver作為參數(shù),同時(shí)返回值是requestObserver。
  3. 通過(guò)requestObserveronNext()不斷發(fā)送數(shù)據(jù)。
  • 執(zhí)行程序,客戶端發(fā)送四條數(shù)據(jù),服務(wù)端每接收到一條數(shù)據(jù)就響應(yīng)一條from server給客戶端。

為了能更好的看到執(zhí)行效果,可以在程序最后增加sleep(10000),觀察服務(wù)的異步響應(yīng)過(guò)程。

雙向流
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文是gRPC的一個(gè)簡(jiǎn)單例子,以protocol buffers 3作為契約類型,使用gRPC自動(dòng)生成服務(wù)端和客戶...
    ted005閱讀 5,374評(píng)論 0 50
  • 版權(quán)聲明:本文為小斑馬偉原創(chuàng)文章,轉(zhuǎn)載請(qǐng)注明出處! 上篇簡(jiǎn)單的闡述了響應(yīng)式編程的基本理論。這篇主要對(duì)響應(yīng)編程進(jìn)行詳...
    ZebraWei閱讀 3,238評(píng)論 0 2
  • 上篇文章講到 Protocol Buffers. 一些 API 也可以用 Protobuf 來(lái)表示。 簡(jiǎn)單介紹 R...
    loono閱讀 20,312評(píng)論 0 4
  • 一.Grpc簡(jiǎn)介 一個(gè)2016年才由google正式發(fā)布的的RPC框架,基于http2,protobuf協(xié)議 官網(wǎng)...
    我也是玄沖閱讀 8,761評(píng)論 0 2
  • 如果你愛(ài)我 請(qǐng)你愛(ài)我之前先愛(ài)你自己 愛(ài)我的同時(shí)也愛(ài)著你自己 你若不愛(ài)你自己 你便無(wú)法來(lái)愛(ài)我 這是愛(ài)的法則 因?yàn)?你...
    清且安閱讀 291評(píng)論 0 1

友情鏈接更多精彩內(nèi)容