SkyWalking之Tracing

上文我們說到作為一個全鏈路監(jiān)控系統(tǒng),Tracing是必不可少的部分,今天我們從客戶端和服務端的源代碼角度分析一下SkyWalking如何實現(xiàn)這塊的。

Agent 代理原理

在介紹Traceing真正的實現(xiàn)之前,我們有必要知道SkyWalking是如何幫助我們完成agent代理的,從上面的文章我們知道,agent的入口是SkyWalkingAgent,我們之前提到了PluginFinder這個加載類,其實它是用來加載agent代理類的總管。如果需要給不同的框架或者代碼增加埋點入口,我們需要對框架的代碼或者原理非常了解才行,這樣才能最大限度抓取到我們需要的trace信息,舉例來說,如果我們需要抓取慢SQL,你使用的是mysql驅(qū)動,那么需要在創(chuàng)建preparestatement或者statement的時候最好,SkyWalking為我們準備CreatePreparedStatementInterceptor和CreateStatementInterceptor兩個解析類,用來攔截ConnectionImpl 的prepareStatement和createStatement方法。

  • 這兩個解析類的基類是InstanceMethodsAroundInterceptor,這個類是skywalking用來攔截非靜態(tài)類的入口基類,這些

  • 對于靜態(tài)類的攔截積基類是StaticMethodsInterceptPoint,比如ConnectionCreateInterceptor,這個類是為了攔截com.mysql.jdbc.ConnectionImpl.getInstance() 這個靜態(tài)方法而準備的。

那么這些解析器什么時候使用呢?那么就涉及到這些攔截類的加載,SkyWalking使用攔截類的加載基類AbstractClassEnhancePluginDefine來加載這些攔截類,這個類對攔截類的加載又分為三類

  • 構造器方法 AbstractClassEnhancePluginDefine
  • 實例方法 InstanceMethodsInterceptPoint
  • 靜態(tài)方法 StaticMethodsInterceptPoint

這個類還有其它方法,define,enhanceInstance,enhanceClass,因為我們最終目的是對實例或者靜態(tài)方法進行增強,所以這三個方法分別就是做這兩件事情的,最要入口方法define,是返回的是net.bytebuddy.dynamic.DynamicType.Builder,從名字我們看出這個是bytebuddy的,如果不了解bytebuddy是如何對類進行增加的可以參考我之前寫的文章。

我們再次回到SkyWalkingAgent的PluginFinder插件查找類,SkyWalking基于插件擴展開發(fā)框架方便我們定義攔截類,這樣設計非常靈活,我們來看一下PluginFinder的初始化過程:

pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());

入口參數(shù)是通過new PluginBootstrap().loadPlugins()來加載所有插件,PluginBootstrap使用PluginResourcesResolver加載所有插件目錄,PluginResourcesResolver最終調(diào)用的是AgentClassLoader從所有插件里面的resource目錄找到skywalking-plugin.def的攔截定義類的定義文件,那么AgentClassLoader是從哪些目錄加載這些插件的呢?我們知道,AgentClassLoader是SkyWalking自定的ClassLoader加載器,重寫的findResource方法,這個方法其實返回就是啟動目錄的父目錄,所以啟動目錄的所有子目錄來找到插件定義文件,其實主要的插件定義都在Plugin目錄。

