異步線程的變量傳遞

設(shè)想場(chǎng)景

假如我們需要跟蹤某條請(qǐng)求的所有后臺(tái)日志,其中這些日志的埋點(diǎn)有同步的,也有異步的,甚至是使用Reactor的,那這個(gè)時(shí)候,我們應(yīng)該怎么跟蹤?這個(gè)在分布式服務(wù)和微服務(wù)下叫全鏈路監(jiān)控-APM,現(xiàn)在我們就在單機(jī)環(huán)境下即同一jvm下說明這個(gè)問題。

同步線程

SLF4J 日志框架提供了一個(gè) MDC(Mapped Diagnostic Contexts) 工具類

public class Main {

    private static final String KEY = "requestId";
    private static final Logger logger = LoggerFactory.getLogger(Main.class);
    
    public static void main(String[] args) {

        // 入口傳入請(qǐng)求ID
        MDC.put(KEY, UUID.randomUUID().toString());
        
        // 打印日志
        logger.debug("log in main thread 1");
        logger.debug("log in main thread 2");
        logger.debug("log in main thread 3");

        // 出口移除請(qǐng)求ID
        MDC.remove(KEY);
    }
}

具體可以參考如何快速過濾出一次請(qǐng)求的所有日志

異步線程

由于 Logback 的 MDC 實(shí)際上是一個(gè) ThreadLocal 的實(shí)現(xiàn),因此,當(dāng)異步執(zhí)行產(chǎn)生線程切換時(shí),需要將 MDC 保存的信息進(jìn)行切換。
Spring 中有一個(gè)可用的線程裝飾器TaskDecorator,這個(gè)是 Spring Core 4.3 版本才加入的接口,通過實(shí)現(xiàn)這個(gè)接口,可以自己控制傳播那些變量

/**
 * 解決異步執(zhí)行時(shí)MDC內(nèi)容延續(xù)的問題
 */
public class MDCTaskDecorator implements TaskDecorator {
    
    @Override
    public Runnable decorate(Runnable runnable) {
        return new MDCContinueRunableDecorator(runnable);
    }
    
    /**
     * 執(zhí)行線程裝飾器
     */
    protected class MDCContinueRunableDecorator implements Runnable {
        
        private final Runnable delegate;
        
        protected final Map<String, String> logContextMap;
        
        public MDCContinueRunableDecorator(Runnable runnable) {
            this.delegate = runnable;
            this.logContextMap = MDC.getCopyOfContextMap();
        }
        
        @Override
        public void run() {
            MDC.setContextMap(this.logContextMap);
            this.delegate.run();
            MDC.clear();
        }
    }
}

然后,需要自定義實(shí)現(xiàn)一個(gè) TaskExecutor,替換 Spring 提供的默認(rèn)實(shí)現(xiàn),代碼如下。

 /**
     * 自定義線程池
     * <p>
     * 用于線程切換時(shí)的MDC延續(xù)
     */
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(maxPoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setTaskDecorator(new MDCTaskDecorator());
        executor.setThreadNamePrefix("MDCAdaptTaskExcutor-");
        executor.initialize();
        return executor;
    }

只要異步處理使用了自定義的 TaskExecutor ,即可實(shí)現(xiàn)上下文的自動(dòng)傳遞。

Reactor

spring5引入webflux,其底層是基于reactor,那么reactor如何進(jìn)行上下文變量的傳播呢?官方提供了Context對(duì)象來替代threadlocal。

其特性如下:

  • 類似map的kv操作,比如put(Object key, Object value),putAll(Context), hasKey(Object key)
  • immutable,即同一個(gè)key,后面put不會(huì)覆蓋
  • 提供getOrDefault,getOrEmpty方法
  • Context與作用鏈上的每個(gè)Subscriber綁定
  • 通過subscriberContext(Context)來訪問
  • Context的作用是自底向上

實(shí)例

設(shè)置及讀取

    @Test
    public void testSubscriberContext(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World")
                .verifyComplete();
    }

這里從最底部的subscriberContext設(shè)置message值為World,然后flatMap里頭通過subscriberContext來訪問。

自底向上

    @Test
    public void testContextSequence(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                //NOTE 這個(gè)subscriberContext設(shè)置的太高了
                .subscriberContext(ctx -> ctx.put(key, "World"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));

        StepVerifier.create(r)
                .expectNext("Hello Stranger")
                .verifyComplete();
    }
復(fù)制代碼

由于這個(gè)例子的subscriberContext設(shè)置的太高了,不能作用在flatMap里頭的Mono.subscriberContext()

不可變

    @Test
    public void testContextImmutable(){
        String key = "message";

        Mono<String> r = Mono.subscriberContext()
                .map( ctx -> ctx.put(key, "Hello"))
                //這里返回了一個(gè)新的,因此上面的設(shè)置失效了
                .flatMap( ctx -> Mono.subscriberContext())
                .map( ctx -> ctx.getOrDefault(key,"Default"));

        StepVerifier.create(r)
                .expectNext("Default")
                .verifyComplete();
    }

subscriberContext永遠(yuǎn)返回一個(gè)新的

多個(gè)連續(xù)的subscriberContext

    @Test
    public void testReadOrder(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor")
                .verifyComplete();
    }

operator只會(huì)讀取離它最近的一個(gè)context

flatMap間的subscriberContext

    @Test
    public void testContextBetweenFlatMap(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor World")
                .verifyComplete();
    }

flatMap讀取離它最近的context

flatMap中的subscriberContext

    @Test
    public void testContextInFlatMap(){
        String key = "message";
        Mono<String> r =
                Mono.just("Hello")
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                        )
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                        )
                        .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World Reactor")
                .verifyComplete();
    }

這里第一個(gè)flatMap無法讀取第二個(gè)flatMap內(nèi)部的context

具體可以參考聊聊reactor異步線程的變量傳遞

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容