本地線程變量

中間件興趣圈 , 作者 丁威

說(shuō)起本地線程變量,我相信大家首先會(huì)想到的是JDK默認(rèn)提供的ThreadLocal,用來(lái)存儲(chǔ)在整個(gè)調(diào)用鏈中都需要訪問(wèn)的數(shù)據(jù),并且是線程安全的。由于本文的寫(xiě)作背景是筆者需要在公司落地全鏈路壓測(cè)平臺(tái),一個(gè)基本并核心的功能需求是壓測(cè)標(biāo)記需要在整個(gè)調(diào)用鏈中進(jìn)行傳遞,線程上下文環(huán)境成為解決這個(gè)問(wèn)題最合適的技術(shù)。

溫馨提示:
本從從ThreadLocal原理入手分析,并拋出其缺點(diǎn),再逐一引出InheritableThreadLocal、TransmittableThreadLocal。文章篇幅稍長(zhǎng),但由于循序漸進(jìn),層層遞進(jìn),故精華部分在后面。

ThreadLocal詳解


ThreadLocal對(duì)外提供的API如下:

  • public T get()
    從線程上下文環(huán)境中獲取設(shè)置的值。

  • public void set(T value)
    將值存儲(chǔ)到線程上下文環(huán)境中,供后續(xù)使用。

  • public void remove()
    清除線程本地上下文環(huán)境。

上述API使用簡(jiǎn)單,關(guān)鍵是要理解ThreadLocal的內(nèi)部存儲(chǔ)結(jié)果。

1.1 ThreadLocal存儲(chǔ)結(jié)構(gòu)

image

圖的幾個(gè)關(guān)鍵點(diǎn)如下:

  • 數(shù)據(jù)存儲(chǔ)
    當(dāng)線程調(diào)用threadLocal對(duì)象的set(Object value)方法時(shí),數(shù)據(jù)并不是存儲(chǔ)在ThreadLocal對(duì)象中,而是存儲(chǔ)在Thread對(duì)象中,這也是ThreadLocal的由來(lái),具體存儲(chǔ)在線程對(duì)象的threadLocals屬性中,其類(lèi)型為T(mén)hreadLocal.ThreadLocalMap。

  • ThreadLocal.ThreadLocalMap,Map結(jié)構(gòu),即鍵值對(duì),鍵為threadLocal對(duì)象,值為需要存儲(chǔ)到線程上下文的值(threadLocal#set)方法的參數(shù)。

1.2 源碼分析ThreadLocal

1.2.1 源碼分析get
public T get() {
  Thread t = Thread.currentThread();  // @1
     ThreadLocalMap map = getMap(t);  // @2
     if (map != null) {                                // @3
         ThreadLocalMap.Entry e = map.getEntry(this);
         if (e != null) {
             @SuppressWarnings("unchecked")
             T result = (T)e.value;
             return result;
        }
    }
    return setInitialValue();  // @4
}

代碼@1:獲取當(dāng)前線程。

代碼@2:獲取線程的threadLocals屬性,在上圖中已展示其存儲(chǔ)結(jié)構(gòu)。

代碼@3:如果線程對(duì)象的threadLocals屬性不為空,則從該Map結(jié)構(gòu)中,用threadLocal對(duì)象為鍵去查找值,如果能找到,則返回其value值,否則執(zhí)行代碼@4。

代碼@4:如果線程對(duì)象的threadLocals屬性為空,或未從threadLocals中找到對(duì)應(yīng)的鍵值對(duì),則調(diào)用該方法執(zhí)行初始化。

ThreadLocal#setInitialValue

 private T setInitialValue() {
     T value = initialValue();    // @1
     Thread t = Thread.currentThread();    // @2
     ThreadLocalMap map = getMap(t);    // @3
     if (map != null)                                     //@4
        map.set(this, value);
     else
        createMap(t, value);                        // @5
     return value;
}

代碼@1:調(diào)用initialValue()獲取默認(rèn)初始化值,該方法默認(rèn)返回null,子類(lèi)可以重寫(xiě),實(shí)現(xiàn)線程本地變量的初始化。

代碼@2:獲取當(dāng)前線程。

代碼@3:獲取該線程對(duì)象的threadLocals屬性。

代碼@4:如果不為空,則將threadLocal:value存入線程對(duì)象的threadLocals屬性中。

代碼@5:否則初始化線程對(duì)象的threadLocals,然后將threadLocal:value鍵值對(duì)存入線程對(duì)象的threadLocals屬性中。

1.2.2 源碼分析set
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

在掌握了get方法實(shí)現(xiàn)細(xì)節(jié),set方法、remove其實(shí)現(xiàn)的邏輯基本一樣,就是對(duì)線程對(duì)象的threadLocals屬性進(jìn)行操作(Map結(jié)構(gòu))。

1.3 ThreadLocal局限性

經(jīng)過(guò)上面的剖析,對(duì)ThreadLocal的內(nèi)部存儲(chǔ)、set、get、remove等實(shí)現(xiàn)細(xì)節(jié)都已做了詳細(xì)的解讀,但ThreadLocal無(wú)法在父子線程之間傳遞,示例代碼如下:

public class Service {
    private static ThreadLocal<Integer> requestIdThreadLocal = new ThreadLocal<>();
    public static void main(String[] args) {
        Integer reqId = new Integer(5);
        Service a = new Service();
        a.setRequestId(reqId);
    }

    public void setRequestId(Integer requestId) {
        requestIdThreadLocal.set(requestId);
        doBussiness();
    }

    public void doBussiness() {
        System.out.println("首先打印requestId:" + requestIdThreadLocal.get());
        (new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("子線程啟動(dòng)");
                System.out.println("在子線程中訪問(wèn)requestId:" + requestIdThreadLocal.get());
            }
        })).start();
    }
}

運(yùn)行結(jié)果如下:

image

從結(jié)果上來(lái)看,在子線程中無(wú)法訪問(wèn)在父線程中設(shè)置的本地線程變量,那我們?cè)撊绾蝸?lái)解決該問(wèn)題呢?

為了解決該問(wèn)題,JDK引入了另外一個(gè)線程本地變量實(shí)現(xiàn)類(lèi)InheritableThreadLocal,接下來(lái)將重點(diǎn)介紹InheritableThreadLocal的實(shí)現(xiàn)原理。

InheritableThreadLocal


由于ThreadLocal在父子線程交互中子線程無(wú)法訪問(wèn)到存儲(chǔ)在父線程中的值,無(wú)法滿足某些場(chǎng)景的需求,例如鏈路跟蹤,例如如下場(chǎng)景:

image

為了解決上述問(wèn)題,JDK引入了InheritableThreadLocal,即子線程可以訪問(wèn)父線程中的線程本地變量,更嚴(yán)謹(jǐn)?shù)恼f(shuō)法是子線程可以訪問(wèn)在創(chuàng)建子線程時(shí)父線程當(dāng)時(shí)的本地線程變量,因?yàn)槠鋵?shí)現(xiàn)原理就是在創(chuàng)建子線程將父線程當(dāng)前存在的本地線程變量拷貝到子線程的本地線程變量中。

2.1 類(lèi)圖

image

<figcaption style="margin: 10px 0px 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; word-wrap: break-word !important; line-height: inherit; text-align: center; color: rgb(153, 153, 153); font-size: 0.7em;"></figcaption>

從類(lèi)的繼承層次來(lái)看,InheritableThreadLocal只是在ThreadLocal的get、set、remove流程中,重寫(xiě)了getMap、createMap方法,整體流程與ThreadLocal保持一致,故我們初步來(lái)看一下InheritableThreadLocal是如何重寫(xiě)上述這兩個(gè)方法的。

ThreadLocalMap getMap(Thread t) {
    return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
    t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}

從代碼得知,ThreadLocal操作的是Thread對(duì)象的threadLocals屬性,而InheritableThreadLocal操作的是Thread對(duì)象的inheritableThreadLocals屬性。

溫馨提示:createMap被執(zhí)行的條件是調(diào)用InheritableThreadLocal#get、set時(shí)如果線程的inheritableThreadLocals屬性為空時(shí)才會(huì)被調(diào)用。

那問(wèn)題來(lái)了,InheritableThreadLocal是如何繼承自父對(duì)象的線程本地變量的呢?

2.2 線程上下文環(huán)境如何從父線程傳遞到子線程

這部分的代碼入口為:Thread#init方法

Thread parent = currentThread();                // @1
 
 // 省略部分代碼
 
 if (inheritThreadLocals && parent.inheritableThreadLocals != null)    // @2
     this.inheritableThreadLocals =
         ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
 /* Stash the specified stack size in case the VM cares */
 this.stackSize = stackSize;

/* Set thread ID */
tid = nextThreadID();

子線程是通過(guò)在父線程中通過(guò)調(diào)用new Thread()方法來(lái)創(chuàng)建子線程,Thread#init方法在Thread的構(gòu)造方法中被調(diào)用。

代碼@1:獲取當(dāng)前線程對(duì)象,即待創(chuàng)建的線程的父線程。

代碼@2:如果父線程的inheritableThreadLocals不為空并且inheritThreadLocals為true(該值默認(rèn)為true),則使用父線程的inherit本地變量的值來(lái)創(chuàng)建子線程的inheritableThreadLocals結(jié)構(gòu),即將父線程中的本地變量復(fù)制到子線程中。

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
     return new ThreadLocalMap(parentMap);
 }
 private ThreadLocalMap(ThreadLocalMap parentMap) {
     Entry[] parentTable = parentMap.table;
     int len = parentTable.length;
     setThreshold(len);
     table = new Entry[len];
 
    for (int j = 0; j < len; j++) {
        Entry e = parentTable[j];
        if (e != null) {
            @SuppressWarnings("unchecked")
            ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
            if (key != null) {
                Object value = key.childValue(e.value);
                Entry c = new Entry(key, value);
                int h = key.threadLocalHashCode & (len - 1);
                while (table[h] != null)
                    h = nextIndex(h, len);

        table[h] = c;
                size++;
            }
        }
    }
}

上述代碼就不一一分析,類(lèi)似于Map的復(fù)制,只不過(guò)其在Hash沖突時(shí),不是使用鏈表結(jié)構(gòu),而是直接在數(shù)組中找下一個(gè)為null的槽位。

溫馨提示:子線程默認(rèn)拷貝父線程的方式是淺拷貝,如果需要使用深拷貝,需要使用自定義ThreadLocal,繼承InheritableThreadLocal并重寫(xiě)childValue方法。

2.3 驗(yàn)證InheritableThreadLocal的特性

驗(yàn)證代碼如下:

public class Service {
     private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();
     public static void main(String[] args) {
         Integer reqId = new Integer(5);
         Service a = new Service();
         a.setRequestId(reqId);
     }
 
     public void setRequestId(Integer requestId) {
        requestIdThreadLocal.set(requestId);
        doBussiness();
    }

    public void doBussiness() {
        System.out.println("首先打印requestId:" + requestIdThreadLocal.get());
        (new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("子線程啟動(dòng)");
                System.out.println("在子線程中訪問(wèn)requestId:" + requestIdThreadLocal.get());
            }
        })).start();
    }
}

執(zhí)行結(jié)果如下:

image

符合預(yù)期,在子線程中如愿訪問(wèn)到了在主線程中設(shè)置的本地環(huán)境變量。

2.4 InheritableThreadLocal局限性

InheritableThreadLocal支持子線程訪問(wèn)在父線程中設(shè)置的線程上下文環(huán)境的實(shí)現(xiàn)原理是在創(chuàng)建子線程時(shí)將父線程中的本地變量值復(fù)制到子線程,即復(fù)制的時(shí)機(jī)為創(chuàng)建子線程時(shí)。

但我們提到并發(fā)、多線程就離不開(kāi)線程池的使用,因?yàn)榫€程池能夠復(fù)用線程,減少線程的頻繁創(chuàng)建與銷(xiāo)毀,如果使用InheritableThreadLocal,那么線程池中的線程拷貝的數(shù)據(jù)來(lái)自于第一個(gè)提交任務(wù)的外部線程,即后面的外部線程向線程池中提交任務(wù)時(shí),子線程訪問(wèn)的本地變量都來(lái)源于第一個(gè)外部線程,造成線程本地變量混亂,驗(yàn)證代碼如下:

 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 public class Service {
     /**
      * 模擬tomcat線程池
      */
     private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
     /**
      * 業(yè)務(wù)線程池,默認(rèn)Control中異步任務(wù)執(zhí)行線程池
     */
    private static ExecutorService businessExecutors = Executors.newFixedThreadPool(5);
    /**
     * 線程上下文環(huán)境,模擬在Control這一層,設(shè)置環(huán)境變量,然后在這里提交一個(gè)異步任務(wù),模擬在子線程中,是否可以訪問(wèn)到剛設(shè)置的環(huán)境變量值。
     */
    private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();

    public static void main(String[] args) {

        for(int i = 0; i < 10; i ++ ) {  // 模式10個(gè)請(qǐng)求,每個(gè)請(qǐng)求執(zhí)行ControlThread的邏輯,其具體實(shí)現(xiàn)就是,先輸出父線程的名稱(chēng),
                                                  //  然后設(shè)置本地環(huán)境變量,并將父線程名稱(chēng)傳入到子線程中,在子線程中嘗試獲取在父線程中的設(shè)置的環(huán)境變量
            tomcatExecutors.submit(new ControlThread(i));
        }

        //簡(jiǎn)單粗暴的關(guān)閉線程池
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        businessExecutors.shutdown();
        tomcatExecutors.shutdown();
    }


    /**
     * 模擬Control任務(wù)
     */
    static class ControlThread implements Runnable {
        private int i;

        public ControlThread(int i) {
            this.i = i;
        }
        @Override
        public void run() {
         System.out.println(Thread.currentThread().getName() + ":" + i);
                requestIdThreadLocal.set(i);
                //使用線程池異步處理任務(wù)
                businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
        }
    }

    /**
     * 業(yè)務(wù)任務(wù),主要是模擬在Control控制層,提交任務(wù)到線程池執(zhí)行
     */
    static class BusinessTask implements Runnable {
        private String parentThreadName;

        public BusinessTask(String parentThreadName) {
            this.parentThreadName = parentThreadName;
        }

        @Override
       public void run() {
            //如果與上面的能對(duì)應(yīng)上來(lái),則說(shuō)明正確,否則失敗
            System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
        }
    }
}

執(zhí)行效果如下:

 pool-1-thread-1:0
 pool-1-thread-2:1
 pool-1-thread-3:2
 pool-1-thread-4:3
 pool-1-thread-5:4
 pool-1-thread-6:5
 pool-1-thread-7:6
 pool-1-thread-8:7
 pool-1-thread-9:8
 pool-1-thread-10:9
parentThreadName:pool-1-thread-7:6
parentThreadName:pool-1-thread-4:6
parentThreadName:pool-1-thread-3:6
parentThreadName:pool-1-thread-2:6
parentThreadName:pool-1-thread-1:6
parentThreadName:pool-1-thread-9:6
parentThreadName:pool-1-thread-10:6
parentThreadName:pool-1-thread-8:7
parentThreadName:pool-1-thread-6:5
parentThreadName:pool-1-thread-5:4

從這里可以出thread-7、thread-4、thread-3、thread-2、thread-1、thread-9、thread-10獲取的都是6,在子線程中出現(xiàn)出現(xiàn)了線程本地變量混亂的現(xiàn)象,在全鏈路跟蹤與壓測(cè)出現(xiàn)這種情況是致命的。

問(wèn)題:大家通過(guò)上面的學(xué)習(xí),應(yīng)該能解釋這個(gè)現(xiàn)象?此處可以稍微停下來(lái)思考一番。

怎么解決這個(gè)問(wèn)題呢?

TransmittableThreadLocal ”閃亮登場(chǎng)“。

TransmittableThreadLocal


3.1 TransmittableThreadLocal“何許人也”

TransmittableThreadLocal何許人也,它可是阿里巴巴開(kāi)源的專(zhuān)門(mén)解決InheritableThreadLocal的局限性,實(shí)現(xiàn)線程本地變量在線程池的執(zhí)行過(guò)程中,能正常的訪問(wèn)父線程設(shè)置的線程變量。實(shí)踐是檢驗(yàn)整理的唯一標(biāo)準(zhǔn),我們還是以上面的示例來(lái)進(jìn)行驗(yàn)證,看看TransmittableThreadLocal是否支持上述場(chǎng)景:

首先需要在pom.xml文件中引入如下maven依賴(lài):

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.10.2</version>
</dependency>

示例代碼如下:

 public class Service {
 
     /**
      * 模擬tomcat線程池
      */
     private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
 
     /**
      * 業(yè)務(wù)線程池,默認(rèn)Control中異步任務(wù)執(zhí)行線程池
     */
    private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(4)); // 使用ttl線程池,該框架的使用,請(qǐng)查閱官方文檔。

    /**
     * 線程上下文環(huán)境,模擬在Control這一層,設(shè)置環(huán)境變量,然后在這里提交一個(gè)任務(wù),模擬在子線程中,是否可以訪問(wèn)到剛設(shè)置的環(huán)境變量值。
     */
   private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();

//    private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();

    public static void main(String[] args) {

        for(int i = 0; i < 10; i ++ ) {
            tomcatExecutors.submit(new ControlThread(i));
        }

        //簡(jiǎn)單粗暴的關(guān)閉線程池
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        businessExecutors.shutdown();
        tomcatExecutors.shutdown();

    }


    /**
     * 模擬Control任務(wù)
     */
    static class ControlThread implements Runnable {
        private int i;

        public ControlThread(int i) {
            this.i = i;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + ":" + i);
            requestIdThreadLocal.set(i);

            //使用線程池異步處理任務(wù)

            businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
        }
    }

    /**
     * 業(yè)務(wù)任務(wù),主要是模擬在Control控制層,提交任務(wù)到線程池執(zhí)行
     */
    static class BusinessTask implements Runnable {
        private String parentThreadName;

        public BusinessTask(String parentThreadName) {
            this.parentThreadName = parentThreadName;
        }

        @Override
        public void run() {
            //如果與上面的能對(duì)應(yīng)上來(lái),則說(shuō)明正確,否則失敗
            System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
        }
    }
}

其運(yùn)行結(jié)果如下:

 pool-1-thread-10:9
 pool-1-thread-8:7
 pool-1-thread-7:6
 pool-1-thread-9:8
 pool-1-thread-6:5
 pool-1-thread-5:4
 pool-1-thread-4:3
 pool-1-thread-3:2
 pool-1-thread-2:1
 pool-1-thread-1:0
parentThreadName:pool-1-thread-5:4
parentThreadName:pool-1-thread-9:8
parentThreadName:pool-1-thread-3:2
parentThreadName:pool-1-thread-2:1
parentThreadName:pool-1-thread-7:6
parentThreadName:pool-1-thread-8:7
parentThreadName:pool-1-thread-1:0
parentThreadName:pool-1-thread-6:5
parentThreadName:pool-1-thread-10:9
parentThreadName:pool-1-thread-4:3

執(zhí)行結(jié)果符合預(yù)期。那TransmittableThreadLocal是如何實(shí)現(xiàn)的呢?

3.2 TransmittableThreadLocal實(shí)現(xiàn)原理

從InheritableThreadLocal不支持線程池的根本原因是InheritableThreadLocal是在父線程創(chuàng)建子線程時(shí)復(fù)制的,由于線程池的復(fù)用機(jī)制,“子線程”只會(huì)復(fù)制一次。要支持線程池中能訪問(wèn)提交任務(wù)線程的本地變量,其實(shí)只需要在父線程向線程池提交任務(wù)時(shí)復(fù)制父線程的上下環(huán)境,那在子線程中就能夠如愿訪問(wèn)到父線程中的本地變量,實(shí)現(xiàn)本地環(huán)境變量在線程池調(diào)用中的透?jìng)?,從而為?shí)現(xiàn)鏈路跟蹤打下堅(jiān)實(shí)的基礎(chǔ),這也就是TransmittableThreadLocal最本質(zhì)的實(shí)現(xiàn)原理。

3.2.1 TransmittableThreadLocal類(lèi)圖

image

TransmittableThreadLocal繼承自InheritableThreadLocal,接下來(lái)將從set方法為入口,開(kāi)始探究TransmittableThreadLocal實(shí)現(xiàn)原理。

3.2.2 set方法詳解

public final void set(T value) {
    super.set(value);                              // @1
   // may set null to remove value
   if (null == value)                               // @2
       removeValue();
   else 
   addValue();

代碼@1:首先調(diào)用父類(lèi)的set方法,將value存入線程本地遍歷,即Thread對(duì)象的inheritableThreadLocals中。

代碼@2:如果value為空,則調(diào)用removeValue()否則調(diào)用addValue。

那接下來(lái)重點(diǎn)看看這兩個(gè)方法有什么名堂:

private void addValue() {
    if (!holder.get().containsKey(this)) {    // @1
        holder.get().put(this, null); // WeakHashMap supports null value.
    }
}
private void removeValue() {
    holder.get().remove(this);
}

代碼@1:當(dāng)前線程在調(diào)用threadLocal方法的set方法(即向線程本地遍歷存儲(chǔ)數(shù)據(jù)時(shí)),如果需要設(shè)置的值不為null,則調(diào)用addValue方法,將當(dāng)前ThreadLocal存儲(chǔ)到TransmittableThreadLocal的全局靜態(tài)變量holder。holder的定義如下:

private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
             new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
                 @Override
                 protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                     return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                 }
 
                 @Override
                 protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                }
            };

從中可以看出,使用了線程本地變量,內(nèi)部存放的結(jié)構(gòu)為Map, ?>,即該對(duì)象緩存了線程執(zhí)行過(guò)程中所有的TransmittableThreadLocal對(duì)象,并且其關(guān)聯(lián)的值不為空。但這樣做有什么用呢?

為了解開(kāi)這個(gè)難題,可能需要大家對(duì)ttl這個(gè)框架的使用有一定的理解,本文由于篇幅的原因,將不會(huì)詳細(xì)介紹,如有大家有興趣,可以查閱其官網(wǎng)解其使用:

https://github.com/alibaba/transmittable-thread-local

 ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(4));
 TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<String>();
 parent.set("value-set-in-parent");
 Runnable task = new Task("1");
 Callable call = new Call("2");
 executorService.submit(task);
 executorService.submit(call);
 
 我們從submit為突破口,來(lái)嘗試解開(kāi)holder屬性用途。
class ExecutorTtlWrapper implements Executor, TtlEnhanced {
   private final Executor executor;

    ExecutorTtlWrapper(@Nonnull Executor executor) {
        this.executor = executor;
    }

    @Override
    public void execute(@Nonnull Runnable command) {
        executor.execute(TtlRunnable.get(command));  // @1
    }

    @Nonnull
    public Executor unwrap() {
        return executor;
    }
}

在向線程池提交任務(wù)時(shí),會(huì)使用TtlRunnable對(duì)提交任務(wù)進(jìn)行包裝。接下來(lái)將重點(diǎn)探討TtlRunnable。

3.2.2 TtlRunnable詳解

3.2.2.1 類(lèi)圖
image

下面一一來(lái)介紹其核心屬性:

  • AtomicReference capturedRef
    “捕獲”的引用,根據(jù)下文的解讀,該引用指向的數(shù)據(jù)結(jié)構(gòu)包含了父線程在執(zhí)行過(guò)程中,通過(guò)使用TransmittableThreadLocal存儲(chǔ)的本地線程變量,但這里的觸發(fā)時(shí)機(jī)是向線程池提交任務(wù)時(shí)捕獲。

  • Runnable runnable
    提交到線程池中待執(zhí)行的業(yè)務(wù)邏輯。

  • boolean releaseTtlValueReferenceAfterRun
    默認(rèn)為false。

接下來(lái)重點(diǎn)看一下其構(gòu)造方法:

private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
    this.capturedRef = new AtomicReference<Object>(capture());   // @1
    this.runnable = runnable;
    this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}

構(gòu)造方法沒(méi)什么特別,重點(diǎn)看一下子線程是如何“捕獲”父線程中已設(shè)置的本地線程變量。

TransmittableThreadLocal$Transmitter#capture
public static Object capture() {
    Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();  // @1
    for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {     // @2
        captured.put(threadLocal, threadLocal.copyValue());                              // @3
    }
    return captured;
}

代碼@1:先創(chuàng)建Map容器,用來(lái)存儲(chǔ)父線程的本地線程變量,鍵為在父線程執(zhí)行過(guò)程中使用到的TransmittableThreadLocal線程。

代碼@2:holder.get(),獲取父線程中使用中的ThreadLocal,因?yàn)槲覀儚?.2.2節(jié)中發(fā)現(xiàn),在當(dāng)前線程在調(diào)用TransmittableThreadLocal的set方法,并且其值不為空的時(shí)候,會(huì)將TransmittableThreadLocal對(duì)象存儲(chǔ)存儲(chǔ)在當(dāng)前線程的本地變量中。故這里使用holder.get()方法能獲取父線程中已使用的ThreadLocal,并其值不為null。

代碼@3:遍歷父線程已使用的線程本地,將其值存入到captured中,注意默認(rèn)是淺拷貝,如果需要實(shí)現(xiàn)深度拷貝,請(qǐng)重寫(xiě)TransmittableThreadLocal的copyValue方法。

溫馨提示:從這里看出TransmittableThreadLocal的靜態(tài)屬性holder的用處吧,請(qǐng)重點(diǎn)理解holder的屬性類(lèi)型為:InheritableThreadLocal, ?>>。

在向線程池提交任務(wù)時(shí),會(huì)先捕獲父線程(提交任務(wù)到線程池的線程)中的本地環(huán)境變量,接下來(lái)重點(diǎn)來(lái)看一下其run方法。

3.2.2.2 run方法
 public void run() {
     Object captured = capturedRef.get();             
     if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
         throw new IllegalStateException("TTL value reference is released after run!");
     }
 
     Object backup = replay(captured);           // @1
     try {
         runnable.run();                                     // @2
    } finally {
        restore(backup);                                    // @3
    }
}

代碼@1:"重放"父線程的本地環(huán)境變量,即使用從父線程中捕獲過(guò)來(lái)的上下文環(huán)境,在子線程中重新執(zhí)行一遍,并返回原先存在與子線程中的上下文環(huán)境變量。

代碼@2:執(zhí)行業(yè)務(wù)邏輯。

代碼@3:恢復(fù)線程池中當(dāng)前執(zhí)行任務(wù)的線程的上下文環(huán)境,即代碼@1,會(huì)直接繼承父線程中的上下文環(huán)境,但會(huì)將原先存在該線程的線程上下文環(huán)境進(jìn)行備份,在任務(wù)執(zhí)行完后通過(guò)執(zhí)行restore方法進(jìn)行恢復(fù)。

不得不佩服這里設(shè)計(jì)的巧妙。筆者有理由相信能看到這里的諸位讀者一定是有實(shí)力并且有強(qiáng)烈求知的欲望的人,那我們?cè)趤?lái)看一下replay、restore方法的實(shí)現(xiàn)。

3.2.2.3 replay
 public static Object replay(@Nonnull Object captured) {
     @SuppressWarnings("unchecked")
     Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;      // @1
     Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();              
 
     for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();               // @2
                  iterator.hasNext(); ) {
         Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
         TransmittableThreadLocal<?> threadLocal = next.getKey();

        backup.put(threadLocal, threadLocal.get());    // @3

        // clear the TTL values that is not in captured
        // avoid the extra TTL values after replay when run task
        if (!capturedMap.containsKey(threadLocal)) {            // @4
            iterator.remove();
            threadLocal.superRemove();
        }
     // set values to captured TTL
      setTtlValuesTo(capturedMap);       // @5

    // call beforeExecute callback
    doExecuteCallback(true);      // @6

    return backup;                 // @7
}

代碼@1:首先解釋一下兩個(gè)局部變量的含義:

  • capturedMap
    子線程從父線程捕獲的線程本地遍歷。

  • backup
    線程池中處理本次任務(wù)的線程中原先存在的本地線程變量。

代碼@2:holder.get(),這里是子線程中原先存在的本地線程變量(即線程池中分配來(lái)執(zhí)行本次任務(wù)的線程),然后遍歷它,將其存儲(chǔ)在backUp(@3)。

代碼@4:從這里開(kāi)始,開(kāi)始根據(jù)父線程的本地變量來(lái)重放當(dāng)前線程,如果父線程中不包含的threadlocal對(duì)象,將從本地線程變量中移除。

代碼@5:遍歷父線程中的本地線程變量,在子線程中重新執(zhí)行一次threadlocal.set方法。

代碼@6:執(zhí)行beforeExecute()鉤子函數(shù)。

代碼@7:返回線程池原線程的本地線程變量,供本次調(diào)用后恢復(fù)上下文環(huán)境。

3.2.2.4 restore

恢復(fù)線程中子線程原先的本地線程變量,即恢復(fù)線程,本次執(zhí)行并不會(huì)污染線程池中線程原先的上下文環(huán)境,精妙。我們來(lái)看看其代碼實(shí)現(xiàn):

 public static void restore(@Nonnull Object backup) {
     @SuppressWarnings("unchecked")
     Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;      // @1
     // call afterExecute callback
     doExecuteCallback(false);   // @2
 
     for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();           // @3
                  iterator.hasNext(); ) {
         Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
        TransmittableThreadLocal<?> threadLocal = next.getKey();

        // clear the TTL values that is not in bac1kup
        // avoid the extra TTL values after restore
        if (!backupMap.containsKey(threadLocal)) {          // @4
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // restore TTL values
    setTtlValuesTo(backupMap);          // @5
} 

代碼@1:獲取備份好的線程本地上下文。

代碼@2:執(zhí)行afterExecute()鉤子函數(shù)。

代碼@3:遍歷本地線程變量,將不屬于backUpMap中存在的線程本地上下文移除(@4)。

代碼@5:遍歷備份的本地線程本地,在本地線程中重新執(zhí)行threadlocal#set方法,實(shí)現(xiàn)線程本地變量的還原。

總結(jié)


本文介紹到這里了,詳細(xì)介紹了ThreadLocal、InheritableThreadLocal、TransmittableThreadLocal的實(shí)現(xiàn)原理,并從ThreadLocal、InheritableThreadLocal的局限性,最終引出TransmittableThreadLocal,為全鏈路壓測(cè)中壓測(cè)標(biāo)記的透?jìng)鞔蛳聢?jiān)實(shí)的基礎(chǔ)。

相關(guān)文章:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容