Skywalking源碼研究之上報(bào)和采集

關(guān)于Agent端的上報(bào)和OAP的采集,Skywalking技術(shù)比較成熟,重點(diǎn)關(guān)注插件在攔截器中如何進(jìn)行上報(bào)

ContextManager

插件進(jìn)行數(shù)據(jù)上報(bào)(上傳span),主要是通過ContextManager提供的API完成的,重點(diǎn)方法如下

beforeMethod中常用的方法:

  • ContextManager#createEntrySpan 創(chuàng)建入口span
  • ContextManager#createLocalSpan 創(chuàng)建本地span
  • ContextManager#createExitSpan 創(chuàng)建出口span
  • Tags#URL#set 設(shè)置span的url屬性
  • span#setComponent 設(shè)置span的component屬性
  • SpanLayer#asRPCFramework 標(biāo)記span為RPC
  • SpanLayer#asHttp 標(biāo)記span為http
  • 等等

注意:beforeMethod調(diào)用的各種span的create方法并沒有實(shí)際的進(jìn)行數(shù)據(jù)上報(bào),只是暫存在ThreadLocal中

實(shí)際觸發(fā)上報(bào)一般是在afterMethod中,常用的方法

  • ContextManager#stopSpan

比較直觀,即在方法結(jié)束時(shí)標(biāo)記span結(jié)束,重點(diǎn)是該方法內(nèi)部會(huì)調(diào)用TracingContext#finish方法,這個(gè)方法會(huì)實(shí)際觸發(fā)數(shù)據(jù)上報(bào)

以Dubbo插件為例,看一下插件對(duì)上報(bào)API的使用

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                         MethodInterceptResult result) throws Throwable {
    // dubbo的rpcContext,使用它可以通過網(wǎng)絡(luò)傳遞附件,這里就是傳遞上下文載體
    RpcContext rpcContext = RpcContext.getContext();
    
    if (isConsumer) {// 消費(fèi)方
        final ContextCarrier contextCarrier = new ContextCarrier();
        // 調(diào)用createExitSpan創(chuàng)建出口span
        span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
        // 使用附件傳輸上下文載體
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            rpcContext.setAttachment(next.getHeadKey(), next.getHeadValue());
        }
    } else { // 服務(wù)方
        ContextCarrier contextCarrier = new ContextCarrier();
        // 跟據(jù)附件獲取上下文載體
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
        }
        // 調(diào)用createEntrySpan創(chuàng)建入口span
        span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
        span.setPeer(rpcContext.getRemoteAddressString()); // span設(shè)置peer屬性
    }

    Tags.URL.set(span, generateRequestURL(requestURL, invocation)); // 設(shè)置url屬性
    collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation); // 收集參數(shù)
    span.setComponent(ComponentsDefine.DUBBO); // 設(shè)置component屬性
    SpanLayer.asRPCFramework(span); // 設(shè)置layer屬性
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                          Object ret) throws Throwable {
    
    // 標(biāo)記span結(jié)束
    ContextManager.stopSpan();
    return ret;
}

數(shù)據(jù)上報(bào)

finish

上文提到ContextManager#stopSpan方法會(huì)觸發(fā)數(shù)據(jù)上報(bào),因?yàn)閮?nèi)部會(huì)調(diào)用TracingContext#finish方法:

private void finish() {
    // 正在運(yùn)行的span是否為空,即segment下的span全部完成
    boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
    if (isFinishedInMainThread) { // 如果已完成
        // 發(fā)布segment完成通知
        TracingThreadListenerManager.notifyFinish(this); 
    }
}

可以看到span在stop之后并不一定會(huì)上報(bào),而是整個(gè)segment下的span全部完成才觸發(fā)上報(bào),并且使用發(fā)布監(jiān)聽模式通知消費(fèi)者處理,其中消費(fèi)者就是TraceSegmentServiceClient

TraceSegmentServiceClient

TraceSegmentServiceClient接收到segment結(jié)束信號(hào)后,同樣不是直接建立網(wǎng)絡(luò)連接上報(bào),而是將消息發(fā)給了skywalking內(nèi)部的消息隊(duì)列

@Override
public void afterFinished(TraceSegment traceSegment) {
    if (!carrier.produce(traceSegment)) { // 發(fā)布到carrier消息隊(duì)列
        ...
    }
}

Agent 收集到 Trace 數(shù)據(jù)后,不是寫入外部消息隊(duì)列( 例如,Kafka )或者日志文件,而是 Agent寫入內(nèi)存消息隊(duì)列,后臺(tái)線程【異步】發(fā)送給 Collector

這個(gè)消息隊(duì)列就是skywalking自實(shí)現(xiàn)的:DataCarrier

同時(shí)TraceSegmentServiceClient本身即是消息的發(fā)布者,又是消費(fèi)者

// TraceSegmentServiceClient的初始化方法
public void boot() {
    ...
    carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE); // 創(chuàng)建消息隊(duì)列
    carrier.consume(this, 1); // 消費(fèi)者就是this, 即TraceSegmentServiceClient本身
}

而消費(fèi)的方法就是TraceSegmentServiceClient#consume方法

@Override
public void consume(List<TraceSegment> data) { // 參數(shù)就是Segment,因?yàn)槭褂藐?duì)列,可能是多個(gè)
    // 使用Grpc包工具
    final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
    StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
        Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
    ).collect(new StreamObserver<Commands>() {
        ...
    });
    
    
    try {
        // 循環(huán)流式上報(bào)segment
        for (TraceSegment segment : data) {
            SegmentObject upstreamSegment = segment.transform();
            upstreamSegmentStreamObserver.onNext(upstreamSegment);
        }
    } catch (Throwable t) {
        LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
    }
    // 結(jié)束
    upstreamSegmentStreamObserver.onCompleted();
}

這里的upstreamSegmentStreamObserver工具是使用grpc-stub的grpc協(xié)議

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>${grpc.version}</version>
</dependency>

使用流式數(shù)據(jù)傳輸?shù)姆绞綄?shí)現(xiàn)數(shù)據(jù)上報(bào),上傳的對(duì)象SegmentObject

OAP采集

OAP采集器主要在server-receiver-plugin包中,依然是使用插件的方式應(yīng)對(duì)不同數(shù)據(jù)的采集,比如

  • skywalking-trace-receiver-plugin 分布式鏈路采集
  • skywalking-jvm-receiver-plugin jvm采集
  • 等等大概10幾個(gè)采集插件

這里主要分析分布式鏈路采集插件:skywalking-trace-receiver-plugin

TraceSegmentReportServiceHandler

分布式鏈路trace的采集主要入口TraceSegmentReportServiceHandler#collect方法,接受流式數(shù)據(jù)對(duì)象SegmentObject(對(duì)應(yīng)了Agent端的上傳對(duì)象)

@Override
public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
    // 使用grpc.stub的服務(wù)端
    return new StreamObserver<SegmentObject>() {
        // 收到數(shù)據(jù)的處理
        @Override
        public void onNext(SegmentObject segment) {
            try {
                // 調(diào)用segmentParserService.send
                segmentParserService.send(segment);
            } catch (Exception e) {
                ...
            }
        }
        ...
    };
}

具體的Segment信息處理交由segmentParserService#send

SegmentParse

Segment信息的解析器,send方法如下

public void send(SegmentObject segment) {
    final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
    traceAnalyzer.doAnalysis(segment);
}

轉(zhuǎn)交給TraceAnalyzer分析并處理

TraceAnalyzer

其中doAnalysis負(fù)責(zé)分析Segment對(duì)象,方法如下

// 解析SegmentO對(duì)象,基本都是訂閱發(fā)布模式
public void doAnalysis(SegmentObject segmentObject) {
    if (segmentObject.getSpansList().size() == 0) {
        return;
    }

    createSpanListeners();

    notifySegmentListener(segmentObject);

    // 循環(huán)所有的span
    segmentObject.getSpansList().forEach(spanObject -> {
        if (spanObject.getSpanId() == 0) {
            // 解析第一個(gè) Span
            notifyFirstListener(spanObject, segmentObject);
        }

        if (SpanType.Exit.equals(spanObject.getSpanType())) {
            // 解析出口Span
            notifyExitListener(spanObject, segmentObject);
        } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
            // 解析入口Span
            notifyEntryListener(spanObject, segmentObject);
        } else if (SpanType.Local.equals(spanObject.getSpanType())) {
            // 解析本地Span
            notifyLocalListener(spanObject, segmentObject);
        } else {
            log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                      .name());
        }
    });
    
    // 通知 Span 監(jiān)聽器們,執(zhí)行構(gòu)建各自的數(shù)據(jù)
    notifyListenerToBuild();
}

總結(jié)就是通知 Span 監(jiān)聽器們,去構(gòu)建各自的數(shù)據(jù),經(jīng)過流式處理,最終存儲(chǔ)到存儲(chǔ)器。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容