找到所有插件定義類之后PluginFinder現(xiàn)在持有了所有插件定義類,它將增強的準備工作交給了BootstrapInstrumentBoost.inject方法處理,這個方法主要是為了創(chuàng)建bytebuddy的 AgentBuilder,真正做事的方法是prepareJREInstrumentation

    /**
     * Generate dynamic delegate for ByteBuddy
     *
     * @param pluginFinder   gets the whole plugin list.
     * @param classesTypeMap hosts the class binary.
     * @return true if have JRE instrumentation requirement.
     * @throws PluginException when generate failure.
     */
    private static boolean prepareJREInstrumentation(PluginFinder pluginFinder,
        Map<String, byte[]> classesTypeMap) throws PluginException {
        TypePool typePool = TypePool.Default.of(BootstrapInstrumentBoost.class.getClassLoader());
        List<AbstractClassEnhancePluginDefine> bootstrapClassMatchDefines = pluginFinder.getBootstrapClassMatchDefine();
        for (AbstractClassEnhancePluginDefine define : bootstrapClassMatchDefines) {
            if (Objects.nonNull(define.getInstanceMethodsInterceptPoints())) {
                for (InstanceMethodsInterceptPoint point : define.getInstanceMethodsInterceptPoints()) {
                    if (point.isOverrideArgs()) {
                        generateDelegator(
                            classesTypeMap, typePool, INSTANCE_METHOD_WITH_OVERRIDE_ARGS_DELEGATE_TEMPLATE, point
                                .getMethodsInterceptor());
                    } else {
                        generateDelegator(
                            classesTypeMap, typePool, INSTANCE_METHOD_DELEGATE_TEMPLATE, point.getMethodsInterceptor());
                    }
                }
            }

            if (Objects.nonNull(define.getConstructorsInterceptPoints())) {
                for (ConstructorInterceptPoint point : define.getConstructorsInterceptPoints()) {
                    generateDelegator(
                        classesTypeMap, typePool, CONSTRUCTOR_DELEGATE_TEMPLATE, point.getConstructorInterceptor());
                }
            }

            if (Objects.nonNull(define.getStaticMethodsInterceptPoints())) {
                for (StaticMethodsInterceptPoint point : define.getStaticMethodsInterceptPoints()) {
                    if (point.isOverrideArgs()) {
                        generateDelegator(
                            classesTypeMap, typePool, STATIC_METHOD_WITH_OVERRIDE_ARGS_DELEGATE_TEMPLATE, point
                                .getMethodsInterceptor());
                    } else {
                        generateDelegator(
                            classesTypeMap, typePool, STATIC_METHOD_DELEGATE_TEMPLATE, point.getMethodsInterceptor());
                    }
                }
            }
        }
        return bootstrapClassMatchDefines.size() > 0;
    }

上面的代碼主要是分三步,主要是我們上面介紹的AbstractClassEnhancePluginDefine的三個方法增強

  • 調(diào)用getInstanceMethodsInterceptPoints 獲取到定義類,通過 point.getMethodsInterceptor拿到攔截類generateDelegator和對實例方法進行增強
  • 調(diào)用getConstructorsInterceptPoints獲取到定義類,通過point.getConstructorInterceptor拿到攔截類generateDelegator和對實例方法進行增強
  • 調(diào)用getStaticMethodsInterceptPoints獲取到定義類,通過point.getMethodsInterceptor拿到攔截類generateDelegator和對實例方法進行增強

其實真正的增加是在SkyWalkingAgent的內(nèi)部類Transformer處理,這個類繼承bytebuddy AgentBuilder.Transformer實現(xiàn),本質(zhì)是調(diào)用我們剛才介紹的AbstractClassEnhancePluginDefine的define類完成實質(zhì)性的代碼增強


@Override
public DynamicType.Builder<?> transform(final DynamicType.Builder<?> builder,final TypeDescription typeDescription,final ClassLoader classLoader,final JavaModule module) {
        LoadedLibraryCollector.registerURLClassLoader(classLoader);
        List<AbstractClassEnhancePluginDefine> pluginDefines = pluginFinder.find(typeDescription);
        if (pluginDefines.size() > 0) {
            DynamicType.Builder<?> newBuilder = builder;
            EnhanceContext context = new EnhanceContext();
            for (AbstractClassEnhancePluginDefine define : pluginDefines) {
                DynamicType.Builder<?> possibleNewBuilder = define.define(
                    typeDescription, newBuilder, classLoader, context);
                    if (possibleNewBuilder != null) {
                        newBuilder = possibleNewBuilder;
                    }
            }
            if (context.isEnhanced()) {
                LOGGER.debug("Finish the prepare stage for {}.", typeDescription.getName());
            }

            return newBuilder;
        }

        LOGGER.debug("Matched class {}, but ignore by finding mechanism.", typeDescription.getTypeName());
        return builder;
}

最后我們來看一下真正增強實例類的方法org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine#enhanceInstance


// ...方法太長,自己看一下
/**
 * Manipulate class source code.<br/>
 *
 * new class need:<br/>
 * 1.Add field, name {@link #CONTEXT_ATTR_NAME}.
 * 2.Add a field accessor for this field.
 *
 * And make sure the source codes manipulation only occurs once.
 *
 */
