歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內(nèi)容:所有原創(chuàng)文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;
本篇概覽
- 本文是《java版gRPC實(shí)戰(zhàn)》系列的第五篇,目標(biāo)是掌握雙向流類型的服務(wù),即請(qǐng)求參數(shù)是流的形式,響應(yīng)的內(nèi)容也是流的形式;
- 先來看看官方資料對(duì)雙向流式RPC的介紹:是雙方使用讀寫流去發(fā)送一個(gè)消息序列。兩個(gè)流獨(dú)立操作,因此客戶端和服務(wù)器 可以以任意喜歡的順序讀寫:比如, 服務(wù)器可以在寫入響應(yīng)前等待接收所有的客戶端消息,或者可以交替 的讀取和寫入消息,或者其他讀寫的組合。 每個(gè)流中的消息順序被預(yù)留;
- 掌握了客戶端流和服務(wù)端流兩種類型的開發(fā)后,雙向流類型就很好理解了,就是之前兩種類型的結(jié)合體,請(qǐng)求和響應(yīng)都按照流的方式處理即可;
- 今天的實(shí)戰(zhàn),咱們來設(shè)計(jì)一個(gè)在線商城的功能:批量減扣庫存,即客戶端提交多個(gè)商品和數(shù)量,服務(wù)端返回每個(gè)商品減扣庫存成功和失敗的情況;
- 咱們盡快進(jìn)入編碼環(huán)節(jié)吧,具體內(nèi)容如下:
- 在proto文件中定義雙向流類型的gRPC接口,再通過proto生成java代碼
- 開發(fā)服務(wù)端應(yīng)用
- 開發(fā)客戶端應(yīng)用
- 驗(yàn)證
源碼下載
- 本篇實(shí)戰(zhàn)中的完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 鏈接 | 備注 |
|---|---|---|
| 項(xiàng)目主頁 | https://github.com/zq2599/blog_demos | 該項(xiàng)目在GitHub上的主頁 |
| git倉(cāng)庫地址(https) | https://github.com/zq2599/blog_demos.git | 該項(xiàng)目源碼的倉(cāng)庫地址,https協(xié)議 |
| git倉(cāng)庫地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項(xiàng)目源碼的倉(cāng)庫地址,ssh協(xié)議 |
- 這個(gè)git項(xiàng)目中有多個(gè)文件夾,《java版gRPC實(shí)戰(zhàn)》系列的源碼在<font color="blue">grpc-tutorials</font>文件夾下,如下圖紅框所示:

在這里插入圖片描述
- <font color="blue">grpc-tutorials</font>文件夾下有多個(gè)目錄,本篇文章對(duì)應(yīng)的服務(wù)端代碼在<font color="blue">double-stream-server-side</font>目錄下,客戶端代碼在<font color="blue">double-stream-client-side</font>目錄下,如下圖:

在這里插入圖片描述
在proto文件中定義雙向流類型的gRPC接口
- 首先要做的就是定義gRPC接口,打開<font color="blue">mall.proto</font>,在里面新增方法和相關(guān)的數(shù)據(jù)結(jié)構(gòu),需要重點(diǎn)關(guān)注的是BatchDeduct方法的入?yún)roductOrder和返回值DeductReply都添加了<font color="red">stream</font>修飾(ProductOrder是上一章定義的),代表該方法是雙向流類型:
// gRPC服務(wù),這是個(gè)在線商城的庫存服務(wù)
service StockService {
// 雙向流式:批量扣減庫存
rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}
}
// 扣減庫存返回結(jié)果的數(shù)據(jù)結(jié)構(gòu)
message DeductReply {
// 返回碼
int32 code = 1;
// 描述信息
string message = 2;
}
- 雙擊下圖紅框中的task即可生成java代碼:

在這里插入圖片描述
- 生成下圖紅框中的文件,即服務(wù)端定義和返回值數(shù)據(jù)結(jié)構(gòu):

