skywalking agent 插件開發(fā)基本邏輯

插件工程結(jié)構(gòu)

代碼結(jié)構(gòu)分為以下部分

  1. 定義攔截形式
  2. 實現(xiàn)攔截形式的攔截器
  3. 在 resources 目錄下定義 skywalking-plugin.def 文件, 讓 agent 發(fā)現(xiàn)并加載探針

基本規(guī)范:

  1. 定義攔截形式使用 *Instrumentation 定義
  2. 實現(xiàn)攔截形式使用 *Interceptor

核心api

核心對象的相關(guān)API
本節(jié)主要介紹java探針開發(fā)過程中涉及重要的API使用, 進(jìn)而使用正確的API 完成探針開發(fā)

(1) ContextCarrier#items

在跨進(jìn)程鏈路追蹤的案例中, 我們使用 ContextCarrier#items 完成兩個進(jìn)程鏈路數(shù)據(jù)的管理, 以HTTP請求為例, 我們需要處理一下兩個場景

場景一, 將發(fā)送進(jìn)程的鏈路綁定到header中并通過客戶端發(fā)送出去, 具體代碼如下

CarrierItem next = contextCarrier.items();
while (next.hasNext())) {
    next = next.next();
    httpRequest.setHeader(next.getHeadKey(), next.getHeadValue))
}

常見而, 接受服務(wù)器解析header并將鏈路綁定到本次接受處理中, 具體如下

CarrierItem next = contextCarrier.items();
while (next.hasNext())) {
    next = next.next();
    next.setHeaderValue(request.getHeader(next.getHeadKey())));
}

(2) ContextManager#createEntrySpan

一個應(yīng)用服務(wù)的提供端或服務(wù)端的接收端點, 如web容器的服務(wù)端入口, RPC服務(wù)或消息隊列的消費者, 在被調(diào)用時, 都需要創(chuàng)建 EntrySpan, 這是需要使用 ContextManager#createEntrySpan 完成

ContextManager#createEntrySpan(operationName, contextCarrier)

有兩個關(guān)鍵入?yún)?/p>

  • operationName: 定義 EntrySpan 的操作方法名稱 如 http 接口的請求路徑, 注意, operationName 必須是有窮盡的, , 比如 restful 接口匹配 /path/{id}, 一定不要將id真實值記錄, 因為 skywalking 上報數(shù)據(jù)時, 處于減少 operationName長度和鏈路消息傳輸性能的考慮, 將 operationName 緩存在本地映射字典中, 因此需要保證 operationName 是有窮盡的, 否則導(dǎo)致 map 過大
  • contextCarrier: 為了綁定跨進(jìn)程的追蹤, 需要將上游的追蹤消息通過 ContextCarrier#items綁定到本鏈路中

(3) ContextManager#createExitSpan

在一個應(yīng)用服務(wù)的客戶端或消息隊列生產(chǎn)者的發(fā)送端, 如redis客戶端訪問, mysql查詢, rpc組件請求, 客戶端都需要使用createExitSpan 來創(chuàng)建 ExitSpan

createExitSpan(operationName, contextCarrier, peer);

有三個參數(shù)

  • operationName: 和 EntrySpan 一樣
  • contextCarrier: 為了綁定跨進(jìn)程追蹤, 需要將鏈路信息放入header中, 具體看 ContextCarrierItems()
  • peer: 下游地址: 具體格式為 ip:port, 若系統(tǒng)下游無法下探針, 如 reids,mysql, 則需要將下游地址寫入peer中, 格式為 ip:port,ip:port

定義攔截形式

攔截形式定義一般通過繼承 ClassInstanceMethodsEnhancePluginDefine 實現(xiàn)

  1. 需要增強哪些類, 通過 ClassMatch 類匹配, 支持以下幾種方法
    • byName: 通過類路徑+類名, 通過常量指定, 不要用 *.class.getName()
    • byClassAnnotationMatch: 類注解匹配, 不支持父類繼承注解
    • byHierarchyMatch 父類或接口, 在多層繼承情況會導(dǎo)致多次攔截, 一般不用
  2. 需要增強的構(gòu)造方法切入點, 需要指定以下幾個部分
    • getConstructorMatcher 構(gòu)造方法匹配器
    • getConstructorInterceptor 構(gòu)造方法探針插件攔截器
  3. 需要增強的實例方法切入點 需要指定以下幾個部分
    • getMethodsMatcher 攔截的方法
    • getMethodsInterceptor 方法的攔截器
    • isOverrideArgs 是否重寫參數(shù)
  4. 需要增強的靜態(tài)方法切入點, 靜態(tài)方法基本和實例方法一致

實現(xiàn)攔截形式的攔截器

可以對匹配的方法, 在 執(zhí)行前, 執(zhí)行后, 執(zhí)行異常, 進(jìn)行無侵入的攔截, 通過調(diào)用 agent 核心 api 來完成鏈路追蹤

實例方法攔截器接口 InstanceMethodsAroundInterceptor 定義

public interface InstanceMethodsAroundInterceptor {
    // 方法前
    void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        MethodInterceptResult result) throws Throwable;

    // 方法后
    Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
        Object ret) throws Throwable;

    // 方法異常
    void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
        Class<?>[] argumentsTypes, Throwable t);
}

想要修改入?yún)⑹? 將 isOverrideArgs 改為true, 否則修改參數(shù)不會生效

dubbo-2.7.x 插件實例

攔截形式: DubboInstrumentation 繼承 ClassInstanceMethodsEnhancePluginDefine

// 定義增強的類
@Override
protected ClassMatch enhanceClass() {
    // 通過類名匹配 org.apache.dubbo.monitor.support.MonitorFilter
    return NameMatch.byName(ENHANCE_CLASS);
}

