java版gRPC實(shí)戰(zhàn)之五:雙向流

歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內(nèi)容:所有原創(chuàng)文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概覽

  • 本文是《java版gRPC實(shí)戰(zhàn)》系列的第五篇,目標(biāo)是掌握雙向流類型的服務(wù),即請(qǐng)求參數(shù)是流的形式,響應(yīng)的內(nèi)容也是流的形式;
  • 先來看看官方資料對(duì)雙向流式RPC的介紹:是雙方使用讀寫流去發(fā)送一個(gè)消息序列。兩個(gè)流獨(dú)立操作,因此客戶端和服務(wù)器 可以以任意喜歡的順序讀寫:比如, 服務(wù)器可以在寫入響應(yīng)前等待接收所有的客戶端消息,或者可以交替 的讀取和寫入消息,或者其他讀寫的組合。 每個(gè)流中的消息順序被預(yù)留;
  • 掌握了客戶端流和服務(wù)端流兩種類型的開發(fā)后,雙向流類型就很好理解了,就是之前兩種類型的結(jié)合體,請(qǐng)求和響應(yīng)都按照流的方式處理即可;
  • 今天的實(shí)戰(zhàn),咱們來設(shè)計(jì)一個(gè)在線商城的功能:批量減扣庫存,即客戶端提交多個(gè)商品和數(shù)量,服務(wù)端返回每個(gè)商品減扣庫存成功和失敗的情況;
  • 咱們盡快進(jìn)入編碼環(huán)節(jié)吧,具體內(nèi)容如下:
  1. 在proto文件中定義雙向流類型的gRPC接口,再通過proto生成java代碼
  2. 開發(fā)服務(wù)端應(yīng)用
  3. 開發(fā)客戶端應(yīng)用
  4. 驗(yàn)證

源碼下載

名稱 鏈接 備注
項(xiàng)目主頁 https://github.com/zq2599/blog_demos 該項(xiàng)目在GitHub上的主頁
git倉(cāng)庫地址(https) https://github.com/zq2599/blog_demos.git 該項(xiàng)目源碼的倉(cāng)庫地址,https協(xié)議
git倉(cāng)庫地址(ssh) git@github.com:zq2599/blog_demos.git 該項(xiàng)目源碼的倉(cāng)庫地址,ssh協(xié)議
  • 這個(gè)git項(xiàng)目中有多個(gè)文件夾,《java版gRPC實(shí)戰(zhàn)》系列的源碼在<font color="blue">grpc-tutorials</font>文件夾下,如下圖紅框所示:
在這里插入圖片描述
  • <font color="blue">grpc-tutorials</font>文件夾下有多個(gè)目錄,本篇文章對(duì)應(yīng)的服務(wù)端代碼在<font color="blue">double-stream-server-side</font>目錄下,客戶端代碼在<font color="blue">double-stream-client-side</font>目錄下,如下圖:
在這里插入圖片描述

在proto文件中定義雙向流類型的gRPC接口

  • 首先要做的就是定義gRPC接口,打開<font color="blue">mall.proto</font>,在里面新增方法和相關(guān)的數(shù)據(jù)結(jié)構(gòu),需要重點(diǎn)關(guān)注的是BatchDeduct方法的入?yún)roductOrder和返回值DeductReply都添加了<font color="red">stream</font>修飾(ProductOrder是上一章定義的),代表該方法是雙向流類型:
// gRPC服務(wù),這是個(gè)在線商城的庫存服務(wù)
service StockService {
    // 雙向流式:批量扣減庫存
    rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}
}

// 扣減庫存返回結(jié)果的數(shù)據(jù)結(jié)構(gòu)
message DeductReply {
    // 返回碼
    int32 code = 1;
    // 描述信息
    string message = 2;
}
  • 雙擊下圖紅框中的task即可生成java代碼:
在這里插入圖片描述
  • 生成下圖紅框中的文件,即服務(wù)端定義和返回值數(shù)據(jù)結(jié)構(gòu):
在這里插入圖片描述
  • 接下來開發(fā)服務(wù)端;

開發(fā)服務(wù)端應(yīng)用

  • 在父工程grpc-turtorials下面新建名為<font color="blue">double-stream-server-side</font>的模塊,其build.gradle內(nèi)容如下:
// 使用springboot插件
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    // 作為gRPC服務(wù)提供方,需要用到此庫
    implementation 'net.devh:grpc-server-spring-boot-starter'
    // 依賴自動(dòng)生成源碼的工程
    implementation project(':grpc-lib')
    // annotationProcessor不會(huì)傳遞,使用了lombok生成代碼的模塊,需要自己聲明annotationProcessor
    annotationProcessor 'org.projectlombok:lombok'
}
  • 配置文件application.yml:
spring:
  application:
    name: double-stream-server-side
# gRPC有關(guān)的配置,這里只需要配置服務(wù)端口號(hào)
grpc:
  server:
    port: 9901
  • 啟動(dòng)類DoubleStreamServerSideApplication.java的代碼就不貼了,普通的springboot啟動(dòng)類而已;
  • 重點(diǎn)是提供grpc服務(wù)的GrpcServerService.java,咱們要做的就是給上層框架返回一個(gè)匿名類,至于里面的onNext、onCompleted方法何時(shí)被調(diào)用是上層框架決定的,另外還準(zhǔn)備了成員變量totalCount,這樣就可以記錄總數(shù)了,由于請(qǐng)求參數(shù)是流,因此匿名類的onNext會(huì)被多次調(diào)用,并且由于返回值是流,因此onNext中調(diào)用了<font color="blue">responseObserver.onNext</font>方法來響應(yīng)流中的每個(gè)請(qǐng)求,這樣客戶端就不斷收到服務(wù)端的響應(yīng)數(shù)據(jù)(即客戶端的onNext方法會(huì)被多次調(diào)用):
package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
@Slf4j
public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {

    @Override
    public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) {
        // 返回匿名類,給上層框架使用
        return new StreamObserver<ProductOrder>() {

            private int totalCount = 0;

            @Override
            public void onNext(ProductOrder value) {
                log.info("正在處理商品[{}],數(shù)量為[{}]",
                        value.getProductId(),
                        value.getNumber());

                // 增加總量
                totalCount += value.getNumber();

                int code;
                String message;

                // 假設(shè)單數(shù)的都有庫存不足的問題
                if (0 == value.getNumber() % 2) {
                    code = 10000;
                    message = String.format("商品[%d]扣減庫存數(shù)[%d]成功", value.getProductId(), value.getNumber());
                } else {
                    code = 10001;
                    message = String.format("商品[%d]扣減庫存數(shù)[%d]失敗", value.getProductId(), value.getNumber());
                }

                responseObserver.onNext(DeductReply.newBuilder()
                        .setCode(code)
                        .setMessage(message)
                        .build());
            }

            @Override
            public void onError(Throwable t) {
                log.error("批量減扣庫存異常", t);
            }

            @Override
            public void onCompleted() {
                log.info("批量減扣庫存完成,共計(jì)[{}]件商品", totalCount);
                responseObserver.onCompleted();
            }
        };
    }
}

開發(fā)客戶端應(yīng)用

  • 在父工程grpc-turtorials下面新建名為<font color="blue">double-stream-server-side</font>的模塊,其build.gradle內(nèi)容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation project(':grpc-lib')
}
  • 配置文件application.yml,設(shè)置自己的web端口號(hào)和服務(wù)端地址:
server:
  port: 8082
spring:
  application:
    name: double-stream-client-side

grpc:
  client:
    # gRPC配置的名字,GrpcClient注解會(huì)用到
    double-stream-server-side:
      # gRPC服務(wù)端地址
      address: 'static://127.0.0.1:9901'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
  • 啟動(dòng)類DoubleStreamClientSideApplication.java的代碼就不貼了,普通的springboot啟動(dòng)類而已;

  • 正常情況下我們都是用StreamObserver處理服務(wù)端響應(yīng),這里由于是異步響應(yīng),需要額外的方法從StreamObserver中取出業(yè)務(wù)數(shù)據(jù),于是定一個(gè)新接口,繼承自StreamObserver,新增<font color="blue">getExtra</font>方法可以返回String對(duì)象,詳細(xì)的用法稍后會(huì)看到:

package com.bolingcavalry.grpctutorials;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver<T> extends StreamObserver<T> {
    String getExtra();
}
  • 重頭戲來了,看看如何遠(yuǎn)程調(diào)用雙向流類型的gRPC接口,代碼中已經(jīng)添加詳細(xì)注釋:
package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class GrpcClientService {

    @GrpcClient("double-stream-server-side")
    private StockServiceGrpc.StockServiceStub stockServiceStub;

    /**
     * 批量減庫存
     * @param count
     * @return
     */
    public String batchDeduct(int count) {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // responseObserver的onNext和onCompleted會(huì)在另一個(gè)線程中被執(zhí)行,
        // ExtendResponseObserver繼承自StreamObserver
        ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() {

            // 用stringBuilder保存所有來自服務(wù)端的響應(yīng)
            private StringBuilder stringBuilder = new StringBuilder();

            @Override
            public String getExtra() {
                return stringBuilder.toString();
            }

            /**
             * 客戶端的流式請(qǐng)求期間,每一筆請(qǐng)求都會(huì)收到服務(wù)端的一個(gè)響應(yīng),
             * 對(duì)應(yīng)每個(gè)響應(yīng),這里的onNext方法都會(huì)被執(zhí)行一次,入?yún)⑹琼憫?yīng)內(nèi)容
             * @param value
             */
            @Override
            public void onNext(DeductReply value) {
                log.info("batch deduct on next");
                // 放入匿名類的成員變量中
                stringBuilder.append(String.format("返回碼[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));
            }

            @Override
            public void onError(Throwable t) {
                log.error("batch deduct gRPC request error", t);
                stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
                countDownLatch.countDown();
            }

            /**
             * 服務(wù)端確認(rèn)響應(yīng)完成后,這里的onCompleted方法會(huì)被調(diào)用
             */
            @Override
            public void onCompleted() {
                log.info("batch deduct on complete");
                // 執(zhí)行了countDown方法后,前面執(zhí)行countDownLatch.await方法的線程就不再wait了,
                // 會(huì)繼續(xù)往下執(zhí)行
                countDownLatch.countDown();
            }
        };

        // 遠(yuǎn)程調(diào)用,此時(shí)數(shù)據(jù)還沒有給到服務(wù)端
        StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);

        for(int i=0; i<count; i++) {
            // 每次執(zhí)行onNext都會(huì)發(fā)送一筆數(shù)據(jù)到服務(wù)端,
            // 服務(wù)端的onNext方法都會(huì)被執(zhí)行一次
            requestObserver.onNext(build(101 + i, 1 + i));
        }

        // 客戶端告訴服務(wù)端:數(shù)據(jù)已經(jīng)發(fā)完了
        requestObserver.onCompleted();

        try {
            // 開始等待,如果服務(wù)端處理完成,那么responseObserver的onCompleted方法會(huì)在另一個(gè)線程被執(zhí)行,
            // 那里會(huì)執(zhí)行countDownLatch的countDown方法,一但countDown被執(zhí)行,下面的await就執(zhí)行完畢了,
            // await的超時(shí)時(shí)間設(shè)置為2秒
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("countDownLatch await error", e);
        }

        log.info("service finish");
        // 服務(wù)端返回的內(nèi)容被放置在requestObserver中,從getExtra方法可以取得
        return responseObserver.getExtra();
    }

    /**
     * 創(chuàng)建ProductOrder對(duì)象
     * @param productId
     * @param num
     * @return
     */
    private static ProductOrder build(int productId, int num) {
        return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
    }
}
  • 最后做個(gè)web接口,可以通過web請(qǐng)求驗(yàn)證遠(yuǎn)程調(diào)用:
package grpctutorials;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "1") int count) {
        return grpcClientService.batchDeduct(count);
    }
}
  • 編碼完成,開始驗(yàn)證;

驗(yàn)證

  • 啟動(dòng)服務(wù)端DoubleStreamServerSideApplication:
在這里插入圖片描述
  • 啟動(dòng)客戶端DoubleStreamClientSideApplication:
在這里插入圖片描述
  • 這里要改:瀏覽器輸入<font color="blue">http://localhost:8083/?count=10</font>,響應(yīng)如下,可見遠(yuǎn)程調(diào)用gRPC服務(wù)成功,流式響應(yīng)的每一筆返回都被客戶端收到:
在這里插入圖片描述
  • 下面是服務(wù)端日志,可見逐一處理了客戶端的每一筆數(shù)據(jù):
在這里插入圖片描述
  • 下面是客戶端日志,可見由于CountDownLatch的作用,發(fā)起gRPC請(qǐng)求的線程一直等待responseObserver.onCompleted在另一個(gè)線程被執(zhí)行完后,才會(huì)繼續(xù)執(zhí)行:
在這里插入圖片描述
  • 至此,四種類型的gRPC服務(wù)及其客戶端開發(fā)就完成了,一般的業(yè)務(wù)場(chǎng)景咱們都能應(yīng)付自如,接下來的文章咱們會(huì)繼續(xù)深入學(xué)習(xí),了解復(fù)雜場(chǎng)景下的gRPC操作;

你不孤單,欣宸原創(chuàng)一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數(shù)據(jù)庫+中間件系列
  6. DevOps系列

歡迎關(guān)注公眾號(hào):程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容