在這里插入圖片描述
- 接下來開發(fā)服務(wù)端;
開發(fā)服務(wù)端應(yīng)用
- 在父工程grpc-turtorials下面新建名為<font color="blue">double-stream-server-side</font>的模塊,其build.gradle內(nèi)容如下:
// 使用springboot插件
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
// 作為gRPC服務(wù)提供方,需要用到此庫
implementation 'net.devh:grpc-server-spring-boot-starter'
// 依賴自動(dòng)生成源碼的工程
implementation project(':grpc-lib')
// annotationProcessor不會(huì)傳遞,使用了lombok生成代碼的模塊,需要自己聲明annotationProcessor
annotationProcessor 'org.projectlombok:lombok'
}
- 配置文件application.yml:
spring:
application:
name: double-stream-server-side
# gRPC有關(guān)的配置,這里只需要配置服務(wù)端口號(hào)
grpc:
server:
port: 9901
- 啟動(dòng)類DoubleStreamServerSideApplication.java的代碼就不貼了,普通的springboot啟動(dòng)類而已;
- 重點(diǎn)是提供grpc服務(wù)的GrpcServerService.java,咱們要做的就是給上層框架返回一個(gè)匿名類,至于里面的onNext、onCompleted方法何時(shí)被調(diào)用是上層框架決定的,另外還準(zhǔn)備了成員變量totalCount,這樣就可以記錄總數(shù)了,由于請(qǐng)求參數(shù)是流,因此匿名類的onNext會(huì)被多次調(diào)用,并且由于返回值是流,因此onNext中調(diào)用了<font color="blue">responseObserver.onNext</font>方法來響應(yīng)流中的每個(gè)請(qǐng)求,這樣客戶端就不斷收到服務(wù)端的響應(yīng)數(shù)據(jù)(即客戶端的onNext方法會(huì)被多次調(diào)用):
package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService
@Slf4j
public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {
@Override
public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) {
// 返回匿名類,給上層框架使用
return new StreamObserver<ProductOrder>() {
private int totalCount = 0;
@Override
public void onNext(ProductOrder value) {
log.info("正在處理商品[{}],數(shù)量為[{}]",
value.getProductId(),
value.getNumber());
// 增加總量
totalCount += value.getNumber();
int code;
String message;
// 假設(shè)單數(shù)的都有庫存不足的問題
if (0 == value.getNumber() % 2) {
code = 10000;
message = String.format("商品[%d]扣減庫存數(shù)[%d]成功", value.getProductId(), value.getNumber());
} else {
code = 10001;
message = String.format("商品[%d]扣減庫存數(shù)[%d]失敗", value.getProductId(), value.getNumber());
}
responseObserver.onNext(DeductReply.newBuilder()
.setCode(code)
.setMessage(message)
.build());
}
@Override
public void onError(Throwable t) {
log.error("批量減扣庫存異常", t);
}
@Override
public void onCompleted() {
log.info("批量減扣庫存完成,共計(jì)[{}]件商品", totalCount);
responseObserver.onCompleted();
}
};
}
}
開發(fā)客戶端應(yīng)用
- 在父工程grpc-turtorials下面新建名為<font color="blue">double-stream-server-side</font>的模塊,其build.gradle內(nèi)容如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation project(':grpc-lib')
}
- 配置文件application.yml,設(shè)置自己的web端口號(hào)和服務(wù)端地址:
server:
port: 8082
spring:
application:
name: double-stream-client-side
grpc:
client:
# gRPC配置的名字,GrpcClient注解會(huì)用到
double-stream-server-side:
# gRPC服務(wù)端地址
address: 'static://127.0.0.1:9901'
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext
啟動(dòng)類DoubleStreamClientSideApplication.java的代碼就不貼了,普通的springboot啟動(dòng)類而已;
正常情況下我們都是用StreamObserver處理服務(wù)端響應(yīng),這里由于是異步響應(yīng),需要額外的方法從StreamObserver中取出業(yè)務(wù)數(shù)據(jù),于是定一個(gè)新接口,繼承自StreamObserver,新增<font color="blue">getExtra</font>方法可以返回String對(duì)象,詳細(xì)的用法稍后會(huì)看到:
package com.bolingcavalry.grpctutorials;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver<T> extends StreamObserver<T> {
String getExtra();
}
- 重頭戲來了,看看如何遠(yuǎn)程調(diào)用雙向流類型的gRPC接口,代碼中已經(jīng)添加詳細(xì)注釋:
package grpctutorials;
import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class GrpcClientService {
@GrpcClient("double-stream-server-side")
private StockServiceGrpc.StockServiceStub stockServiceStub;
/**
* 批量減庫存
* @param count
* @return
*/
public String batchDeduct(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted會(huì)在另一個(gè)線程中被執(zhí)行,
// ExtendResponseObserver繼承自StreamObserver
ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() {
// 用stringBuilder保存所有來自服務(wù)端的響應(yīng)
private StringBuilder stringBuilder = new StringBuilder();
@Override
public String getExtra() {
return stringBuilder.toString();
}
/**
* 客戶端的流式請(qǐng)求期間,每一筆請(qǐng)求都會(huì)收到服務(wù)端的一個(gè)響應(yīng),
* 對(duì)應(yīng)每個(gè)響應(yīng),這里的onNext方法都會(huì)被執(zhí)行一次,入?yún)⑹琼憫?yīng)內(nèi)容
* @param value
*/
@Override
public void onNext(DeductReply value) {
log.info("batch deduct on next");
// 放入匿名類的成員變量中
stringBuilder.append(String.format("返回碼[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));
}
@Override
public void onError(Throwable t) {
log.error("batch deduct gRPC request error", t);
stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
countDownLatch.countDown();
}
/**
* 服務(wù)端確認(rèn)響應(yīng)完成后,這里的onCompleted方法會(huì)被調(diào)用
*/
@Override
public void onCompleted() {
log.info("batch deduct on complete");
// 執(zhí)行了countDown方法后,前面執(zhí)行countDownLatch.await方法的線程就不再wait了,
// 會(huì)繼續(xù)往下執(zhí)行
countDownLatch.countDown();
}
};
// 遠(yuǎn)程調(diào)用,此時(shí)數(shù)據(jù)還沒有給到服務(wù)端
StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);
for(int i=0; i<count; i++) {
// 每次執(zhí)行onNext都會(huì)發(fā)送一筆數(shù)據(jù)到服務(wù)端,
// 服務(wù)端的onNext方法都會(huì)被執(zhí)行一次
requestObserver.onNext(build(101 + i, 1 + i));
}
// 客戶端告訴服務(wù)端:數(shù)據(jù)已經(jīng)發(fā)完了
requestObserver.onCompleted();
try {
// 開始等待,如果服務(wù)端處理完成,那么responseObserver的onCompleted方法會(huì)在另一個(gè)線程被執(zhí)行,
// 那里會(huì)執(zhí)行countDownLatch的countDown方法,一但countDown被執(zhí)行,下面的await就執(zhí)行完畢了,
// await的超時(shí)時(shí)間設(shè)置為2秒
countDownLatch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("countDownLatch await error", e);
}
log.info("service finish");
// 服務(wù)端返回的內(nèi)容被放置在requestObserver中,從getExtra方法可以取得
return responseObserver.getExtra();
}
/**
* 創(chuàng)建ProductOrder對(duì)象
* @param productId
* @param num
* @return
*/
private static ProductOrder build(int productId, int num) {
return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
}
}
- 最后做個(gè)web接口,可以通過web請(qǐng)求驗(yàn)證遠(yuǎn)程調(diào)用:
package grpctutorials;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GrpcClientController {
@Autowired
private GrpcClientService grpcClientService;
@RequestMapping("/")
public String printMessage(@RequestParam(defaultValue = "1") int count) {
return grpcClientService.batchDeduct(count);
}
}
- 編碼完成,開始驗(yàn)證;
驗(yàn)證
- 啟動(dòng)服務(wù)端DoubleStreamServerSideApplication:

在這里插入圖片描述
- 啟動(dòng)客戶端DoubleStreamClientSideApplication:

在這里插入圖片描述
- 這里要改:瀏覽器輸入<font color="blue">http://localhost:8083/?count=10</font>,響應(yīng)如下,可見遠(yuǎn)程調(diào)用gRPC服務(wù)成功,流式響應(yīng)的每一筆返回都被客戶端收到:

在這里插入圖片描述
- 下面是服務(wù)端日志,可見逐一處理了客戶端的每一筆數(shù)據(jù):

在這里插入圖片描述
- 下面是客戶端日志,可見由于CountDownLatch的作用,發(fā)起gRPC請(qǐng)求的線程一直等待responseObserver.onCompleted在另一個(gè)線程被執(zhí)行完后,才會(huì)繼續(xù)執(zhí)行:

在這里插入圖片描述
- 至此,四種類型的gRPC服務(wù)及其客戶端開發(fā)就完成了,一般的業(yè)務(wù)場(chǎng)景咱們都能應(yīng)付自如,接下來的文章咱們會(huì)繼續(xù)深入學(xué)習(xí),了解復(fù)雜場(chǎng)景下的gRPC操作;
你不孤單,欣宸原創(chuàng)一路相伴
歡迎關(guān)注公眾號(hào):程序員欣宸
微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos