關(guān)于Agent端的上報(bào)和OAP的采集,Skywalking技術(shù)比較成熟,重點(diǎn)關(guān)注插件在攔截器中如何進(jìn)行上報(bào)
ContextManager
插件進(jìn)行數(shù)據(jù)上報(bào)(上傳span),主要是通過ContextManager提供的API完成的,重點(diǎn)方法如下
在beforeMethod中常用的方法:
-
ContextManager#createEntrySpan創(chuàng)建入口span -
ContextManager#createLocalSpan創(chuàng)建本地span -
ContextManager#createExitSpan創(chuàng)建出口span -
Tags#URL#set設(shè)置span的url屬性 -
span#setComponent設(shè)置span的component屬性 -
SpanLayer#asRPCFramework標(biāo)記span為RPC -
SpanLayer#asHttp標(biāo)記span為http - 等等
注意:beforeMethod調(diào)用的各種span的create方法并沒有實(shí)際的進(jìn)行數(shù)據(jù)上報(bào),只是暫存在ThreadLocal中
實(shí)際觸發(fā)上報(bào)一般是在afterMethod中,常用的方法
ContextManager#stopSpan
比較直觀,即在方法結(jié)束時(shí)標(biāo)記span結(jié)束,重點(diǎn)是該方法內(nèi)部會(huì)調(diào)用TracingContext#finish方法,這個(gè)方法會(huì)實(shí)際觸發(fā)數(shù)據(jù)上報(bào)
以Dubbo插件為例,看一下插件對(duì)上報(bào)API的使用
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
// dubbo的rpcContext,使用它可以通過網(wǎng)絡(luò)傳遞附件,這里就是傳遞上下文載體
RpcContext rpcContext = RpcContext.getContext();
if (isConsumer) {// 消費(fèi)方
final ContextCarrier contextCarrier = new ContextCarrier();
// 調(diào)用createExitSpan創(chuàng)建出口span
span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
// 使用附件傳輸上下文載體
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
rpcContext.setAttachment(next.getHeadKey(), next.getHeadValue());
}
} else { // 服務(wù)方
ContextCarrier contextCarrier = new ContextCarrier();
// 跟據(jù)附件獲取上下文載體
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
}
// 調(diào)用createEntrySpan創(chuàng)建入口span
span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
span.setPeer(rpcContext.getRemoteAddressString()); // span設(shè)置peer屬性
}
Tags.URL.set(span, generateRequestURL(requestURL, invocation)); // 設(shè)置url屬性
collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation); // 收集參數(shù)
span.setComponent(ComponentsDefine.DUBBO); // 設(shè)置component屬性
SpanLayer.asRPCFramework(span); // 設(shè)置layer屬性
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
// 標(biāo)記span結(jié)束
ContextManager.stopSpan();
return ret;
}
數(shù)據(jù)上報(bào)
finish
上文提到ContextManager#stopSpan方法會(huì)觸發(fā)數(shù)據(jù)上報(bào),因?yàn)閮?nèi)部會(huì)調(diào)用TracingContext#finish方法:
private void finish() {
// 正在運(yùn)行的span是否為空,即segment下的span全部完成
boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
if (isFinishedInMainThread) { // 如果已完成
// 發(fā)布segment完成通知
TracingThreadListenerManager.notifyFinish(this);
}
}
可以看到span在stop之后并不一定會(huì)上報(bào),而是整個(gè)segment下的span全部完成才觸發(fā)上報(bào),并且使用發(fā)布監(jiān)聽模式通知消費(fèi)者處理,其中消費(fèi)者就是TraceSegmentServiceClient
TraceSegmentServiceClient
TraceSegmentServiceClient接收到segment結(jié)束信號(hào)后,同樣不是直接建立網(wǎng)絡(luò)連接上報(bào),而是將消息發(fā)給了skywalking內(nèi)部的消息隊(duì)列
@Override
public void afterFinished(TraceSegment traceSegment) {
if (!carrier.produce(traceSegment)) { // 發(fā)布到carrier消息隊(duì)列
...
}
}
Agent 收集到 Trace 數(shù)據(jù)后,不是寫入外部消息隊(duì)列( 例如,Kafka )或者日志文件,而是 Agent寫入內(nèi)存消息隊(duì)列,后臺(tái)線程【異步】發(fā)送給 Collector
這個(gè)消息隊(duì)列就是skywalking自實(shí)現(xiàn)的:DataCarrier
同時(shí)TraceSegmentServiceClient本身即是消息的發(fā)布者,又是消費(fèi)者
// TraceSegmentServiceClient的初始化方法
public void boot() {
...
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE); // 創(chuàng)建消息隊(duì)列
carrier.consume(this, 1); // 消費(fèi)者就是this, 即TraceSegmentServiceClient本身
}
而消費(fèi)的方法就是TraceSegmentServiceClient#consume方法
@Override
public void consume(List<TraceSegment> data) { // 參數(shù)就是Segment,因?yàn)槭褂藐?duì)列,可能是多個(gè)
// 使用Grpc包工具
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
...
});
try {
// 循環(huán)流式上報(bào)segment
for (TraceSegment segment : data) {
SegmentObject upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
} catch (Throwable t) {
LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
}
// 結(jié)束
upstreamSegmentStreamObserver.onCompleted();
}
這里的upstreamSegmentStreamObserver工具是使用grpc-stub的grpc協(xié)議
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
使用流式數(shù)據(jù)傳輸?shù)姆绞綄?shí)現(xiàn)數(shù)據(jù)上報(bào),上傳的對(duì)象SegmentObject
OAP采集
OAP采集器主要在server-receiver-plugin包中,依然是使用插件的方式應(yīng)對(duì)不同數(shù)據(jù)的采集,比如
-
skywalking-trace-receiver-plugin分布式鏈路采集 -
skywalking-jvm-receiver-pluginjvm采集 - 等等大概10幾個(gè)采集插件
這里主要分析分布式鏈路采集插件:skywalking-trace-receiver-plugin
TraceSegmentReportServiceHandler
分布式鏈路trace的采集主要入口TraceSegmentReportServiceHandler#collect方法,接受流式數(shù)據(jù)對(duì)象SegmentObject(對(duì)應(yīng)了Agent端的上傳對(duì)象)
@Override
public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
// 使用grpc.stub的服務(wù)端
return new StreamObserver<SegmentObject>() {
// 收到數(shù)據(jù)的處理
@Override
public void onNext(SegmentObject segment) {
try {
// 調(diào)用segmentParserService.send
segmentParserService.send(segment);
} catch (Exception e) {
...
}
}
...
};
}
具體的Segment信息處理交由segmentParserService#send
SegmentParse
Segment信息的解析器,send方法如下
public void send(SegmentObject segment) {
final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
traceAnalyzer.doAnalysis(segment);
}
轉(zhuǎn)交給TraceAnalyzer分析并處理
TraceAnalyzer
其中doAnalysis負(fù)責(zé)分析Segment對(duì)象,方法如下
// 解析SegmentO對(duì)象,基本都是訂閱發(fā)布模式
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}
createSpanListeners();
notifySegmentListener(segmentObject);
// 循環(huán)所有的span
segmentObject.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
// 解析第一個(gè) Span
notifyFirstListener(spanObject, segmentObject);
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
// 解析出口Span
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
// 解析入口Span
notifyEntryListener(spanObject, segmentObject);
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
// 解析本地Span
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});
// 通知 Span 監(jiān)聽器們,執(zhí)行構(gòu)建各自的數(shù)據(jù)
notifyListenerToBuild();
}
總結(jié)就是通知 Span 監(jiān)聽器們,去構(gòu)建各自的數(shù)據(jù),經(jīng)過流式處理,最終存儲(chǔ)到存儲(chǔ)器。