重拾jdk源碼重點系列-8:ThreadLocal源碼分析

1. ThreadLocal的作用

ThreadLocal的作用是提供線程內(nèi)的局部變量,說白了,就是在各線程內(nèi)部創(chuàng)建一個變量的副本,相比于使用各種鎖機制訪問變量,ThreadLocal的思想就是用空間換時間,使各線程都能訪問屬于自己這一份的變量副本,變量值不互相干擾,減少同一個線程內(nèi)的多個函數(shù)或者組件之間一些公共變量傳遞的復(fù)雜度。我們看看源碼對于ThreadLocal的描述.

This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its {@code get} or {@code set} method) has its own, independently initialized copy of the variable. {@code ThreadLocal} instances are typically private static fields in classes that wish to associate state with a thread (e.g.,a user ID or Transaction ID).

2. 基本用法

實現(xiàn)的功能是給每個線程都有自己唯一的id,且是自增的.

public class ThreadId {

    public static final AtomicInteger NEXTId = new AtomicInteger(0);
    public static final ThreadLocal<Integer> THREADID = new ThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() {
            return NEXTId.incrementAndGet();
        }

    };
    public static int getThreadId() {

        return THREADID.get();
    }
    public static void main(String[] args) throws Exception {

        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {

                public void run() {
                    System.out.println(ThreadId.getThreadId());
                }

            }).start();
        }
        Thread.sleep(1000 * 3);
    }
}
3. ThreadLocal的數(shù)據(jù)結(jié)構(gòu)
private final int threadLocalHashCode = nextHashCode();

private static AtomicInteger nextHashCode =
        new AtomicInteger();
        
private static final int HASH_INCREMENT = 0x61c88647;

private static int nextHashCode() {
       return nextHashCode.getAndAdd(HASH_INCREMENT);  //
   }

從上面可以看出,每創(chuàng)建一個ThreadLocal變量,hashcode就會增加0x61c88647.hashcode的作用就是在后面根據(jù)在map中根據(jù)hash比較ThreadLocalMap的key,從而判定是否相等.之所以用這個數(shù)是因為可以是2的冪盡可能分布均勻
在每個線程內(nèi)部,都會維護一個 ThreadLocal.ThreadLocalMap threadLocals的成員變量,參考下面這個實例圖.每個變量能夠?qū)⒆兞克接谢母驹蜻€是在于ThreadLocalMap.

image.png

如圖所示,實線是強引用,虛線是弱引用,如果ThreadLocalRef的引用沒有了,則只剩下Entry對ThreadLocal有弱引用,我們知道弱引用活不過下次Gc(Entry是弱引用)

static class ThreadLocalMap {

        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
        /**
         * The initial capacity -- MUST be a power of two.
         */
        private static final int INITIAL_CAPACITY = 16;
        /**
         * The table, resized as necessary.
         * table.length MUST always be a power of two.
         */
        private Entry[] table;
        /**
         * The number of entries in the table.
         */
        private int size = 0;
        /**
         * The next size value at which to resize.
         */
        private int threshold; // Default to 0
        /**
         * Set the resize threshold to maintain at worst a 2/3 load factor.
         */
        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }
4. get()返回存儲在ThreadLocalMap中value
public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

從ThreadLocal中獲取值的步驟分為如下幾步.

  1. 獲取當(dāng)前線程的ThreadLocalMap
  2. 把當(dāng)前的ThreadLocal對象為key,去獲取值.若存在,且不為null,則返回.否則設(shè)置map,初始化
setInitialValue()
private T setInitialValue() {
        T value = initialValue();  //1
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);  //若存在,則設(shè)置key,value就可以
        else
            createMap(t, value); //不存在則創(chuàng)建ThreadLocalMap
        return value;
    }
  • initialValue()返回值為null,說明初始值為null

createMap()

 ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];  // 1 初始化數(shù)組,初始大小為16
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);  //定位到數(shù)組下標(biāo)
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);  //設(shè)置閾值
        }
  • firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1)相當(dāng)于一個求余的方法,這要求INITIAL_CAPACITY為2的n次冪.經(jīng)常采用這種方法來求響應(yīng)的hash值對應(yīng)在數(shù)組中的位置.
5. set()往ThreadLocalMap設(shè)置值
public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

set()的邏輯如下

  1. 獲取當(dāng)前線程的ThreadLocalMap
  2. 如果map不為null,則把傳入的值設(shè)置進去
  3. 否則創(chuàng)建新的map,createMap()和前面get()createMap()中的一樣.

set(ThreadLocal<?> key, Object value)

private void set(ThreadLocal<?> key, Object value) {

            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);  //找到在數(shù)組中的位置

            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();    //hash的線性探測法

                if (k == key) {   //遇到相等,則替換
                    e.value = value;
                    return;
                }

                if (k == null) { //發(fā)現(xiàn)key為null,則需要把這個key所在的Entry設(shè)置為null,然后把這個key后面的元素做再hash往前移動
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            tab[i] = new Entry(key, value);  //第一次遇到Entry為空,則放入進去.運行到這里,說明這個過程中沒有key為null的Entry
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)  //在清理完成后,看當(dāng)前大小有沒有超過閾值,看是否需要rehash
                rehash();
  }

set()方法的邏輯是:

  1. 找到在數(shù)組中的位置
  2. 遇到相等則替換,如果在這過程中遇到key為null,執(zhí)行第三步
  3. 執(zhí)行replaceStaleEntry()
  4. 經(jīng)過2,3兩步還沒終止,說明遇到Entry為null,則把key,value組成Entry,放入到這個位置.
  5. 添加了新的元素,需要判斷達沒達到閾值,達到則需要再hash
replaceStaleEntry()
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            int slotToExpunge = staleSlot;   //key為null的Entry,在數(shù)組中的下標(biāo)
            for (int i = prevIndex(staleSlot, len);   //從該位置往前找
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;     //記錄下key為null的點

            for (int i = nextIndex(staleSlot, len);   //從該位置往后走
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                if (k == key) {   
                    e.value = value;

                    tab[i] = tab[staleSlot];   //若找到,把該Entry與傳入進來位置的Entry做個交換
                    tab[staleSlot] = e;

                
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;  //從交換之后,此時key為nul,正好從這里清理
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;   //遇到第一個key為null的位置,記錄下來
            }
           //運行到這里,說明沒有遇到key相等的,則在slot處新建一個新的Entry,把key,value設(shè)置進去.
          
            tab[staleSlot].value = null;  //方便GC
            tab[staleSlot] = new Entry(key, value);
          //如果還有key為null的Entry,則清理
            if (slotToExpunge != staleSlot)   說明存在key為null的Entry,則清理
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }

slotToExpunge主要用來記錄從前到后key為null的位置,方便清理

  • 第1個for循環(huán):我們向前找到key為null的位置,記錄為slotToExpunge,這里是為了后面的清理過程,可以不關(guān)注了;
  • 第2個for循環(huán):我們從staleSlot起到下一個null為止,若是找到key和傳入key相等的Entry,就給這個Entry賦新的value值,并且把它和staleSlot位置的Entry交換,然后調(diào)用CleanSomeSlots清理key為null的Entry。
  • 若是一直沒有key和傳入key相等的Entry,那么就在staleSlot處新建一個Entry。函數(shù)最后再清理一遍空key的Entry。
    cleanSomeSlots這個函數(shù)是以log(n)的速度去發(fā)現(xiàn)key為null的點.如果找到則調(diào)用expungeStaleEntry取清除和再hash,它里面就是不斷的折半查找.
expungeStaleEntry(int staleSlot)
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 把該位置設(shè)置為null
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    // Rehash until we encounter null
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
        if (k == null) {   //遇到key為null的,則設(shè)置為null,方便gc
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            int h = k.threadLocalHashCode & (len - 1);  //如果有值,再hash
            if (h != i) {
                tab[i] = null;
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i;
}

expungeStaleEntry的邏輯是:

  1. 先把該位置設(shè)置為null,方便GC
  2. 從當(dāng)前位置順著往下走,直到第一為null的Entry.在這過程中,如果遇到key為null,則把該位置的Entry設(shè)置為null,有利于GC.
  3. 如果key不為null,則把該元素重新hash(線性探測法)
rehash
private void rehash() {
            expungeStaleEntries();  //清除過時的Entry,這里只要是key為null,這調(diào)用expungeStaleEntry(int staleSlot),也就是上面這個方法

            if (size >= threshold - threshold / 4)  //清理后,如果size還大于3/4的threshold,那么就擴容
                resize();
        }

private void resize() {
            Entry[] oldTab = table;
            int oldLen = oldTab.length;
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];   //開辟一個數(shù)組大小是原來兩倍大的數(shù)組
            int count = 0;

            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    if (k == null) {
                        e.value = null; // 幫助GC
                    } else {                //重新hash到新數(shù)組中
                        int h = k.threadLocalHashCode & (newLen - 1);
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen);
                        newTab[h] = e;
                        count++;
                    }
                }
            }

rehash的邏輯是:

  1. 先嘗試清除key為null的位置
  2. 再觀察是否達到3/4的閾值,從而來擴容

擴容的邏輯是;

  1. 開辟一個長度是以前數(shù)組兩倍的數(shù)組,重新hash,放入到新數(shù)組中.
  2. 這個過程中,如果遇到key為空,則把值賦值為null,方便GC
remove
 private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();   //把引用設(shè)為null,方便GC
                    expungeStaleEntry(i);  //上面已經(jīng)談到
                    return;
                }
            }
        }

remove的處理邏輯是把應(yīng)用設(shè)置為null,方便GC.然后在調(diào)用 expungeStaleEntry(i)去掉key為null的Entry,再hash.

5. 關(guān)于expungeStaleEntry中當(dāng)key不為空,為什么要重新hash

是因為,如果不重新hash,那么后來再取尋找的時候,遇到Null就會停止搜索,這就造成原本能夠找到的,現(xiàn)在找不到.歸根結(jié)底采用了鏈地址法.

6. 使用ThreadLocal的最佳實踐

我們發(fā)現(xiàn)無論是set,get還是remove方法,過程中key為null的Entry都會被擦除,那么Entry內(nèi)的value也就沒有強引用鏈,GC時就會被回收。那么怎么會存在內(nèi)存泄露呢?但是以上的思路是假設(shè)你調(diào)用get或者set方法了,很多時候我們都沒有調(diào)用過,所以最佳實踐就是

  • 1 .使用者需要手動調(diào)用remove函數(shù),刪除不再使用的ThreadLocal.
  • 2 .還有盡量將ThreadLocal設(shè)置成private static的,這樣ThreadLocal會盡量和線程本身一起消亡。

參考文章:
ThreadLocal源碼深度剖析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容