if (!typeDescription.isAssignableTo(EnhancedInstance.class)) {
    if (!context.isObjectExtended()) {
        newClassBuilder = newClassBuilder.defineField(
           CONTEXT_ATTR_NAME, Object.class, ACC_PRIVATE | ACC_VOLATILE)
                          .implement(EnhancedInstance.class)
                                    .intercept(FieldAccessor.ofField(CONTEXT_ATTR_NAME));
        context.extendObjectCompleted();
    }
}
// 其它邏輯
 newClassBuilder = newClassBuilder.method(junction)
 .intercept(MethodDelegation.withDefaultConfiguration()
 .to(new InstMethodsInter(interceptor, classLoader)));

// ...其它邏輯

方法太長,我們只關注主重要的一些代碼,第一塊是讓目標類實現(xiàn)EnhancedInstance接口,在目標方法里面定義一個名稱是CONTEXT_ATTR_NAME即_$EnhancedClassField_ws的字段, 定義getSkyWalkingDynamicField() 和setSkyWalkingDynamicField() 兩個方法,分別讀寫新增的_$EnhancedClassField_ws 字段,這個很重要,是用來承載Tracing信息的字段,下面一行是使用bytebuddy 方法攔截類InstMethodsInter,bytebuddy幫我們調(diào)用這個攔截類的intercept方法


/**
     * Intercept the target instance method.
     *
     * @param obj          target class instance.
     * @param allArguments all method arguments
     * @param method       method description.
     * @param zuper        the origin call ref.
     * @return the return value of target instance method.
     * @throws Exception only throw exception because of zuper.call() or unexpected exception in sky-walking ( This is a
     *                   bug, if anything triggers this condition ).
     */
    @RuntimeType
    public Object intercept(@This Object obj, @AllArguments Object[] allArguments, @SuperCall Callable<?> zuper,
        @Origin Method method) throws Throwable {
        EnhancedInstance targetObject = (EnhancedInstance) obj;

        MethodInterceptResult result = new MethodInterceptResult();
        try {
            interceptor.beforeMethod(targetObject, method, allArguments, method.getParameterTypes(), result);
        } catch (Throwable t) {
            LOGGER.error(t, "class[{}] before method[{}] intercept failure", obj.getClass(), method.getName());
        }

        Object ret = null;
        try {
            if (!result.isContinue()) {
                ret = result._ret();
            } else {
                ret = zuper.call();
            }
        } catch (Throwable t) {
            try {
                interceptor.handleMethodException(targetObject, method, allArguments, method.getParameterTypes(), t);
            } catch (Throwable t2) {
                LOGGER.error(t2, "class[{}] handle method[{}] exception failure", obj.getClass(), method.getName());
            }
            throw t;
        } finally {
            try {
                ret = interceptor.afterMethod(targetObject, method, allArguments, method.getParameterTypes(), ret);
            } catch (Throwable t) {
                LOGGER.error(t, "class[{}] after method[{}] intercept failure", obj.getClass(), method.getName());
            }
        }
        return ret;
    }

從代碼可以看出,主要是調(diào)用SkyWalking定義的攔截類實例 基類是InstanceMethodsAroundInterceptor,比如我們上面提到的CreatePreparedStatementInterceptor和CreateStatementInterceptor

Tracing上報解析

我們還是以上面的慢SQL上報為例進行說明,上面我們說到增加類會繼承接口EnhancedInstance,在JDBC執(zhí)行的過程中,SkyWalking分別對Connection,PreparedStatement或者createStatement方法進行增強,最后對PreparedStatement的executeQuery,executeUpdate executeLargeUpdate增強的org.apache.skywalking.apm.plugin.jdbc.mysql.PreparedStatementExecuteMethodsInterceptor或Statement的executeQuery,executeUpdate,executeLargeUpdate,executeBatchInternal,executeUpdateInternal,executeQuery,executeBatch方法進行增強的org.apache.skywalking.apm.plugin.jdbc.mysql.StatementExecuteMethodsInterceptor,前面對Connection,PreparedStatement增加主要是為了將鏈接信息,SQL參數(shù)信息放到上下文進行傳遞,最后PreparedStatementExecuteMethodsInterceptor或者StatementExecuteMethodsInterceptor進行上報處理,我們以PreparedStatementExecuteMethodsInterceptor為例來看一下代碼


