設(shè)想場(chǎng)景
假如我們需要跟蹤某條請(qǐng)求的所有后臺(tái)日志,其中這些日志的埋點(diǎn)有同步的,也有異步的,甚至是使用Reactor的,那這個(gè)時(shí)候,我們應(yīng)該怎么跟蹤?這個(gè)在分布式服務(wù)和微服務(wù)下叫全鏈路監(jiān)控-APM,現(xiàn)在我們就在單機(jī)環(huán)境下即同一jvm下說明這個(gè)問題。
同步線程
SLF4J 日志框架提供了一個(gè) MDC(Mapped Diagnostic Contexts) 工具類
public class Main {
private static final String KEY = "requestId";
private static final Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) {
// 入口傳入請(qǐng)求ID
MDC.put(KEY, UUID.randomUUID().toString());
// 打印日志
logger.debug("log in main thread 1");
logger.debug("log in main thread 2");
logger.debug("log in main thread 3");
// 出口移除請(qǐng)求ID
MDC.remove(KEY);
}
}
具體可以參考如何快速過濾出一次請(qǐng)求的所有日志
異步線程
由于 Logback 的 MDC 實(shí)際上是一個(gè) ThreadLocal 的實(shí)現(xiàn),因此,當(dāng)異步執(zhí)行產(chǎn)生線程切換時(shí),需要將 MDC 保存的信息進(jìn)行切換。
Spring 中有一個(gè)可用的線程裝飾器TaskDecorator,這個(gè)是 Spring Core 4.3 版本才加入的接口,通過實(shí)現(xiàn)這個(gè)接口,可以自己控制傳播那些變量
/**
* 解決異步執(zhí)行時(shí)MDC內(nèi)容延續(xù)的問題
*/
public class MDCTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
return new MDCContinueRunableDecorator(runnable);
}
/**
* 執(zhí)行線程裝飾器
*/
protected class MDCContinueRunableDecorator implements Runnable {
private final Runnable delegate;
protected final Map<String, String> logContextMap;
public MDCContinueRunableDecorator(Runnable runnable) {
this.delegate = runnable;
this.logContextMap = MDC.getCopyOfContextMap();
}
@Override
public void run() {
MDC.setContextMap(this.logContextMap);
this.delegate.run();
MDC.clear();
}
}
}
然后,需要自定義實(shí)現(xiàn)一個(gè) TaskExecutor,替換 Spring 提供的默認(rèn)實(shí)現(xiàn),代碼如下。
/**
* 自定義線程池
* <p>
* 用于線程切換時(shí)的MDC延續(xù)
*/
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(maxPoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setTaskDecorator(new MDCTaskDecorator());
executor.setThreadNamePrefix("MDCAdaptTaskExcutor-");
executor.initialize();
return executor;
}
只要異步處理使用了自定義的 TaskExecutor ,即可實(shí)現(xiàn)上下文的自動(dòng)傳遞。
Reactor
spring5引入webflux,其底層是基于reactor,那么reactor如何進(jìn)行上下文變量的傳播呢?官方提供了Context對(duì)象來替代threadlocal。
其特性如下:
- 類似map的kv操作,比如put(Object key, Object value),putAll(Context), hasKey(Object key)
- immutable,即同一個(gè)key,后面put不會(huì)覆蓋
- 提供getOrDefault,getOrEmpty方法
- Context與作用鏈上的每個(gè)Subscriber綁定
- 通過subscriberContext(Context)來訪問
- Context的作用是自底向上
實(shí)例
設(shè)置及讀取
@Test
public void testSubscriberContext(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World")
.verifyComplete();
}
這里從最底部的subscriberContext設(shè)置message值為World,然后flatMap里頭通過subscriberContext來訪問。
自底向上
@Test
public void testContextSequence(){
String key = "message";
Mono<String> r = Mono.just("Hello")
//NOTE 這個(gè)subscriberContext設(shè)置的太高了
.subscriberContext(ctx -> ctx.put(key, "World"))
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));
StepVerifier.create(r)
.expectNext("Hello Stranger")
.verifyComplete();
}
復(fù)制代碼
由于這個(gè)例子的subscriberContext設(shè)置的太高了,不能作用在flatMap里頭的Mono.subscriberContext()
不可變
@Test
public void testContextImmutable(){
String key = "message";
Mono<String> r = Mono.subscriberContext()
.map( ctx -> ctx.put(key, "Hello"))
//這里返回了一個(gè)新的,因此上面的設(shè)置失效了
.flatMap( ctx -> Mono.subscriberContext())
.map( ctx -> ctx.getOrDefault(key,"Default"));
StepVerifier.create(r)
.expectNext("Default")
.verifyComplete();
}
subscriberContext永遠(yuǎn)返回一個(gè)新的
多個(gè)連續(xù)的subscriberContext
@Test
public void testReadOrder(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello Reactor")
.verifyComplete();
}
operator只會(huì)讀取離它最近的一個(gè)context
flatMap間的subscriberContext
@Test
public void testContextBetweenFlatMap(){
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello Reactor World")
.verifyComplete();
}
flatMap讀取離它最近的context
flatMap中的subscriberContext
@Test
public void testContextInFlatMap(){
String key = "message";
Mono<String> r =
Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key))
)
.flatMap( s -> Mono.subscriberContext()
.map( ctx -> s + " " + ctx.get(key))
.subscriberContext(ctx -> ctx.put(key, "Reactor"))
)
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r)
.expectNext("Hello World Reactor")
.verifyComplete();
}
這里第一個(gè)flatMap無法讀取第二個(gè)flatMap內(nèi)部的context
具體可以參考聊聊reactor異步線程的變量傳遞