插件工程結(jié)構(gòu)
代碼結(jié)構(gòu)分為以下部分
- 定義攔截形式
- 實現(xiàn)攔截形式的攔截器
- 在 resources 目錄下定義 skywalking-plugin.def 文件, 讓 agent 發(fā)現(xiàn)并加載探針
基本規(guī)范:
- 定義攔截形式使用 *Instrumentation 定義
- 實現(xiàn)攔截形式使用 *Interceptor
核心api
核心對象的相關(guān)API
本節(jié)主要介紹java探針開發(fā)過程中涉及重要的API使用, 進(jìn)而使用正確的API 完成探針開發(fā)
(1) ContextCarrier#items
在跨進(jìn)程鏈路追蹤的案例中, 我們使用 ContextCarrier#items 完成兩個進(jìn)程鏈路數(shù)據(jù)的管理, 以HTTP請求為例, 我們需要處理一下兩個場景
場景一, 將發(fā)送進(jìn)程的鏈路綁定到header中并通過客戶端發(fā)送出去, 具體代碼如下
CarrierItem next = contextCarrier.items();
while (next.hasNext())) {
next = next.next();
httpRequest.setHeader(next.getHeadKey(), next.getHeadValue))
}
常見而, 接受服務(wù)器解析header并將鏈路綁定到本次接受處理中, 具體如下
CarrierItem next = contextCarrier.items();
while (next.hasNext())) {
next = next.next();
next.setHeaderValue(request.getHeader(next.getHeadKey())));
}
(2) ContextManager#createEntrySpan
一個應(yīng)用服務(wù)的提供端或服務(wù)端的接收端點, 如web容器的服務(wù)端入口, RPC服務(wù)或消息隊列的消費者, 在被調(diào)用時, 都需要創(chuàng)建 EntrySpan, 這是需要使用 ContextManager#createEntrySpan 完成
ContextManager#createEntrySpan(operationName, contextCarrier)
有兩個關(guān)鍵入?yún)?/p>
- operationName: 定義 EntrySpan 的操作方法名稱 如 http 接口的請求路徑, 注意, operationName 必須是有窮盡的, , 比如 restful 接口匹配 /path/{id}, 一定不要將id真實值記錄, 因為 skywalking 上報數(shù)據(jù)時, 處于減少 operationName長度和鏈路消息傳輸性能的考慮, 將 operationName 緩存在本地映射字典中, 因此需要保證 operationName 是有窮盡的, 否則導(dǎo)致 map 過大
- contextCarrier: 為了綁定跨進(jìn)程的追蹤, 需要將上游的追蹤消息通過 ContextCarrier#items綁定到本鏈路中
(3) ContextManager#createExitSpan
在一個應(yīng)用服務(wù)的客戶端或消息隊列生產(chǎn)者的發(fā)送端, 如redis客戶端訪問, mysql查詢, rpc組件請求, 客戶端都需要使用createExitSpan 來創(chuàng)建 ExitSpan
createExitSpan(operationName, contextCarrier, peer);
有三個參數(shù)
- operationName: 和 EntrySpan 一樣
- contextCarrier: 為了綁定跨進(jìn)程追蹤, 需要將鏈路信息放入header中, 具體看 ContextCarrierItems()
- peer: 下游地址: 具體格式為 ip:port, 若系統(tǒng)下游無法下探針, 如 reids,mysql, 則需要將下游地址寫入peer中, 格式為 ip:port,ip:port
定義攔截形式
攔截形式定義一般通過繼承 ClassInstanceMethodsEnhancePluginDefine 實現(xiàn)
- 需要增強哪些類, 通過 ClassMatch 類匹配, 支持以下幾種方法
- byName: 通過類路徑+類名, 通過常量指定, 不要用 *.class.getName()
- byClassAnnotationMatch: 類注解匹配, 不支持父類繼承注解
- byHierarchyMatch 父類或接口, 在多層繼承情況會導(dǎo)致多次攔截, 一般不用
- 需要增強的構(gòu)造方法切入點, 需要指定以下幾個部分
- getConstructorMatcher 構(gòu)造方法匹配器
- getConstructorInterceptor 構(gòu)造方法探針插件攔截器
- 需要增強的實例方法切入點 需要指定以下幾個部分
- getMethodsMatcher 攔截的方法
- getMethodsInterceptor 方法的攔截器
- isOverrideArgs 是否重寫參數(shù)
- 需要增強的靜態(tài)方法切入點, 靜態(tài)方法基本和實例方法一致
實現(xiàn)攔截形式的攔截器
可以對匹配的方法, 在 執(zhí)行前, 執(zhí)行后, 執(zhí)行異常, 進(jìn)行無侵入的攔截, 通過調(diào)用 agent 核心 api 來完成鏈路追蹤
實例方法攔截器接口 InstanceMethodsAroundInterceptor 定義
public interface InstanceMethodsAroundInterceptor {
// 方法前
void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable;
// 方法后
Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable;
// 方法異常
void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t);
}
想要修改入?yún)⑹? 將 isOverrideArgs 改為true, 否則修改參數(shù)不會生效
dubbo-2.7.x 插件實例
攔截形式: DubboInstrumentation 繼承 ClassInstanceMethodsEnhancePluginDefine
// 定義增強的類
@Override
protected ClassMatch enhanceClass() {
// 通過類名匹配 org.apache.dubbo.monitor.support.MonitorFilter
return NameMatch.byName(ENHANCE_CLASS);
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
// 定義攔截的方法, 這里是 MonitorFilter#invoke
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("invoke");
}
// 定義實現(xiàn)攔截形式的攔截器 DubboInterceptor
@Override
public String getMethodsInterceptor() {
return INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
攔截方法 DubboInterceptor
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Invoker invoker = (Invoker) allArguments[0];
Invocation invocation = (Invocation) allArguments[1];
RpcContext rpcContext = RpcContext.getContext();
boolean isConsumer = rpcContext.isConsumerSide();
URL requestURL = invoker.getUrl();
AbstractSpan span;
final String host = requestURL.getHost();
final int port = requestURL.getPort();
boolean needCollectArguments;
int argumentsLengthThreshold;
if (isConsumer) {
// 調(diào)用方創(chuàng)建 ExitSpan
final ContextCarrier contextCarrier = new ContextCarrier();
span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
// 將上下文序列化放入 dubbo attachments
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
if (invocation.getAttachments().containsKey(next.getHeadKey())) {
invocation.getAttachments().remove(next.getHeadKey());
}
}
needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_CONSUMER_ARGUMENTS;
argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.CONSUMER_ARGUMENTS_LENGTH_THRESHOLD;
} else {
// 將數(shù)據(jù)反序列化回 ContextCarrier
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
}
// 服務(wù)方創(chuàng)建 EntrySpan
span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
span.setPeer(rpcContext.getRemoteAddressString());
needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_PROVIDER_ARGUMENTS;
argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.PROVIDER_ARGUMENTS_LENGTH_THRESHOLD;
}
Tags.URL.set(span, generateRequestURL(requestURL, invocation));
// 收集參數(shù)
collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation);
// 將 span 設(shè)置為 dubbo 以及 RPC 調(diào)用
span.setComponent(ComponentsDefine.DUBBO);
SpanLayer.asRPCFramework(span);
}
skywalking-plugin.def 定義
dubbo=org.apache.skywalking.apm.plugin.asf.dubbo.DubboInstrumentation
toolkit 工具箱
截止8.7, 目前 toolkit 支持, 功能上為 agent 提供各豐富的自定義實現(xiàn)
- apm-toolkit-kafka: kafka plugin 抓的是 spring 的 KafkaTemplate, 如果自定義則用它, 使用 @KafkaPollAndInvoke 注解實現(xiàn)
- apm-toolkit-log4j: log4j日志收集
- apm-toolkit-logback: logback 日志收集
- apm-toolkit-meter
- apm-toolkit-micrometer-registry
- apm-toolkit-opentracing
- apm-toolkit-trace: 通過代碼方式對 trace 信息進(jìn)行補充
skywalking agent 中有對應(yīng)的激活包, 在 /agent/activations 目錄下
- apm-toolkit-kafka-activation-8.7.0
- apm-toolkit-log4j-1.x-activation-8.7.0
- ...
下面以 trace 包方式解釋下處理流程
trace 包處理方式
首先代碼一般會導(dǎo)入 toolkit 包
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>${project.version}</version>
</dependency>
挑選 AcitveSpan 來進(jìn)行說明
/**
* provide custom api that set tag for current active span.
*/
public class ActiveSpan {
/**
* 為 span 增加自定義屬性
* @param key tag key
* @param value tag value
*/
public static void tag(String key, String value) {
}
...
}
可以看到每個方法內(nèi)容都為空, 具體的業(yè)務(wù)在 apm-toolkit-trace-activation-8.7.0 包中 ActiveSpanActivation實現(xiàn)
ActiveSpanActivation 激活類繼承了用于插件開發(fā)的 ClassStaticMethodsEnhancePluginDefine, 并定義 ActiveSpan 中每個靜態(tài)方法的 StaticMethodsInterceptPoint 用于處理
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[] {
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// tag 方法
return named(TAG_INTERCEPTOR_METHOD_NAME);
}
@Override
public String getMethodsInterceptor() {
// tag 方法處理類 ActiveSpanTagInterceptor
return TAG_INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
...
}
實際業(yè)務(wù)處理類 ActiveSpanTagInterceptor 用于自定義屬性添加
public class ActiveSpanTagInterceptor implements StaticMethodsAroundInterceptor {
@Override
public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
MethodInterceptResult result) {
try {
// 獲取當(dāng)前在用的 Span
AbstractSpan activeSpan = ContextManager.activeSpan();
// 為 Span 添加自定義屬性
activeSpan.tag(Tags.ofKey(String.valueOf(allArguments[0])), String.valueOf(allArguments[1]));
} catch (NullPointerException ignored) {
}
}
@Override
public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Object ret) {
return ret;
}
@Override
public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Throwable t) {
}
}