GRPC java 分布式調(diào)用鏈跟蹤實踐

Opentracing基本模型

image.png

如圖,在跟蹤鏈中有以下幾個比較重要的數(shù)據(jù)結(jié)構(gòu)和概念:

span:標識一次分布式調(diào)用,其自身包含了id,parentId(指向上級Span的id), traceIds,服務(wù)名稱等重要屬性,其應(yīng)盡量保持精簡;
trace:標識整個請求鏈,即一些列Span的組合。其自身的ID將貫穿整個調(diào)用鏈,其中的每個Span都必須攜帶這個traceId,因此traceId將在整個調(diào)用鏈中傳遞;
cs:客戶端發(fā)起請求,標志Span的開始;
sr:服務(wù)端接收到請求,并開始處理內(nèi)部事務(wù),其中sr - cs則為網(wǎng)絡(luò)延遲和時鐘抖動;
ss:服務(wù)端處理完請求,返回響應(yīng)內(nèi)容,其中ss - sr則為服務(wù)端處理請求耗時;
cr:客戶端接收到服務(wù)端響應(yīng)內(nèi)容,標志著Span的結(jié)束,其中cr - ss則為網(wǎng)絡(luò)延遲和時鐘抖動。

客戶端調(diào)用時間=cr-cs
服務(wù)端處理時間=sr-ss

分布式系統(tǒng)調(diào)用跟蹤的基本架構(gòu)要求

低侵入性,高性能,高可用容錯,低丟失率等。

基于GRPC的分布式系統(tǒng)調(diào)用跟蹤實踐

創(chuàng)建TraceContext

TraceContext通過Threadlocal對span進行保存,并且將traceid和spanid向底層服務(wù)傳遞,zebra對線程上下文傳遞進行了封裝,具體參照GRPC如何實現(xiàn)公共參數(shù)與業(yè)務(wù)參數(shù)分離傳遞下面是TraceContext具體代碼

public class TraceContext{

   private static final String SPAN_LIST_KEY = "spanList";

   public static final String TRACE_ID_KEY = "traceId";

   public static final String SPAN_ID_KEY = "spanId";

   public static final String ANNO_CS = "cs";

   public static final String ANNO_CR = "cr";

   public static final String ANNO_SR = "sr";

   public static final String ANNO_SS = "ss";

   private TraceContext(){}

   public static void setTraceId(String traceId) {
       RpcContext.getContext().set(TRACE_ID_KEY, traceId);
   }

   public static String getTraceId() {
       return (String) RpcContext.getContext().get(TRACE_ID_KEY);
   }

   public static String getSpanId() {
       return (String) RpcContext.getContext().get(SPAN_ID_KEY);
   }

   public static void setSpanId(String spanId) {
       RpcContext.getContext().set(SPAN_ID_KEY, spanId);
   }

   @SuppressWarnings("unchecked")
   public static void addSpan(Span span){
       ((List<Span>)RpcContext.getContext().get(SPAN_LIST_KEY)).add(span);
   }

   @SuppressWarnings("unchecked")
   public static List<Span> getSpans(){
       return (List<Span>) RpcContext.getContext().get(SPAN_LIST_KEY);
   }

   public static void clear(){
       RpcContext.getContext().remove(TRACE_ID_KEY);
       RpcContext.getContext().remove(SPAN_ID_KEY);
       RpcContext.getContext().remove(SPAN_LIST_KEY);
   }

   public static void start(){
       clear();
       RpcContext.getContext().set(SPAN_LIST_KEY, new ArrayList<Span>());
   }
}

創(chuàng)建TraceAgent

TraceAgent將span信息上傳至kafka,代碼如下:

public class TraceAgent {
   private GrpcProperties grpcProperties;
   private KafkaSender sender;
   private AsyncReporter<zipkin2.Span> report;

   public TraceAgent() {
       grpcProperties = SpringContextUtils.getBean(GrpcProperties.class);
       sender = KafkaSender.newBuilder().bootstrapServers(grpcProperties.getCallChainUpdAddr()).topic("zipkin").encoding(Encoding.JSON).build();
       report = AsyncReporter.builder(sender).build();
   }

   public void send(final List<Span> spans){
       spans.forEach(item ->{
           report.report(item);
       });
   }
}

創(chuàng)建ZebraClientTracing

ZebraClientTracing用于記錄調(diào)用端的span信息,具體代碼如下:

@Component
public class ZebraClientTracing {
   public Span startTrace(String method) {
       String id = IdUtils.get() + "";
       String traceId = null;
       if (null == TraceContext.getTraceId()) {
           TraceContext.start();
           traceId = id;
       } else {
           traceId = TraceContext.getTraceId();
       }
       long timestamp = System.currentTimeMillis() * 1000;
       // 注冊本地信息
       Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)
               .port(50003).build();
       // 初始化span
       Span consumerSpan = Span.newBuilder().localEndpoint(endpoint).id(id).traceId(traceId)
               .parentId(TraceContext.getSpanId() + "").name(EtcdRegistry.serviceName).timestamp(timestamp)
               .addAnnotation(timestamp, TraceContext.ANNO_CS).putTag("method", method)
               .putTag("pkgId", RpcContext.getContext().getAttachment("pkg")).build();
       // 將tracing id和spanid放到上下文
       RpcContext.getContext().get().put(TraceContext.TRACE_ID_KEY, consumerSpan.traceId());
       RpcContext.getContext().get().put(TraceContext.SPAN_ID_KEY, String.valueOf(consumerSpan.id()));
       return consumerSpan;
   }

   public void endTrace(Span span, Stopwatch watch,int code) {
       span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_CR)
               .duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();
       TraceAgent traceAgent = new TraceAgent();
       traceAgent.send(TraceContext.getSpans());
   }
}

