如何復(fù)制Pinpoint中一條調(diào)用鏈的完整數(shù)據(jù)

如何復(fù)制Pinpoint中一條調(diào)用鏈的完整數(shù)據(jù)

如果你熟悉Pinpoint的話,你應(yīng)該知道一條調(diào)用鏈包含哪些數(shù)據(jù)
在這里我指的是com.navercorp.pinpoint.web.controller.BusinessTransactionController#transactionInfo方法中查詢會(huì)涉及到的HBase數(shù)據(jù)。
為什么會(huì)產(chǎn)生這個(gè)需求呢,HBase中的數(shù)據(jù)都是配置了TTL的,過(guò)一段時(shí)間會(huì)被清理,你可能就和要這條骨骼驚奇的調(diào)用鏈說(shuō)拜拜了。
如果能夠離線或者保存另外的HBase里,可以更快的復(fù)現(xiàn)場(chǎng)景進(jìn)行調(diào)試和排查。


image.png

先看一下到底會(huì)查哪些表吧。

  • TraceV2
  • ApiMetaData
  • StringMetaData
  • SqlMetaData_Ver2

Pinpoint在構(gòu)造調(diào)用鏈界面需要的信息的時(shí)候,TraceV2是一行數(shù)據(jù),用事務(wù)號(hào)就能查詢出來(lái)。
然后遍歷這行數(shù)據(jù)里的Span和SpanEvent,使用包含的apiId,stringId,sqlId去后三個(gè)表對(duì)應(yīng)查詢所關(guān)聯(lián)的數(shù)據(jù)。
后面三個(gè),一個(gè)復(fù)雜的調(diào)用鏈會(huì)查詢很多次。怎么才能知道呢?

下面只是記錄一下本地試驗(yàn)的方法,僅在測(cè)試環(huán)境中使用。

利用Spring AOP 將hbase查詢結(jié)果 插入到另外的hbase中

我想了下,如果在hbase查詢的時(shí)候進(jìn)行AOP攔截,并且把數(shù)據(jù)發(fā)送到另外一個(gè)hbase的話,這樣不就能把一條調(diào)用鏈的數(shù)據(jù)給剝離出來(lái)了么?
我把將查詢出來(lái)的數(shù)據(jù)插入到別的hbase的過(guò)程叫做逆轉(zhuǎn)。

我們要尋找一些合適的spring bean,因?yàn)閟pring的aop只能作用在spring創(chuàng)建的對(duì)象上。

首先我注意到 com.navercorp.pinpoint.common.hbase.RowMapper#mapRow

public interface RowMapper<T> {

    T mapRow(Result result, int rowNum) throws Exception;
}

第一個(gè)參數(shù)org.apache.hadoop.hbase.client.Result包含了查詢出來(lái)的一行數(shù)據(jù),現(xiàn)在就差個(gè)表名了。