@Override
public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) {
        StatementEnhanceInfos cacheObject = (StatementEnhanceInfos) objInst.getSkyWalkingDynamicField();
        /**
         * For avoid NPE. In this particular case, Execute sql inside the {@link com.mysql.jdbc.ConnectionImpl} constructor,
         * before the interceptor sets the connectionInfo.
         * When invoking prepareCall, cacheObject is null. Because it will determine procedures's parameter types by executing sql in mysql 
         * before the interceptor sets the statementEnhanceInfos.
         * @see JDBCDriverInterceptor#afterMethod(EnhancedInstance, Method, Object[], Class[], Object)
         */
        if (cacheObject != null && cacheObject.getConnectionInfo() != null) {
            ConnectionInfo connectInfo = cacheObject.getConnectionInfo();
            AbstractSpan span = ContextManager.createExitSpan(
                buildOperationName(connectInfo, method.getName(), cacheObject
                    .getStatementName()), connectInfo.getDatabasePeer());
            Tags.DB_TYPE.set(span, "sql");
            Tags.DB_INSTANCE.set(span, connectInfo.getDatabaseName());
            Tags.DB_STATEMENT.set(span, SqlBodyUtil.limitSqlBodySize(cacheObject.getSql()));
            span.setComponent(connectInfo.getComponent());

            if (JDBCPluginConfig.Plugin.JDBC.TRACE_SQL_PARAMETERS) {
                final Object[] parameters = cacheObject.getParameters();
                if (parameters != null && parameters.length > 0) {
                    int maxIndex = cacheObject.getMaxIndex();
                    String parameterString = getParameterString(parameters, maxIndex);
                    SQL_PARAMETERS.set(span, parameterString);
                }
            }

            SpanLayer.asDB(span);
        }
}

@Override
public final Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,Class<?>[] argumentsTypes, Object ret) {
    StatementEnhanceInfos cacheObject = (StatementEnhanceInfos) objInst.getSkyWalkingDynamicField();
    if (cacheObject != null && cacheObject.getConnectionInfo() != null) {
            ContextManager.stopSpan();
    }

    return ret;
}

@Override
public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,Class<?>[] argumentsTypes, Throwable t) {
    StatementEnhanceInfos cacheObject = (StatementEnhanceInfos) objInst.getSkyWalkingDynamicField();
    if (cacheObject != null && cacheObject.getConnectionInfo() != null) {
        ContextManager.activeSpan().log(t);
    }
}

在解釋上面的代碼之前,首先我們來了解幾個概念:

Span

Span 分為 3 類:

  • EntrySpan:當請求進入服務時會創(chuàng)建 EntrySpan 類型的 Span,它也是 TraceSegment 中的第一個 Span。例如,HTTP 服務、RPC 服務、MQ-Consumer 等入口服務的插件在接收到請求時都會創(chuàng)建相應的 EntrySpan。
  • LocalSpan:它是在本地方法調(diào)用時可能創(chuàng)建的 Span 類型,在后面介紹 @Trace 注解的時候我們還會看到 LocalSpan。
  • ExitSpan:當請求離開當前服務、進入其他服務時會創(chuàng)建 ExitSpan 類型的 Span。例如, Http Client 、RPC Client 發(fā)起遠程調(diào)用或是 MQ-producer 生產(chǎn)消息時,都會產(chǎn)生該類型的 Span。

