RPC框架的選擇
常見的RPC框架主要分為輕重兩種。較輕的框架一般只負(fù)責(zé)通信,如rmi、webservice、restful、Thrift、gRPC等。較重的框架一般包括完整的服務(wù)發(fā)現(xiàn)、負(fù)載均衡策略等等如BAT三家的Dubbo、brpc、Tars之類。
框架選擇時個人認(rèn)為首先要考慮的是框架的歷史和項目的活躍程度。一個歷史悠久的活躍項目(大概至少可以保證每兩到三個月有一次小版本的更新)可以保證各種bug早已暴露并修復(fù),讓我們可以更專注于我們自己的項目本身,而不是要擔(dān)心究竟是我們自己的代碼有問題還是框架本身就有問題。
重量級RPC框架有一個主要問題就是結(jié)構(gòu)復(fù)雜,另外主語言之外的代碼質(zhì)量也不太容易保證。個人認(rèn)為活躍的社區(qū)以及一個活躍的開源管理團隊是這些重型RPC框架項目成功的必要前提條件。比如我們項目組試用過騰訊的Tars,C++同學(xué)表示沒有任何問題,然后JAVA同學(xué)表示java版本有許多bug,修復(fù)bug的pull request需要兩個多月才能得到merge,而官方j(luò)ar包也將近兩年沒有更新過了。
輕量級rpc框架中,restful可以被視作標(biāo)桿。由于restful基于http協(xié)議,天然被各種框架支持,而且非常靈活。restful的缺點有兩方面,一是過于靈活,缺少根據(jù)協(xié)議生成服務(wù)端和客戶端代碼的工具,聯(lián)調(diào)往往要花更多的時間;二是大部分序列化基于json或者xml,相對來講效率不理想。和restful相比,其它很多輕量級框架都有這樣或者那樣的缺點,有的缺少跨語言支持(rmi),有的既繁瑣又缺乏效率優(yōu)勢(webservice)。個人認(rèn)為其中相對理想的是gRPC和Thrift。
gRPC簡介
Protobuf是一種google推出的非常流行的跨語言序列化/反序列化框架。在Protobuf2中就已經(jīng)出現(xiàn)了用rpc定義服務(wù)的概念,但是一直缺少一種流行的rpc框架支持。當(dāng)Http2推出之后,google將Http2和protobuf3結(jié)合,推出了gRPC。gRPC繼承了Protobuf和Http2的優(yōu)點,包括:
- 序列化反序列化性能好
- 強類型支持
- 向前/向后兼容
- 有代碼生成機制,而且可以支持多語言
- 長連接、多路復(fù)用
同時gRPC還提供了簡單地服務(wù)發(fā)現(xiàn)和負(fù)載均衡功能。雖然這并不是gRPC框架的重點,但是開發(fā)者可以非常容易的自己擴展gRPC這些功能,實現(xiàn)自己的策略或應(yīng)用最新的相關(guān)方面技術(shù),而不用像重型RPC框架一樣受制于框架本身是否支持。
gRPC與Thrift對比
Thrift是Facebook推出的一種RPC框架,從性能上來講遠(yuǎn)優(yōu)于gRPC。但是在實際調(diào)研時發(fā)現(xiàn)有一個很麻煩的問題:Thrift的客戶端是線程不安全的——這意味著在Spring中無法以單例形式注入到Bean中。解決方案有三種:
- 每次調(diào)用創(chuàng)建一個Thrift客戶端。這不僅意味著額外的對象創(chuàng)建和垃圾回收開銷,而且實際上相當(dāng)于只使用了短鏈接,這是一個開發(fā)復(fù)雜度最低但是從性能上來講最差的解決方案。
- 利用Pool,稍微復(fù)雜一點的解決方案,但是也非常成熟。但是問題在于一來缺少服務(wù)發(fā)現(xiàn)和負(fù)載均衡恐實現(xiàn),需要很多額外開發(fā);二來需要創(chuàng)建Pool數(shù)量*服務(wù)端數(shù)量個客戶端,內(nèi)存開銷會比較大。
- 使用異步框架如Netty,可以成功避免創(chuàng)建過多的客戶端,但是仍要自己實現(xiàn)服務(wù)發(fā)現(xiàn)和負(fù)載均衡,相對復(fù)雜。實際上Facebook有一個基于Netty的Thrift客戶端,叫Nifty,但是快四年沒更新了。。。
相比較而言gRPC就友好多了,本身有簡單而且可擴展的服務(wù)發(fā)現(xiàn)和負(fù)載均衡功能,底層基于Netty所以線程安全,在不需要極限壓榨性能的情況下是非常好的選擇。當(dāng)然如果需要極限壓榨性能Thrift也未必夠看。
gRPC入門
gRPC服務(wù)定義
gRPC中有一個特殊的關(guān)鍵字stream,表示可以以流式輸入或輸出多個protobuf對象。注意只有異步非阻塞的客戶端支持以stream形式輸入,同步阻塞客戶端不支持以stream形式輸入。
syntax = "proto3"; //gRPC必須使用proto3
option java_multiple_files = true;
option java_package = "cn.lmh.examples.grpc.proto";
service RouteGuide {
// 輸入一個坐標(biāo),返回坐標(biāo)和時間(1:1)
rpc getPoint(Point) returns (LocationNote) {}
// 輸入一個矩形,以stream形式返回一系列點(1:n)
rpc listPoints(Rectangle) returns (stream Point) {}
// 以stream形式輸入一系列點,返回點的數(shù)量和總共花費的時間(m:1)
rpc recordRoute(stream Point) returns (RouteSummary) {}
// 以stream形式輸入一系列點,以stream形式返回已輸入點的數(shù)量和總共花費的時間(m:n)
rpc getPointStream(stream Point) returns (stream RouteSummary) {}
}
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
message Rectangle {
Point lo = 1;
Point hi = 2;
}
message LocationNote {
Point location = 1;
int64 timestamp = 2;
}
message RouteSummary {
int32 point_count = 1;
int64 elapsed_time = 2;
}
依賴和代碼生成
由于protoc的gRPC插件需要自己編譯,而且存在環(huán)境問題。推薦使用gradle或者maven的protobuf插件。入門示例項目使用了gradle,根目錄build.gradle配置如下:
plugins {
id 'java'
id 'idea'
id 'wrapper'
}
ext {
groupId = 'cn.lmh.leviathan'
proto = [
version : "3.9.0",
"grpc" :[
version : "1.23.0"
]
]
}
allprojects{
apply plugin: 'java'
apply plugin: 'idea'
sourceCompatibility=JavaVersion.VERSION_1_8
targetCompatibility=JavaVersion.VERSION_1_8
project.group = 'cn.lmh.examples'
compileJava.options.encoding = 'UTF-8'
}
subprojects{
repositories {
mavenCentral()
mavenLocal();
};
configurations {
compile
}
dependencies {
compile "io.grpc:grpc-netty-shaded:${proto.grpc.version}"
compile "io.grpc:grpc-protobuf:${proto.grpc.version}"
compile "io.grpc:grpc-stub:${proto.grpc.version}"
testCompile group: 'junit', name: 'junit', version: '4.12'
}
}
子項目build.gradle如下:
plugins{
id 'com.google.protobuf' version '0.8.10' //引入protobuf插件
}
sourceSets{
main{
proto {
srcDir 'src/main/proto' //指定.proto文件所在的位置
}
}
}
protobuf {
generatedFilesBaseDir = "$projectDir/src" //生成文件的根目錄
protoc {
artifact = "com.google.protobuf:protoc:${proto.version}" //protoc的版本
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${proto.grpc.version}" //gRPC的版本
}
}
generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = "java" //grpc生成文件的子目錄
}
}
}
}
我們的入門子項目名稱叫做starter,配置好build.gradle之后,執(zhí)行g(shù)radlew :starter:generateProto就可以在src/main/java下生成對應(yīng)的文件:

