gRPC 客戶端調(diào)用服務(wù)接口
不依賴protobuf生成的代碼文件進(jìn)行調(diào)用,可以通過反射接口進(jìn)行調(diào)用。但需要Server端提供io.grpc.reflection.v1alpha.ServerReflectionGrpc 服務(wù),用于獲取服務(wù)的描述文件。
大致流程:
- 根據(jù)方法名稱,調(diào)用服務(wù)端反射服務(wù)的方法,獲取方法所在proto文件
- 根據(jù)proto描述文件,獲取文件描述、服務(wù)描述,用于重新構(gòu)建被調(diào)用方法的方法描述
MethodDescriptor - 根據(jù)方法描述,將請求內(nèi)容序列化為對應(yīng)的類型
- 使用重新構(gòu)建的
MethodDescriptor和其他參數(shù)對Server端相應(yīng)的方法發(fā)起調(diào)用 - 解析響應(yīng)并返回
實現(xiàn)
1. proto文件定義 hello.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.haust.hello";
option java_outer_classname = "HelloProto";
package com.haust.hello;
service HelloService {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string message = 1;
}
message HelloReply {
string message = 1;
}
2. 構(gòu)建反射服務(wù)
package com.haust.grpc.myrpc;
import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class MyGrpc {
public static void main(String[] args) throws Exception {
// 請求方法
String methodSymbol = "com.haust.hello.HelloService.SayHello";
// 請求內(nèi)容
String requestContext = "{\"message\":\"this is request\"}";
// 構(gòu)建channel
final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9921).usePlaintext().build();
// 使用channel 構(gòu)建Stub 創(chuàng)建一個新的異步的stub
ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);
// 響應(yīng)觀察器
// 需要Server端 加入ProtoReflectionService.newInstance()
StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {
@Override
public void onNext(ServerReflectionResponse serverReflectionResponse) {
// 處理響應(yīng)
try {
if (serverReflectionResponse.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {
List<ByteString> fileDescriptorProtoList = serverReflectionResponse.getFileDescriptorResponse().getFileDescriptorProtoList();
// requestContent 請求內(nèi)容
handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContext);
} else {
System.out.println("響應(yīng)處理失敗" + serverReflectionResponse.getMessageResponseCase());
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("completed!");
}
};
// 請求觀察器
StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);
// 根據(jù)方法名稱獲取文件描述請求
ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder().setFileContainingSymbol(methodSymbol).build();
requestStreamObserver.onNext(getFileContainingSymbolRequest);
channel.awaitTermination(10, TimeUnit.SECONDS);
}
// 在處理請求時,先解析了包名、服務(wù)名和方法名,然后根據(jù)包名和服務(wù)名,從返回的文件描述中獲取到了響應(yīng)方法所在文件的描述;然后從文件描述中獲取服務(wù)描述,最終獲取到方法描述,根據(jù)方法描述執(zhí)行調(diào)用
private static void handleResponse(List<ByteString> fileDescriptorProtoList, ManagedChannel channel, String methodFullName, String requestContent) {
try {
// 解析方法和服務(wù)名稱
String fullName = extraPrefix(methodFullName);
String methodName = extraSuffix(methodFullName);
String packageName = extraPrefix(fullName);
String serviceName = extraSuffix(fullName);
// 根據(jù)響應(yīng)解析 FileDescriptor
Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);
// 查找服務(wù)描述
Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);
// 查找方法描述
Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);
// 發(fā)起請求
executeCall(channel, fileDescriptor, methodDescriptor, requestContent);
} catch (Exception e) {
e.printStackTrace();
}
}
// 根據(jù)響應(yīng)找到方法對應(yīng)的文件的FileDescriptorProto ,然后構(gòu)建出對應(yīng)的 FileDescriptor
private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList,
String packageName,
String serviceName) throws Exception {
Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap =
fileDescriptorProtoList.stream()
.map(bs -> {
try {
return DescriptorProtos.FileDescriptorProto.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));
if (fileDescriptorProtoMap.isEmpty()) {
System.out.println("服務(wù)不存在");
throw new IllegalArgumentException("方法的文件描述不存在");
}
// 查找服務(wù)對應(yīng)的 Proto 描述
DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);
// 獲取這個 Proto 的依賴
Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);
// 生成 Proto 的 FileDescriptor
return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
}
// 根據(jù)包名查找響應(yīng)的文件描述
private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName, String serviceName, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) {
if (proto.getPackage().equals(packageName)) {
boolean exist = proto.getServiceList().stream().anyMatch(s -> serviceName.equals(s.getName()));
if (exist) {
return proto;
}
}
}
throw new IllegalArgumentException("服務(wù)不存在");
}
// 獲取依賴類型
private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
return proto.getDependencyList().stream().map(fileDescriptorProtoMap::get)
.map(f -> toFileDescriptor(f, getDependencies(f, fileDescriptorProtoMap))).toArray(Descriptors.FileDescriptor[]::new);
}
// 將FileDescriptorProto轉(zhuǎn)為Descriptor
private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto, Descriptors.FileDescriptor[] dependencies) {
try {
return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
} catch (Descriptors.DescriptorValidationException e) {
e.printStackTrace();
}
return null;
}
// 獲取前綴
private static String extraPrefix(String context) {
int index = context.lastIndexOf(".");
return context.substring(0, index);
}
// 獲取后綴
private static String extraSuffix(String context) {
int index = context.lastIndexOf(".");
return context.substring(index + 1);
}
// 執(zhí)行方法調(diào)用
private static void executeCall(ManagedChannel channel, Descriptors.FileDescriptor fileDescriptor, Descriptors.MethodDescriptor originMethodDescriptor, String requestContext) throws Exception {
// 重新生成MethodDescriptor
MethodDescriptor<DynamicMessage, DynamicMessage> methodDescriptor = generateMethodDescriptor(originMethodDescriptor);
CallOptions callOptions = CallOptions.DEFAULT;
TypeRegistry typeRegistry = TypeRegistry.newBuilder().add(fileDescriptor.getMessageTypes()).build();
// 將請求內(nèi)容諸位響應(yīng)的類型
JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(typeRegistry);
DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType());
parser.merge(requestContext, messageBuilder);
DynamicMessage requestMessage = messageBuilder.build();
// 調(diào)用, 調(diào)用方可以通過originMethodDescriptor.isClientStreaming()和originMethodDescriptor.isServerStreaming() 推斷
DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage);
// 將響應(yīng)解析為JSON字符串
JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry).includingDefaultValueFields();
String responseContent = printer.print(response);
System.out.println("響應(yīng) responseContent: " + responseContent);
}
//格式,而需要的是package.service/method格式,同時請求和響應(yīng)類型也需要重新設(shè)置為 DynamicMessage,所以需要重新生成 MethodDescriptor
private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {
// 生成方法全名
String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());
// 請求和響應(yīng)類型
MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())
.buildPartial());
MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())
.buildPartial());
// 生成方法描述, originMethodDescriptor 的 fullMethodName 不正確
return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
.setFullMethodName(fullMethodName)
.setRequestMarshaller(inputTypeMarshaller)
.setResponseMarshaller(outputTypeMarshaller)
// 使用 UNKNOWN,自動修改
.setType(MethodDescriptor.MethodType.UNKNOWN)
.build();
}
}
Server端 服務(wù)實現(xiàn)
package com.haust.grpc.server;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.util.concurrent.TimeUnit;
public class ReflectionServer {
public static void main(String[] args) {
Server server = NettyServerBuilder.forPort(9921).addService(new HelloGrpcImpl())
.addService(ProtoReflectionService.newInstance()).build();
try {
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
server.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
server.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
}