它們都繼承自AbstractSpan ,其主要的方法有:

  • setComponent() 方法:用于設置組件類型。它有兩個重載,在 AbstractTracingSpan 實現(xiàn)中,有 componentId 和 componentName 兩個字段,兩個重載分別用于設置這兩個字段。在 ComponentsDefine 中可以找到 SkyWalking 目前支持的組件類型。
  • setLayer() 方法:用于設置 SpanLayer,也就是當前 Span 所處的位置。SpanLayer 是個枚舉,可選項有 DB、RPC_FRAMEWORK、HTTP、MQ、CACHE。
  • tag(AbstractTag, String) 方法:用于為當前 Span 添加鍵值對的 Tags。一個 Span 可以有多個 Tags。AbstractTag 中不僅包含了 String 類型的 Key 值,還包含了 Tag 的 ID 以及 canOverwrite 標識。AbstractTracingSpan 實現(xiàn)通過維護一個 List<TagValuePair> 集合(tags 字段)來記錄 Tag 信息,TagValuePair 中則封裝了 AbstractTag 類型的 Key 以及 String 類型的 Value。
  • log() 方法:用于向當前 Span 中添加 Log,一個 Span 可以包含多條日志。在 AbstractTracingSpan 實現(xiàn)中通過維護一個 List<LogDataEntity> 集合(logs 字段)來記錄 Log。LogDataEntity 會記錄日志的時間戳以及 KV 信息,以異常日志為例,其中就會包含一個 Key 為“stack”的 KV,其 value 為異常堆棧。
  • start() 方法:開啟 Span,其中會設置當前 Span 的開始時間以及調(diào)用層級等信息。
  • isEntry() 方法:判斷當前是否是 EntrySpan。
  • isExit() 方法:判斷當前是否是 ExitSpan。
  • ref() 方法:用于設置關聯(lián)的 TraceSegment 。

TraceSegment

在 SkyWalking 中,TraceSegment 是一個介于 Trace 與 Span 之間的概念,它是一條 Trace 的一段,可以包含多個 Span。在微服務架構中,一個請求基本都會涉及跨進程(以及跨線程)的操作,例如, RPC 調(diào)用、通過 MQ 異步執(zhí)行、HTTP 請求遠端資源等,處理一個請求就需要涉及到多個服務的多個線程。TraceSegment 記錄了一個請求在一個線程中的執(zhí)行流程(即 Trace 信息)。將該請求關聯(lián)的 TraceSegment 串聯(lián)起來,就能得到該請求對應的完整 Trace。

Context

SkyWalking 中的每個 TraceSegment 都與一個 Context 上下文對象一對一綁定,Context 上下文不僅記錄了 TraceSegment 的上下文信息,還提供了管理 TraceSegment 生命周期、創(chuàng)建 Span 以及跨進程(跨線程)傳播相關的功能。

AbstractTracerContext 是對上下文概念的抽象,其中定義了 Context 上下文的基本行為:

  • inject(ContextCarrier) 方法:在跨進程調(diào)用之前,調(diào)用方會通過 inject() 方法將當前 Context 上下文記錄的全部信息注入到 ContextCarrier 參數(shù)中,Agent 后續(xù)會將 ContextCarrier 序列化并隨遠程調(diào)用進行傳播。

  • extract(ContextCarrier) 方法:跨進程調(diào)用的接收方會反序列化得到 ContextCarrier 對象,然后通過 extract() 方法從 ContextCarrier 中讀取上游傳遞下來的 Trace 信息并記錄到當前的 Context 上下文中。

  • ContextSnapshot capture() 方法:在跨線程調(diào)用之前,SkyWalking Agent 會通過 capture() 方法將當前 Context 進行快照,然后將快照傳遞給其他線程。

  • continued(ContextSnapshot) 方法:跨線程調(diào)用的接收方會從收到的 ContextSnapshot 中讀取 Trace 信息并填充到當前 Context 上下文中。

  • getReadableGlobalTraceId() 方法: 用于獲取當前 Context 關聯(lián)的 TraceId。

  • createEntrySpan()、createLocalSpan() 方法、createExitSpan() 方法:用于創(chuàng)建 Span。

  • activeSpan() 方法:用于獲得當前活躍的 Span。在 TraceSegment 中,Span 也是按照棧的方式進行維護的,因為 Span 的生命周期符合棧的特性,即:先創(chuàng)建的 Span 后結(jié)束。

  • stopSpan(AbstractSpan) 方法:用于停止指定 Span。