服務(wù)端
無論客戶端以異步非阻塞還是同步阻塞形式調(diào)用,gRPC服務(wù)端的Response都是異步形式。對于異步的Request或者Response,都需要實現(xiàn)gRPC的io.grpc.stub.StreamObserver接口。io.grpc.stub.StreamObserver接口有三個方法:
-
onNext:表示接收/發(fā)送一個對象 -
onError:處理異常 -
onCompleted:表示Request或Response結(jié)束
當(dāng)Request發(fā)送到服務(wù)端端時,會異步調(diào)用requestObserver的onNext方法,直到結(jié)束時調(diào)用requestObserver的onCompleted方法;服務(wù)端調(diào)用responseObserver的onNext把Response返回給客戶端,直到調(diào)用responseObserver的onCompleted方法通知客戶端Response結(jié)束。服務(wù)端代碼如下:
public class RouteGuideServer {
private final int port;
private final Server server;
public RouteGuideServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port).addService(new RouteGuideService())
.build();
}
/**
* Start server.
*/
public void start() throws IOException {
server.start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
RouteGuideServer.this.stop();
}
});
}
/**
* Stop server
*/
public void stop() {
if (server != null) {
server.shutdown();
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws Exception {
RouteGuideServer server = new RouteGuideServer(8980);
server.start();
server.blockUntilShutdown();
}
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
@Override
public void getPoint(Point request, StreamObserver<LocationNote> responseObserver) {
LocationNote value = LocationNote
.newBuilder()
.setLocation(request)
.setTimestamp(System.nanoTime())
.build();
responseObserver.onNext(value);
responseObserver.onCompleted();
}
@Override
public void listPoints(Rectangle request, StreamObserver<Point> responseObserver) {
int left = Math.min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = Math.max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = Math.max(request.getLo().getLatitude(), request.getHi().getLatitude());
for (int x = left; x <= right; x++) {
for (int y = top; y >= bottom; y--) {
Point point = Point.newBuilder().setLongitude(x).setLatitude(y).build();
responseObserver.onNext(point);
}
}
responseObserver.onCompleted();
}
@Override
public StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() { //返回的是requestObserver
AtomicInteger pointCount = new AtomicInteger(0);
final long startTime = System.nanoTime();
@Override
public void onNext(Point value) {
int count = pointCount.incrementAndGet();
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
RouteSummary result = RouteSummary.newBuilder()
.setElapsedTime(System.nanoTime() - startTime).setPointCount(pointCount.get()).build();
responseObserver.onNext(result);
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<Point> getPointStream(StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() { //返回的是requestObserver
AtomicInteger pointCount = new AtomicInteger(0);
final long startTime = System.nanoTime();
@Override
public void onNext(Point value) {
int count = pointCount.incrementAndGet();
RouteSummary result = RouteSummary.newBuilder()
.setElapsedTime(System.nanoTime() - startTime).setPointCount(count).build();
responseObserver.onNext(result);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}
客戶端
gRPC的客戶端有同步阻塞客戶端(blockingStub)和異步非阻塞客戶端(Stub)兩種。同步客戶端使用比較方便,但是性能較低,而且不支持stream形式的Request;異步客戶端性能較高,支持stream形式的Request,但是如果想要以同步方式調(diào)用需要額外封裝。本文將主要以異步為例。
異步轉(zhuǎn)同步
由于gRPC的異步客戶端性能較高且功能更完整,所以一般都會采用異步客戶端。異步客戶端接收到的Response也是以io.grpc.stub.StreamObserver形式。由于客戶端的調(diào)用可能是在異步進程中但更可能是在同步進程中,所以就存在一個如何把gRPC異步Response轉(zhuǎn)為同步Response的問題。
一個比較常見的思路是寫一個io.grpc.stub.StreamObserver實現(xiàn),里面有一個內(nèi)置變量保存異步Response的結(jié)果,再添加一個阻塞式的get()方法,直到Response結(jié)束才把所有結(jié)果返回。要知道Response是否結(jié)束,需要添加一個Boolean或者AtomicBoolean變量,初始化為false,調(diào)用responseObserver.onCompleted()方法時設(shè)置為true,這樣就可以通過這個變量判斷Response是否結(jié)束。
阻塞get()方法最常見的思路是get()寫一個while循環(huán),直到變量值改為true才退出循環(huán)并返回結(jié)果。這種方式的優(yōu)點是簡單直接,任何語言都可以簡單實現(xiàn),缺點是由于使用循環(huán)可能CPU占用較高。而對于java這種多線程比較完善的語言,另一個比較好思路是Response結(jié)束前將線程掛起,當(dāng)調(diào)用responseObserver.onCompleted()方法再喚醒線程。代碼如下:
public class CallableStreamObserver<T> implements StreamObserver<T> {
List<T> values = new ArrayList<T>();
boolean isCompleted = false;
Throwable t = null;
@Override
public void onNext(T value) {
this.values.add(value);
}
@Override
public void onError(Throwable t) {
this.isCompleted = true;
notifyAll();
}
@Override
public synchronized void onCompleted() {
this.isCompleted = true;
notifyAll();
}
public List<T> get() throws Throwable {
if (!this.isCompleted) {
synchronized (this) {
this.wait(60 * 1000);
}
}
if (null != t) {
throw this.t;
} else {
return this.values;
}
}
}
客戶端代碼
public class RouteGuideClient {
private final ManagedChannel channel;
private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;
private final RouteGuideGrpc.RouteGuideStub asyncStub;
public RouteGuideClient(String host, int port) {
String target = "dns:///" + host + ":" + port;
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
.forTarget(target)
.usePlaintext();
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public LocationNote getPoint(int lo, int lt, boolean blocking) throws Throwable {
Point point = Point.newBuilder().setLongitude(lo).setLatitude(lt).build();
if(blocking) {
return blockingStub.getPoint(point);
}else{
CallableStreamObserver<LocationNote> responseObserver = new CallableStreamObserver<LocationNote>();
asyncStub.getPoint(point, responseObserver);
return responseObserver.get().get(0);
}
}
public Iterator<Point> listPoints(int left, int top, int right, int bottom, boolean blocking) throws Throwable {
Point hi = Point.newBuilder().setLongitude(left).setLatitude(top).build();
Point lo = Point.newBuilder().setLongitude(right).setLatitude(bottom).build();
Rectangle rec = Rectangle.newBuilder().setHi(hi).setLo(lo).build();
if(blocking){
return blockingStub.listPoints(rec);
}else{
CallableStreamObserver<Point> responseObserver = new CallableStreamObserver<Point>();
asyncStub.listPoints(rec, responseObserver);
return responseObserver.get().iterator();
}
}
public RouteSummary recordRoute(Collection<Point> points) throws Throwable {
CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
points.stream().parallel().forEach(p -> requestObserver.onNext(p));
requestObserver.onCompleted();
return responseObserver.get().get(0);
}
public List<RouteSummary> getPointStream(Collection<Point> points) throws Throwable {
CallableStreamObserver<RouteSummary> responseObserver = new CallableStreamObserver<RouteSummary>();
StreamObserver<Point> requestObserver = asyncStub.getPointStream(responseObserver);
points.stream().parallel().forEach(p -> requestObserver.onNext(p));
requestObserver.onCompleted();
return responseObserver.get();
}
}
gRPC客戶端代碼詳解
gRPC官方將自己分為三層組件:Stub、Channel和Transport。
- Stub層是最上層的代碼,gRPC附帶的插件可以從.proto文件直接生成Stub層代碼,開發(fā)人員通過直接調(diào)用Stub層的代碼調(diào)用RPC服務(wù)
- Channel層是對Transport層功能的抽象,同時提供了很多有用的功能,比如服務(wù)發(fā)現(xiàn)和負(fù)載均衡。。
- Transport層承擔(dān)了將字節(jié)從網(wǎng)絡(luò)中取出和放入數(shù)據(jù)的工作,有三種實現(xiàn)Netty、okHttp、inProgress。Transport層是最底層的代碼。
整個grpc-java項目的代碼比較多。從風(fēng)格上來講,封裝比較多,相對于interface更喜歡使用abstract class,相對于反射更喜歡使用硬編碼,而且大量使用了單線程異步調(diào)用造成調(diào)用棧斷裂,與常見的java項目的編碼風(fēng)格有很大差別,閱讀起來可能容易不習(xí)慣。
在源碼層面本文將關(guān)注下面這些方面:
- Channel的初始化過程;
- gRPC中的服務(wù)發(fā)現(xiàn);
- gRPC中的負(fù)載均衡
- Client與Server之間的數(shù)據(jù)傳輸
Channel的初始化過程
通過入門示例可以看到,Channel的初始化過程分三步:
- 調(diào)用forTarget方法創(chuàng)建
io.grpc.ManagedChannelBuilder; - 配置各種選項,不論如何配置,返回的總是
io.grpc.ManagedChannelBuilder對象; - 調(diào)用build方法創(chuàng)建
io.grpc.ManagedChannel。
forTarget方法
gRPC這里設(shè)計比較繁瑣,過程比較繞。forTarget方法的實際功能就是把參數(shù)target賦值給io.grpc.ManagedChannelBuilder的內(nèi)部變量target。
public static ManagedChannelBuilder<?> forTarget(String target) {
return ManagedChannelProvider.provider().builderForTarget(target);
}
io.grpc.ManagedChannelProvider.provider()會返回一個io.grpc.ManagedChannelProvider實現(xiàn)。有哪些io.grpc.ManagedChannelProvider實現(xiàn)是在io.grpc.ManagedChannelProvider中以硬編碼形式確定的,這里其實存在利用反射改進的空間。
private static final class HardcodedClasses implements Iterable<Class<?>> {
@Override
public Iterator<Class<?>> iterator() {
List<Class<?>> list = new ArrayList<>();
try {
list.add(Class.forName("io.grpc.okhttp.OkHttpChannelProvider"));
} catch (ClassNotFoundException ex) {
// ignore
}
try {
list.add(Class.forName("io.grpc.netty.NettyChannelProvider"));
} catch (ClassNotFoundException ex) {
// ignore
}
return list.iterator();
}
}
實際上就根據(jù)依賴的jar包不同就只有兩個實現(xiàn),一個netty的,一個okhttp的。如果像入門示例項目一樣只配置了netty實現(xiàn),那就只有netty的。io.grpc.netty.NettyChannelProvider的buildForTarget方法調(diào)用的是io.grpc.netty.NettyChannelBuilder的forTarget方法。
public NettyChannelBuilder builderForTarget(String target) {
return NettyChannelBuilder.forTarget(target);
}
而io.grpc.netty.NettyChannelBuilder繼承自io.grpc.internal.AbstractManagedChannelImplBuilder,forTarget方法實際上調(diào)用了父類的構(gòu)造函數(shù)。
NettyChannelBuilder(String target) {
super(target);
}
public static NettyChannelBuilder forTarget(String target) {
return new NettyChannelBuilder(target);
}
io.grpc.internal.AbstractManagedChannelImplBuilder的構(gòu)造函數(shù)最終會是把參數(shù)賦值給target變量。
protected AbstractManagedChannelImplBuilder(String target) {
this.target = Preconditions.checkNotNull(target, "target");
this.directServerAddress = null;
}
build方法
從前文可以看到,實際初始化的io.grpc.ManagedChannelBuilder實際上是io.grpc.netty.NettyChannelBuilder,其build方法實現(xiàn)在其父類io.grpc.internal.AbstractManagedChannelImplBuilder中。
public ManagedChannel build() {
return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
this,
buildTransportFactory(),
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
GrpcUtil.STOPWATCH_SUPPLIER,
getEffectiveInterceptors(),
TimeProvider.SYSTEM_TIME_PROVIDER));
}
這里的io.grpc.internal.ManagedChannelOrphanWrapper和io.grpc.internal.ManagedChannelImpl其實都是io.grpc.ManagedChannel的實現(xiàn)。io.grpc.internal.ManagedChannelOrphanWrapper從功能上分析沒有任何作用,io.grpc.internal.ManagedChannelOrphanWrapper會為io.grpc.ManagedChannel創(chuàng)建弱引用,并被放置到ReferenceQueue中。如果Channel是單例的,那么意義不大;如果客戶端被重復(fù)創(chuàng)建卻沒有被關(guān)閉,那么ReferenceQueue中會留下相應(yīng)的引用記錄,可能有助于排查問題。
io.grpc.internal.ManagedChannelImpl構(gòu)造方法的幾個參數(shù)中,除了第一個參數(shù)是builder本身,第二個參數(shù)是用來創(chuàng)建Transport的Factory,第三個參數(shù)是后臺連接重試策略,第四個參數(shù)是gRPC的全局線程池,第五個和第七個都是和時間相關(guān)的對象,主要用于日志中,第六個是客戶端調(diào)用時的interceptor。在io.grpc.netty.NettyChannelBuilder中,buildTransportFactory方法會創(chuàng)建一個io.grpc.netty.NettyChannelBuilder.NettyTransportFactory。
服務(wù)發(fā)現(xiàn)
前文的入門示例中直接寫了target,只能連接單個Server。如果有多個可以提供服務(wù)的Server,那么就需要有一種方式通過單個target發(fā)現(xiàn)這些Server。在io.grpc.ManagedChannelBuilder中有一個nameResolverFactory方法,可以用來指定如何解析target地址,發(fā)現(xiàn)多個服務(wù)端。
nameResolverFactory方法
這個方法的實現(xiàn)也在io.grpc.internal.AbstractManagedChannelImplBuilder中,如果用戶有自己的io.grpc.NameResolver.Factory實現(xiàn)的話可以通過nameResolverFactory方法指定,gRPC就會使用用戶自己的io.grpc.NameResolver.Factroy實現(xiàn)代替gRPC自己的默認(rèn)實現(xiàn),否則會使用io.grpc.NameResolverRegistry中的默認(rèn)實現(xiàn)。
io.grpc.NameResolverRegistry會通過硬編碼加載io.grpc.NameResolverProvider實現(xiàn),并創(chuàng)建一個與之有關(guān)的io.grpc.NameResolver.Factory的實現(xiàn)。目前硬編碼加載的io.grpc.NameResolverProvider實現(xiàn)只有io.grpc.internal.DnsNameResolverProvider一種。
private final NameResolver.Factory factory = new NameResolverFactory();
@GuardedBy("this")
private final LinkedHashSet<NameResolverProvider> allProviders = new LinkedHashSet<>();
private synchronized void addProvider(NameResolverProvider provider) {
checkArgument(provider.isAvailable(), "isAvailable() returned false");
allProviders.add(provider);
}
public static synchronized NameResolverRegistry getDefaultRegistry() {
if (instance == null) {
List<NameResolverProvider> providerList = ServiceProviders.loadAll(
NameResolverProvider.class,
getHardCodedClasses(),
NameResolverProvider.class.getClassLoader(),
new NameResolverPriorityAccessor());
if (providerList.isEmpty()) {
logger.warning("No NameResolverProviders found via ServiceLoader, including for DNS. This "
+ "is probably due to a broken build. If using ProGuard, check your configuration");
}
instance = new NameResolverRegistry();
for (NameResolverProvider provider : providerList) {
logger.fine("Service loader found " + provider);
if (provider.isAvailable()) {
instance.addProvider(provider);
}
}
instance.refreshProviders();
}
return instance;
}
public NameResolver.Factory asFactory() {
return factory;
}
@VisibleForTesting
static List<Class<?>> getHardCodedClasses() {
ArrayList<Class<?>> list = new ArrayList<>();
try {
list.add(Class.forName("io.grpc.internal.DnsNameResolverProvider"));
} catch (ClassNotFoundException e) {
logger.log(Level.FINE, "Unable to find DNS NameResolver", e);
}
return Collections.unmodifiableList(list);
}
private final class NameResolverFactory extends NameResolver.Factory {
@Override
@Nullable
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
List<NameResolverProvider> providers = providers();
for (NameResolverProvider provider : providers) {
NameResolver resolver = provider.newNameResolver(targetUri, args);
if (resolver != null) {
return resolver;
}
}
return null;
}
@Override
public String getDefaultScheme() {
List<NameResolverProvider> providers = providers();
if (providers.isEmpty()) {
return "unknown";
}
return providers.get(0).getDefaultScheme();
}
}
getDefaultSchema會匹配target中的schema(如dns),如果匹配的上,就使用相應(yīng)的NameResolver.Factory,返回NameResolver決定真正的服務(wù)訪問地址。
io.grpc.NameResolver
我們來看io.grpc.NameResolver
public abstract class NameResolver {
public abstract String getServiceAuthority();
public void start(final Listener listener) {
if (listener instanceof Listener2) {
start((Listener2) listener);
} else {
start(new Listener2() {
@Override
public void onError(Status error) {
listener.onError(error);
}
@Override
public void onResult(ResolutionResult resolutionResult) {
listener.onAddresses(resolutionResult.getAddresses(), resolutionResult.getAttributes());
}
});
}
}
public void start(Listener2 listener) {
start((Listener) listener);
}
public abstract void shutdown();
public void refresh() {}
@ThreadSafe
public interface Listener {
void onAddresses(List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes);
void onError(Status error);
}
public abstract static class Listener2 implements Listener {
@Override
public final void onAddresses(
List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes) {
onResult(
ResolutionResult.newBuilder().setAddresses(servers).setAttributes(attributes).build());
}
public abstract void onResult(ResolutionResult resolutionResult);
@Override
public abstract void onError(Status error);
}
public static final class ResolutionResult {
private final List<EquivalentAddressGroup> addresses;
@ResolutionResultAttr
private final Attributes attributes;
@Nullable
private final ConfigOrError serviceConfig;
ResolutionResult(
List<EquivalentAddressGroup> addresses,
@ResolutionResultAttr Attributes attributes,
ConfigOrError serviceConfig) {
this.addresses = Collections.unmodifiableList(new ArrayList<>(addresses));
this.attributes = checkNotNull(attributes, "attributes");
this.serviceConfig = serviceConfig;
}
public static Builder newBuilder() {
return new Builder();
}
public Builder toBuilder() {
return newBuilder()
.setAddresses(addresses)
.setAttributes(attributes)
.setServiceConfig(serviceConfig);
}
public List<EquivalentAddressGroup> getAddresses() {
return addresses;
}
@ResolutionResultAttr
public Attributes getAttributes() {
return attributes;
}
@Nullable
public ConfigOrError getServiceConfig() {
return serviceConfig;
}
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
public static final class Builder {
private List<EquivalentAddressGroup> addresses = Collections.emptyList();
private Attributes attributes = Attributes.EMPTY;
@Nullable
private ConfigOrError serviceConfig;
Builder() {}
public Builder setAddresses(List<EquivalentAddressGroup> addresses) {
this.addresses = addresses;
return this;
}
public Builder setAttributes(Attributes attributes) {
this.attributes = attributes;
return this;
}
public Builder setServiceConfig(@Nullable ConfigOrError serviceConfig) {
this.serviceConfig = serviceConfig;
return this;
}
public ResolutionResult build() {
return new ResolutionResult(addresses, attributes, serviceConfig);
}
}
}
}
在客戶端首次連接服務(wù)端的時候會調(diào)用Listener2的start方法,需要更新的時候會調(diào)用refresh方法。當(dāng)Listener2接收到服務(wù)端地址時,會調(diào)用onResult方法。
io.grpc.internal.DnsNameResolver
由于gRPC支持長連接,所以如果直連的話只會訪問一個域名下的一臺服務(wù)器,即首次連接時通過DNS返回IP地址。io.grpc.internal.DnsNameResolverProvider是對io.grpc.internal.DnsNameResolver的簡單封裝,只支持以dns:///開頭的地址。io.grpc.internal.DnsNameResolver會根據(jù)target獲取該host下所有關(guān)聯(lián)的IP,即通過DNS解析出所有的服務(wù)端IP地址。
public final class DnsNameResolverProvider extends NameResolverProvider {
private static final String SCHEME = "dns";
@Override
public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
if (SCHEME.equals(targetUri.getScheme())) {
String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader()));
} else {
return null;
}
}
@Override
public String getDefaultScheme() {
return SCHEME;
}
@Override
protected boolean isAvailable() {
return true;
}
@Override
protected int priority() {
return 5;
}
}
可以看到io.grpc.internal.DnsNameResolver中的start和refresh方法都調(diào)用的是resolve方法,而resolve方法是執(zhí)行了一個繼承自Runnable的Resolve接口。

在有代理的情況下,Resolve的resolveInternal會根據(jù)代理返回的ProxiedSocketAddress創(chuàng)建EquivalentAddressGroup作為服務(wù)端列表返回,并設(shè)置空config;否則會調(diào)用resolveAll方法獲取服務(wù)端列表,并調(diào)用parseServiceConfig方法設(shè)置config。resolveAll方法返回的ResolutionResults有三個變量addresses、txtRecords和balancerAddresses。
@VisibleForTesting
static ResolutionResults resolveAll(
AddressResolver addressResolver,
@Nullable ResourceResolver resourceResolver,
boolean requestSrvRecords,
boolean requestTxtRecords,
String name) {
List<? extends InetAddress> addresses = Collections.emptyList();
Exception addressesException = null;
List<EquivalentAddressGroup> balancerAddresses = Collections.emptyList();
Exception balancerAddressesException = null;
List<String> txtRecords = Collections.emptyList();
Exception txtRecordsException = null;
try {
addresses = addressResolver.resolveAddress(name);
} catch (Exception e) {
addressesException = e;
}
if (resourceResolver != null) {
if (requestSrvRecords) {
try {
balancerAddresses =
resourceResolver.resolveSrv(addressResolver, GRPCLB_NAME_PREFIX + name);
} catch (Exception e) {
balancerAddressesException = e;
}
}
if (requestTxtRecords) {
boolean balancerLookupFailedOrNotAttempted =
!requestSrvRecords || balancerAddressesException != null;
boolean dontResolveTxt =
(addressesException != null) && balancerLookupFailedOrNotAttempted;
if (!dontResolveTxt) {
try {
txtRecords = resourceResolver.resolveTxt(SERVICE_CONFIG_NAME_PREFIX + name);
} catch (Exception e) {
txtRecordsException = e;
}
}
}
}
try {
if (addressesException != null
&& (balancerAddressesException != null || balancerAddresses.isEmpty())) {
Throwables.throwIfUnchecked(addressesException);
throw new RuntimeException(addressesException);
}
} finally {
if (addressesException != null) {
logger.log(Level.FINE, "Address resolution failure", addressesException);
}
if (balancerAddressesException != null) {
logger.log(Level.FINE, "Balancer resolution failure", balancerAddressesException);
}
if (txtRecordsException != null) {
logger.log(Level.FINE, "ServiceConfig resolution failure", txtRecordsException);
}
}
return new ResolutionResults(addresses, txtRecords, balancerAddresses);
}
addressResolver的resolveAddress方法實際是調(diào)用JDK的java.net.InetAddress的getAllByName方法,即根據(jù)host通過DNS返回一系列服務(wù)端列表。resourceResolver根據(jù)LDAP協(xié)議獲取指定命名空間下的服務(wù)端列表地址。txtRecords和balancerAddresses是和LDAP相關(guān)的參數(shù),方法入?yún)?code>requestSrvRecords和requestTxtRecords的默認(rèn)值都是false。由于LDAP不是特別常用,這里就不深入展開了。
NameResolverListener的onResult
當(dāng)NameResolverListener獲取解析結(jié)果后會調(diào)用onResult方法,進而會調(diào)用io.grpc.LoadBalancer的handleResolvedAddresses方法。

