gRPC 是在 HTTP/2 之上實(shí)現(xiàn)的 RPC 框架,HTTP/2 是第 7 層(應(yīng)用層)協(xié)議,它運(yùn)行在 TCP(第 4 層 - 傳輸層)協(xié)議之上,相比于傳統(tǒng)的 REST/JSON 機(jī)制grpc有諸多的優(yōu)點(diǎn):
- 基于 HTTP/2 之上的二進(jìn)制協(xié)議(Protobuf 序列化機(jī)制);
- 一個(gè)連接上可以多路復(fù)用(stream),并發(fā)處理多個(gè)請求和響應(yīng);
- 多種語言的類庫實(shí)現(xiàn);
- 服務(wù)定義文件和自動(dòng)代碼生成(.proto 文件和 Protobuf 編譯工具)。
一個(gè)完整的 RPC 調(diào)用流程示例如下:

gRPC 的 RPC 調(diào)用與上述流程相似,下面我們一起學(xué)習(xí)下 gRPC 的客戶端創(chuàng)建和服務(wù)調(diào)用流程。
demo
以 gRPC 入門級的 helloworld Demo 為例,客戶端發(fā)起 RPC 調(diào)用的代碼主要包括如下幾部分:
- 根據(jù) hostname 和 port 創(chuàng)建 ManagedChannelImpl;
- 根據(jù) helloworld.proto 文件生成的 GreeterGrpc 創(chuàng)建客戶端 Stub,用來發(fā)起 RPC 調(diào)用;
- 使用客戶端 Stub(GreeterBlockingStub)發(fā)起 RPC 調(diào)用,獲取響應(yīng)。
相關(guān)示例代碼如下所示(HelloWorldClient 類):
HelloWorldClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = GreeterGrpc.newBlockingStub(channel);
futureStub = GreeterGrpc.newFutureStub(channel);
stub = GreeterGrpc.newStub(channel);
}
public void blockingGreet(String name) {
logger.info("Will try to greet " + name + " ...");
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
try {
HelloReply response = blockingStub
.sayHello(request);
...
gRPC 的客戶端調(diào)用主要包括基于 Netty 的 HTTP/2 客戶端創(chuàng)建、客戶端負(fù)載均衡、請求消息的發(fā)送和響應(yīng)接收處理四個(gè)流程。
gRPC 的客戶端調(diào)用總體流程如下圖所示:

gRPC 的客戶端調(diào)用流程如下:
- 客戶端 Stub(GreeterBlockingStub) 調(diào)用 sayHello(request),發(fā)起 RPC 調(diào)用;
- 通過 DnsNameResolver 進(jìn)行域名解析,獲取服務(wù)端的地址信息(列表),隨后使用默認(rèn)的 LoadBalancer 策略,選擇一個(gè)具體的 gRPC 服務(wù)端實(shí)例;
- 如果與路由選中的服務(wù)端之間沒有可用的連接,則創(chuàng)建 NettyClientTransport 和 NettyClientHandler,發(fā)起 HTTP/2 連接;
- 對請求消息使用 PB(Protobuf)做序列化,通過 HTTP/2 Stream 發(fā)送給 gRPC 服務(wù)端;
- 接收到服務(wù)端響應(yīng)之后,使用 PB(Protobuf)做反序列化;
- 回調(diào) GrpcFuture 的 set(Response) 方法,喚醒阻塞的客戶端調(diào)用線程,獲取 RPC 響應(yīng)。
需要指出的是,客戶端同步阻塞 RPC 調(diào)用阻塞的是調(diào)用方線程(通常是業(yè)務(wù)線程),底層 Transport 的 I/O 線程(Netty 的 NioEventLoop)仍然是非阻塞的。
ManagedChannel 創(chuàng)建流程
ManagedChannel 是對 Transport 層 SocketChannel 的抽象,Transport 層負(fù)責(zé)協(xié)議消息的序列化和反序列化,以及協(xié)議消息的發(fā)送和讀取。
ManagedChannel 將處理后的請求和響應(yīng)傳遞給與之相關(guān)聯(lián)的 ClientCall 進(jìn)行上層處理,同時(shí),ManagedChannel 提供了對 Channel 的生命周期管理(鏈路創(chuàng)建、空閑、關(guān)閉等)。
ManagedChannel 提供了接口式的切面 ClientInterceptor,它可以攔截 RPC 客戶端調(diào)用,注入擴(kuò)展點(diǎn),以及功能定制,方便框架的使用者對 gRPC 進(jìn)行功能擴(kuò)展。
ManagedChannel 的主要實(shí)現(xiàn)類 ManagedChannelImpl 創(chuàng)建流程如下:

- 使用 builder 模式創(chuàng)建 ManagedChannelBuilder 實(shí)現(xiàn)類 NettyChannelBuilder,NettyChannelBuilder 提供了 buildTransportFactory 工廠方法創(chuàng)建 NettyTransportFactory,最終用于創(chuàng)建 NettyClientTransport;
- 初始化 HTTP/2 連接方式:采用 plaintext 協(xié)商模式還是默認(rèn)的 TLS 模式,HTTP/2 的連接有兩種模式,h2(基于 TLS 之上構(gòu)建的 HTTP/2)和 h2c(直接在 TCP 之上構(gòu)建的 HTTP/2);
- 創(chuàng)建 NameResolver.Factory 工廠類,用于服務(wù)端 URI 的解析,gRPC 默認(rèn)采用 DNS 域名解析方式。
ManagedChannel 實(shí)例構(gòu)造完成之后,即可創(chuàng)建 ClientCall,發(fā)起 RPC 調(diào)用。
ClientCall 創(chuàng)建流程
完成 ManagedChannelImpl 創(chuàng)建之后,由 ManagedChannelImpl 發(fā)起創(chuàng)建一個(gè)新的 ClientCall 實(shí)例。ClientCall 的用途是業(yè)務(wù)應(yīng)用層的消息調(diào)度和處理,它的典型用法如下:
call = channel.newCall(unaryMethod, callOptions);
call.start(listener, headers);
call.sendMessage(message);
call.halfClose();
call.request(1);
ClientCall 實(shí)例的創(chuàng)建流程如下所示:

ClientCallImpl 的主要構(gòu)造參數(shù)是 MethodDescriptor 和 CallOptions,其中 MethodDescriptor 存放了需要調(diào)用 RPC 服務(wù)的接口名、方法名、服務(wù)調(diào)用的方式(例如 UNARY 類型)以及請求和響應(yīng)的序列化和反序列化實(shí)現(xiàn)類。
CallOptions 則存放了 RPC 調(diào)用的其它附加信息,例如超時(shí)時(shí)間、鑒權(quán)信息、消息長度限制和執(zhí)行客戶端調(diào)用的線程池等。設(shè)置壓縮和解壓縮的注冊類(CompressorRegistry 和 DecompressorRegistry),以便可以按照指定的壓縮算法對 HTTP/2 消息做壓縮和解壓縮。
ClientCallImpl 實(shí)例創(chuàng)建完成之后,就可以調(diào)用 ClientTransport,創(chuàng)建 HTTP/2 Client,向 gRPC 服務(wù)端發(fā)起遠(yuǎn)程服務(wù)調(diào)用。
基于 Netty 的 HTTP/2 Client 創(chuàng)建流程
gRPC 客戶端底層基于 Netty4.1 的 HTTP/2 協(xié)議??蚣軜?gòu)建,以便可以使用 HTTP/2 協(xié)議來承載 RPC 消息,在滿足標(biāo)準(zhǔn)化規(guī)范的前提下,提升通信性能。
gRPC HTTP/2 協(xié)議棧(客戶端)的關(guān)鍵實(shí)現(xiàn)是 NettyClientTransport 和 NettyClientHandler,客戶端初始化流程如下所示:

- NettyClientHandler 的創(chuàng)建:級聯(lián)創(chuàng)建 Netty 的 Http2FrameReader、Http2FrameWriter 和 Http2Connection,用于構(gòu)建基于 Netty 的 gRPC HTTP/2 客戶端協(xié)議棧。
- HTTP/2 Client 啟動(dòng):仍然基于 Netty 的 Bootstrap 來初始化并啟動(dòng)客戶端,但是有兩個(gè)細(xì)節(jié)需要注意:
NettyClientHandler(實(shí)際被包裝成 ProtocolNegotiator.Handler,用于 HTTP/2 的握手協(xié)商)創(chuàng)建之后,不是由傳統(tǒng)的 ChannelInitializer 在初始化 Channel 時(shí)將 NettyClientHandler 加入到 pipeline 中,而是直接通過 Bootstrap 的 handler 方法直接加入到 pipeline 中,以便可以立即接收發(fā)送任務(wù)。
客戶端使用的 work 線程組并非通常意義的 EventLoopGroup,而是一個(gè) EventLoop:即 HTTP/2 客戶端使用的 work 線程并非一組線程(默認(rèn)線程數(shù)為 CPU 內(nèi)核 * 2),而是一個(gè) EventLoop 線程。這個(gè)其實(shí)也很容易理解,一個(gè) NioEventLoop 線程可以同時(shí)處理多個(gè) HTTP/2 客戶端連接,它是多路復(fù)用的,對于單個(gè) HTTP/2 客戶端,如果默認(rèn)獨(dú)占一個(gè) work 線程組,將造成極大的資源浪費(fèi),同時(shí)也可能會(huì)導(dǎo)致句柄溢出(并發(fā)啟動(dòng)大量 HTTP/2 客戶端)。 - WriteQueue 創(chuàng)建:Netty 的 NioSocketChannel 初始化并向 Selector 注冊之后(發(fā)起 HTTP 連接之前),立即由 NettyClientHandler 創(chuàng)建 WriteQueue,用于接收并處理 gRPC 內(nèi)部的各種 Command,例如鏈路關(guān)閉指令、發(fā)送 Frame 指令、發(fā)送 Ping 指令等。
HTTP/2 Client 創(chuàng)建完成之后,即可由客戶端根據(jù)協(xié)商策略發(fā)起 HTTP/2 連接。如果連接創(chuàng)建成功,后續(xù)即可復(fù)用該 HTTP/2 連接,進(jìn)行 RPC 調(diào)用。
HTTP/2 連接創(chuàng)建流程
HTTP/2 在 TCP 連接之初通過協(xié)商的方式進(jìn)行通信,只有協(xié)商成功,才能進(jìn)行后續(xù)的業(yè)務(wù)層數(shù)據(jù)發(fā)送和接收。
HTTP/2 的版本標(biāo)識分為兩類:
- 基于 TLS 之上構(gòu)架的 HTTP/2, 即 HTTPS,使用 h2 表示(ALPN):0x68 與 0x32;
- 直接在 TCP 之上構(gòu)建的 HTTP/2, 即 HTTP,使用 h2c 表示。
HTTP/2 連接創(chuàng)建,分為兩種:通過協(xié)商升級協(xié)議方式和直接連接方式。
假如不知道服務(wù)端是否支持 HTTP/2,可以先使用 HTTP/1.1 進(jìn)行協(xié)商,客戶端發(fā)送協(xié)商請求消息(只含消息頭),報(bào)文示例如下:
GET / HTTP/1.1
Host: 127.0.0.1
Connection: Upgrade, HTTP2-Settings
Upgrade: h2c
HTTP2-Settings: <base64url encoding of HTTP/2 SETTINGS payload>
服務(wù)端接收到協(xié)商請求之后,如果不支持 HTTP/2,則直接按照 HTTP/1.1 響應(yīng)返回,雙方通過 HTTP/1.1 進(jìn)行通信,報(bào)文示例如下:
HTTP/1.1 200 OK
Content-Length: 28
Content-Type: text/css
body...
如果服務(wù)端支持 HTTP/2, 則協(xié)商成功,返回 101 結(jié)果碼,通知客戶端一起升級到 HTTP/2 進(jìn)行通信,示例報(bào)文如下:
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Upgrade: h2c
[ HTTP/2 connection...
101 響應(yīng)之后,服務(wù)需要發(fā)送 SETTINGS 幀作為連接序言,客戶端接收到 101 響應(yīng)之后,也必須發(fā)送一個(gè)序言作為回應(yīng),示例如下:
PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n
SETTINGS 幀
客戶端序言發(fā)送完成之后,可以不需要等待服務(wù)端的 SETTINGS 幀,而直接發(fā)送業(yè)務(wù)請求 Frame。
假如客戶端和服務(wù)端已經(jīng)約定使用 HTTP/2, 則可以免去 101 協(xié)商和切換流程,直接發(fā)起 HTTP/2 連接,具體流程如下所示:

幾個(gè)關(guān)鍵點(diǎn):
- 如果已經(jīng)明確知道服務(wù)端支持 HTTP/2,則可免去通過 HTTP/1.1 101 協(xié)議切換方式進(jìn)行升級,TCP 連接建立之后即可發(fā)送序言,否則只能在接收到服務(wù)端 101 響應(yīng)之后發(fā)送序言;
- 針對一個(gè)連接,服務(wù)端第一個(gè)要發(fā)送的幀必須是 SETTINGS 幀,連接序言所包含的 SETTINGS 幀可以為空;
- 客戶端可以在發(fā)送完序言之后發(fā)送應(yīng)用幀數(shù)據(jù),不用等待來自服務(wù)器端的序言 SETTINGS 幀。
gRPC 支持三種 Protocol Negotiator 策略:
- PlaintextNegotiator:明確服務(wù)端支持 HTTP/2,采用 HTTP 直接連接的方式與服務(wù)端建立 HTTP/2 連接,省去 101 協(xié)議切換過程;
- PlaintextUpgradeNegotiator:不清楚服務(wù)端是否支持 HTTP/2,采用 HTTP/1.1 協(xié)商模式切換升級到 HTTP/2
- TlsNegotiator:在 TLS 之上構(gòu)建 HTTP/2,協(xié)商采用 ALPN 擴(kuò)展協(xié)議,以 “h2” 作為協(xié)議標(biāo)識符。
下面我們以 PlaintextNegotiator 為例,了解下基于 Netty 的 HTTP/2 連接創(chuàng)建流程:

負(fù)載均衡策略
總體上看,RPC 的負(fù)載均衡策略有兩大類:
- 服務(wù)端負(fù)載均衡(例如代理模式、外部負(fù)載均衡服務(wù))
- 客戶端負(fù)載均衡(內(nèi)置負(fù)載均衡策略和算法,客戶端實(shí)現(xiàn))
外部負(fù)載均衡模式如下所示:

以代理 LB 模式為例:RPC 客戶端向負(fù)載均衡代理發(fā)送請求,負(fù)載均衡代理按照指定的路由策略,將請求消息轉(zhuǎn)發(fā)到后端可用的服務(wù)實(shí)例上。負(fù)載均衡代理負(fù)責(zé)維護(hù)后端可用的服務(wù)列表,如果發(fā)現(xiàn)某個(gè)服務(wù)不可用,則將其剔除出路由表。
代理 LB 模式的優(yōu)點(diǎn)是客戶端不需要實(shí)現(xiàn)負(fù)載均衡策略算法,也不需要維護(hù)后端的服務(wù)列表信息,不直接跟后端的服務(wù)進(jìn)行通信,在做網(wǎng)絡(luò)安全邊界隔離時(shí),非常實(shí)用。例如通過 Nginx 做 L7 層負(fù)載均衡,將互聯(lián)網(wǎng)前端的流量安全的接入到后端服務(wù)中。
代理 LB 模式通常支持 L4(Transport)和 L7(Application) 層負(fù)載均衡,兩者各有優(yōu)缺點(diǎn),可以根據(jù) RPC 的協(xié)議特點(diǎn)靈活選擇。L4/L7 層負(fù)載均衡對應(yīng)場景如下:
- L4 層:對時(shí)延要求苛刻、資源損耗少、RPC 本身采用私有 TCP 協(xié)議;
- L7 層:有會(huì)話狀態(tài)的連接、HTTP 協(xié)議簇(例如 Restful)。
客戶端負(fù)載均衡策略由客戶端內(nèi)置負(fù)載均衡能力,通過靜態(tài)配置、域名解析服務(wù)(例如 DNS 服務(wù))、訂閱發(fā)布(例如 Zookeeper 服務(wù)注冊中心)等方式獲取 RPC 服務(wù)端地址列表,并將地址列表緩存到客戶端內(nèi)存中。
每次 RPC 調(diào)用時(shí),根據(jù)客戶端配置的負(fù)載均衡策略由負(fù)載均衡算法從緩存的服務(wù)地址列表中選擇一個(gè)服務(wù)實(shí)例,發(fā)起 RPC 調(diào)用。
客戶端負(fù)載均衡策略工作原理示例如下:

gRPC 默認(rèn)采用客戶端負(fù)載均衡策略,同時(shí)提供了擴(kuò)展機(jī)制,使用者通過自定義實(shí)現(xiàn) NameResolver 和 LoadBalancer,即可覆蓋 gRPC 默認(rèn)的負(fù)載均衡策略,實(shí)現(xiàn)自定義路由策略的擴(kuò)展。
gRPC 提供的負(fù)載均衡策略實(shí)現(xiàn)類如下所示:
- PickFirstBalancer:無負(fù)載均衡能力,即使有多個(gè)服務(wù)端地址可用,也只選擇第一個(gè)地址;
- RoundRobinLoadBalancer:“RoundRobin” 負(fù)載均衡策略。
gRPC 負(fù)載均衡流程如下所示:

流程關(guān)鍵技術(shù)點(diǎn)解讀:
- 負(fù)載均衡功能模塊的輸入是客戶端指定的 hostName、需要調(diào)用的接口名和方法名等參數(shù),輸出是執(zhí)行負(fù)載均衡算法后獲得的 NettyClientTransport,通過 NettyClientTransport 可以創(chuàng)建基于 Netty HTTP/2 的 gRPC 客戶端,發(fā)起 RPC 調(diào)用;
- gRPC 系統(tǒng)默認(rèn)提供的是 DnsNameResolver,它通過 InetAddress.getAllByName(host) 獲取指定 host 的 IP 地址列表(本地 DNS 服務(wù)),對于擴(kuò)展者而言,可以繼承 NameResolver 實(shí)現(xiàn)自定義的地址解析服務(wù),例如使用 Zookeeper 替換 DnsNameResolver,把 Zookeeper 作為動(dòng)態(tài)的服務(wù)地址配置中心,它的偽代碼示例如下:
第一步:繼承 NameResolver,實(shí)現(xiàn) start(Listener listener) 方法:
void start(Listener listener)
{
// 獲取 ZooKeeper 地址,并連接
// 創(chuàng)建 Watcher,并實(shí)現(xiàn) process(WatchedEvent event),監(jiān)聽地址變更
// 根據(jù)接口名和方法名,調(diào)用 getChildren 方法,獲取發(fā)布該服務(wù)的地址列表
// 將地址列表加到 List 中
// 調(diào)用 NameResolver.Listener.onAddresses(), 通知地址解析完成
第二步:創(chuàng)建 ManagedChannelBuilder 時(shí),指定 Target 的地址為 Zookeeper 服務(wù)端地址,同時(shí)設(shè)置 nameResolver 為 Zookeeper NameResolver, 示例代碼如下所示:
this(ManagedChannelBuilder.forTarget(zookeeperAddr)
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.nameResolverFactory(new ZookeeperNameResolverProvider())
.usePlaintext(false));
- LoadBalancer 負(fù)責(zé)從 nameResolver 中解析獲得的服務(wù)端 URL 中按照指定路由策略,選擇一個(gè)目標(biāo)服務(wù)端地址,并創(chuàng)建 ClientTransport。同樣,可以通過覆蓋 handleResolvedAddressGroups 實(shí)現(xiàn)自定義負(fù)載均衡策略。
通過 LoadBalancer + NameResolver,可以實(shí)現(xiàn)靈活的負(fù)載均衡策略擴(kuò)展。例如基于 Zookeeper、etcd 的分布式配置服務(wù)中心方案。
RPC 請求消息發(fā)送流程
gRPC 默認(rèn)基于 Netty HTTP/2 + PB 進(jìn)行 RPC 調(diào)用,請求消息發(fā)送流程如下所示:

- ClientCallImpl 的 sendMessage 調(diào)用,主要完成了請求對象的序列化(基于 PB)、HTTP/2 Frame 的初始化;
- ClientCallImpl 的 halfClose 調(diào)用將客戶端準(zhǔn)備就緒的請求 Frame 封裝成自定義的 SendGrpcFrameCommand,寫入到 WriteQueue 中;
- WriteQueue 執(zhí)行 flush() 將 SendGrpcFrameCommand 寫入到 Netty 的 Channel 中,調(diào)用 Channel 的 write 方法,被 NettyClientHandler 攔截到,由 NettyClientHandler 負(fù)責(zé)具體的發(fā)送操作;
- NettyClientHandler 調(diào)用 Http2ConnectionEncoder 的 writeData 方法,將 Frame 寫入到 HTTP/2 Stream 中,完成請求消息的發(fā)送。
RPC 響應(yīng)接收和處理流程
gRPC 客戶端響應(yīng)消息的接收入口是 NettyClientHandler,它的處理流程如下所示:

流程關(guān)鍵技術(shù)點(diǎn)解讀:
- NettyClientHandler 的 onHeadersRead(int streamId, Http2Headers headers, boolean endStream) 方法會(huì)被調(diào)用兩次,根據(jù) endStream 判斷是否是 Stream 結(jié)尾;
- 請求和響應(yīng)的關(guān)聯(lián):根據(jù) streamId 可以關(guān)聯(lián)同一個(gè) HTTP/2 Stream,將 NettyClientStream 緩存到 Stream 中,客戶端就可以在接收到響應(yīng)消息頭或消息體時(shí)還原出 NettyClientStream,進(jìn)行后續(xù)處理;
- RPC 客戶端調(diào)用線程的阻塞和喚醒使用到了 GrpcFuture 的 wait 和 notify 機(jī)制,來實(shí)現(xiàn)客戶端調(diào)用線程的同步阻塞和喚醒;
- 客戶端和服務(wù)端的 HTTP/2 Header 和 Data Frame 解析共用同一個(gè)方法,即 MessageDeframer 的 deliver()。
客戶端源碼分析
gRPC 客戶端調(diào)用原理并不復(fù)雜,但是代碼卻相對比較繁雜。下面圍繞關(guān)鍵的類庫,對主要功能點(diǎn)進(jìn)行源碼分析。
NettyClientTransport 功能和源碼分析
NettyClientTransport 的主要功能如下:
- 通過 start(Listener transportListener) 創(chuàng)建 HTTP/2 Client,并連接 gRPC 服務(wù)端;
- 通過 newStream(MethodDescriptor method, Metadata headers, CallOptions callOptions) 創(chuàng)建 ClientStream;
- 通過 shutdown() 關(guān)閉底層的 HTTP/2 連接。
以啟動(dòng) HTTP/2 客戶端為例進(jìn)行講解(NettyClientTransport 類):
EventLoop eventLoop = group.next();
if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
keepAliveManager = new KeepAliveManager(
new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
keepAliveWithoutCalls);
}
handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
HandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);
根據(jù)啟動(dòng)時(shí)配置的 HTTP/2 協(xié)商策略,以 NettyClientHandler 為參數(shù)創(chuàng)建 ProtocolNegotiator.Handler。
創(chuàng)建 Bootstrap,并設(shè)置 EventLoopGroup,需要指出的是,此處并沒有使用 EventLoopGroup,而是它的一種實(shí)現(xiàn)類 EventLoop,原因在前文中已經(jīng)說明,相關(guān)代碼示例如下(NettyClientTransport 類):
Bootstrap b = new Bootstrap();
b.group(eventLoop);
b.channel(channelType);
if (NioSocketChannel.class.isAssignableFrom(channelType)) {
b.option(SO_KEEPALIVE, true);
}
創(chuàng)建 WriteQueue 并設(shè)置到 NettyClientHandler 中,用于接收內(nèi)部的各種 QueuedCommand,初始化完成之后,發(fā)起 HTTP/2 連接,代碼如下(NettyClientTransport 類):
handler.startWriteQueue(channel);
channel.connect(address).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ChannelHandlerContext ctx = future.channel().pipeline().context(handler);
if (ctx != null) {
ctx.fireExceptionCaught(future.cause());
}
future.channel().pipeline().fireExceptionCaught(future.cause());
}
NettyClientHandler 功能和源碼分析
NettyClientHandler 繼承自 Netty 的 Http2ConnectionHandler,是 gRPC 接收和發(fā)送 HTTP/2 消息的關(guān)鍵實(shí)現(xiàn)類,也是 gRPC 和 Netty 的交互橋梁,它的主要功能如下所示:
- 發(fā)送各種協(xié)議消息給 gRPC 服務(wù)端;
- 接收 gRPC 服務(wù)端返回的應(yīng)答消息頭、消息體和其它協(xié)議消息;
- 處理 HTTP/2 協(xié)議相關(guān)的指令,例如 StreamError、ConnectionError 等。
協(xié)議消息的發(fā)送:無論是業(yè)務(wù)請求消息,還是協(xié)議指令消息,都統(tǒng)一封裝成 QueuedCommand,由 NettyClientHandler 攔截并處理,相關(guān)代碼如下所示(NettyClientHandler 類):
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof CreateStreamCommand) {
createStream((CreateStreamCommand) msg, promise);
} else if (msg instanceof SendGrpcFrameCommand) {
sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
} else if (msg instanceof CancelClientStreamCommand) {
cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
} else if (msg instanceof SendPingCommand) {
sendPingFrame(ctx, (SendPingCommand) msg, promise);
} else if (msg instanceof GracefulCloseCommand) {
gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
} else if (msg instanceof ForcefulCloseCommand) {
forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
} else if (msg == NOOP_MESSAGE) {
ctx.write(Unpooled.EMPTY_BUFFER, promise);
} else {
throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
}
協(xié)議消息的接收:NettyClientHandler 通過向 Http2ConnectionDecoder 注冊 FrameListener 來監(jiān)聽 RPC 響應(yīng)消息和協(xié)議指令消息,相關(guān)接口如下:

FrameListener 回調(diào) NettyClientHandler 的相關(guān)方法,實(shí)現(xiàn)協(xié)議消息的接收和處理:

需要指出的是,NettyClientHandler 并沒有實(shí)現(xiàn)所有的回調(diào)接口,對于需要特殊處理的幾個(gè)方法進(jìn)行了重載,例如 onDataRead 和 onHeadersRead。
ProtocolNegotiator 功能和源碼分析
ProtocolNegotiator 用于 HTTP/2 連接創(chuàng)建的協(xié)商,gRPC 支持三種策略并有三個(gè)實(shí)現(xiàn)子類:

gRPC 的 ProtocolNegotiator 實(shí)現(xiàn)類完全遵循 HTTP/2 相關(guān)規(guī)范,以 PlaintextUpgradeNegotiator 為例,通過設(shè)置 Http2ClientUpgradeCodec,用于 101 協(xié)商和協(xié)議升級,相關(guān)代碼如下所示(PlaintextUpgradeNegotiator 類):
public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler);
HttpClientCodec httpClientCodec = new HttpClientCodec();
final HttpClientUpgradeHandler upgrader =
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
return new BufferingHttp2UpgradeHandler(upgrader);
}
LoadBalancer 功能和源碼分析
LoadBalancer 負(fù)責(zé)客戶端負(fù)載均衡,它是個(gè)抽象類,gRPC 框架的使用者可以通過繼承的方式進(jìn)行擴(kuò)展。
gRPC 當(dāng)前已經(jīng)支持 PickFirstBalancer 和 RoundRobinLoadBalancer 兩種負(fù)載均衡策略,未來不排除會(huì)提供更多的策略。
以 RoundRobinLoadBalancer 為例,它的工作原理如下:根據(jù) PickSubchannelArgs 來選擇一個(gè) Subchannel(RoundRobinLoadBalancerFactory 類):
public PickResult pickSubchannel(PickSubchannelArgs args) {
if (size > 0) {
return PickResult.withSubchannel(nextSubchannel());
}
if (status != null) {
return PickResult.withError(status);
}
return PickResult.withNoResult();
}
再看下 Subchannel 的選擇算法(Picker 類):
private Subchannel nextSubchannel() {
if (size == 0) {
throw new NoSuchElementException();
}
synchronized (this) {
Subchannel val = list.get(index);
index++;
if (index >= size) {
index = 0;
}
return val;
}
}
即通過順序的方式從服務(wù)端列表中獲取一個(gè) Subchannel。
如果用戶需要定制負(fù)載均衡策略,則可以在 RPC 調(diào)用時(shí),使用如下代碼(HelloWorldClient 類):
this(ManagedChannelBuilder.forAddress(host, port).loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()).nameResolverFactory(new ZkNameResolverProvider()) .usePlaintext(true));
ClientCalls 功能和源碼分析
ClientCalls 提供了各種 RPC 調(diào)用方式,包括同步、異步、Streaming 和 Unary 方式等,相關(guān)方法如下所示:

下面一起看下 RPC 請求消息的發(fā)送和應(yīng)答接收相關(guān)代碼。
RPC 請求調(diào)用源碼分析
請求調(diào)用主要有兩步:請求 Frame 構(gòu)造和 Frame 發(fā)送,請求 Frame 構(gòu)造代碼如下所示(ClientCallImpl 類):
public void sendMessage(ReqT message) {
Preconditions.checkState(stream != null, "Not started");
Preconditions.checkState(!cancelCalled, "call was cancelled");
Preconditions.checkState(!halfCloseCalled, "call was half-closed");
try {
InputStream messageIs = method.streamRequest(message);
stream.writeMessage(messageIs);
...
使用 PB 對請求消息做序列化,生成 InputStream,構(gòu)造請求 Frame:
private int writeUncompressed(InputStream message, int messageLength) throws IOException {
if (messageLength != -1) {
statsTraceCtx.outboundWireSize(messageLength);
return writeKnownLengthUncompressed(message, messageLength);
}
BufferChainOutputStream bufferChain = new BufferChainOutputStream();
int written = writeToOutputStream(message, bufferChain);
if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
throw Status.INTERNAL
.withDescription(
String.format("message too large %d > %d", written , maxOutboundMessageSize))
.asRuntimeException();
}
writeBufferChain(bufferChain, false);
return written;
}
Frame 發(fā)送代碼如下所示:
public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
if (numBytes > 0) {
onSendingBytes(numBytes);
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream),
channel.newPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
transportState().onSentBytes(numBytes);
}
}
}), flush);
NettyClientHandler 接收到發(fā)送事件之后,調(diào)用 Http2ConnectionEncoder 將 Frame 寫入 Netty HTTP/2 協(xié)議棧(NettyClientHandler 類):
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
ChannelPromise promise) {
encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
}
RPC 響應(yīng)接收和處理源碼分析
響應(yīng)消息的接收入口是 NettyClientHandler,包括 HTTP/2 Header 和 HTTP/2 DATA Frame 兩部分,代碼如下(NettyClientHandler 類):
private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
stream.transportHeadersReceived(headers, endStream);
if (keepAliveManager != null) {
keepAliveManager.onDataReceived();
}
}
如果參數(shù) endStream 為 True,說明 Stream 已經(jīng)結(jié)束,調(diào)用 transportTrailersReceived,通知 Listener close,代碼如下所示(AbstractClientStream2 類):
if (stopDelivery || isDeframerStalled()) {
deliveryStalledTask = null;
closeListener(status, trailers);
} else {
deliveryStalledTask = new Runnable() {
@Override
public void run() {
closeListener(status, trailers);
}
};
}
讀取到 HTTP/2 DATA Frame 之后,調(diào)用 MessageDeframer 的 deliver 對 Frame 進(jìn)行解析,代碼如下(MessageDeframer 類):
private void deliver() {
if (inDelivery) {
return;
}
inDelivery = true;
try {
while (pendingDeliveries > 0 && readRequiredBytes()) {
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
processBody();
...
將 Frame 轉(zhuǎn)換成 InputStream 之后,通知 ClientStreamListenerImpl,調(diào)用 messageRead(final InputStream message),將 InputStream 反序列化為響應(yīng)對象,相關(guān)代碼如下所示(ClientStreamListenerImpl 類):
public void messageRead(final InputStream message) {
class MessageRead extends ContextRunnable {
MessageRead() {
super(context);
}
@Override
public final void runInContext() {
try {
if (closed) {
return;
}
try {
observer.onMessage(method.parseResponse(message));
} finally {
message.close();
}
當(dāng)接收到 endOfStream 之后,通知 ClientStreamListenerImpl,調(diào)用它的 close 方法,如下所示(ClientStreamListenerImpl 類):
private void close(Status status, Metadata trailers) {
closed = true;
cancelListenersShouldBeRemoved = true;
try {
closeObserver(observer, status, trailers);
} finally {
removeContextListenerAndCancelDeadlineFuture();
}
}
最終調(diào)用 UnaryStreamToFuture 的 onClose 方法,set 響應(yīng)對象,喚醒阻塞的調(diào)用方線程,完成 RPC 調(diào)用,代碼如下(UnaryStreamToFuture 類):
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
if (value == null) {
responseFuture.setException(
Status.INTERNAL.withDescription("No value received for unary call")
.asRuntimeException(trailers));
}
responseFuture.set(value);
} else {
responseFuture.setException(status.asRuntimeException(trailers));
}