AbstractTraceContext 有兩個實現(xiàn)類IgnoredTracerContext,TracingContext,IgnoredTracerContext 表示該 Trace 將會被丟失,所以其中不會記錄任何信息,里面所有方法也都是空實現(xiàn)。這里重點來看 TracingContext,其核心字段如下:

  • samplingService(SamplingService 類型):負責完成 Agent 端的 Trace 采樣。

  • segment(TraceSegment 類型):它是與當前 Context 上下文關聯(lián)的 TraceSegment 對象,在 TracingContext 的構造方法中會創(chuàng)建該對象。

  • activeSpanStack(LinkedList<AbstractSpan> 類型):用于記錄當前 TraceSegment 中所有活躍的 Span(即未關閉的 Span)。實際上 activeSpanStack 字段是作為棧使用的,TracingContext 提供了 push() 、pop() 、peek() 三個標準的棧方法,以及 first() 方法來訪問棧底元素。

  • spanIdGenerator(int 類型):它是 Span ID 自增序列,初始值為 0。該字段的自增操作都是在一個線程中完成的,所以無需加鎖。

結(jié)合上面的解析以及前一篇的介紹,我們知道SkyWalking使用堆棧進行Span管理,EntrySpan為TraceSegment入口,ExitSpan為TraceSegment出口,如果調(diào)用鏈復雜,我們可能會同時用EntrySpan和ExitSpan,但是對于上面的例子,我們只需要創(chuàng)建一個ExitSpan就可以了,所以上面代碼不用解析已經(jīng)不言自明。

那么數(shù)據(jù)是如何上報的呢?我們關注一下afterMethod方法, ContextManager.stopSpan()這個方法最要是調(diào)用org.apache.skywalking.apm.agent.core.context.TracingContext#finish方法


/**
     * Finish this context, and notify all {@link TracingContextListener}s, managed by {@link
     * TracingContext.ListenerManager} and {@link TracingContext.TracingThreadListenerManager}
     */
    private void finish() {
        if (isRunningInAsyncMode) {
            asyncFinishLock.lock();
        }
        try {
            boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
            if (isFinishedInMainThread) {
                /*
                 * Notify after tracing finished in the main thread.
                 */
                TracingThreadListenerManager.notifyFinish(this);
            }

            if (isFinishedInMainThread && (!isRunningInAsyncMode || asyncSpanCounter == 0)) {
                TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
                TracingContext.ListenerManager.notifyFinish(finishedSegment);
                running = false;
            }
        } finally {
            if (isRunningInAsyncMode) {
                asyncFinishLock.unlock();
            }
        }
    }

當 TracingContext 通過 stopSpan() 方法關閉最后一個 Span 時,會調(diào)用 finish() 方法關閉相應的 TraceSegment,與此同時,還會調(diào)用 TracingContext.ListenerManager.notifyFinish() 方法通知所有監(jiān)聽 TracingContext 關閉事件的監(jiān)聽器 —— TracingContextListener,TraceSegmentServiceClient 是 TracingContextListener 接口的實現(xiàn)之一,其主要功能就是在 TraceSegment 結(jié)束時對其進行收集,并發(fā)送到后端的 OAP 集群。TraceSegmentServiceClient 底層維護了一個 DataCarrier 對象,其底層 Channels 默認有 5 個 Buffer,每個 Buffer 長度為 300,使用的是 IF_POSSIBLE 阻塞寫入策略,底層會啟動一個 ConsumerThread 線程。

TraceSegmentServiceClient 作為一個 TracingContextListener 接口的實現(xiàn),會在 notifyFinish() 方法中,將剛剛結(jié)束的 TraceSegment 寫入到 DataCarrier 中緩存。同時,TraceSegmentServiceClient 實現(xiàn)了 IConsumer 接口,封裝了消費 Channels 中數(shù)據(jù)的邏輯,在 consume() 方法中會首先將消費到的 TraceSegment 對象序列化,然后通過 gRPC 請求發(fā)送到后端 OAP 集群,最后我們看一下TraceSegmentServiceClient的consume() 方法