負(fù)載均衡
io.grpc.ManagedChannel初始化的時候可以通過defaultLoadBalancingPolicy方法指定負(fù)載均衡策略,實際是根據(jù)defaultLoadBalancingPolicy創(chuàng)建了一個io.grpc.internal.AutoConfiguredLoadBalancerFactory對象。io.grpc.internal.AutoConfiguredLoadBalancerFactory則通過io.grpc.LoadBalancerRegistry獲取對應(yīng)名稱的負(fù)載均衡策略。io.grpc.LoadBalancerProvider的getPolicyName方法指定負(fù)載均衡策略名稱,newLoadBalancer返回負(fù)載均衡io.grpc.LoadBalancer的具體實現(xiàn)。如果想要添加自定義負(fù)載均衡策略,需要調(diào)用io.grpc.LoadBalancerRegistry的registry方法,并自己實現(xiàn)io.grpc.LoadBalancerProvider和io.grpc.LoadBalancer,并指定負(fù)載均衡策略名稱即可。

io.grpc.LoadBalancer.SubchannelPicker
io.grpc.LoadBalancer的核心邏輯實際在SubchannelPicker中。pickSubchannel方法會返回的PickResult中包含真正可用的subchannel,用來進行后續(xù)的數(shù)據(jù)傳輸。
public abstract static class SubchannelPicker {
/**
* Make a balancing decision for a new RPC.
*
* @param args the pick arguments
* @since 1.3.0
*/
public abstract PickResult pickSubchannel(PickSubchannelArgs args);
}
gRPC默認(rèn)提供了兩種負(fù)載均衡實現(xiàn)策略:prick_first和round_robin。前者總會使用第一個可用的服務(wù)端,后者則是簡單輪詢。
handleResolvedAddresses
當(dāng)服務(wù)端列表更新時,會調(diào)用io.grpc.LoadBalancer的handleResolvedAddresses方法更新可用的subchannel。
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
if (subchannel == null) {
final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(servers)
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo stateInfo) {
processSubchannelState(subchannel, stateInfo);
}
});
this.subchannel = subchannel;
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
subchannel.requestConnection();
} else {
subchannel.updateAddresses(servers);
}
}
如果是首次調(diào)用(subchannel == null) 會創(chuàng)建subchannel,其實現(xiàn)是io.grpc.internal.ManagedChannelImpl.SubchannelImpl,創(chuàng)建的過程中會創(chuàng)建io.grpc.internal.InternalSubchannel。然后調(diào)用io.grpc.internal.ManagedChannelImpl的updateBalancingState方法,把subchannelPicker更新為實現(xiàn)Picker,然后開啟subchannel的連接。

在開啟subchannel的連接過程中,會調(diào)用io.grpc.internal.InternalSubchannel的obtainActiveTransport方法。
這里的transportFactory就是上面提到io.grpc.ManagedChannelBuilder調(diào)用build初始化時調(diào)用buildTransportFactory方法返回的,依賴于Transport層的具體實現(xiàn)。在netty實現(xiàn)中,返回的是io.grpc.netty.NettyClientTransport。
傳輸
gRPC客戶端發(fā)起Request時,stub會調(diào)用ClientCalls的startCall方法,最終會調(diào)用io.grpc.internal.ManagedChannelImpl.ChannelTransportProvider的get方法獲取io.grc.internal.ClientTransport。

public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
if (shutdown.get()) {
return delayedTransport;
}
if (pickerCopy == null) {
final class ExitIdleModeForTransport implements Runnable {
@Override
public void run() {
exitIdleMode();
}
}
syncContext.execute(new ExitIdleModeForTransport());
return delayedTransport;
}
PickResult pickResult = pickerCopy.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, args.getCallOptions().isWaitForReady());
if (transport != null) {
return transport;
}
return delayedTransport;
}
如果subchannelPicker存在,會使用subchannelPicker進行選擇;如果是首次訪問服務(wù)端時subchannel肯定不存在,會使用syncContext異步執(zhí)行exitIdleMode方法初始化。syncContext是一個單線程執(zhí)行隊列,可以保證先提交的任務(wù)先執(zhí)行。delayedTransport的執(zhí)行也依賴于syncContext,這就保證了delayedTransport中的方法執(zhí)行一定會在exitIdleMode方法之后。
首次訪問服務(wù)端時執(zhí)行exidIdleMode方法
exitIdleMode方法會初始化NameResolver和LoadBalancer,并會啟動NameResolverListener。當(dāng)解析完成后會調(diào)用NameResolverListener的onResult方法,進而調(diào)用LoadBalancer的handleResolvedAddresses方法創(chuàng)建subchannelPicker、創(chuàng)建并連接subchannel。
@VisibleForTesting
void exitIdleMode() {
syncContext.throwIfNotInThisSynchronizationContext();
if (shutdown.get() || panicMode) {
return;
}
if (inUseStateAggregator.isInUse()) {
cancelIdleTimer(false);
} else {
rescheduleIdleTimer();
}
if (lbHelper != null) {
return;
}
channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
LbHelperImpl lbHelper = new LbHelperImpl();
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
this.lbHelper = lbHelper;
NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
nameResolver.start(listener);
nameResolverStarted = true;
}
Request
發(fā)送Request時會調(diào)用ConnectionClientTransport的newStream方法返回一個io.grpc.internal.ClientStream對象,而首次調(diào)用會通過delayedTransport延遲調(diào)用newStream方法。

netty實現(xiàn)會返回一個io.grpc.netty.shaded.io.grpc.netty.NettyClientStream對象。io.grpc.internal.ClientStream下有兩個子類,TransportState負(fù)責(zé)處理傳輸狀態(tài),Sink負(fù)責(zé)寫入數(shù)據(jù)。
在進行一系列http2相關(guān)設(shè)置后,會調(diào)用io.grpc.internal.ClientStream的start方法,為TransportState設(shè)置監(jiān)聽并通過Sink寫入Header。
@Override
public final void start(ClientStreamListener listener) {
transportState().setListener(listener);
if (!useGet) {
abstractClientStreamSink().writeHeaders(headers, null);
headers = null;
}
}
初始化結(jié)束后,調(diào)用requestObserver的onNext方法會調(diào)用io.grpc.internal.ClientCallImpl的sendMessage方法,將protobuf對象轉(zhuǎn)換成InputStream,并作為參數(shù)調(diào)用io.grpc.internal.ClientStream的writeMessage方法,進而調(diào)用io.grpc.internal.MessageFramer的writePayload方法,最終調(diào)用writeToOutputStream方法將內(nèi)容寫入Http的OutputStream。如果是參數(shù)是stream形式會繼續(xù)調(diào)用flush。

調(diào)用requestObserver的onCompleted方法會調(diào)用io.grpc.internal.ClientCallImpl的halfClose方法,進而會調(diào)用io.grpc.internal.MessageFramer的endOfMessages,flush并結(jié)束發(fā)送消息。