@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
    return new InstanceMethodsInterceptPoint[] {
        new InstanceMethodsInterceptPoint() {
            // 定義攔截的方法, 這里是 MonitorFilter#invoke
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                return named("invoke");
            }

            // 定義實現(xiàn)攔截形式的攔截器 DubboInterceptor
            @Override
            public String getMethodsInterceptor() {
                return INTERCEPT_CLASS;
            }

            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        }
    };
}

攔截方法 DubboInterceptor

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                         MethodInterceptResult result) throws Throwable {
    Invoker invoker = (Invoker) allArguments[0];
    Invocation invocation = (Invocation) allArguments[1];
    RpcContext rpcContext = RpcContext.getContext();
    boolean isConsumer = rpcContext.isConsumerSide();
    URL requestURL = invoker.getUrl();

    AbstractSpan span;

    final String host = requestURL.getHost();
    final int port = requestURL.getPort();

    boolean needCollectArguments;
    int argumentsLengthThreshold;
    if (isConsumer) {
        // 調(diào)用方創(chuàng)建 ExitSpan
        final ContextCarrier contextCarrier = new ContextCarrier();
        span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
        // 將上下文序列化放入 dubbo attachments
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
            if (invocation.getAttachments().containsKey(next.getHeadKey())) {
                invocation.getAttachments().remove(next.getHeadKey());
            }
        }
        needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_CONSUMER_ARGUMENTS;
        argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.CONSUMER_ARGUMENTS_LENGTH_THRESHOLD;
    } else {
        // 將數(shù)據(jù)反序列化回 ContextCarrier
        ContextCarrier contextCarrier = new ContextCarrier();
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
        }
        // 服務(wù)方創(chuàng)建 EntrySpan
        span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
        span.setPeer(rpcContext.getRemoteAddressString());
        needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_PROVIDER_ARGUMENTS;
        argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.PROVIDER_ARGUMENTS_LENGTH_THRESHOLD;
    }

    Tags.URL.set(span, generateRequestURL(requestURL, invocation));
    // 收集參數(shù)
    collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation);
    // 將 span 設(shè)置為 dubbo 以及 RPC 調(diào)用
    span.setComponent(ComponentsDefine.DUBBO);
    SpanLayer.asRPCFramework(span);
}

skywalking-plugin.def 定義

dubbo=org.apache.skywalking.apm.plugin.asf.dubbo.DubboInstrumentation

toolkit 工具箱

截止8.7, 目前 toolkit 支持, 功能上為 agent 提供各豐富的自定義實現(xiàn)

  • apm-toolkit-kafka: kafka plugin 抓的是 spring 的 KafkaTemplate, 如果自定義則用它, 使用 @KafkaPollAndInvoke 注解實現(xiàn)
  • apm-toolkit-log4j: log4j日志收集
  • apm-toolkit-logback: logback 日志收集
  • apm-toolkit-meter
  • apm-toolkit-micrometer-registry
  • apm-toolkit-opentracing
  • apm-toolkit-trace: 通過代碼方式對 trace 信息進(jìn)行補充

skywalking agent 中有對應(yīng)的激活包, 在 /agent/activations 目錄下

  • apm-toolkit-kafka-activation-8.7.0
  • apm-toolkit-log4j-1.x-activation-8.7.0
  • ...

下面以 trace 包方式解釋下處理流程

trace 包處理方式

首先代碼一般會導(dǎo)入 toolkit 包

<dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-trace</artifactId>
    <version>${project.version}</version>
</dependency>

挑選 AcitveSpan 來進(jìn)行說明

/**
 * provide custom api that set tag for current active span.
 */
public class ActiveSpan {
    /**
     * 為 span 增加自定義屬性
     * @param key   tag key
     * @param value tag value
     */
    public static void tag(String key, String value) {
    }

    ...
}

可以看到每個方法內(nèi)容都為空, 具體的業(yè)務(wù)在 apm-toolkit-trace-activation-8.7.0 包中 ActiveSpanActivation實現(xiàn)

ActiveSpanActivation 激活類繼承了用于插件開發(fā)的 ClassStaticMethodsEnhancePluginDefine, 并定義 ActiveSpan 中每個靜態(tài)方法的 StaticMethodsInterceptPoint 用于處理

 @Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
    return new StaticMethodsInterceptPoint[] {
        new StaticMethodsInterceptPoint() {
            @Override
            public ElementMatcher<MethodDescription> getMethodsMatcher() {
                // tag 方法
                return named(TAG_INTERCEPTOR_METHOD_NAME);
            }

            @Override
            public String getMethodsInterceptor() {
                // tag 方法處理類 ActiveSpanTagInterceptor
                return TAG_INTERCEPTOR_CLASS;
            }

            @Override
            public boolean isOverrideArgs() {
                return false;
            }
        },
        ...
}        

實際業(yè)務(wù)處理類 ActiveSpanTagInterceptor 用于自定義屬性添加


public class ActiveSpanTagInterceptor implements StaticMethodsAroundInterceptor {
    @Override
    public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
        MethodInterceptResult result) {
        try {
            // 獲取當(dāng)前在用的 Span
            AbstractSpan activeSpan = ContextManager.activeSpan();
            // 為 Span 添加自定義屬性
            activeSpan.tag(Tags.ofKey(String.valueOf(allArguments[0])), String.valueOf(allArguments[1]));
        } catch (NullPointerException ignored) {
        }
    }

    @Override
    public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
        Object ret) {
        return ret;
    }

    @Override
    public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
        Throwable t) {

    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容