@Override
public void consume(List<TraceSegment> data) {
        if (CONNECTED.equals(status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
                Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
            ).collect(new StreamObserver<Commands>() {
                @Override
                public void onNext(Commands commands) {
                    ServiceManager.INSTANCE.findService(CommandService.class)
                                           .receiveCommand(commands);
                }

                @Override
                public void onError(
                    Throwable throwable) {
                    status.finished();
                    if (LOGGER.isErrorEnable()) {
                        LOGGER.error(
                            throwable,
                            "Send UpstreamSegment to collector fail with a grpc internal exception."
                        );
                    }
                    ServiceManager.INSTANCE
                        .findService(GRPCChannelManager.class)
                        .reportError(throwable);
                }

                @Override
                public void onCompleted() {
                    status.finished();
                }
            });

            try {
                for (TraceSegment segment : data) {
                    SegmentObject upstreamSegment = segment.transform();
                    upstreamSegmentStreamObserver.onNext(upstreamSegment);
                }
            } catch (Throwable t) {
                LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
            }

            upstreamSegmentStreamObserver.onCompleted();

            status.wait4Finish();
            segmentUplinkedCounter += data.size();
        } else {
            segmentAbandonedCounter += data.size();
        }

        printUplinkStatus();
}

注意,TraceSegmentServiceClient 在批量發(fā)送完 UpstreamSegment 數(shù)據(jù)之后,會通過 GRPCStreamServiceStatus 進行自旋等待,直至該批 UpstreamSegment 全部發(fā)送完畢。

下面我們來分析一下TraceSegmentServiceClient在哪里啟動的以及consume是如何調(diào)用的,還記得上篇文章我們分析SkyWalking的微內(nèi)核架構嗎?我們列出了第一個啟動服務類就是TraceSegmentServiceClient,可以看到TraceSegmentServiceClient繼承于BootService的微內(nèi)核服務,這個服務就是用來消費是報數(shù)據(jù)使用。TracingContext將上報數(shù)據(jù)緩存到TraceSegmentServiceClient的DataCarrier,同時DataCarrier持有一個ConsumeDriver對象,這個對象相當于一個線程池,線程池里面實際的工作線程是ConsumerThread,這個繼承于Thread的線程,用來消費實現(xiàn)了org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer接口的實現(xiàn)類,TraceSegmentServiceClient就實現(xiàn)了這個接口,在構造DataCarrier傳了this,ConsumeDriver將DataCarrier傳的 Channels 轉(zhuǎn)為ConsumerThread持有的List<DataSource> 類型數(shù)組對象dataSources,這樣dataSources持有了Channels所持有的QueueBuffer<T>隊列,最后將QueueBuffer<T>隊列里面的元素drainTo到一個List數(shù)組,最終傳給IConsumer接口的實現(xiàn)類進行消費,下面是org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerThread#consume的實現(xiàn)。


    private boolean consume(List<T> consumeList) {
        for (DataSource dataSource : dataSources) {
            dataSource.obtain(consumeList);
        }

        if (!consumeList.isEmpty()) {
            try {
                consumer.consume(consumeList);
            } catch (Throwable t) {
                consumer.onError(consumeList, t);
            } finally {
                consumeList.clear();
            }
            return true;
        }
        consumer.nothingToConsume();
        return false;
    }

最后我們分析一下ContextManager,顧名思意,這是一個管理TraceSegment上報數(shù)據(jù)上下文的類,同樣它也是繼承自BootService,同樣回歸前一篇文章,我們列出的第二個服務器就是它,ContextManager里面的屬性有兩個ThreadLocal數(shù)組ThreadLocal<AbstractTracerContext> 類型的CONTEXT,ThreadLocal<RuntimeContext> 的RUNTIME_CONTEXT,CONTEXT具體的類型就是我們上面提到的IgnoredTracerContext,TracingContext,RUNTIME_CONTEXT用來傳遞trace過程中的中間數(shù)據(jù),我們可以發(fā)現(xiàn)ContextManager的prepare,boot,onComplete,shutdown都是空的,為什么這么設計?我猜測只是借助于初始化過程做一個ThreadLocal的預熱。

TraceSegment是如何填充數(shù)據(jù)的?我們發(fā)現(xiàn)TraceSegment只有archive方法做了數(shù)據(jù)的添加,最后跟蹤到org.apache.skywalking.apm.agent.core.context.TracingContext#stopSpan調(diào)用org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan#finish方法將數(shù)據(jù)裝入TraceSegment

    /**
     * Finish the active Span. When it is finished, it will be archived by the given {@link TraceSegment}, which owners
     * it.
     *
     * @param owner of the Span.
     */
    public boolean finish(TraceSegment owner) {
        this.endTime = System.currentTimeMillis();
        owner.archive(this);
        return true;
    }
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容