仔細(xì)看pinpoint的代碼,每次執(zhí)行hbase操作前都會(huì)調(diào)用com.navercorp.pinpoint.common.hbase.TableDescriptor#getTableName獲取表名。
這樣設(shè)置一個(gè)線程上下文(https://github.com/apache/shiro/blob/master/core/src/main/java/org/apache/shiro/util/ThreadContext.java),就能將表名和多行數(shù)據(jù)完整聯(lián)系在一起了。

最后線程上下文里面還需要設(shè)置一個(gè)是否逆轉(zhuǎn)查詢數(shù)據(jù)的標(biāo)志。
對(duì)于查詢單條調(diào)用鏈來(lái)說(shuō)就是com.navercorp.pinpoint.web.service.SpanService#selectSpan方法進(jìn)入的時(shí)候開(kāi)啟標(biāo)志。

實(shí)現(xiàn)

首先仿照shiro弄個(gè)線程上下文。

package com.navercorp.pinpoint.web.dao;

import java.util.HashMap;
import java.util.Map;

/**
 * @author tankilo
 * https://github.com/apache/shiro/blob/master/core/src/main/java/org/apache/shiro/util/ThreadContext.java
 */
public final class ThreadContext {
    private ThreadContext() {

    }

    public static final String REVERSE = "REVERSE";
    public static final String TABLE_NAME = "TABLE_NAME";
    private static ThreadLocal<Map<String, Object>> resources = new InheritableThreadLocal<Map<String, Object>>() {
        @Override
        protected Map<String, Object> initialValue() {
            return new HashMap<>(8);
        }
    };

    public static void put(String key, Object value) {
        if (key == null) {
            throw new IllegalArgumentException("key cannot be null");
        }

        if (value == null) {
            remove(key);
            return;
        }
        ensureResourcesInitialized();
        resources.get().put(key, value);
    }

    private static void ensureResourcesInitialized() {
        if (resources.get() == null) {
            resources.set(new HashMap<>(8));
        }
    }

    public static void remove(String key) {
        Map<String, Object> map = resources.get();
        if (map != null) {
            map.remove(key);
        }
    }

    public static Object get(String key) {
        Map<String, Object> map = resources.get();
        if (map != null) {
            return map.get(key);
        } else {
            return null;
        }
    }

    private static Object getValue(Object key) {
        Map<String, Object> perThreadResources = resources.get();
        return perThreadResources != null ? perThreadResources.get(key) : null;
    }

    private static Boolean getBoolean(String key, Boolean defaultValue) {
        Object value = getValue(key);
        if (null != value) {
            return Boolean.valueOf(value.toString());
        }
        return defaultValue;
    }

    private static String getString(String key) {
        Object value = getValue(key);
        if (null != value) {
            return value.toString();
        } else {
            return null;
        }
    }

    public static void setReverse(boolean reverse) {
        put(REVERSE, reverse);

    }

    public static boolean isReverse() {
        return getBoolean(REVERSE, false);
    }

    public static void setTableName(String tableName) {
        put(TABLE_NAME, tableName);
    }

    public static String getTableName() {
        return getString(TABLE_NAME);
    }

    public static void remove() {
        resources.remove();
    }
}

切面

@Aspect
public class HbaseTemplateReverseAspect {

    @Autowired
    @Qualifier("hbaseTemplateReverse")
    private HbaseOperations2 template2;

    @Autowired
    private HbaseTableNameProvider tableNameProvider;

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Pointcut("execution(public * com.navercorp.pinpoint.common.hbase.RowMapper.mapRow(..))")
    public void pointCut() {
    }

    @After("pointCut() && args(result,rowNum)")
    public void doBefore(Result result, int rowNum) {
        if (ThreadContext.isReverse()) {
            String tableNameStr = ThreadContext.getTableName();
            TableName tableName = tableNameProvider.getTableName(tableNameStr);
            Put put = new Put(result.getRow());
            Cell[] rawCells = result.rawCells();
            for (Cell cell : rawCells) {
                put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp(), CellUtil.cloneValue(cell));
            }
            template2.asyncPut(tableName, put);
        }
    }

    @Before("execution(public * com.navercorp.pinpoint.web.dao.hbase.HbaseTraceDaoV2.selectSpan(..))")
    public void selectSpan() {
        ThreadContext.setTableName(HbaseTable.TRACE_V2.getName());
    }

    @AfterReturning(returning = "tableName", pointcut = "execution(public * com.navercorp.pinpoint.common.hbase.TableDescriptor.getTableName())")
    public void getTableName(TableName tableName) {
        ThreadContext.setTableName(tableName.getNameAsString());
    }

    @Around("execution(public * com.navercorp.pinpoint.web.service.SpanService.selectSpan(..))")
    public Object dealContext(ProceedingJoinPoint jp) throws Throwable {
        Object result;
        try {
            ThreadContext.setReverse(true);
            result = jp.proceed();
        } finally {
            ThreadContext.remove();
        }
        return result;
    }
}

遺憾

里面有些曲折,看上面代碼也知道。
com.navercorp.pinpoint.web.dao.hbase.HbaseTraceDaoV2#spanMapperV2 不是spring bean,而是手動(dòng)構(gòu)造的。
這里我就沒(méi)有繼續(xù)弄下去了,因?yàn)楦杏X(jué)spring aop的局限性很大,計(jì)劃用java instrumentation api,和pinpoint的agent一樣重新弄下。

代碼提交備份在我的github上 https://github.com/tankilo/pinpoint/tree/spring-aop-hbase-reverse

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容