線程池分布式追蹤方案實(shí)現(xiàn)

場景:在生產(chǎn)環(huán)境種,查詢發(fā)現(xiàn)錯誤日志時,需要定位跟蹤到最終問題,但是項目種用到了很多的異步線程池,導(dǎo)致定位問題時根據(jù)traceId,只能找到一部分日志內(nèi)容,無法最終定位到問題所在;

原因:這是由于MDC的實(shí)現(xiàn)是通過ThreadLocal實(shí)現(xiàn)的,然后線程池種的線程和用戶線程不是同一個線程,所以在異步調(diào)用的時候,用戶線程的內(nèi)容沒有同步到線程池線程中,導(dǎo)致最終沒有打印出關(guān)聯(lián)的tracId,從而導(dǎo)致最終無法關(guān)聯(lián)相關(guān)日志,無法定位到具體問題;

解決方案:因為實(shí)現(xiàn)異步的方式有很多種,例如線程池和ThreadPoolExecutor和ThreadPoolTaskExecutor,spring的@Aync等,但是根本原因都是因為兩個線程不是同一個線程引起的,所以我這里只例舉線程池ThreadPoolExecutor的實(shí)現(xiàn),其他實(shí)現(xiàn)相差無幾;
ThreadMdcUtil.class

import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.Callable;

/**
 * className:ThreadMdcUtil
 * description: mdcUtil
 *
 * @author: david
 * date: 2020/12/31 10:42 上午
 */
public class ThreadMdcUtil {


    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

ThreadPoolExecutorMdcWrapper.class

import org.slf4j.MDC;
import java.util.concurrent.*;

/**
 * className:ThreadPoolExecutorMdcWrapper
 * description: 線程池包裝類
 *
 * @author: david
 * date: 2020/12/31 10:41 上午
 */
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {
    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

具體使用方式如下:
在需要使用線程池的時候,使用ThreadPoolExecutorMdcWrapper類創(chuàng)建線程池去執(zhí)行相關(guān)邏輯;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容