前言
在上一篇文章 多線程篇-父子線程的上下文傳遞 的文末,我們了解到JDK提供的InheritableThreadLocal 在線程池中的使用情況并不是太理想,因?yàn)樵趶?fù)用線程的情況下,得到的值很有可能不是我們想要的,接下來我要給大家介紹一款開源組件,阿里開源的,用的感覺還不錯。
TransmittableThreadLocal
一般情況下,ThreadLocal都可以滿足我們的需求,當(dāng)我們出現(xiàn)需要 在使用線程池等會池化復(fù)用線程的執(zhí)行組件情況下傳遞ThreadLocal ,
這個場景就是TransmittableThreadLocal解決的問題。
Github地址:https://github.com/alibaba/transmittable-thread-local
感興趣的可以去下載的玩一玩,接下來我們來介紹一下這個組件的神奇之處。
首先看個demo, 通過demo,我們先了解了解怎么用
demo
/**
* ttl測試
*
* @author zhangyunhe
* @date 2020-04-23 12:47
*/
public class Test {
// 1. 初始化一個TransmittableThreadLocal,這個是繼承了InheritableThreadLocal的
static TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>();
// 初始化一個長度為1的線程池
static ExecutorService poolExecutor = Executors.newFixedThreadPool(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
Test test = new Test();
test.test();
}
private void test() throws ExecutionException, InterruptedException {
// 設(shè)置初始值
local.set("天王老子");
//?。。。?注意:這個地方的Task是使用了TtlRunnable包裝的
Future future = poolExecutor.submit(TtlRunnable.get(new Task("任務(wù)1")));
future.get();
Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任務(wù)2")));
future2.get();
System.out.println("父線程的值:"+local.get());
poolExecutor.shutdown();
}
class Task implements Runnable{
String str;
Task(String str){
this.str = str;
}
@Override
public void run() {
// 獲取值
System.out.println(Thread.currentThread().getName()+":"+local.get());
// 重新設(shè)置一波
local.set(str);
}
}
}
輸出結(jié)果:
pool-1-thread-1:天王老子
pool-1-thread-1:天王老子
父線程的值:天王老子
原理分析
我們首先看一下TransmittableThreadLocal的源碼,
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
// 1. 此處的holder是他的主要設(shè)計(jì)點(diǎn),后續(xù)在構(gòu)建TtlRunnable
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
@SuppressWarnings("unchecked")
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
}
}
@Override
public final T get() {
T value = super.get();
if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
return value;
}
/**
* see {@link InheritableThreadLocal#set}
*/
@Override
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
super.set(value);
addThisToHolder();
}
}
/**
* see {@link InheritableThreadLocal#remove()}
*/
@Override
public final void remove() {
removeThisFromHolder();
super.remove();
}
private void superRemove() {
super.remove();
}
}
步驟說明:
- 在代碼中,作者構(gòu)建了一個holder對象,這個對象是一個
InheritableThreadLocal, 里面的類型是一個弱引用的WeakHashMap , 這個map的va lu就是TransmittableThreadLocal, 至于value永遠(yuǎn)都是空的
holder里面存儲的是這個應(yīng)用里面,所有關(guān)于TransmittableThreadLocal的引用。
- 從上面可以看到,每次get, set ,remove都會操作holder對象,這樣做的目的是為了保持
TransmittableThreadLocal所有的這個引用都在holder里面存一份。
TtlRunnable
回到我們上面的代碼
Future future = poolExecutor.submit(TtlRunnable.get(new Task("任務(wù)1")));
細(xì)心的朋友可能已經(jīng)發(fā)現(xiàn)了,我們調(diào)用了TtlRunnable 對象的get方法,下面看一下這個方法有啥作用吧
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
// 重點(diǎn)在這里
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
看上面的代碼,細(xì)節(jié)上我們不看,我們看大致的思路, 這個地方主要就是根據(jù)傳入的runnable構(gòu)建了一個TtlRunnable對象。
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
//重點(diǎn)在這里
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
下面這行代碼,運(yùn)行到這里的時候,還是在主線程里面,調(diào)用了capture方法
this.capturedRef = new AtomicReference<Object>(capture());
capture
public static Object capture() {
// 構(gòu)建一個臨時對象,主要看captureTtlValues方法
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
// 構(gòu)建一個WeakHashMap方法,
WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
// 在主線程里面,調(diào)用holder變量,循環(huán)獲取里面所有的key和value
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
// 返回出去
return ttl2Value;
}
步驟說明:
1.調(diào)用靜態(tài)變量holder, 循環(huán)獲取里面所有的key和value, value的獲取就比較巧妙一點(diǎn)。
private T copyValue() {
// 這里的get方法,調(diào)用的是父類的方法,可以在父類里面最終獲取到當(dāng)前TransmittableThreadLocal所對應(yīng)的value
return copy(get());
}
2.組裝好一個WeakHashMap出去,最終就會到了我們上面的構(gòu)造方法里面,針對capturedRef 的賦值操作。
run
@Override
public void run() {
//1. 獲取到剛剛構(gòu)造TtlRunnable對象的時候初始化的capturedRef對象。包含了從submit丟任務(wù)進(jìn)來的時候父線程的數(shù)據(jù)
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
// 清除不在captured里面的key,同時在這個子線程中,對所有的ThreadLocal進(jìn)行重新設(shè)置值
Object backup = replay(captured);
try {
// 執(zhí)行實(shí)際的線程方法
runnable.run();
} finally {
// 做好還原工作,根據(jù)backup
restore(backup);
}
}
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// 做好當(dāng)前線程的local備份
backup.put(threadLocal, threadLocal.get());
// 清除數(shù)據(jù),不在captured里面的。
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 這里就是把值設(shè)置到當(dāng)前線程的TransmittableThreadLocal里面。
setTtlValuesTo(captured);
// 一個鉤子
doExecuteCallback(true);
return backup;
}
總結(jié):
一些朋友看了上面的那么多源碼,可能有點(diǎn)蒙,我可能說的也有點(diǎn)亂,在這里總結(jié)一下。
1.通過繼承InheritableThreadLocal,新成立一個TransmittableThreadLocal類, 該類中有一個hodel變量,用來維護(hù)所有的TransmittableThreadLocal引用。
2.在實(shí)際submit任務(wù)到線程池的時候,我們是需要調(diào)用TtlRunnable.get方法,構(gòu)建一個任務(wù)的包裝類。這里使用裝飾者模式,對runnable線程對象進(jìn)行裝飾包裝,在初始化這個包裝對象的時候,會獲取主線程里面所有的TransmittableThreadLocal引用,以及里面所有的值,這個值其實(shí)就是當(dāng)前父線程里面的(跟你當(dāng)時創(chuàng)建這個線程的父線程沒有任何關(guān)系,注意,這里講的是線程池的場景)。
3.對數(shù)據(jù)做規(guī)整,根據(jù)收集到的captured (這個對象里面存儲的都是主線程里面能夠獲取到TransmittableThreadLocal以及對應(yīng)的值) 做規(guī)整,去掉當(dāng)前線程里面不需要的,同時將剩余的key和value ,更新到當(dāng)前線程的ThreadLocal里面。這樣就達(dá)到了在池化技術(shù)里面父子線程傳值的安全性