Gradle工程的環(huán)境搭建依然和前文一致,參考:gRPC入門-Hello World
-
在
src\main\proto下新建.proto契約,如下:syntax = "proto3"; package com.mattie.netty.grpc; option java_package = "com.mattie.netty.grpc"; option java_outer_classname = "HelloWorldProtos"; service HelloService { //雙向流 rpc biStream (stream StreamReq) returns (stream StreamResp) {}; } //雙向流 message StreamReq { string req_info = 1; } message StreamResp { string resp_info = 1; } 執(zhí)行
gradle clean build,自動(dòng)生成gRPC相關(guān)代碼:HelloServiceGrpc, 我們需要使用就是它和它的幾個(gè)內(nèi)部類。-
自定義服務(wù)類
MyServiceImpl繼承HelloServiceGrpc的內(nèi)部類HelloServiceImplBase并重寫(xiě)契約中定義的服務(wù)biStream。注意到重寫(xiě)的方法接收的參數(shù)類型是StreamObserver(針對(duì)response),返回值的類型也是StreamObserver(針對(duì)request)。public class MyServiceImpl extends HelloServiceImplBase { @Override public StreamObserver<HelloWorldProtos.StreamReq> biStream(StreamObserver<HelloWorldProtos.StreamResp> responseObserver) { return new StreamObserver<HelloWorldProtos.StreamReq>() { //接收請(qǐng)求 @Override public void onNext(HelloWorldProtos.StreamReq streamReq) { System.out.println(streamReq.getReqInfo()); //接收請(qǐng)求后就返回一個(gè)響應(yīng) responseObserver.onNext(HelloWorldProtos.StreamResp.newBuilder().setRespInfo("from server").build()); } @Override public void onError(Throwable throwable) { } //客戶端發(fā)送數(shù)據(jù)完畢 @Override public void onCompleted() { //服務(wù)端也完畢 responseObserver.onCompleted(); } }; }
RequestObserver在服務(wù)端實(shí)現(xiàn),responseObserver在客戶端實(shí)現(xiàn)。
因此,在服務(wù)代碼MyServiceImpl中需要返回一個(gè)requestObserver,實(shí)現(xiàn)其中的回調(diào)方法:
1. onNext表示接收到一個(gè)request,這里通過(guò)responseObserver的onNext()立刻返回了一條數(shù)據(jù)。
2. onCompleted表示客戶端已經(jīng)發(fā)送數(shù)據(jù)完畢,這里調(diào)用responseObserver的onCompleted()也告訴客戶端連接關(guān)閉。
-
客戶端不僅需要發(fā)送數(shù)據(jù),而且需要實(shí)現(xiàn)一個(gè)
responseObserver:public void biCommute() { StreamObserver<HelloWorldProtos.StreamReq> reqStreamObserver = this.stub.biStream(new StreamObserver<HelloWorldProtos.StreamResp>() { //接收到服務(wù)返回 @Override public void onNext(HelloWorldProtos.StreamResp streamResp) { System.out.println(streamResp.getRespInfo()); } @Override public void onError(Throwable throwable) { } //服務(wù)發(fā)送完畢 @Override public void onCompleted() { } }); reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server1").build()); reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server2").build()); reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server3").build()); reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server4").build()); reqStreamObserver.onCompleted(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }
- 這里調(diào)用服務(wù)方法
biStream使用的是stub(異步),而非之前的blockingStub(同步)。 - 調(diào)用
biStream需要傳入responseObserver作為參數(shù),同時(shí)返回值是requestObserver。 - 通過(guò)
requestObserver的onNext()不斷發(fā)送數(shù)據(jù)。
- 執(zhí)行程序,客戶端發(fā)送四條數(shù)據(jù),服務(wù)端每接收到一條數(shù)據(jù)就響應(yīng)一條
from server給客戶端。
為了能更好的看到執(zhí)行效果,可以在程序最后增加
sleep(10000),觀察服務(wù)的異步響應(yīng)過(guò)程。
