gRpc 客戶端調(diào)用服務(wù)接口

gRPC 客戶端調(diào)用服務(wù)接口

不依賴protobuf生成的代碼文件進(jìn)行調(diào)用,可以通過反射接口進(jìn)行調(diào)用。但需要Server端提供io.grpc.reflection.v1alpha.ServerReflectionGrpc 服務(wù),用于獲取服務(wù)的描述文件。

大致流程:

  1. 根據(jù)方法名稱,調(diào)用服務(wù)端反射服務(wù)的方法,獲取方法所在proto文件
  2. 根據(jù)proto描述文件,獲取文件描述、服務(wù)描述,用于重新構(gòu)建被調(diào)用方法的方法描述MethodDescriptor
  3. 根據(jù)方法描述,將請求內(nèi)容序列化為對應(yīng)的類型
  4. 使用重新構(gòu)建的MethodDescriptor和其他參數(shù)對Server端相應(yīng)的方法發(fā)起調(diào)用
  5. 解析響應(yīng)并返回

實現(xiàn)

1. proto文件定義 hello.proto

syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.haust.hello";
option java_outer_classname = "HelloProto";
package com.haust.hello;


service HelloService {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string message = 1;
}

message HelloReply {
  string message =  1;
}

2. 構(gòu)建反射服務(wù)

package com.haust.grpc.myrpc;

