中間件興趣圈 , 作者 丁威
說(shuō)起本地線程變量,我相信大家首先會(huì)想到的是JDK默認(rèn)提供的ThreadLocal,用來(lái)存儲(chǔ)在整個(gè)調(diào)用鏈中都需要訪問(wèn)的數(shù)據(jù),并且是線程安全的。由于本文的寫(xiě)作背景是筆者需要在公司落地全鏈路壓測(cè)平臺(tái),一個(gè)基本并核心的功能需求是壓測(cè)標(biāo)記需要在整個(gè)調(diào)用鏈中進(jìn)行傳遞,線程上下文環(huán)境成為解決這個(gè)問(wèn)題最合適的技術(shù)。
溫馨提示:
本從從ThreadLocal原理入手分析,并拋出其缺點(diǎn),再逐一引出InheritableThreadLocal、TransmittableThreadLocal。文章篇幅稍長(zhǎng),但由于循序漸進(jìn),層層遞進(jìn),故精華部分在后面。
ThreadLocal詳解
ThreadLocal對(duì)外提供的API如下:
public T get()
從線程上下文環(huán)境中獲取設(shè)置的值。public void set(T value)
將值存儲(chǔ)到線程上下文環(huán)境中,供后續(xù)使用。public void remove()
清除線程本地上下文環(huán)境。
上述API使用簡(jiǎn)單,關(guān)鍵是要理解ThreadLocal的內(nèi)部存儲(chǔ)結(jié)果。
1.1 ThreadLocal存儲(chǔ)結(jié)構(gòu)
圖的幾個(gè)關(guān)鍵點(diǎn)如下:
數(shù)據(jù)存儲(chǔ)
當(dāng)線程調(diào)用threadLocal對(duì)象的set(Object value)方法時(shí),數(shù)據(jù)并不是存儲(chǔ)在ThreadLocal對(duì)象中,而是存儲(chǔ)在Thread對(duì)象中,這也是ThreadLocal的由來(lái),具體存儲(chǔ)在線程對(duì)象的threadLocals屬性中,其類(lèi)型為T(mén)hreadLocal.ThreadLocalMap。ThreadLocal.ThreadLocalMap,Map結(jié)構(gòu),即鍵值對(duì),鍵為threadLocal對(duì)象,值為需要存儲(chǔ)到線程上下文的值(threadLocal#set)方法的參數(shù)。
1.2 源碼分析ThreadLocal
1.2.1 源碼分析get
public T get() {
Thread t = Thread.currentThread(); // @1
ThreadLocalMap map = getMap(t); // @2
if (map != null) { // @3
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue(); // @4
}
代碼@1:獲取當(dāng)前線程。
代碼@2:獲取線程的threadLocals屬性,在上圖中已展示其存儲(chǔ)結(jié)構(gòu)。
代碼@3:如果線程對(duì)象的threadLocals屬性不為空,則從該Map結(jié)構(gòu)中,用threadLocal對(duì)象為鍵去查找值,如果能找到,則返回其value值,否則執(zhí)行代碼@4。
代碼@4:如果線程對(duì)象的threadLocals屬性為空,或未從threadLocals中找到對(duì)應(yīng)的鍵值對(duì),則調(diào)用該方法執(zhí)行初始化。
ThreadLocal#setInitialValue
private T setInitialValue() {
T value = initialValue(); // @1
Thread t = Thread.currentThread(); // @2
ThreadLocalMap map = getMap(t); // @3
if (map != null) //@4
map.set(this, value);
else
createMap(t, value); // @5
return value;
}
代碼@1:調(diào)用initialValue()獲取默認(rèn)初始化值,該方法默認(rèn)返回null,子類(lèi)可以重寫(xiě),實(shí)現(xiàn)線程本地變量的初始化。
代碼@2:獲取當(dāng)前線程。
代碼@3:獲取該線程對(duì)象的threadLocals屬性。
代碼@4:如果不為空,則將threadLocal:value存入線程對(duì)象的threadLocals屬性中。
代碼@5:否則初始化線程對(duì)象的threadLocals,然后將threadLocal:value鍵值對(duì)存入線程對(duì)象的threadLocals屬性中。
1.2.2 源碼分析set
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
在掌握了get方法實(shí)現(xiàn)細(xì)節(jié),set方法、remove其實(shí)現(xiàn)的邏輯基本一樣,就是對(duì)線程對(duì)象的threadLocals屬性進(jìn)行操作(Map結(jié)構(gòu))。
1.3 ThreadLocal局限性
經(jīng)過(guò)上面的剖析,對(duì)ThreadLocal的內(nèi)部存儲(chǔ)、set、get、remove等實(shí)現(xiàn)細(xì)節(jié)都已做了詳細(xì)的解讀,但ThreadLocal無(wú)法在父子線程之間傳遞,示例代碼如下:
public class Service {
private static ThreadLocal<Integer> requestIdThreadLocal = new ThreadLocal<>();
public static void main(String[] args) {
Integer reqId = new Integer(5);
Service a = new Service();
a.setRequestId(reqId);
}
public void setRequestId(Integer requestId) {
requestIdThreadLocal.set(requestId);
doBussiness();
}
public void doBussiness() {
System.out.println("首先打印requestId:" + requestIdThreadLocal.get());
(new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子線程啟動(dòng)");
System.out.println("在子線程中訪問(wèn)requestId:" + requestIdThreadLocal.get());
}
})).start();
}
}
運(yùn)行結(jié)果如下:
從結(jié)果上來(lái)看,在子線程中無(wú)法訪問(wèn)在父線程中設(shè)置的本地線程變量,那我們?cè)撊绾蝸?lái)解決該問(wèn)題呢?
為了解決該問(wèn)題,JDK引入了另外一個(gè)線程本地變量實(shí)現(xiàn)類(lèi)InheritableThreadLocal,接下來(lái)將重點(diǎn)介紹InheritableThreadLocal的實(shí)現(xiàn)原理。
InheritableThreadLocal
由于ThreadLocal在父子線程交互中子線程無(wú)法訪問(wèn)到存儲(chǔ)在父線程中的值,無(wú)法滿足某些場(chǎng)景的需求,例如鏈路跟蹤,例如如下場(chǎng)景:
為了解決上述問(wèn)題,JDK引入了InheritableThreadLocal,即子線程可以訪問(wèn)父線程中的線程本地變量,更嚴(yán)謹(jǐn)?shù)恼f(shuō)法是子線程可以訪問(wèn)在創(chuàng)建子線程時(shí)父線程當(dāng)時(shí)的本地線程變量,因?yàn)槠鋵?shí)現(xiàn)原理就是在創(chuàng)建子線程將父線程當(dāng)前存在的本地線程變量拷貝到子線程的本地線程變量中。
2.1 類(lèi)圖
<figcaption style="margin: 10px 0px 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; word-wrap: break-word !important; line-height: inherit; text-align: center; color: rgb(153, 153, 153); font-size: 0.7em;"></figcaption>
從類(lèi)的繼承層次來(lái)看,InheritableThreadLocal只是在ThreadLocal的get、set、remove流程中,重寫(xiě)了getMap、createMap方法,整體流程與ThreadLocal保持一致,故我們初步來(lái)看一下InheritableThreadLocal是如何重寫(xiě)上述這兩個(gè)方法的。
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
從代碼得知,ThreadLocal操作的是Thread對(duì)象的threadLocals屬性,而InheritableThreadLocal操作的是Thread對(duì)象的inheritableThreadLocals屬性。
溫馨提示:createMap被執(zhí)行的條件是調(diào)用InheritableThreadLocal#get、set時(shí)如果線程的inheritableThreadLocals屬性為空時(shí)才會(huì)被調(diào)用。
那問(wèn)題來(lái)了,InheritableThreadLocal是如何繼承自父對(duì)象的線程本地變量的呢?
2.2 線程上下文環(huán)境如何從父線程傳遞到子線程
這部分的代碼入口為:Thread#init方法
Thread parent = currentThread(); // @1
// 省略部分代碼
if (inheritThreadLocals && parent.inheritableThreadLocals != null) // @2
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
子線程是通過(guò)在父線程中通過(guò)調(diào)用new Thread()方法來(lái)創(chuàng)建子線程,Thread#init方法在Thread的構(gòu)造方法中被調(diào)用。
代碼@1:獲取當(dāng)前線程對(duì)象,即待創(chuàng)建的線程的父線程。
代碼@2:如果父線程的inheritableThreadLocals不為空并且inheritThreadLocals為true(該值默認(rèn)為true),則使用父線程的inherit本地變量的值來(lái)創(chuàng)建子線程的inheritableThreadLocals結(jié)構(gòu),即將父線程中的本地變量復(fù)制到子線程中。
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
上述代碼就不一一分析,類(lèi)似于Map的復(fù)制,只不過(guò)其在Hash沖突時(shí),不是使用鏈表結(jié)構(gòu),而是直接在數(shù)組中找下一個(gè)為null的槽位。
溫馨提示:子線程默認(rèn)拷貝父線程的方式是淺拷貝,如果需要使用深拷貝,需要使用自定義ThreadLocal,繼承InheritableThreadLocal并重寫(xiě)childValue方法。
2.3 驗(yàn)證InheritableThreadLocal的特性
驗(yàn)證代碼如下:
public class Service {
private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();
public static void main(String[] args) {
Integer reqId = new Integer(5);
Service a = new Service();
a.setRequestId(reqId);
}
public void setRequestId(Integer requestId) {
requestIdThreadLocal.set(requestId);
doBussiness();
}
public void doBussiness() {
System.out.println("首先打印requestId:" + requestIdThreadLocal.get());
(new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子線程啟動(dòng)");
System.out.println("在子線程中訪問(wèn)requestId:" + requestIdThreadLocal.get());
}
})).start();
}
}
執(zhí)行結(jié)果如下:
符合預(yù)期,在子線程中如愿訪問(wèn)到了在主線程中設(shè)置的本地環(huán)境變量。
2.4 InheritableThreadLocal局限性
InheritableThreadLocal支持子線程訪問(wèn)在父線程中設(shè)置的線程上下文環(huán)境的實(shí)現(xiàn)原理是在創(chuàng)建子線程時(shí)將父線程中的本地變量值復(fù)制到子線程,即復(fù)制的時(shí)機(jī)為創(chuàng)建子線程時(shí)。
但我們提到并發(fā)、多線程就離不開(kāi)線程池的使用,因?yàn)榫€程池能夠復(fù)用線程,減少線程的頻繁創(chuàng)建與銷(xiāo)毀,如果使用InheritableThreadLocal,那么線程池中的線程拷貝的數(shù)據(jù)來(lái)自于第一個(gè)提交任務(wù)的外部線程,即后面的外部線程向線程池中提交任務(wù)時(shí),子線程訪問(wèn)的本地變量都來(lái)源于第一個(gè)外部線程,造成線程本地變量混亂,驗(yàn)證代碼如下:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Service {
/**
* 模擬tomcat線程池
*/
private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
/**
* 業(yè)務(wù)線程池,默認(rèn)Control中異步任務(wù)執(zhí)行線程池
*/
private static ExecutorService businessExecutors = Executors.newFixedThreadPool(5);
/**
* 線程上下文環(huán)境,模擬在Control這一層,設(shè)置環(huán)境變量,然后在這里提交一個(gè)異步任務(wù),模擬在子線程中,是否可以訪問(wèn)到剛設(shè)置的環(huán)境變量值。
*/
private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();
public static void main(String[] args) {
for(int i = 0; i < 10; i ++ ) { // 模式10個(gè)請(qǐng)求,每個(gè)請(qǐng)求執(zhí)行ControlThread的邏輯,其具體實(shí)現(xiàn)就是,先輸出父線程的名稱(chēng),
// 然后設(shè)置本地環(huán)境變量,并將父線程名稱(chēng)傳入到子線程中,在子線程中嘗試獲取在父線程中的設(shè)置的環(huán)境變量
tomcatExecutors.submit(new ControlThread(i));
}
//簡(jiǎn)單粗暴的關(guān)閉線程池
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
businessExecutors.shutdown();
tomcatExecutors.shutdown();
}
/**
* 模擬Control任務(wù)
*/
static class ControlThread implements Runnable {
private int i;
public ControlThread(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + i);
requestIdThreadLocal.set(i);
//使用線程池異步處理任務(wù)
businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
}
}
/**
* 業(yè)務(wù)任務(wù),主要是模擬在Control控制層,提交任務(wù)到線程池執(zhí)行
*/
static class BusinessTask implements Runnable {
private String parentThreadName;
public BusinessTask(String parentThreadName) {
this.parentThreadName = parentThreadName;
}
@Override
public void run() {
//如果與上面的能對(duì)應(yīng)上來(lái),則說(shuō)明正確,否則失敗
System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
}
}
}
執(zhí)行效果如下:
pool-1-thread-1:0
pool-1-thread-2:1
pool-1-thread-3:2
pool-1-thread-4:3
pool-1-thread-5:4
pool-1-thread-6:5
pool-1-thread-7:6
pool-1-thread-8:7
pool-1-thread-9:8
pool-1-thread-10:9
parentThreadName:pool-1-thread-7:6
parentThreadName:pool-1-thread-4:6
parentThreadName:pool-1-thread-3:6
parentThreadName:pool-1-thread-2:6
parentThreadName:pool-1-thread-1:6
parentThreadName:pool-1-thread-9:6
parentThreadName:pool-1-thread-10:6
parentThreadName:pool-1-thread-8:7
parentThreadName:pool-1-thread-6:5
parentThreadName:pool-1-thread-5:4
從這里可以出thread-7、thread-4、thread-3、thread-2、thread-1、thread-9、thread-10獲取的都是6,在子線程中出現(xiàn)出現(xiàn)了線程本地變量混亂的現(xiàn)象,在全鏈路跟蹤與壓測(cè)出現(xiàn)這種情況是致命的。
問(wèn)題:大家通過(guò)上面的學(xué)習(xí),應(yīng)該能解釋這個(gè)現(xiàn)象?此處可以稍微停下來(lái)思考一番。
怎么解決這個(gè)問(wèn)題呢?
TransmittableThreadLocal ”閃亮登場(chǎng)“。
TransmittableThreadLocal
3.1 TransmittableThreadLocal“何許人也”
TransmittableThreadLocal何許人也,它可是阿里巴巴開(kāi)源的專(zhuān)門(mén)解決InheritableThreadLocal的局限性,實(shí)現(xiàn)線程本地變量在線程池的執(zhí)行過(guò)程中,能正常的訪問(wèn)父線程設(shè)置的線程變量。實(shí)踐是檢驗(yàn)整理的唯一標(biāo)準(zhǔn),我們還是以上面的示例來(lái)進(jìn)行驗(yàn)證,看看TransmittableThreadLocal是否支持上述場(chǎng)景:
首先需要在pom.xml文件中引入如下maven依賴(lài):
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.10.2</version>
</dependency>
示例代碼如下:
public class Service {
/**
* 模擬tomcat線程池
*/
private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
/**
* 業(yè)務(wù)線程池,默認(rèn)Control中異步任務(wù)執(zhí)行線程池
*/
private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(4)); // 使用ttl線程池,該框架的使用,請(qǐng)查閱官方文檔。
/**
* 線程上下文環(huán)境,模擬在Control這一層,設(shè)置環(huán)境變量,然后在這里提交一個(gè)任務(wù),模擬在子線程中,是否可以訪問(wèn)到剛設(shè)置的環(huán)境變量值。
*/
private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();
// private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();
public static void main(String[] args) {
for(int i = 0; i < 10; i ++ ) {
tomcatExecutors.submit(new ControlThread(i));
}
//簡(jiǎn)單粗暴的關(guān)閉線程池
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
businessExecutors.shutdown();
tomcatExecutors.shutdown();
}
/**
* 模擬Control任務(wù)
*/
static class ControlThread implements Runnable {
private int i;
public ControlThread(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + i);
requestIdThreadLocal.set(i);
//使用線程池異步處理任務(wù)
businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
}
}
/**
* 業(yè)務(wù)任務(wù),主要是模擬在Control控制層,提交任務(wù)到線程池執(zhí)行
*/
static class BusinessTask implements Runnable {
private String parentThreadName;
public BusinessTask(String parentThreadName) {
this.parentThreadName = parentThreadName;
}
@Override
public void run() {
//如果與上面的能對(duì)應(yīng)上來(lái),則說(shuō)明正確,否則失敗
System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
}
}
}
其運(yùn)行結(jié)果如下:
pool-1-thread-10:9
pool-1-thread-8:7
pool-1-thread-7:6
pool-1-thread-9:8
pool-1-thread-6:5
pool-1-thread-5:4
pool-1-thread-4:3
pool-1-thread-3:2
pool-1-thread-2:1
pool-1-thread-1:0
parentThreadName:pool-1-thread-5:4
parentThreadName:pool-1-thread-9:8
parentThreadName:pool-1-thread-3:2
parentThreadName:pool-1-thread-2:1
parentThreadName:pool-1-thread-7:6
parentThreadName:pool-1-thread-8:7
parentThreadName:pool-1-thread-1:0
parentThreadName:pool-1-thread-6:5
parentThreadName:pool-1-thread-10:9
parentThreadName:pool-1-thread-4:3
執(zhí)行結(jié)果符合預(yù)期。那TransmittableThreadLocal是如何實(shí)現(xiàn)的呢?
3.2 TransmittableThreadLocal實(shí)現(xiàn)原理
從InheritableThreadLocal不支持線程池的根本原因是InheritableThreadLocal是在父線程創(chuàng)建子線程時(shí)復(fù)制的,由于線程池的復(fù)用機(jī)制,“子線程”只會(huì)復(fù)制一次。要支持線程池中能訪問(wèn)提交任務(wù)線程的本地變量,其實(shí)只需要在父線程向線程池提交任務(wù)時(shí)復(fù)制父線程的上下環(huán)境,那在子線程中就能夠如愿訪問(wèn)到父線程中的本地變量,實(shí)現(xiàn)本地環(huán)境變量在線程池調(diào)用中的透?jìng)?,從而為?shí)現(xiàn)鏈路跟蹤打下堅(jiān)實(shí)的基礎(chǔ),這也就是TransmittableThreadLocal最本質(zhì)的實(shí)現(xiàn)原理。
3.2.1 TransmittableThreadLocal類(lèi)圖
TransmittableThreadLocal繼承自InheritableThreadLocal,接下來(lái)將從set方法為入口,開(kāi)始探究TransmittableThreadLocal實(shí)現(xiàn)原理。
3.2.2 set方法詳解
public final void set(T value) {
super.set(value); // @1
// may set null to remove value
if (null == value) // @2
removeValue();
else
addValue();
代碼@1:首先調(diào)用父類(lèi)的set方法,將value存入線程本地遍歷,即Thread對(duì)象的inheritableThreadLocals中。
代碼@2:如果value為空,則調(diào)用removeValue()否則調(diào)用addValue。
那接下來(lái)重點(diǎn)看看這兩個(gè)方法有什么名堂:
private void addValue() {
if (!holder.get().containsKey(this)) { // @1
holder.get().put(this, null); // WeakHashMap supports null value.
}
}
private void removeValue() {
holder.get().remove(this);
}
代碼@1:當(dāng)前線程在調(diào)用threadLocal方法的set方法(即向線程本地遍歷存儲(chǔ)數(shù)據(jù)時(shí)),如果需要設(shè)置的值不為null,則調(diào)用addValue方法,將當(dāng)前ThreadLocal存儲(chǔ)到TransmittableThreadLocal的全局靜態(tài)變量holder。holder的定義如下:
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
@Override
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}
@Override
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
從中可以看出,使用了線程本地變量,內(nèi)部存放的結(jié)構(gòu)為Map, ?>,即該對(duì)象緩存了線程執(zhí)行過(guò)程中所有的TransmittableThreadLocal對(duì)象,并且其關(guān)聯(lián)的值不為空。但這樣做有什么用呢?
為了解開(kāi)這個(gè)難題,可能需要大家對(duì)ttl這個(gè)框架的使用有一定的理解,本文由于篇幅的原因,將不會(huì)詳細(xì)介紹,如有大家有興趣,可以查閱其官網(wǎng)解其使用:
https://github.com/alibaba/transmittable-thread-local
ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(4));
TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<String>();
parent.set("value-set-in-parent");
Runnable task = new Task("1");
Callable call = new Call("2");
executorService.submit(task);
executorService.submit(call);
我們從submit為突破口,來(lái)嘗試解開(kāi)holder屬性用途。
class ExecutorTtlWrapper implements Executor, TtlEnhanced {
private final Executor executor;
ExecutorTtlWrapper(@Nonnull Executor executor) {
this.executor = executor;
}
@Override
public void execute(@Nonnull Runnable command) {
executor.execute(TtlRunnable.get(command)); // @1
}
@Nonnull
public Executor unwrap() {
return executor;
}
}
在向線程池提交任務(wù)時(shí),會(huì)使用TtlRunnable對(duì)提交任務(wù)進(jìn)行包裝。接下來(lái)將重點(diǎn)探討TtlRunnable。
3.2.2 TtlRunnable詳解
3.2.2.1 類(lèi)圖
下面一一來(lái)介紹其核心屬性:
AtomicReference capturedRef
“捕獲”的引用,根據(jù)下文的解讀,該引用指向的數(shù)據(jù)結(jié)構(gòu)包含了父線程在執(zhí)行過(guò)程中,通過(guò)使用TransmittableThreadLocal存儲(chǔ)的本地線程變量,但這里的觸發(fā)時(shí)機(jī)是向線程池提交任務(wù)時(shí)捕獲。Runnable runnable
提交到線程池中待執(zhí)行的業(yè)務(wù)邏輯。boolean releaseTtlValueReferenceAfterRun
默認(rèn)為false。
接下來(lái)重點(diǎn)看一下其構(gòu)造方法:
private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture()); // @1
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
構(gòu)造方法沒(méi)什么特別,重點(diǎn)看一下子線程是如何“捕獲”父線程中已設(shè)置的本地線程變量。
TransmittableThreadLocal$Transmitter#capture
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>(); // @1
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { // @2
captured.put(threadLocal, threadLocal.copyValue()); // @3
}
return captured;
}
代碼@1:先創(chuàng)建Map容器,用來(lái)存儲(chǔ)父線程的本地線程變量,鍵為在父線程執(zhí)行過(guò)程中使用到的TransmittableThreadLocal線程。
代碼@2:holder.get(),獲取父線程中使用中的ThreadLocal,因?yàn)槲覀儚?.2.2節(jié)中發(fā)現(xiàn),在當(dāng)前線程在調(diào)用TransmittableThreadLocal的set方法,并且其值不為空的時(shí)候,會(huì)將TransmittableThreadLocal對(duì)象存儲(chǔ)存儲(chǔ)在當(dāng)前線程的本地變量中。故這里使用holder.get()方法能獲取父線程中已使用的ThreadLocal,并其值不為null。
代碼@3:遍歷父線程已使用的線程本地,將其值存入到captured中,注意默認(rèn)是淺拷貝,如果需要實(shí)現(xiàn)深度拷貝,請(qǐng)重寫(xiě)TransmittableThreadLocal的copyValue方法。
溫馨提示:從這里看出TransmittableThreadLocal的靜態(tài)屬性holder的用處吧,請(qǐng)重點(diǎn)理解holder的屬性類(lèi)型為:InheritableThreadLocal, ?>>。
在向線程池提交任務(wù)時(shí),會(huì)先捕獲父線程(提交任務(wù)到線程池的線程)中的本地環(huán)境變量,接下來(lái)重點(diǎn)來(lái)看一下其run方法。
3.2.2.2 run方法
public void run() {
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
Object backup = replay(captured); // @1
try {
runnable.run(); // @2
} finally {
restore(backup); // @3
}
}
代碼@1:"重放"父線程的本地環(huán)境變量,即使用從父線程中捕獲過(guò)來(lái)的上下文環(huán)境,在子線程中重新執(zhí)行一遍,并返回原先存在與子線程中的上下文環(huán)境變量。
代碼@2:執(zhí)行業(yè)務(wù)邏輯。
代碼@3:恢復(fù)線程池中當(dāng)前執(zhí)行任務(wù)的線程的上下文環(huán)境,即代碼@1,會(huì)直接繼承父線程中的上下文環(huán)境,但會(huì)將原先存在該線程的線程上下文環(huán)境進(jìn)行備份,在任務(wù)執(zhí)行完后通過(guò)執(zhí)行restore方法進(jìn)行恢復(fù)。
不得不佩服這里設(shè)計(jì)的巧妙。筆者有理由相信能看到這里的諸位讀者一定是有實(shí)力并且有強(qiáng)烈求知的欲望的人,那我們?cè)趤?lái)看一下replay、restore方法的實(shí)現(xiàn)。
3.2.2.3 replay
public static Object replay(@Nonnull Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; // @1
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); // @2
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
backup.put(threadLocal, threadLocal.get()); // @3
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
if (!capturedMap.containsKey(threadLocal)) { // @4
iterator.remove();
threadLocal.superRemove();
}
// set values to captured TTL
setTtlValuesTo(capturedMap); // @5
// call beforeExecute callback
doExecuteCallback(true); // @6
return backup; // @7
}
代碼@1:首先解釋一下兩個(gè)局部變量的含義:
capturedMap
子線程從父線程捕獲的線程本地遍歷。backup
線程池中處理本次任務(wù)的線程中原先存在的本地線程變量。
代碼@2:holder.get(),這里是子線程中原先存在的本地線程變量(即線程池中分配來(lái)執(zhí)行本次任務(wù)的線程),然后遍歷它,將其存儲(chǔ)在backUp(@3)。
代碼@4:從這里開(kāi)始,開(kāi)始根據(jù)父線程的本地變量來(lái)重放當(dāng)前線程,如果父線程中不包含的threadlocal對(duì)象,將從本地線程變量中移除。
代碼@5:遍歷父線程中的本地線程變量,在子線程中重新執(zhí)行一次threadlocal.set方法。
代碼@6:執(zhí)行beforeExecute()鉤子函數(shù)。
代碼@7:返回線程池原線程的本地線程變量,供本次調(diào)用后恢復(fù)上下文環(huán)境。
3.2.2.4 restore
恢復(fù)線程中子線程原先的本地線程變量,即恢復(fù)線程,本次執(zhí)行并不會(huì)污染線程池中線程原先的上下文環(huán)境,精妙。我們來(lái)看看其代碼實(shí)現(xiàn):
public static void restore(@Nonnull Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; // @1
// call afterExecute callback
doExecuteCallback(false); // @2
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); // @3
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// clear the TTL values that is not in bac1kup
// avoid the extra TTL values after restore
if (!backupMap.containsKey(threadLocal)) { // @4
iterator.remove();
threadLocal.superRemove();
}
}
// restore TTL values
setTtlValuesTo(backupMap); // @5
}
代碼@1:獲取備份好的線程本地上下文。
代碼@2:執(zhí)行afterExecute()鉤子函數(shù)。
代碼@3:遍歷本地線程變量,將不屬于backUpMap中存在的線程本地上下文移除(@4)。
代碼@5:遍歷備份的本地線程本地,在本地線程中重新執(zhí)行threadlocal#set方法,實(shí)現(xiàn)線程本地變量的還原。
總結(jié)
本文介紹到這里了,詳細(xì)介紹了ThreadLocal、InheritableThreadLocal、TransmittableThreadLocal的實(shí)現(xiàn)原理,并從ThreadLocal、InheritableThreadLocal的局限性,最終引出TransmittableThreadLocal,為全鏈路壓測(cè)中壓測(cè)標(biāo)記的透?jìng)鞔蛳聢?jiān)實(shí)的基礎(chǔ)。
相關(guān)文章: