為什么需要TransmittableThreadLocal

1. 正確的ThreadLocal的使用

1.1 如何使用

同一個線程中,通過ThreadLocal傳遞上下文,在使用后進(jìn)行釋放

1.2 使用場景

  • 示意圖


    image.png
  • 示例說明
    • 如果web應(yīng)用在處理tom的請求的時候,bob又進(jìn)來,使用了線程t2, t1和t2的threadlocal變量是隔離的,也就是線程安全的
    • 線程使用完后將threadlocal釋放,避免內(nèi)存泄漏
    • 這種情況下,線程的thradlocal可以從上層流轉(zhuǎn)到下層,上層的修改對下層是可見的

1.3 代碼示例

package com.zihao.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author tangzihao
 * @date 2020/9/5 8:08 下午
 */
@RestController
public class TestThreadLocalController {
    private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();
    /**
     * 處理用戶的請求
     * @param uid 用戶的uid
     */
    @RequestMapping("/testThreadLocal")
    public void handleRequest(String uid){
        System.out.println("current user uid:"+uid);
        //將uid放入ThreadLocal中,通過ThreadLocal進(jìn)行傳值,避免共享成員變量產(chǎn)生線程不安全
        threadLocal.set(uid);
        query();
    }
    /**
     * 查詢用戶信息
     */
    private void query(){
        try{
            queryCache();
            queryDb();
        }finally {
            //用完后一定要釋放該線程的threadLocal
            threadLocal.remove();
            //移除后輸出下內(nèi)容
            System.out.println("after remove,current threadLocal:"+threadLocal.get());
            System.out.println();
        }
    }
    /**
     * 模擬查詢緩存
     */
    private void queryCache(){
        System.out.println("thread ["+Thread.currentThread().getName()+"] query cache, uid:"+threadLocal.get());
    }
    /**
     * 模擬查詢數(shù)據(jù)庫
     */
    private void queryDb(){
        System.out.println("thread ["+Thread.currentThread().getName()+"] query db, uid:"+threadLocal.get());
    }
}

我是在springboot項(xiàng)目中,簡單寫了個controller模擬用戶的請求

1.4 輸出結(jié)果

image.png

1.5 其他聯(lián)想

  • tomcat的線程池的設(shè)計策略
    • tomcat的線程池是如何擴(kuò)展jdk的線程池的,具體見tomcat源碼的org.apache.tomcat.util.threads.ThreadPoolExecutor,tomcat源碼編譯本博客有可以看下喔
  • springboot內(nèi)嵌的tomcat配置
    • 配置項(xiàng)見ServerProperties
  • Spring boot如何切換不同的web容器
    • 使用不同的web容器,線程池的命名是不同的
    • 容器可以切換成jetty和undertow
      • 排除spring-boot-starter-web的spring-boot-starter-tomcat
      • 引入spring-boot-starter-jetty或者spring-boot-starter-undertow

2. 父子線程傳值的問題

2.1 示例代碼

package com.test.zihao;
/**
 * 父子線程傳遞ThreadLocal變量
 *
 * @author tangzihao
 * @date 2020/9/5 9:42 下午
 */
public class ParentChildThreadLocalCase {
    private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();
    public static void main(String[] args) {
        //main線程 設(shè)置threadLocal
        threadLocal.set("hello");
        Thread childThread = new Thread(() -> {
            System.out.println("current thread: " + Thread.currentThread().getName() + ", value from thread local: " + threadLocal.get());
        }, "childThread");
        childThread.start();
        System.out.println(Thread.currentThread().getName() + ", value from thread local: " + threadLocal.get());
    }
}

2.2 輸出結(jié)果

main, value from thread local: hello
current thread: childThread, value from thread local: null

2.3 原因分析

  • ThreadLocal是對當(dāng)前線程有效
  • 這個case中有兩個線程
    • 線程1,main線程,main程序的執(zhí)行線程,main線程設(shè)置了自己線程的threadLocal,只有main線程自己get才可以獲取到之前它自己設(shè)置的值
    • 線程2,childThread線程,是main線程的子線程,子線程嘗試獲取父線程main的threadlocal變量,獲取為null

3. InheritableThreadLocal解決父子線程的傳值

3.1 示例代碼

package com.test.zihao;
/**
 * 父子線程傳遞ThreadLocal變量
 *
 * @author tangzihao
 * @date 2020/9/5 9:42 下午
 */
public class ParentChildThreadLocalCase {
    private static final InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
    public static void main(String[] args) throws InterruptedException {
        //main線程 設(shè)置threadLocal
        threadLocal.set("hello");
        Thread childThread = new Thread(() -> {
            System.out.println("current thread: " + Thread.currentThread().getName() + ", value from thread local: " + threadLocal.get());
        }, "childThread");
        childThread.start();
        System.out.println(Thread.currentThread().getName() + ", value from thread local: " + threadLocal.get());
    }
}

3.2 輸出結(jié)果

current thread: childThread, value from thread local: hello
main, value from thread local: hello

3.3 結(jié)論

  • 使用InheritableThreadLocal可以在父子線程之間傳遞ThreadLocal變量
  • 子線程修改的InheritableThreadLocal對父線程不可見
  • 子線程之間的InheritableThreadLocal彼此不可見

3.4 InheritableThreadLocal傳值的源碼

(1) 如果父線程的inheritableThreadLocals不為null同時允許繼承inheritThreadLocals的話,將父線程的inheritableThreadLocals復(fù)制到子線程
Thread源碼的418行

if (inheritThreadLocals && parent.inheritableThreadLocals != null)
    this.inheritableThreadLocals =
        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

(2) InheritableThreadLocal通過重寫childValue方法來將父線程的值注入
ThreadLocal的ThreadLocalMap(ThreadLocalMap parentMap)方法

private ThreadLocalMap(ThreadLocalMap parentMap) {
            //構(gòu)造table數(shù)組
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];
            
            //復(fù)制parentMap的entry
            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        //key.childValue childValue是InheritableThreadLocal重寫的方法
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        //找到table中空的索引位置
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        //設(shè)置table對應(yīng)索引位置的entry
                        table[h] = c;
                        size++;
                    }
                }
            }
        }

InheritableThreadLocal line61

protected T childValue(T parentValue) {
    return parentValue;
}

(3) 通過重寫getMap方法,將threadLocal的get和set方法由線程的inheritableThreadLocals維護(hù)

ThreadLocalMap getMap(Thread t) {
   return t.inheritableThreadLocals;
}

4. 線程池復(fù)用的傳值問題

4.1 問題簡單描述

對于使用線程池等會池化復(fù)用線程的組件的情況,線程由線程池創(chuàng)建好,并且線程是池化起來反復(fù)使用的;這時父子線程關(guān)系的ThreadLocal值傳遞已經(jīng)沒有意義,應(yīng)用需要的實(shí)際上是把 任務(wù)提交給線程池時的ThreadLocal值傳遞到 任務(wù)執(zhí)行時。

4.2 使用場景

  • 假設(shè)我有個需求需要記錄程序執(zhí)行的方法調(diào)用順序,在方法調(diào)用中需要使用線程池進(jìn)行并發(fā)調(diào)用
  • 執(zhí)行示意圖


    image.png

4.3 問題代碼

package com.test.zihao;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * 線程池使用threadLocal
 *
 * @author tangzihao
 * @date 2020/9/5 10:40 下午
 */
public class ThreadPoolUseInheritableThreadLocalCase {
    private static final InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
    private static ExecutorService executorService = Executors.newFixedThreadPool(2);
    public static void main(String[] args) {
        threadLocal.set("main-->");
        executorService.submit(ThreadPoolUseInheritableThreadLocalCase::queryShop);
        executorService.submit(ThreadPoolUseInheritableThreadLocalCase::queryItem);
        executorService.submit(ThreadPoolUseInheritableThreadLocalCase::queryCoupon);
    }
    /**
     * 查詢店鋪信息
     */
    private static void queryShop() {
        threadLocal.set(threadLocal.get() + "queryShop");
        record();
    }
    /**
     * 查詢商品
     */
    private static void queryItem() {
        threadLocal.set(threadLocal.get() + "queryItem");
        record();
    }
    /**
     * 查詢優(yōu)惠券
     */
    private static void queryCoupon() {
        threadLocal.set(threadLocal.get() + "queryCoupon");
        record();
    }
    /**
     * 記錄日志
     */
    private static void record() {
        threadLocal.set(threadLocal.get() + "-->record");
        System.out.println(Thread.currentThread().getName() + " method call chain[ " + threadLocal.get() + " ]");
    }
}

4.4 輸出結(jié)果

明顯的發(fā)現(xiàn)在復(fù)用pool-1-thread-1的時候,無法獲取提交線程池任務(wù)時候的threadlocal值,而是“殘留”了上一次使用記錄的方法調(diào)用信息

pool-1-thread-1 method call chain[ main-->queryShop-->record ]
pool-1-thread-2 method call chain[ main-->queryItem-->record ]
pool-1-thread-1 method call chain[ main-->queryShop-->recordqueryCoupon-->record ]

4.5 結(jié)論

我們需要一種機(jī)制把 任務(wù)提交給線程池時的ThreadLocal值傳遞到 任務(wù)執(zhí)行時。

5. 使用TransmittableThreadLocal實(shí)現(xiàn)線程池復(fù)用的傳值

5.1 修飾Runnable

  • 用法簡介
    • 獲取原生的實(shí)現(xiàn)runnable的task
    • 使用TtlRunnable修飾原生的runnable
  • 代碼示例
package com.test.zihao;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TtlRunnableTest {
    private static final CountDownLatch latch = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<String>();
        parent.set("value-set-in-parent");
        ConcurrentHashMap<String, TransmittableThreadLocal<String>> ttlInstance =
                new ConcurrentHashMap<String, TransmittableThreadLocal<String>>();
        ttlInstance.put("1",parent);
        Runnable task = new MyTask("thread1","1",ttlInstance);
        Runnable task1 = new MyTask("thread2","1",ttlInstance);
        Runnable ttlRunnable = TtlRunnable.get(task);
        Runnable ttlRunnable1 = TtlRunnable.get(task1);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(ttlRunnable);
        executorService.submit(ttlRunnable1);
        //等待子線程全部完成后 嘗試從main線程獲取對應(yīng)的TransmittableThreadLocal
        latch.await();
        
        System.out.println("after task and task1 finished, parent : "+ttlInstance.get("1").get());
        executorService.shutdown();
    }
    static class MyTask implements Runnable{
        private ConcurrentHashMap<String, TransmittableThreadLocal<String>> ttlInstance;
        private String tag;
        public MyTask(String name,String tag, ConcurrentHashMap<String, TransmittableThreadLocal<String>> ttlInstance){
            Thread.currentThread().setName(name);
            this.tag = tag;
            this.ttlInstance = ttlInstance;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+" child before set: "+ttlInstance.get("1").get());
            ttlInstance.get("1").set("value-set-in-child"+new Random().nextInt(10));
            System.out.println(Thread.currentThread().getName()+" child after set: "+ttlInstance.get("1").get());
            latch.countDown();
        }
    }
}
  • 代碼輸出
pool-1-thread-1 child before set: value-set-in-parent
pool-1-thread-2 child before set: value-set-in-parent
pool-1-thread-1 child after set: value-set-in-child4
pool-1-thread-2 child after set: value-set-in-child1
after task and task1 finished, parent : value-set-in-parent

5.2 修飾Callable

  • 用法簡介

    • 獲取原生的實(shí)現(xiàn)callable的task
    • 使用TtlCallable修飾原生的callable
  • 代碼示例

package com.test.zihao;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlCallable;
import java.util.Random;
import java.util.concurrent.*;
/**
 * @author tangzihao
 * @date 2020/9/6 9:34 下午
 */
public class TtlCallableTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<String>();
        parent.set("value-set-in-parent");
        ConcurrentHashMap<String, TransmittableThreadLocal<String>> ttlInstance =
                new ConcurrentHashMap<String, TransmittableThreadLocal<String>>();
        ttlInstance.put("1", parent);
        Callable<String> task = new MyTask("thread1", "1", ttlInstance);
        Callable<String> task1 = new MyTask("thread2", "1", ttlInstance);
        Callable<String> ttlCallable = TtlCallable.get(task);
        Callable<String> ttlCallable1 = TtlCallable.get(task1);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<String> future = executorService.submit(ttlCallable);
        Future<String> future1 = executorService.submit(ttlCallable1);
        System.out.println("thread1 after set: " + future.get());
        System.out.println("thread2 after set: " + future1.get());
        System.out.println("after task and task1 finished, parent : " + ttlInstance.get("1").get());
        executorService.shutdown();
    }
    static class MyTask implements Callable<String> {
        private ConcurrentHashMap<String, TransmittableThreadLocal<String>> ttlInstance;
        private String tag;
        public MyTask(String name, String tag, ConcurrentHashMap<String, TransmittableThreadLocal<String>> ttlInstance) {
            Thread.currentThread().setName(name);
            this.tag = tag;
            this.ttlInstance = ttlInstance;
        }
        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + " child before set: " + ttlInstance.get("1").get());
            ttlInstance.get("1").set("value-set-in-child" + new Random().nextInt(10));
            return ttlInstance.get("1").get();
        }
    }
}
  • 代碼輸出
pool-1-thread-1 child before set: value-set-in-parent
pool-1-thread-2 child before set: value-set-in-parent
thread1 after set: value-set-in-child9
thread2 after set: value-set-in-child4
after task and task1 finished, parent : value-set-in-parent

5.3 修飾線程池

  • 使用方法
    • 使用原生的ExecutorService
    • 使用TtlExecutors包裝ExecutorService
  • 代碼示例
package com.test.zihao;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.threadpool.TtlExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * 調(diào)用鏈
 *        |----queryShop----record
 * main---|----queryItem----record
 *        |----queryCoupon---record
 * @author tangzihao
 * @date 2020/9/5 11:17 下午
 */
public class TtlExecutorTest {
    private static final TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
    private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2));
    public static void main(String[] args) {
        threadLocal.set("main-->");
        executorService.submit(TtlExecutorTest::queryShop);
        executorService.submit(TtlExecutorTest::queryItem);
        executorService.submit(TtlExecutorTest::queryCoupon);
    }
    /**
     * 查詢店鋪信息
     */
    private static void queryShop() {
        threadLocal.set(threadLocal.get() + "queryShop");
        record();
    }
    /**
     * 查詢商品
     */
    private static void queryItem() {
        threadLocal.set(threadLocal.get() + "queryItem");
        record();
    }
    /**
     * 查詢優(yōu)惠券
     */
    private static void queryCoupon() {
        threadLocal.set(threadLocal.get() + "queryCoupon");
        record();
    }
    /**
     * 記錄日志
     */
    private static void record() {
        threadLocal.set(threadLocal.get() + "-->record");
        System.out.println(Thread.currentThread().getName() + " method call chain[ " + threadLocal.get() + " ]");
    }
}
  • 輸出
pool-1-thread-1 method call chain[ main-->queryShop-->record ]
pool-1-thread-2 method call chain[ main-->queryItem-->record ]
pool-1-thread-1 method call chain[ main-->queryCoupon-->record ]

*其他

  • 可以看下Guava的ListeningExecutorService的線程池的包裝來實(shí)現(xiàn)異步回調(diào)
//創(chuàng)建Java線程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
//包裝Java線程池,構(gòu)造Guava線程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容