import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class MyGrpc {

    public static void main(String[] args) throws Exception {
        // 請求方法
        String methodSymbol = "com.haust.hello.HelloService.SayHello";
        // 請求內(nèi)容
        String requestContext = "{\"message\":\"this is request\"}";
        // 構(gòu)建channel
        final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9921).usePlaintext().build();
        // 使用channel 構(gòu)建Stub 創(chuàng)建一個新的異步的stub
        ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);
        // 響應(yīng)觀察器
        // 需要Server端 加入ProtoReflectionService.newInstance()
        StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {
            @Override
            public void onNext(ServerReflectionResponse serverReflectionResponse) {
                // 處理響應(yīng)
                try {
                    if (serverReflectionResponse.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {
                        List<ByteString> fileDescriptorProtoList = serverReflectionResponse.getFileDescriptorResponse().getFileDescriptorProtoList();
                        // requestContent 請求內(nèi)容
                        handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContext);
                    } else {
                        System.out.println("響應(yīng)處理失敗" + serverReflectionResponse.getMessageResponseCase());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }
            @Override
            public void onCompleted() {
                System.out.println("completed!");
            }
        };
        // 請求觀察器
        StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);

        // 根據(jù)方法名稱獲取文件描述請求
        ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder().setFileContainingSymbol(methodSymbol).build();

        requestStreamObserver.onNext(getFileContainingSymbolRequest);

        channel.awaitTermination(10, TimeUnit.SECONDS);
    }

    // 在處理請求時,先解析了包名、服務(wù)名和方法名,然后根據(jù)包名和服務(wù)名,從返回的文件描述中獲取到了響應(yīng)方法所在文件的描述;然后從文件描述中獲取服務(wù)描述,最終獲取到方法描述,根據(jù)方法描述執(zhí)行調(diào)用
    private static void handleResponse(List<ByteString> fileDescriptorProtoList, ManagedChannel channel, String methodFullName, String requestContent) {
        try {
            // 解析方法和服務(wù)名稱
            String fullName = extraPrefix(methodFullName);
            String methodName = extraSuffix(methodFullName);
            String packageName = extraPrefix(fullName);
            String serviceName = extraSuffix(fullName);

            // 根據(jù)響應(yīng)解析 FileDescriptor
            Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);

            // 查找服務(wù)描述
            Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);
            // 查找方法描述
            Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);

            // 發(fā)起請求
            executeCall(channel, fileDescriptor, methodDescriptor, requestContent);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 根據(jù)響應(yīng)找到方法對應(yīng)的文件的FileDescriptorProto ,然后構(gòu)建出對應(yīng)的 FileDescriptor
    private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList,
                                                                String packageName,
                                                                String serviceName) throws Exception {

        Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap =
                fileDescriptorProtoList.stream()
                        .map(bs -> {
                            try {
                                return DescriptorProtos.FileDescriptorProto.parseFrom(bs);
                            } catch (InvalidProtocolBufferException e) {
                                e.printStackTrace();
                            }
                            return null;
                        })
                        .filter(Objects::nonNull)
                        .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));


        if (fileDescriptorProtoMap.isEmpty()) {
            System.out.println("服務(wù)不存在");
            throw new IllegalArgumentException("方法的文件描述不存在");
        }

        // 查找服務(wù)對應(yīng)的 Proto 描述
        DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);

        // 獲取這個 Proto 的依賴
        Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);

        // 生成 Proto 的 FileDescriptor
        return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
    }

    // 根據(jù)包名查找響應(yīng)的文件描述
    private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName, String serviceName, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
        for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) {
            if (proto.getPackage().equals(packageName)) {
                boolean exist = proto.getServiceList().stream().anyMatch(s -> serviceName.equals(s.getName()));
                if (exist) {
                    return proto;
                }
            }
        }
        throw new IllegalArgumentException("服務(wù)不存在");
    }

    // 獲取依賴類型
    private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
        return proto.getDependencyList().stream().map(fileDescriptorProtoMap::get)
                .map(f -> toFileDescriptor(f, getDependencies(f, fileDescriptorProtoMap))).toArray(Descriptors.FileDescriptor[]::new);
    }

    // 將FileDescriptorProto轉(zhuǎn)為Descriptor
    private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto, Descriptors.FileDescriptor[] dependencies) {
        try {
            return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
        } catch (Descriptors.DescriptorValidationException e) {
            e.printStackTrace();
        }
        return null;
    }

    // 獲取前綴
    private static String extraPrefix(String context) {
        int index = context.lastIndexOf(".");
        return context.substring(0, index);
    }

    // 獲取后綴
    private static String extraSuffix(String context) {
        int index = context.lastIndexOf(".");
        return context.substring(index + 1);
    }

    // 執(zhí)行方法調(diào)用
    private static void executeCall(ManagedChannel channel, Descriptors.FileDescriptor fileDescriptor, Descriptors.MethodDescriptor originMethodDescriptor, String requestContext) throws Exception {
        // 重新生成MethodDescriptor
        MethodDescriptor<DynamicMessage, DynamicMessage> methodDescriptor = generateMethodDescriptor(originMethodDescriptor);

        CallOptions callOptions = CallOptions.DEFAULT;

        TypeRegistry typeRegistry = TypeRegistry.newBuilder().add(fileDescriptor.getMessageTypes()).build();
        // 將請求內(nèi)容諸位響應(yīng)的類型
        JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(typeRegistry);
        DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType());
        parser.merge(requestContext, messageBuilder);
        DynamicMessage requestMessage = messageBuilder.build();

        // 調(diào)用, 調(diào)用方可以通過originMethodDescriptor.isClientStreaming()和originMethodDescriptor.isServerStreaming() 推斷
        DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage);

        // 將響應(yīng)解析為JSON字符串
        JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry).includingDefaultValueFields();
        String responseContent = printer.print(response);

        System.out.println("響應(yīng) responseContent: " + responseContent);
    }

    //格式,而需要的是package.service/method格式,同時請求和響應(yīng)類型也需要重新設(shè)置為 DynamicMessage,所以需要重新生成 MethodDescriptor
    private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {
        // 生成方法全名
        String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());
        // 請求和響應(yīng)類型
        MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())
                .buildPartial());
        MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())
                .buildPartial());

        // 生成方法描述, originMethodDescriptor 的 fullMethodName 不正確
        return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
                .setFullMethodName(fullMethodName)
                .setRequestMarshaller(inputTypeMarshaller)
                .setResponseMarshaller(outputTypeMarshaller)
                // 使用 UNKNOWN,自動修改
                .setType(MethodDescriptor.MethodType.UNKNOWN)
                .build();
    }
}

Server端 服務(wù)實現(xiàn)

package com.haust.grpc.server;

import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;

import java.util.concurrent.TimeUnit;

public class ReflectionServer {

    public static void main(String[] args) {
        Server server = NettyServerBuilder.forPort(9921).addService(new HelloGrpcImpl())
                .addService(ProtoReflectionService.newInstance()).build();
        try {
            server.start();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    server.awaitTermination(10, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
            server.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

當(dāng)然一般情況下,在公司的grpc服務(wù)中,是不可以開啟反射的,此情況僅適用于開發(fā)環(huán)境

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容