創(chuàng)建ZebraServerTracing

ZebraServerTracing用于記錄服務(wù)端的span信息,具體代碼如下:

@Component
public class ZebraServerTracing {
   public Span startTrace(String method) {
       String traceId = (String) RpcContext.getContext().get(TraceContext.TRACE_ID_KEY);
       String parentSpanId = (String) RpcContext.getContext().get(TraceContext.SPAN_ID_KEY);

       String id = IdUtils.get() + "";
       TraceContext.start();
       TraceContext.setTraceId(traceId);
       TraceContext.setSpanId(parentSpanId);

       long timestamp = System.currentTimeMillis() * 1000;
       Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)
               .port(50003).build();
       Span providerSpan = Span.newBuilder().id(id).parentId(parentSpanId).traceId(traceId)
               .name(EtcdRegistry.serviceName).timestamp(timestamp).localEndpoint(endpoint)
               .addAnnotation(timestamp, TraceContext.ANNO_SR).putTag("method", method)
               .putTag("pkgId", RpcContext.getContext().getAttachment("pkg"))
               .build();
       TraceContext.addSpan(providerSpan);
       return providerSpan;
   }

   public void endTrace(Span span, Stopwatch watch,int code) {
       span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_SS)
               .duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();
       TraceAgent traceAgent = new TraceAgent();
       traceAgent.send(TraceContext.getSpans());
   }
}

創(chuàng)建grpc client攔截器

public class HeaderClientInterceptor implements ClientInterceptor {

    private static final Logger log = LogManager.getLogger(HeaderClientInterceptor.class);
    private final ZebraClientTracing clientTracing;
    
    public static ClientInterceptor instance() {
        return new HeaderClientInterceptor();
    }

    private HeaderClientInterceptor() {
        clientTracing = SpringContextUtils.getBean(ZebraClientTracing.class);
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions, Channel next) {
        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            //判斷API網(wǎng)關(guān)是否要打開調(diào)用鏈
            boolean isGatewayTracing = "1".equals(RpcContext.getContext().getAttachment(ZebraConstants.ZEBRA_OPEN_TRACING))?true:false;
            boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY)!=null?true:false;
            Stopwatch watch =null;
            Span span =null;
            
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                if(isSubTracing||isGatewayTracing){
                    span =clientTracing.startTrace(method.getFullMethodName());
                    watch = Stopwatch.createStarted();
                }
                copyThreadLocalToMetadata(headers);
                super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
                    @Override
                    public void onHeaders(Metadata headers) {
                        super.onHeaders(headers);
                    }

                    @Override
                    public void onClose(Status status, Metadata trailers) {
                        super.onClose(status, trailers);
                        if(isSubTracing||isGatewayTracing)
                            clientTracing.endTrace(span, watch,status.getCode().value());
                    }
                }, headers);
            }
        };
    }

    private void copyThreadLocalToMetadata(Metadata headers) {
        Map<String, String> attachments = RpcContext.getContext().getAttachments();
        Map<String, Object> values = RpcContext.getContext().get();
        try {
            if (!attachments.isEmpty()) {
                headers.put(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS, SerializerUtil.toJson(attachments));
            }
            if (!values.isEmpty()) {
                headers.put(GrpcUtil.GRPC_CONTEXT_VALUES, SerializerUtil.toJson(values));
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }
}

創(chuàng)建grpc server攔截器

public class HeaderServerInterceptor implements ServerInterceptor {

    private static final Logger log = LogManager.getLogger(HeaderServerInterceptor.class);

    private final ZebraServerTracing serverTracing;

    public static ServerInterceptor instance() {
        return new HeaderServerInterceptor();
    }

    private HeaderServerInterceptor() {
        serverTracing = SpringContextUtils.getBean(ZebraServerTracing.class);
    }

    @Override
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, final Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {
        return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
            boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY) != null ? true : false;
            Stopwatch watch = null;
            Span span = null;

            @Override
            public void request(int numMessages) {
                if (isSubTracing) {
                    span = serverTracing.startTrace(call.getMethodDescriptor().getFullMethodName());
                    watch = Stopwatch.createStarted();
                }
                InetSocketAddress remoteAddress = (InetSocketAddress) call.getAttributes()
                        .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                RpcContext.getContext().setAttachment(ZebraConstants.REMOTE_ADDRESS, remoteAddress.getHostString());
                copyMetadataToThreadLocal(headers);
                log.debug("FullMethodName:{},RemoteAddress={},attachments={},context={}",
                        call.getMethodDescriptor().getFullMethodName(), remoteAddress.getHostString(),
                        headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS), headers.get(GrpcUtil.GRPC_CONTEXT_VALUES));
                super.request(numMessages);
            }

            @Override
            public void close(Status status, Metadata trailers) {
                delegate().close(status, trailers);
                if(isSubTracing)
                    serverTracing.endTrace(span, watch,status.getCode().value());
            }

        }, headers);
    }

    private void copyMetadataToThreadLocal(Metadata headers) {
        String attachments = headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS);
        String values = headers.get(GrpcUtil.GRPC_CONTEXT_VALUES);
        try {
            if (attachments != null) {
                Map<String, String> attachmentsMap = SerializerUtil.fromJson(attachments,
                        new TypeToken<Map<String, String>>() {
                        }.getType());
                RpcContext.getContext().setAttachments(attachmentsMap);
            }
            if (values != null) {
                Map<String, Object> valuesMap = SerializerUtil.fromJson(values, new TypeToken<Map<String, Object>>() {
                }.getType());
                for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
                    RpcContext.getContext().set(entry.getKey(), entry.getValue());
                }
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容