netty新增了FastThreadLocal和FastThreadLocalThread等類來實現(xiàn)了一個FastThread框架,相較于java.lang.Thread和java.lang.ThreadLocal速度更快。
在看FastThread框架之前,先看一下java.lang.ThreadLocal的源碼。
- 一、使用姿勢
- 二、數(shù)據(jù)結(jié)構(gòu)
- 三、源碼分析
- 四、回收機制
- 總結(jié)
一、使用姿勢
public class ThreadLocalTest {
/**
* 最佳實踐一:使用private static
* {@code ThreadLocal} instances are typically private static fields in classes
* that wish to associate state with a thread (e.g., a user ID or Transaction ID)
*
* java8寫法:
* private static final ThreadLocal<Integer> integerThreadLocal = ThreadLocal.withInitial(() -> 100);
*
* java8之前的寫法:
* <pre>
* private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
* @Override
* protected Integer initialValue() {
* return 100;
* }
* };
* </pre>
*/
private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
/**
* 最佳實踐二:重寫initialValue()
*/
@Override
protected Integer initialValue() {
return 100;
}
};
@Test
public void testThreadLocal() {
System.out.println(integerThreadLocal.get());
/**
* 最佳實踐三:用完如果可以刪除,則手動刪除
*/
integerThreadLocal.remove();
System.out.println(integerThreadLocal.get());
}
}
最佳實踐
- 在類中定義ThreadLocal,用private static修飾;
- 根據(jù)源碼分析,由于ThreadLocal對象本身會作為Entry的key去獲取數(shù)據(jù),所以最好也要用final去修飾。key通常都是不可變對象,否則當作為key的對象發(fā)生了變化之后,之前存儲的數(shù)據(jù)將無法get,造成資源泄露;
- 使用匿名內(nèi)部類實現(xiàn)initialValue(),在執(zhí)行g(shù)et()時,如果當前的Thread的ThreadLocalMap沒有integerThreadLocal這個key的Entry(不管是第一次get還是remove之后的第一次get),則直接使用initialValue()進行初始化操作;
- 在使用完ThreadLocal后,及時手動remove()無用的ThreadLocal,防止資源泄露
二、數(shù)據(jù)結(jié)構(gòu)

說明
- 每一個Thread類都有一個屬性:ThreadLocal.ThreadLocalMap threadLocals = null
- 每一個ThreadLocalMap都有一個Entry[]數(shù)組
- 每一個Entry繼承于WeakReference,Entry的key(ThreadLocal實例)作為WeakReference的referent,傳入的value作為value
java引用:
注意
- 從第三條說明點發(fā)現(xiàn)每一個ThreadLocal只能存儲一個值,對于dubbo的RpcContext.LOCAL來講,存儲了一個RpcContext上下文信息;如果需要存儲多個值,根據(jù)情況,將多個值合并為對象進行存儲或者使用多個ThreadLocal實例進行存儲
三、源碼分析
3.1、ThreadLocal的創(chuàng)建
基礎屬性
/**
* 每一個ThreadLocal對象都有一個唯一的threadLocalHashCode。
* ThreadLocal對象會作為ThreadLocalMap的Entry的key
*/
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
/**
* 魔數(shù):與斐波那契數(shù)列和黃金分割點有關。用于散列均勻,減少hash沖突
*/
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
說明
- 每一個ThreadLocal對象都有一個唯一的threadLocalHashCode,該值用于計算Entry元素存儲的直接索引。
- 直接索引:元素根據(jù)threadLocalHashCode&(table.len-1)算出來的值
- 真實索引:元素可能由于hash沖突,使用線性探測法,放置在不是直接索引的位置
- hash算法:每一個ThreadLocal的threadLocalHashCode都是前一個的threadLocalHashCode + 0x61c88647。這樣的hash算法可以很大程度的避免hash沖突,散列很均勻。0x61c88647這個數(shù)是怎么算出來的,見:https://www.javaspecialists.eu/archive/Issue164.html
普通構(gòu)造函數(shù)
public ThreadLocal() {
}
java8提供的構(gòu)造函數(shù)
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
return new SuppliedThreadLocal<>(supplier);
}
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
private final Supplier<? extends T> supplier;
SuppliedThreadLocal(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
}
@Override
protected T initialValue() {
return supplier.get();
}
}
java8提供了Supplier(提供者) FunctionalInterface?;谠摻涌诳梢苑奖愕奶娲羰纠械氖褂媚涿麅?nèi)部類來重寫initialValue()的方法。需要注意的是,supplier函數(shù)不可為null。
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*/
T get();
}
3.2 數(shù)據(jù)的初始化與設置
/**
* 設置初始值
* 與set(T value)幾乎相同,下邊代碼等價于
* <pre>
* private T setInitialValue() {
* T value = initialValue();
* set(value);
* return value;
* }
* </pre>
*/
private T setInitialValue() {
// 獲取初始值(通常是調(diào)用使用方重寫的initialValue(),否則使用{@link ThreadLocal#initialValue()返回null})
T value = initialValue();
// 獲取當前線程
Thread t = Thread.currentThread();
// 獲取當前線程的ThreadLocalMap對象
ThreadLocalMap map = getMap(t);
// 如果當前線程的ThreadLocalMap對象不為null,則直接將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
if (map != null)
map.set(this, value);
// 如果當前線程的ThreadLocalMap為null,則直接新建ThreadLocalMap對象,并將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
else
createMap(t, value);
// 返回初始值
return value;
}
protected T initialValue() {
return null;
}
/**
* 設置值
*/
public void set(T value) {
// 獲取當前線程
Thread t = Thread.currentThread();
// 獲取當前線程的ThreadLocalMap對象
ThreadLocalMap map = getMap(t);
// 如果當前線程的ThreadLocalMap對象不為null,則直接將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
if (map != null)
map.set(this, value);
// 如果當前線程的ThreadLocalMap為null,則直接新建ThreadLocalMap對象,并將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
else
createMap(t, value);
}
/**
* 獲取當前線程t的ThreadLocalMap
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
/**
* 創(chuàng)建ThreadLocalMap,并將 {當前的ThreadLocal對象,firstValue} 設置到ThreadLocalMap對象中去
* this:當前的ThreadLocal對象
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
setInitialValue()流程:
- 首先調(diào)用initialValue()獲取初始值(通常是調(diào)用使用方重寫的initialValue(),否則返回null)
- 然后獲取當前線程
- 然后獲取當前線程的ThreadLocalMap對象屬性
- 如果當前線程的ThreadLocalMap對象不為null,則直接將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
- 如果當前線程的ThreadLocalMap為null,則直接新建ThreadLocalMap對象,并將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
- 最后返回initialValue()獲取到的初始值
setInitialValue()被使用的時機:ThreadLocal.get()。
關于ThreadLocalMap的創(chuàng)建和向ThreadLocalMap中設置參數(shù)的源碼后續(xù)再說。
3.3 數(shù)據(jù)的獲取
public T get() {
// 獲取當前線程
Thread t = Thread.currentThread();
// 獲取當前線程的ThreadLocalMap屬性
ThreadLocalMap map = getMap(t);
if (map != null) {
// 從ThreadLocalMap屬性中獲取key為當前的ThreadLocal的Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
// 如果Entry不為null,直接獲取entry的value,返回即可
@SuppressWarnings("unchecked")
T result = (T) e.value;
return result;
}
}
// 如果當前線程的ThreadLocalMap為null或者key為當前的ThreadLocal的Entry為null
return setInitialValue();
}
get()流程:
- 獲取當前線程
- 獲取當前線程的ThreadLocalMap屬性
- 如果當前線程的ThreadLocalMap對象不為null,從ThreadLocalMap對象中獲取key為當前的ThreadLocal的Entry
- 如果該Entry不為null,直接獲取entry的value,返回即可
- 如果當前線程的ThreadLocalMap為null(還未
createMap(Thread t, T firstValue))或者key為當前的ThreadLocal的Entry為null(沒有調(diào)用map.set(this, value)),則執(zhí)行設置初始化值的方法。
關于根據(jù)ThreadLocal從ThreadLocalMap中獲取Entry的源碼后續(xù)再說。
3.4 數(shù)據(jù)的刪除
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
remove()步驟:
- 獲取當前的線程
- 獲取當前線程的ThreadLocalMap
- 如果當前的ThreadLocalMap不為null,直接執(zhí)行調(diào)用ThreadLocalMap的remove(ThreadLocal<?> key)
關于根據(jù)ThreadLocal從ThreadLocalMap中remove Entry的源碼后續(xù)再說。
可以看到具體的操作全部委托到了ThreadLocalMap中來執(zhí)行。
3.5 ThreadLocalMap的創(chuàng)建
基礎屬性
/**
* The initial capacity -- MUST be a power of two.
*/
private static final int INITIAL_CAPACITY = 16;
/**
* The table, resized as necessary.
* table.length MUST always be a power of two.
*/
private Entry[] table;
/**
* The number of entries in the table.
*/
private int size = 0;
/**
* threshold == table大小的2/3,
* 1、當size >= threshold,遍歷table并刪除所有key為null的元素(rehash()),
* 2、如果刪除后size >= threshold*3/4 == 數(shù)組長度*1/2 時,需要對table進行擴容
*/
private int threshold; // Default to 0
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
靜態(tài)內(nèi)部類Entry
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
其中value就是set的value,而k是當前的ThreadLocal對象,作為WeakReference的referent屬性。
構(gòu)造器
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 創(chuàng)建數(shù)組實例,默認長度16
table = new Entry[INITIAL_CAPACITY];
// 求余計算數(shù)組索引i
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 第一次插入,沒有hash沖突,直接放置
table[i] = new Entry(firstKey, firstValue);
// 數(shù)組實際元素長度 + 1
size = 1;
// 設置擴容的閾值為 16*2/3 = 10,當有10個元素時,發(fā)生擴容
setThreshold(INITIAL_CAPACITY);
}
步驟:
- 創(chuàng)建數(shù)組實例,默認長度16
- 計算firstKey的直接索引,當len=2的n次方時,x&(len-1) <=> x%len,所以這里相當于直接求余,只是&運算效率更高
- 第一次插入,沒有hash沖突,直接創(chuàng)建Entry并放置就ok了
- 然后數(shù)組中元素個數(shù)size+1
- 設置rehash()的閾值
索引操作
/**
* i:數(shù)組索引
* len:數(shù)組長度
* 如果 i+1<數(shù)組長度,則返回i+1, 如果 i+1>=數(shù)組長度,則直接取0(即開頭table[0]),
* 在實際使用中,實際上就是不斷向后的循環(huán)取,到了末尾,再回到table[0],再向后循環(huán)取
*/
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
/**
* 如果 i-1>=0,則返回i-1, 如果 i<1,則直接取數(shù)組長度-1(即末尾table[len-1]),
* 在實際使用中,實際上就是不斷向前的循環(huán)取,到了開頭,再回到table[len-1],再向前循環(huán)取
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
從上述的邏輯可知,我們可以將table[]看做是一個循環(huán)數(shù)組。
3.6 向ThreadLocalMap中設置Entry
/**
* 設置 {key, value} 到當前線程的ThreadLocalMap中
*/
private void set(ThreadLocal<?> key, Object value) {
// 成員變量局部化,提高性能
Entry[] tab = table;
// 獲取數(shù)組容量
int len = tab.length;
// 獲取當前的threadlocal位于數(shù)組的下標,當len=2的n次方時,
// key.threadLocalHashCode & (len - 1) <=> key.threadLocalHashCode % len
int i = key.threadLocalHashCode & (len - 1);
/**
* 先獲取tab[i],判斷如果不為null,則進行循環(huán)體內(nèi)邏輯;
* 若沒有return,則進行二次循環(huán),tab[i+1],判斷如果不為null,則進行循環(huán)體內(nèi)邏輯;
* 當 i + 1 >= len 時,tab[0]回到頭再進行循環(huán)。
*
* 這就是解決hash沖突的另一種方式:線性探測法。如果當前槽沒有元素,直接插入元素;如果當前槽有元素,則向后尋找第一個為null的槽,放置該元素。
* 在這里,還添加了hash沖突時替換原有元素或者替換無效元素的邏輯。
*/
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
// 獲取當前的Entry的key,使用e.get(),是因為當前的key是一個WeakReference,使用get()獲取key
ThreadLocal<?> k = e.get();
// 如果當前的key就是剛剛查詢出來的Entry的key(即修改的是同一個Entry),則直接替換值
if (k == key) {
e.value = value;
return;
}
// 如果剛剛查詢出來的Entry的key是null,則表明該Entry的key(ThreadLocal)已經(jīng)被回收了
// 使用replaceStaleEntry替換掉已經(jīng)無效的Entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 通過上述for循環(huán)解決了hash沖突之后,創(chuàng)建新的Entry,放置到當前的table[i]節(jié)點上
tab[i] = new Entry(key, value);
// table的size+1
int sz = ++size;
// 回收部分無效的Entry,如果成功,則當前的數(shù)組中的元素個數(shù)將小于閾值,則肯定不需要擴容,
// 否則判斷當前的數(shù)組中的元素個數(shù)是否達到了擴容閾值
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
說明:
- 首先將table[]成員變量局部化(在后續(xù)的操作中如果會對table成員變量進行多次操作,局部化會提高性能)
- 獲取數(shù)組長度,計算直接索引
- 然后進行hash沖突檢測與處理(線性探測法)
- 獲取直接索引位置的Entry,如果不為null,表示發(fā)生了hash沖突,則先獲取Entry中的key,如果該key與當前的ThreadLocal是同一個對象,則直接替換value,返回;
- 如果Entry中的key==null,表示當前的Entry已經(jīng)是一個無效的Entry了,執(zhí)行replaceStaleEntry方法進行處理(如果從該Entry后的第一個元素開始在一個連續(xù)的Entry子數(shù)組內(nèi)找到一個key與當前ThreadLocal相等的元素,則替換值,然后互換該Entry和當前的key==null的Entry位置;如果沒找到,則將當前的key==null的Entry設置為新的將設置的Entry)返回;
- 如果發(fā)生了hash沖突,但是直接索引位置上的Entry即不是當前的ThreadLocal的Entry,也不是一個無效的Entry,則需要向后循環(huán)遍歷下一個Entry元素,再進行上述邏輯的處理。直到找到一個Entry為null的位置或者Entry失效的位置或者Entry就是當前的ThreadLocal的Entry的位置。
- 假設沒有發(fā)生hash沖突或者經(jīng)過上述的for循環(huán)找到了一個Entry為null的位置的時候,在該位置創(chuàng)建設置新添加的Entry,數(shù)組元素+1;
- 嘗試回收部分無效的Entry,如果成功,則當前的數(shù)組中的元素個數(shù)將小于閾值,則肯定不需要rehash();如果失敗,且數(shù)組元素個數(shù) > threshold,執(zhí)行rehash() -- 即執(zhí)行部分資源清理與可能的擴容操作。
replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)
/**
* 對一段連續(xù)的Entry子數(shù)組進行操作
* Replace a stale entry encountered during a set operation
* with an entry for the specified key.
*/
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
// 獲取當前的table及其容量
Entry[] tab = table;
int len = tab.length;
Entry e;
// 設置需要被回收的槽索引(無效的Entry索引)
int slotToExpunge = staleSlot;
/**
* 向前遍歷,直到找到第一個為null的槽
* 在遍歷的過程中,如果找到了無效的Entry,則將slotToExpunge設置為當前的被回收的無效索引
*/
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
/**
* 向后遍歷,直到找到第一個為null的槽或者找到與當前Entry key相等的槽
*/
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
// 如果key相等,直接替換
if (k == key) {
// 替換值
e.value = value;
/**
* 互換 tab[i] 和 tab[staleSlot],將無效的tab[staleSlot]中的Entry后移;
* 將正確的 tab[i]中的Entry前移,方便下一次的查找(可以根據(jù)直接槽索引查找到了)
*/
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 如果待回收的槽索引就是當前的無效索引,則設置待回收的槽索引為i
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 如果 "k==null && 待回收的槽索引就是當前的無效索引",則設置待回收的槽索引為i
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// 置空原本的entry的value,將當前的Entry添加到這個槽上,原本的Entry等待GC
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
步驟:
- 首先從當前索引staleSlot的前一個元素開始向前遍歷一個連續(xù)的Entry子數(shù)組,一直找到該連續(xù)子數(shù)組最靠前的那個無效索引,設置為slotToExpunge
- 然后從staleSlot的 后一個元素開始 向后遍歷一個連續(xù)的Entry子數(shù)組
- 如果找到的Entry的key是當前的ThreadLocal,則直接替換value,之后將找到的Entry放置到table[staleSlot]處,也就是Entry本身的直接索引處,而Entry本身的真實索引的位置放置為原來的table[staleSlot];然后進行一次連續(xù)段的回收,之后進行一次log次數(shù)級別的回收,最后返回
- 如果經(jīng)過循環(huán),沒有找到key與ThreadLocal相等的Entry,則直接將要set的Entry放置到table[staleSlot]處,最后根據(jù)需要進行一次連續(xù)段的回收,之后在進行一次log次數(shù)級別的回收
關于回收與擴容相關的操作,后續(xù)分析。
3.7 從ThreadLocalMap中獲取Entry
private Entry getEntry(ThreadLocal<?> key) {
// 計算所查找的Entry在table[]中的索引
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 如果快速查找到的Entry!=null并且key也相等,則直接返回(也就是說在直接的hash槽找到了Entry)
if (e != null && e.get() == key)
return e;
//
else
return getEntryAfterMiss(key, i, e);
}
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
/**
* 循環(huán)遍歷,直到找到下一個不為null的Entry
*/
while (e != null) {
ThreadLocal<?> k = e.get();
// 如果key相等,表示找到了,直接返回
if (k == key)
return e;
// 如果key==null,則嘗試回收或者整理"i到i之后的第一個Entry為null的索引(即返回值)之間"的Entry數(shù)據(jù)
if (k == null)
expungeStaleEntry(i);
// 繼續(xù)向后遍歷
else
i = nextIndex(i, len);
e = tab[i];
}
// 如果沒找到,返回null
return null;
}
步驟:
- 計算當前的ThreadLocal的直接索引,獲取直接索引位置的Entry
- 如果該位置的Entry不為null且Entry的key與當前的ThreadLocal是同一個元素,則直接返回該Entry
- 如果該位置的Entry為null,直接返回null(看getEntryAfterMiss邏輯)
- 如果該位置的Entry不為null,但是Entry的key與當前的ThreadLocal不是同一個元素,則表明發(fā)生了hash沖突。此時,會不斷的向后循環(huán)尋找,直到找到了要查找的Entry或者遍歷到的Entry為null,如果找到了,返回Entry,如果沒找到,返回null。
注意:
- 在沒有根據(jù)直接索引找到Entry后的while循環(huán)中,如果檢測到無效的Entry時,會使用expungeStaleEntry方法嘗試回收或者整理"i到i之后的一段連續(xù)的不為null的索引(即返回值)之間"的Entry數(shù)據(jù)。
- 對于get來講,線性探測法是以Entry!=null為循環(huán)查找結(jié)束條件的,所以要注意remove時的操作。
3.8 從ThreadLocalMap中刪除Entry
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len - 1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// key - ThreadLocal置為null
e.clear();
// 嘗試回收或者整理"i到i之后的一段連續(xù)的不為null的索引(即返回值)之間"的Entry數(shù)據(jù)
expungeStaleEntry(i);
return;
}
}
}
步驟:
- 計算key的直接索引,從直接索引的Entry開始,依舊在一段連續(xù)的不為null的Entry進行循環(huán),如果找到的Entry的key與當前的key相同,則進行刪除操作:
- 首先將Entry的key置空(e.clear())
- 嘗試回收或者整理"i到i之后的一段連續(xù)的不為null的索引(即返回值)之間"的Entry數(shù)據(jù)
注意:
- 這里的expungeStaleEntry中的整理邏輯非常重要,直接影響get查詢的查詢鏈是否會斷掉的問題。
3.9 rehash()與resize()
private void rehash() {
// 回收table中所有無效的Entry,如果可以有效減小size到不滿足下邊的擴容條件,則下邊的resize()不需要執(zhí)行
expungeStaleEntries();
// 實際上擴容是發(fā)生在size>=threshold*3/4==length*(2/3)*(3/4)==lenth*1/2
if (size >= threshold - threshold / 4)
resize();
}
/**
* 擴容兩倍數(shù)組,之后rehash
*/
private void resize() {
// 獲取舊數(shù)組及其長度
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 新數(shù)組擴容2倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
// 無效的Entry
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 設置新的閾值
setThreshold(newLen);
size = count;
table = newTab;
}
rehash()僅用在ThreadLocalMap#set(ThreadLocal<?> key, Object value)的末尾部分。
rehash()步驟:
- 首先進行一次全表回收無效的Entry操作,如果可以有效減小數(shù)組中的元素個數(shù)size到數(shù)組長度的一半以下,則不需要進行擴容,如果達到一半及以上,則進行擴容操作。
resize()步驟:
- 創(chuàng)建新的Entry[]是舊的長度的兩倍;之后遍歷舊數(shù)組,之后根據(jù)新的數(shù)組的長度重新計算每個有效Entry的直接索引,根據(jù)線性探測法重新填充數(shù)組。
- 最后設置新的Entry[]的閾值threshold、數(shù)組元素個數(shù)size,將新數(shù)組賦值給全局變量table
四、回收機制
ThreadLocal 回收機制 就是如下的四個函數(shù)以及WeakReference<ThreadLocal<?>>
1、expungeStaleEntry(int staleSlot)
從staleSlot開始的連續(xù)Entry子數(shù)組中的無效Entry的清理和 整理:
2、expungeStaleEntries()
清理整個table中的無效Entry
3、cleanSomeSlots(int i, int n)
進行l(wèi)og2(n)次循環(huán)(如果正好遍歷到無效的Entry,則n=table.length,調(diào)用expungeStaleEntry進行連續(xù)段的清理和整理);
是一種介于完全不掃描和完全掃描(expungeStaleEntries)之間的一種掃描方式
4、replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)
該函數(shù)已在set部分分析:從table[staleSlot]后的第一個元素開始在一個連續(xù)的Entry子數(shù)組內(nèi)如果找到一個key與當前ThreadLocal相等的Entry,則替換值,然后互換該Entry和table[staleSlot](①),返回;如果沒找到,則將table[staleSlot]直接設置為新的將設置的Entry(②)
①②處都會對該Entry所在的連續(xù)段,從連續(xù)段的第一個無效Entry(可能在當前的Entry之前)開始或者從該Entry的后一個元素開始(表示該Entry之前的元素全部有效)進行一次連續(xù)段的 回收和整理,之后在進行一次log級別的回收;
4.1 expungeStaleEntry(int staleSlot)
/**
* 嘗試回收或者整理"staleSlot到staleSlot之后的第一個Entry為null的索引(即返回值)之間"的Entry數(shù)據(jù)
*
* @return the index of the next null slot after staleSlot
* (all between staleSlot and this slot will have been checked
* for expunging).
*/
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 回收table[staleSlot]處的值,value置為null,整體置為null
tab[staleSlot].value = null;
tab[staleSlot] = null;
// size-1
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 如果當前的table[i] Entry無用,則直接回收
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
/**
* 問題:
* 假設ThreadLocalMap的table長度為4的,其中table[1]、table[2]已經(jīng)有值,而且key都不為null,
* 這時候set一個threadlocal1,算出來的索引為1,根據(jù)線性探測法,最后會設置到table[3];
* 然后,table[2]被remove,此時table[2]==null;
* 然后,get threadlocal1,此時算出索引還是1, 從table[1]向后查找,結(jié)果table[2]==null,則查找鏈就斷了,結(jié)果返回null,
* 而沒有辦法查找到真正的table[3],這種情況threadlocal是怎么處理的呢?
*
* 答案就是下邊的代碼:會對remove元素的后邊的元素循環(huán)遍歷,直到遍歷到非null為止。對每個元素做如下邏輯:
* 如果當前元素的k==null,則回收,如果不是無效元素,則計算其真實的直接索引,例如重新計算table[3]的索引,此處為1,與當前遍歷的索引3不同,
* 所以將table[3]先置空,然后從table[1]開始向后尋找第一個為null的位置,重新放置table[3]的元素,此處就是table[2]=table[3]
*
* 簡言之:remove一個Entry之后,會整理remove的Entry元素之后的一塊兒連續(xù)的(中間沒有null的entry)元素子數(shù)組,全部使用線性探測法重新計算,
* 這樣remove掉的那一塊兒就會被后續(xù)的Entry重新填充,查詢鏈就不會斷
*/
// 計算真實的直接索引位置,因為可能k是因為hash沖突循環(huán)后移的
int h = k.threadLocalHashCode & (len - 1);
// 如果h和i,不相等,表示確實k是循環(huán)后移了的
if (h != i) {
// 首先將之前的Entry置為null
tab[i] = null;
// 一直向后循環(huán)找到第一個為null的索引,之后table[h] = e(將當前的Entry放置到table[h]新的hash位置上)
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
步驟:
- 將table[staleSlot]的value和Entry都置為null,元素個數(shù)size - 1
- 遍歷staleSlot之后的一段連續(xù)的Entry,如果Entry的key==null,將該Entry的value和Entry本身置空,如果key是有效的,將要進行 整理操作
- 計算直接索引的位置,如果當前的Entry的真實索引與直接索引不相等,則表示該Entry之前是因為hash沖突被沖到后邊的slot里的,此處會重新從直接索引位置開始向后找到第一個不為null的的位置,將當前的Entry元素放置到這里,這樣會保證原本連續(xù)的Entry子數(shù)組,在從中刪除一個Entry后,子數(shù)組中剩下的元素還可以組成一個連續(xù)的Entry數(shù)組,這保證了get操作的查詢鏈不會斷。
注意:
- remove一個元素后,會調(diào)用這里的 整理操作 進行連續(xù)Entry子數(shù)組的整理,不會使查詢鏈斷掉。要仔細的看注釋中的那個例子。
- 整理的過程相當于一次從staleSlot開始的連續(xù)段 內(nèi)存碎片整理,將無效的Entry刪除,有效的Entry按照原本的順序聚集。
4.2 expungeStaleEntries()
/**
* 回收table中所有的無效Entry.
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
遍歷table中所有的元素,發(fā)現(xiàn)無效的Entry后,就調(diào)用expungeStaleEntry(int staleSlot)進行連續(xù)子數(shù)組的資源清理與整理。該方法影響較大,僅僅用在rehash()中。
4.3 cleanSomeSlots(int i, int n)
/**
* 對數(shù)級別的掃描,介于 "不掃描(fast but retains garbage)" 和 "完全掃描(find all garbage but would cause some insertions to take O(n) time)" 之間
*
* @param i a position known NOT to hold a stale entry. The
* scan starts at the element after i.
* @param n scan control: {@code log2(n)} cells are scanned,
* unless a stale entry is found, in which case
* {@code log2(table.length)-1} additional cells are scanned.
* When called from insertions, this parameter is the number
* of elements, but when from replaceStaleEntry, it is the
* table length. (Note: all this could be changed to be either
* more or less aggressive by weighting n instead of just
* using straight log n. But this version is simple, fast, and
* seems to work well.)
* @return true if any stale entries have been removed.
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// 向后循環(huán)獲取下一個索引,如果已經(jīng)到了末尾,則返回table[0]
i = nextIndex(i, len);
Entry e = tab[i];
// 如果此Entry已經(jīng)無效了,key-ThreadLocal已經(jīng)被回收掉了,無法再通過該key獲取Entry,如果不回收此Entry,則會造成內(nèi)存泄露
if (e != null && e.get() == null) {
n = len;
removed = true;
// 嘗試回收或者rehash"i到i之后的第一個Entry為null的索引(即返回值)之間"的Entry數(shù)據(jù)
i = expungeStaleEntry(i);
}
} while ((n >>>= 1) != 0); // n / 2,假設n=16,當然set的時候n=size,則log2(16)=4,也就是說需要循環(huán)4次(也可以直接理解為16/2/2/2/2=1,第5次1/2==0則不再進行循環(huán))
return removed;
}
該方法只掃描一部分Entry,進行l(wèi)og2(n)次循環(huán),如果恰好掃描到無效的Entry,則n=table.length,即進行l(wèi)og2(table.length)次循環(huán)。每個循環(huán)的步驟:
- 獲取下一個索引位置i及該位置上的Entry
- 如果該Entry==null或者Entry依然有效,則進行下一次循環(huán);
- 如果該Entry!=null且無效,則先將n設置為長度(在set方法中n本來是)調(diào)用expungeStaleEntry(i)回收或者 整理 i之后的一段連續(xù)的Entry,之后再進行一下輪的回收和整理操作。
4.4 弱引用WeakReference
由于Entry繼承了弱引用,他的key是referent,當發(fā)生gc時,如果該key已經(jīng)不可達了,則直接回收(key==null),之后在如下操作時刪除key==null的Entry。
- get時調(diào)用expungeStaleEntry
- set時調(diào)用replaceStaleEntry或者cleanSomeSlots或者expungeStaleEntries
- remove時調(diào)用expungeStaleEntry
五、總結(jié)
5.1 set整個流程
- 首先將table[]成員變量局部化(在后續(xù)的操作中如果會對table成員變量進行多次操作,局部化會提高性能)
- 獲取數(shù)組長度,計算直接索引
- 然后進行hash沖突檢測與處理(線性探測法)
- 獲取直接索引位置上的Entry,如果不為null,表示發(fā)生了hash沖突,則先獲取Entry中的key,如果該key與當前的ThreadLocal是同一個對象,則直接替換value,返回;
- 如果Entry中的key==null,表示當前的Entry已經(jīng)是一個無效的Entry了,執(zhí)行replaceStaleEntry方法進行處理(replaceStaleEntry:如果從該Entry后的第一個元素開始在一個連續(xù)的Entry子數(shù)組內(nèi)找到一個key與當前ThreadLocal相等的元素,則替換值,然后互換該Entry和當前的key==null的Entry位置;如果沒找到,則將當前的key==null的Entry設置為新的將設置的Entry)返回;
- 如果發(fā)生了hash沖突,但是直接索引位置上的Entry即不是當前的ThreadLocal的Entry,也不是一個無效的Entry,則需要向后循環(huán)遍歷下一個Entry元素,再進行上述邏輯的處理。直到找到一個Entry為null的位置或者Entry失效的位置或者Entry就是當前的ThreadLocal的Entry的位置。
- 假設沒有發(fā)生hash沖突或者經(jīng)過上述的for循環(huán)找到了一個Entry為null的位置的時候,在該位置創(chuàng)建設置新添加的Entry,數(shù)組元素+1;
- 嘗試log級別的回收部分無效的Entry,如果成功,則當前的數(shù)組中的元素個數(shù)將小于閾值,則肯定不需要rehash();如果失敗,且數(shù)組元素個數(shù) >= threshold,執(zhí)行rehash()
- 首先對整個table回收無效的Entry操作,如果可以有效減小數(shù)組中的元素個數(shù)size到數(shù)組長度的一半以下,則不需要進行擴容,如果size達到length的一半及以上,則進行擴容操作
- 擴容:創(chuàng)建新的Entry[]是舊的長度的兩倍;之后遍歷舊數(shù)組,根據(jù)新的數(shù)組的長度重新計算每個有效Entry的直接索引,根據(jù)線性探測法重新填充數(shù)組。
最后設置rehash()閾值、數(shù)組元素個數(shù)size,將新數(shù)組賦值給全局變量table。
5.2 get整個流程
- 獲取當前線程
- 獲取當前線程的ThreadLocalMap屬性
- 如果當前線程的ThreadLocalMap對象不為null,從ThreadLocalMap對象中獲取key為當前的ThreadLocal的Entry
- 計算當前的ThreadLocal的直接索引,獲取直接索引位置的Entry
- 如果該位置的Entry不為null且Entry的key與當前的ThreadLocal是同一個元素,則直接返回該Entry
- 如果該位置的Entry為null,直接返回null(看getEntryAfterMiss邏輯)
- 如果該位置的Entry不為null,但是Entry的key與當前的ThreadLocal不是同一個元素,則表明發(fā)生了hash沖突。此時,會不斷的向后循環(huán)尋找,直到找到了要查找的Entry或者遍歷到的Entry為null,如果找到了,返回Entry,如果沒找到,返回null。
- 如果該Entry不為null,直接獲取entry的value,返回即可
- 如果當前線程的ThreadLocalMap為null(還未createMap(Thread t, T firstValue))或者key為當前的ThreadLocal的Entry為null(沒有調(diào)用map.set(this, value)),則執(zhí)行設置初始化值的方法。
5.3 各類回收的時機
- set或者setInitialValue
- 如果從直接索引開始向后遍歷的一段連續(xù)Entry段內(nèi)發(fā)生hash沖突且沖突的Entry已經(jīng)是無效的,此時會執(zhí)行replaceStaleEntry,會執(zhí)行回收和整理操作
- 如果沒有發(fā)生hash沖突 或者 發(fā)生了hash沖突但是在從直接索引開始的一段連續(xù)Entry段內(nèi)沒有找到與設置的key相等的Entry或者無效的Entry,如果直接索引所在的整個連續(xù)Entry段如果有無效Entry,則在set的尾部都會執(zhí)行一次log級別的回收操作,如果回收不理想,需要進行rehash()
- rehash()會進行一次全table的回收
- get
- 在通過直接索引沒有g(shù)et到數(shù)據(jù)的時候,會循環(huán)遍歷一段連續(xù)的Entry,如果遍歷到的Entry是無效的,則進行一次連續(xù)Entry子數(shù)組的回收和整理
- remove
- 當找到需要被刪除的Entry時,進行一次連續(xù)Entry子數(shù)組的回收和整理
- gc時
- 由于Entry繼承了弱引用,他的key是referent,當發(fā)生gc時,如果該key已經(jīng)無引用指向了,則直接回收(key==null了),之后再根據(jù)上述1,2,3的操作刪除key==null的Entry。(這也是為什么ThreadLocal沒有為Entry綁定ReferenceQueue的原因,因為Entry的刪除已經(jīng)可以發(fā)生在1,2,3中了;但是WeakHashMap不一樣,WeakHashMap的Entry繼承于WeakReference,其key是referent,當發(fā)生gc時,如果key不可達了,則回收key,之后將key對應的Entry放入queue,在后續(xù)對WeakHashMap的操作 - getTable()/size()/resize(int newCapacity)的時候進行Entry的回收)
5.4 ThreadLocal與線程池
線程池中的線程由于會被復用,所以線程池中的每一條線程在執(zhí)行task結(jié)束后,要清理掉其ThreadLocalMap及其中的各個Entry,否則,當這條線程在下一次被復用的時候,其ThreadLocalMap信息還存儲著上一次被使用的時的信息;另外,假設這條線程不再被使用,但是這個線程有可能不會被銷毀(與線程池的類型和配置相關),那么其上的ThreadLocal將發(fā)生了資源泄露。所以netty的io線程池使用FastThreadLocalRunnable wrap了Runnable任務,當任務執(zhí)行結(jié)束后,會做InternalThreadLocalMap和FastThreadLocal的清理操作。