一、簡介
ThreadLocal提供了線程本地變量,通過get或者set操作的這些變量在每個(gè)不同線程間是不相同的,各個(gè)線程獨(dú)立地初始化這些變量。ThreadLocal實(shí)例通常在類中是聲明為private static域的,用于在同一個(gè)線程內(nèi)關(guān)聯(lián)相同的狀態(tài)(e.g. 一個(gè)User ID或者Transaction ID)。ThreadLocal相當(dāng)于提供了一種線程隔離,將變量與線程相綁定。
只要線程還存活并且ThreadLocal實(shí)例還能被獲取到,那么每個(gè)線程會(huì)持有一個(gè)ThreadLocal變量弱引用。當(dāng)線程結(jié)束生命周期時(shí),所有的線程本地實(shí)例都會(huì)被GC。
二、簡單示例
/**
* 不同線程持有一個(gè)不同的UUID
*/
public class ThreadLocalTest {
private static ThreadLocal<String> uuidLocal = new ThreadLocal<String>(){
protected String initialValue() {
return UUID.randomUUID().toString();
}
};
public static void main(String[] args) {
UUIDThread t1 = new UUIDThread();
UUIDThread t2 = new UUIDThread();
t1.start();
t2.start();
}
public static class UUIDThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " uuid: " + uuidLocal.get());
}
}
}
輸出結(jié)果兩個(gè)uuid不同,如下所示:
Thread-1 uuid: d8d2006f-0a8a-4999-90c0-de2648c742da
Thread-0 uuid: 5061e4bd-8f57-4ef6-8b74-e7571f9efb93
三、我司的用法
使用用戶公司來做分庫,不同的公司數(shù)據(jù)分在不同的業(yè)務(wù)庫中,將companyID存入DataSourceContext中,查詢數(shù)據(jù)庫的時(shí)候從DataSourceContext獲取對(duì)應(yīng)的companyID,根據(jù)companyID獲取對(duì)應(yīng)的數(shù)據(jù)庫鏈接。
public class DataSourceContext {
static final ThreadLocal<String> local = new ThreadLocal<>();
public DataSourceContext() {
}
public static String getCompany() {
return local.get();
}
public static String setCompany(String companyID) {
return companyID == null ? null : set(companyID);
}
}
四、成員變量
private final int threadLocalHashCode = nextHashCode(); // 初始值為0
private static AtomicInteger nextHashCode =
new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
ThreadLocal通過自定義threadLocalHashCode來減少線性探測的沖突,每次實(shí)例化一個(gè)ThreadLocal,threadLocalHashCode都會(huì)新增HASH_INCREMENT(0x61c88647)。
五、幾個(gè)方法
1. initialValue方法
/**
* 返回當(dāng)前線程的線程本地變量初始化值,在一個(gè)線程首次調(diào)用get方法時(shí)被調(diào)用。
* 如果調(diào)用get之前調(diào)用了set方法,就不會(huì)調(diào)用initialValue方法了。
* 默認(rèn)實(shí)現(xiàn)返回null,如果有需要,可以繼承ThreadLocal,并覆蓋該方法。
* 一般是使用匿名內(nèi)部類的形式子類化。
*/
protected T initialValue() {
return null;
}
2. get方法
/**
* 返回當(dāng)前線程的線程本地變量
*/
public T get() {
// 獲取當(dāng)前線程的引用
Thread t = Thread.currentThread();
// 從當(dāng)前線程中獲取到關(guān)聯(lián)的ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null)
return (T)e.value;
}
// 如果當(dāng)前線程沒有線程本地變量,就設(shè)置初始值
return setInitialValue();
}
/**
* 從ThreadLocal中獲取一個(gè)關(guān)聯(lián)的Map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
可以從代碼中看出來,get()方法就是從當(dāng)前線程中獲取一個(gè)和當(dāng)前線程相關(guān)聯(lián)的ThreadLocalMap,然后以this為key,從ThreadLocalMap中取出相應(yīng)的值,并返回。如果沒有值,就設(shè)置一個(gè)初始值。
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
threadLocals是Thread的成員變量,每個(gè)線程通過ThreadLocal.ThreadLocalMap與ThreadLocal相綁定,這樣可以確保每個(gè)線程訪問到的thread-local variable都是本線程的。
3. set方法
/**
* 設(shè)置初始值
*/
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
// 如果線程t不存在ThreadLocalMap實(shí)例,就創(chuàng)建一個(gè)
createMap(t, value);
return value;
}
/**
* 實(shí)例化一個(gè)ThreadLocalMap并賦值給t.threadLocals
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
/**
* 設(shè)置當(dāng)前線程的線程本地變量值
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
可以看出set()方法和setInitialValue()方法類似,如果當(dāng)前線程存在threadLocals,那么直接把設(shè)置的值put到這個(gè)ThreadLocalMap中。否則,創(chuàng)建一個(gè)帶有這個(gè)value的ThreadLocalMap。
4. remove方法
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
首先獲取當(dāng)前線程,并從當(dāng)前線程中獲取ThreadLocalMap,如果不為空,則調(diào)用ThreadLocalMap的remove方法,把以this為key的Entry移除掉。如果隨后在當(dāng)前線程中被調(diào)用了get方法,那么因?yàn)樵鹊?code>Entry已經(jīng)被移除掉了,所以還會(huì)調(diào)用一次initialValue()方法初始化值。
5. 小結(jié)
從這些方法可以看出ThreadLocal類的設(shè)計(jì),Thread中有ThreadLocalMap成員變量,ThreadLocalMap又以ThreadLocal作為key來存放值。也就是說ThreadLocal把自身實(shí)例作為key,和需要保存的value存放到當(dāng)前線程的一個(gè)Map中,來保證每個(gè)線程訪問到的線程本地變量值都是各自線程的。ThreadLocal#set方法可以簡單理解為Thread.currentThread().threadLocals.put(this, value),ThreadLocal#get方法可以簡單理解為Thread.currentThread().threadLocals.get(this)。
六、ThreadLocalMap
ThreadLocal實(shí)現(xiàn)中,核心還是ThreadLocalMap。ThreadLocal只是作為ThreadLocalMap的key, 從ThreadLocalMap中獲取到相應(yīng)的值。下面簡單看下ThreadLocalMap的實(shí)現(xiàn)。
1. 成員變量
/**
* 初始容量,必須是2^n
*/
private static final int INITIAL_CAPACITY = 16;
/**
* 必要時(shí)會(huì)擴(kuò)容,但必須是2^n
*/
private Entry[] table;
/**
* table中entry的數(shù)量,也就是ThreadLocalMap的大小
*/
private int size = 0;
/**
* 下一次擴(kuò)容的閾值
*/
private int threshold; // Default to 0
其中INITIAL_CAPACITY代表這個(gè)ThreadLocalMap的初始容量;table是一個(gè)Entry類型的數(shù)組,用于存儲(chǔ)數(shù)據(jù);size代表表中的存儲(chǔ)數(shù)目,也就是ThreadLocalMap的大小;threshold代表需要擴(kuò)容時(shí)對(duì)應(yīng)size的閾值。
2. 靜態(tài)內(nèi)部類
static class Entry extends WeakReference<ThreadLocal> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal k, Object v) {
super(k);
value = v;
}
}
Entry繼承了WeakReference,當(dāng)除了Entry以外沒有其它地方強(qiáng)引用ThreadLocal實(shí)例,那么ThreadLocal實(shí)例就會(huì)被GC回收,避免造成內(nèi)存溢出。
3. 構(gòu)造函數(shù)
ThreadLocalMap(ThreadLocal firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
// 對(duì)hashcode“取?!庇?jì)算出table中索引值
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
計(jì)算索引值i的時(shí)候里面采用了hashCode & (size - 1)的算法,這相當(dāng)于取模運(yùn)算hashCode % size的一個(gè)更高效的實(shí)現(xiàn)(和HashMap中的思路相同)。正是因?yàn)檫@種算法,要求size必須是2^n。
4. set方法
大致思路為:
- 通過
key的hashcode計(jì)算出索引值 - 從索引值
i開始,通過線性探測法從table中找到一個(gè)可以存放value的地方,然后設(shè)置值 - 因?yàn)?code>ThreadLocalMap的
key是WeakReference,所以會(huì)存在Entry存在,但是key已經(jīng)被回收的情況,這時(shí)候需要進(jìn)行一些清理工作,把這些Entry清理掉。 - 如果
size大于閾值(threshold),就要進(jìn)行擴(kuò)容,并rehash,從新計(jì)算映射。
private void set(ThreadLocal key, Object value) {
Entry[] tab = table;
int len = tab.length;
// 計(jì)算索引值
int i = key.threadLocalHashCode & (len-1);
// 使用線性探測法來解決沖突,而不是HashMap中采用的拉鏈法
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal k = e.get();
// 如果欲設(shè)置的key和table[i]中的相同,則更新value
if (k == key) {
e.value = value;
return;
}
// 如果k==null,證明key(WeakReference)已經(jīng)被GC回收,所以替換新的key和value
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 創(chuàng)建新的Entry
tab[i] = new Entry(key, value);
int sz = ++size;
// 如果更新之后的size大于閾值threshold,則需要rehash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
/** 線性探測法的套路,找到下一個(gè)索引,如果越界了,就從0開始 */
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
5. getEntry方法
ThreadLocal#get方法就是調(diào)用了ThreadLocalMap#getEntry方法。
大致思路為:
- 通過
key的hashcode計(jì)算出索引值i - 為了提高性能,直接判斷索引值下的
Entry是不是需要找的 - 否則,用線性探測的方式找到相應(yīng)的
value
private Entry getEntry(ThreadLocal key) {
// 計(jì)算出索引值
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 命中,則直接返回
if (e != null && e.get() == key)
return e;
else
// 按線性探測的方式查找
return getEntryAfterMiss(key, i, e);
}
/**
* 和getEntry類似,用于當(dāng)key的hash直接計(jì)算出的索引值上找不到Entry時(shí)
*/
private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal k = e.get();
if (k == key)
return e;
if (k == null)
// 清理過期的Entry
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
6. remove方法
remove方法和getEntry方法類似,計(jì)算索引值i,用線性探測的防止,找到Entry后,清理Entry。
private void remove(ThreadLocal key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
7. 擴(kuò)容方法
擴(kuò)容方法也很清楚,判斷目前使用的容量是否大于一定的值(size >= 3 / 4 * threshold),如果大于,則需要resize。
resize方法的思路如下:
-
size擴(kuò)大為兩倍,創(chuàng)建一個(gè)新的table表,將oldTab上的Entry轉(zhuǎn)移到newTab上。 - 轉(zhuǎn)移過程中,如果發(fā)現(xiàn)
e.get() == null,則證明key已經(jīng)被GC回收,那么這個(gè)Entry就不轉(zhuǎn)移。 - 否則,用線性探測法找到
Entry在newTab存放的位置,并設(shè)置。 - 最后設(shè)置新的
threshold。
可以看出threshold的大小為len * 2 / 3,所以每次size >= 0.5 * len的時(shí)候就要進(jìn)行擴(kuò)容(resize)。
private void rehash() {
expungeStaleEntries();
// 如果size > 3/4 * threshold,則擴(kuò)容
if (size >= threshold - threshold / 4)
resize();
}
/**
* table容量翻倍
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal k = e.get();
// 如果k已經(jīng)被GC回收,那么把value也設(shè)置為null,幫助GC回收,防止內(nèi)存泄漏
if (k == null) {
e.value = null; // Help the GC
} else {
// 從新計(jì)算索引值,并通過線性探測的方式存放到table中
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
8. 一些清理方法
1) expungeStaleEntry
首先會(huì)清理tab[staleSlot]上過期的Entry,然后需要再散列(rehash),中間可能還會(huì)遇到一些過期的Entry,這些也要清理掉,知道遇到table[i] == null,中間的所有Entry都要rehash。
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 清理掉過期位置的Entry
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// 再散列,知道遇到table[i] == null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
// 如果h==i,那么證明這個(gè)Entry就是要放在table[i]上的,就不要rehash這個(gè)Entry
// 否則,rehash
if (h != i) {
// 先把當(dāng)前位置tab[i]釋放出來,再把Entry放到新的位置
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
2) cleanSomeSlots
/**
* 探索式掃描尋找過期的entry,當(dāng)增加新元素或者另一個(gè)過期entry被清理的時(shí)候會(huì)被調(diào)用
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
// 判斷是否過期,如果是則處理
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
每次新增元素的時(shí)候會(huì)進(jìn)行探索式掃描,尋找過期Entry并清理。
3) 何時(shí)會(huì)清理過期Entry
處理Thread實(shí)例被GC回收,ThreadLocalMap同時(shí)被回收之外,下面這些條件下,會(huì)清理過期的Entry。
-
getEntry時(shí),線性探索尋找Entry的時(shí)候發(fā)現(xiàn)Entry過期。 -
set的時(shí)候發(fā)現(xiàn),key對(duì)應(yīng)索引值的Entry已過期,則會(huì)清理并替換 - 每次調(diào)用
set方法的時(shí)候,會(huì)探索式掃描Entry,如果發(fā)現(xiàn)過期,則清理。 -
size > threshold,rehash的時(shí)候。 - 調(diào)用
remove方法的時(shí)候。
當(dāng)前的應(yīng)用開發(fā)過程中,出于復(fù)用的目的,常常會(huì)使用線程池的技術(shù),線程中ThreadLocalMap可能會(huì)長期存在。因?yàn)?code>Entry中的key被WeakReference包裝,在key不存在強(qiáng)引用的時(shí)候,會(huì)回收key,但是Entry和value并不會(huì)被回收。所以在ThreadLocalMap中需要不時(shí)地清理過期的Entry,來保證內(nèi)存不泄露。當(dāng)然,如果我們在代碼中每次使用完ThreadLocal,都可以remove一下,那么就可以盡早釋放不需要的內(nèi)存。