Response

客戶端接受到Response會調(diào)用ClientStreamListener的messagesAvailable方法,并通過同步線程池最終調(diào)用StreamObserver的onNext方法接收數(shù)據(jù)。

當(dāng)返回結(jié)束時會調(diào)用TransportState的transportReportStatus方法關(guān)閉請求,進而調(diào)用ClientStreamListener的closed方法關(guān)閉監(jiān)聽,進而調(diào)用StreamObserver的onClose方法。
gRPC通信格式
gRPC發(fā)送的請求發(fā)送方法是POST,路徑是/{methodName},content-type為content-type = application/grpc+proto。
Request
HEADERS (flags = END_HEADERS)
:method = POST
:scheme = http
:path = /RouteGuide/getPoint
grpc-timeout = 1S
content-type = application/grpc+proto
grpc-encoding = gzip
DATA (flags = END_STREAM)
<Length-Prefixed Message>
Response
HEADERS (flags = END_HEADERS)
:status = 200
grpc-encoding = gzip
content-type = application/grpc+proto
DATA
<Length-Prefixed Message>
HEADERS (flags = END_STREAM, END_HEADERS)
grpc-status = 0 # OK
trace-proto-bin = jher831yy13JHy3hc
擴展gRPC
自定義基于zookeeper的NameResolver.Factory實現(xiàn)
public class CuratorNameResolver extends NameResolver {
CuratorFramework curatorFramework;
String basePath;
String serviceAuthority;
private Listener2 listener;
public CuratorNameResolver(CuratorFramework curatorFramework, String basePath, String serviceAuthority) {
this.curatorFramework = curatorFramework;
this.basePath = basePath;
this.serviceAuthority = serviceAuthority;
}
@Override
public void start(Listener2 listener) {
this.curatorFramework.start();
this.listener = listener;
refresh();
}
@Override
public void refresh() {
List<EquivalentAddressGroup> servers = new ArrayList<>();
try {
List<EquivalentAddressGroup> addresses = curatorFramework.getChildren()
.forPath(basePath)
.stream().map(address ->{
try {
URI uri = new URI("http://" + address);
return new EquivalentAddressGroup(
new InetSocketAddress(uri.getHost(), uri.getPort()));
}catch (Exception e){
listener.onError(Status.INTERNAL);
return null;
}
}).collect(Collectors.toList());
listener.onResult(ResolutionResult.newBuilder().setAddresses(addresses).build());
} catch (Exception e) {
listener.onError(Status.INTERNAL);
}
}
@Override
public String getServiceAuthority() {
return this.serviceAuthority;
}
@Override
public void shutdown() {
this.curatorFramework.close();
}
public static class Factory extends NameResolver.Factory{
@Override
public NameResolver newNameResolver(URI targetUri, Args args) {
String address = targetUri.getHost() + ":" + targetUri.getPort();
String authority = null == targetUri.getAuthority() ? address : targetUri.getAuthority();
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(address)
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(60000)
.build();
return new CuratorNameResolver(curator, targetUri.getPath(), authority);
}
@Override
public String getDefaultScheme() {
return "zookeeper";
}
}
}
自定義隨機負(fù)載均衡實現(xiàn)
public class RandomLoadBalancer extends LoadBalancer{
LoadBalancer.Helper helper;
private final Map<EquivalentAddressGroup, Subchannel> subchannels =
new HashMap<>();
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("state-info");
public RandomLoadBalancer(LoadBalancer.Helper helper) {
this.helper = helper;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
for(EquivalentAddressGroup server : servers){
List<EquivalentAddressGroup> serverSingletonListt = Collections.singletonList(server);
Subchannel exists = subchannels.getOrDefault(server, null);
if(null != exists){
exists.updateAddresses(serverSingletonListt);
continue;
}
Attributes.Builder subchannelAttrs = Attributes.newBuilder()
.set(STATE_INFO,
new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));
final Subchannel subchannel = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(serverSingletonListt)
.setAttributes(subchannelAttrs.build())
.build());
subchannels.put(server, subchannel);
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo state) {
for(Map.Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()){
if(subchannel == entry.getValue()){
if (state.getState() == SHUTDOWN) {
subchannels.remove(entry.getKey());
}
if (state.getState() == IDLE) {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).value = state;
updateBalancingState();
return;
}
}
}
});
subchannel.requestConnection();
}
updateBalancingState();
}
@Override
public void handleNameResolutionError(Status error) {
shutdown();
helper.updateBalancingState(TRANSIENT_FAILURE, new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
});
}
private void updateBalancingState(){
boolean ready = true;
for(Subchannel subchannel : this.subchannels.values()){
if(subchannel.getAttributes().get(STATE_INFO).value.getState() != READY){
helper.updateBalancingState(CONNECTING, new RandomSubchannelPick(subchannels.values()));
return;
}
}
helper.updateBalancingState(ConnectivityState.READY, new RandomSubchannelPick(subchannels.values()));
}
@Override
public void shutdown() {
for(Iterator<Map.Entry<EquivalentAddressGroup, Subchannel>> itr = subchannels.entrySet().iterator(); itr.hasNext();){
Map.Entry<EquivalentAddressGroup, Subchannel> e = itr.next();
e.getValue().shutdown();
itr.remove();
}
}
class RandomSubchannelPick extends SubchannelPicker{
Subchannel[] subchannels;
Random random = new Random(System.currentTimeMillis());
public RandomSubchannelPick(Collection<Subchannel> subchannels) {
this.subchannels = subchannels.stream().toArray(Subchannel[]::new);
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
int idx = random.nextInt(subchannels.length);
return PickResult.withSubchannel(subchannels[idx]);
}
}
public static class Provider extends LoadBalancerProvider{
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 100;
}
@Override
public String getPolicyName() {
return "random";
}
@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new RandomLoadBalancer(helper);
}
}
static final class Ref<T> {
T value;
Ref(T value) {
this.value = value;
}
}
}
服務(wù)端初始化
服務(wù)端需要把自己的服務(wù)地址注冊到zookeeper。
private final int port;
private final Server server;
private String registryPath;
private String address;
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 5))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(60000)
.build();;
public GreetingServer(int port, String registryPath) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port).addService(new GreetingService())
.build();
this.registryPath = registryPath;
this.address = "localhost:" + port; //本機網(wǎng)卡不能正確顯示地址,直接寫死localhost
}
/**
* Start server.
*/
public void start() throws Exception {
this.curator.start();
server.start();;
this.curator.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(registryPath + "/" + address, ("http://" + address).getBytes());
System.out.println("Server started, listening on " + address);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
GreetingServer.this.stop();
}
});
}
客戶端初始化
客戶端需要注冊自定義的NameResolverFactory和LoadBalancer。
public GreetingClient(String host, int port, String path) {
String target = "zookeeper://" + host + ":" + port + path;
CuratorNameResolver.Factory factory = new CuratorNameResolver.Factory();
LoadBalancerRegistry.getDefaultRegistry().register(new RandomLoadBalancer.Provider());
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
.forTarget(target)
.nameResolverFactory(factory)
.defaultLoadBalancingPolicy("random")
.usePlaintext();
channel = channelBuilder.build();
blockingStub = GreetingGrpc.newBlockingStub(channel);
}