ThreadLocal源碼分析

一、簡介

ThreadLocal提供了線程本地變量,通過get或者set操作的這些變量在每個(gè)不同線程間是不相同的,各個(gè)線程獨(dú)立地初始化這些變量。ThreadLocal實(shí)例通常在類中是聲明為private static域的,用于在同一個(gè)線程內(nèi)關(guān)聯(lián)相同的狀態(tài)(e.g. 一個(gè)User ID或者Transaction ID)。ThreadLocal相當(dāng)于提供了一種線程隔離,將變量與線程相綁定。

只要線程還存活并且ThreadLocal實(shí)例還能被獲取到,那么每個(gè)線程會(huì)持有一個(gè)ThreadLocal變量弱引用。當(dāng)線程結(jié)束生命周期時(shí),所有的線程本地實(shí)例都會(huì)被GC。

二、簡單示例

/**
 * 不同線程持有一個(gè)不同的UUID
 */
 public class ThreadLocalTest {
     private static ThreadLocal<String> uuidLocal = new ThreadLocal<String>(){
         protected String initialValue() {
             return UUID.randomUUID().toString();
         }
     };

     public static void main(String[] args) {
         UUIDThread t1 = new UUIDThread();
         UUIDThread t2 = new UUIDThread();
         t1.start();
         t2.start();
     }

     public static class UUIDThread extends Thread {
         @Override
         public void run() {
             System.out.println(Thread.currentThread().getName() + " uuid: " + uuidLocal.get());
         }
     }
 }

輸出結(jié)果兩個(gè)uuid不同,如下所示:

Thread-1 uuid: d8d2006f-0a8a-4999-90c0-de2648c742da
Thread-0 uuid: 5061e4bd-8f57-4ef6-8b74-e7571f9efb93

三、我司的用法

使用用戶公司來做分庫,不同的公司數(shù)據(jù)分在不同的業(yè)務(wù)庫中,將companyID存入DataSourceContext中,查詢數(shù)據(jù)庫的時(shí)候從DataSourceContext獲取對(duì)應(yīng)的companyID,根據(jù)companyID獲取對(duì)應(yīng)的數(shù)據(jù)庫鏈接。

public class DataSourceContext {
    static final ThreadLocal<String> local = new ThreadLocal<>();
    public DataSourceContext() {
    }
    
    public static String getCompany() {
        return local.get();
    }

    public static  String setCompany(String companyID) {
        return companyID == null ? null : set(companyID);
    }
}

四、成員變量

private final int threadLocalHashCode = nextHashCode(); // 初始值為0

private static AtomicInteger nextHashCode =
    new AtomicInteger();

private static final int HASH_INCREMENT = 0x61c88647;

private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

ThreadLocal通過自定義threadLocalHashCode減少線性探測的沖突,每次實(shí)例化一個(gè)ThreadLocal,threadLocalHashCode都會(huì)新增HASH_INCREMENT(0x61c88647)。

五、幾個(gè)方法

1. initialValue方法

/**
 * 返回當(dāng)前線程的線程本地變量初始化值,在一個(gè)線程首次調(diào)用get方法時(shí)被調(diào)用。
 * 如果調(diào)用get之前調(diào)用了set方法,就不會(huì)調(diào)用initialValue方法了。
 * 默認(rèn)實(shí)現(xiàn)返回null,如果有需要,可以繼承ThreadLocal,并覆蓋該方法。
 * 一般是使用匿名內(nèi)部類的形式子類化。
 */
protected T initialValue() {
    return null;
}

2. get方法

/**
 * 返回當(dāng)前線程的線程本地變量
 */
public T get() {
    // 獲取當(dāng)前線程的引用
    Thread t = Thread.currentThread();
    // 從當(dāng)前線程中獲取到關(guān)聯(lián)的ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null)
            return (T)e.value;
    }
    // 如果當(dāng)前線程沒有線程本地變量,就設(shè)置初始值
    return setInitialValue();
}

/**
 * 從ThreadLocal中獲取一個(gè)關(guān)聯(lián)的Map
 */
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

可以從代碼中看出來,get()方法就是從當(dāng)前線程中獲取一個(gè)和當(dāng)前線程相關(guān)聯(lián)的ThreadLocalMap,然后以thiskey,從ThreadLocalMap中取出相應(yīng)的值,并返回。如果沒有值,就設(shè)置一個(gè)初始值。

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

threadLocalsThread的成員變量,每個(gè)線程通過ThreadLocal.ThreadLocalMapThreadLocal相綁定,這樣可以確保每個(gè)線程訪問到的thread-local variable都是本線程的。

3. set方法

/**
 * 設(shè)置初始值
 */
private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        // 如果線程t不存在ThreadLocalMap實(shí)例,就創(chuàng)建一個(gè)
        createMap(t, value);
    return value;
}

/**
 * 實(shí)例化一個(gè)ThreadLocalMap并賦值給t.threadLocals
 */
void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

/**
 * 設(shè)置當(dāng)前線程的線程本地變量值
 */
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

可以看出set()方法和setInitialValue()方法類似,如果當(dāng)前線程存在threadLocals,那么直接把設(shè)置的值put到這個(gè)ThreadLocalMap中。否則,創(chuàng)建一個(gè)帶有這個(gè)valueThreadLocalMap

4. remove方法

public void remove() {
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}

首先獲取當(dāng)前線程,并從當(dāng)前線程中獲取ThreadLocalMap,如果不為空,則調(diào)用ThreadLocalMapremove方法,把以thiskeyEntry移除掉。如果隨后在當(dāng)前線程中被調(diào)用了get方法,那么因?yàn)樵鹊?code>Entry已經(jīng)被移除掉了,所以還會(huì)調(diào)用一次initialValue()方法初始化值。

5. 小結(jié)

從這些方法可以看出ThreadLocal類的設(shè)計(jì),Thread中有ThreadLocalMap成員變量,ThreadLocalMap又以ThreadLocal作為key來存放值。也就是說ThreadLocal把自身實(shí)例作為key,和需要保存的value存放到當(dāng)前線程的一個(gè)Map中,來保證每個(gè)線程訪問到的線程本地變量值都是各自線程的。ThreadLocal#set方法可以簡單理解為Thread.currentThread().threadLocals.put(this, value),ThreadLocal#get方法可以簡單理解為Thread.currentThread().threadLocals.get(this)

六、ThreadLocalMap

ThreadLocal實(shí)現(xiàn)中,核心還是ThreadLocalMap。ThreadLocal只是作為ThreadLocalMapkey, 從ThreadLocalMap中獲取到相應(yīng)的值。下面簡單看下ThreadLocalMap的實(shí)現(xiàn)。

1. 成員變量

/**
 * 初始容量,必須是2^n
 */
private static final int INITIAL_CAPACITY = 16;

/**
 * 必要時(shí)會(huì)擴(kuò)容,但必須是2^n
 */
private Entry[] table;

/**
 * table中entry的數(shù)量,也就是ThreadLocalMap的大小
 */
private int size = 0;

/**
 * 下一次擴(kuò)容的閾值
 */
private int threshold; // Default to 0

其中INITIAL_CAPACITY代表這個(gè)ThreadLocalMap的初始容量;table是一個(gè)Entry類型的數(shù)組,用于存儲(chǔ)數(shù)據(jù);size代表表中的存儲(chǔ)數(shù)目,也就是ThreadLocalMap的大小;threshold代表需要擴(kuò)容時(shí)對(duì)應(yīng)size的閾值。

2. 靜態(tài)內(nèi)部類

static class Entry extends WeakReference<ThreadLocal> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal k, Object v) {
        super(k);
        value = v;
    }
}

Entry繼承了WeakReference,當(dāng)除了Entry以外沒有其它地方強(qiáng)引用ThreadLocal實(shí)例,那么ThreadLocal實(shí)例就會(huì)被GC回收,避免造成內(nèi)存溢出。

3. 構(gòu)造函數(shù)

ThreadLocalMap(ThreadLocal firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY];
    // 對(duì)hashcode“取?!庇?jì)算出table中索引值
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
}

計(jì)算索引值i的時(shí)候里面采用了hashCode & (size - 1)的算法,這相當(dāng)于取模運(yùn)算hashCode % size的一個(gè)更高效的實(shí)現(xiàn)(和HashMap中的思路相同)。正是因?yàn)檫@種算法,要求size必須是2^n。

4. set方法

大致思路為:

  1. 通過keyhashcode計(jì)算出索引值
  2. 從索引值i開始,通過線性探測法table中找到一個(gè)可以存放value的地方,然后設(shè)置值
  3. 因?yàn)?code>ThreadLocalMap的keyWeakReference,所以會(huì)存在Entry存在,但是key已經(jīng)被回收的情況,這時(shí)候需要進(jìn)行一些清理工作,把這些Entry清理掉。
  4. 如果size大于閾值(threshold),就要進(jìn)行擴(kuò)容,并rehash,從新計(jì)算映射。
private void set(ThreadLocal key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    // 計(jì)算索引值
    int i = key.threadLocalHashCode & (len-1);

    // 使用線性探測法來解決沖突,而不是HashMap中采用的拉鏈法
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal k = e.get();

        // 如果欲設(shè)置的key和table[i]中的相同,則更新value
        if (k == key) {
            e.value = value;
            return;
        }

        // 如果k==null,證明key(WeakReference)已經(jīng)被GC回收,所以替換新的key和value
        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    // 創(chuàng)建新的Entry
    tab[i] = new Entry(key, value);
    int sz = ++size;
    // 如果更新之后的size大于閾值threshold,則需要rehash
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

/** 線性探測法的套路,找到下一個(gè)索引,如果越界了,就從0開始 */
private static int nextIndex(int i, int len) {
    return ((i + 1 < len) ? i + 1 : 0);
}

5. getEntry方法

ThreadLocal#get方法就是調(diào)用了ThreadLocalMap#getEntry方法。

大致思路為:

  1. 通過keyhashcode計(jì)算出索引值i
  2. 為了提高性能,直接判斷索引值下的Entry是不是需要找的
  3. 否則,用線性探測的方式找到相應(yīng)的value
private Entry getEntry(ThreadLocal key) {
    // 計(jì)算出索引值
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    // 命中,則直接返回
    if (e != null && e.get() == key)
        return e;
    else
        // 按線性探測的方式查找
        return getEntryAfterMiss(key, i, e);
}

/**
 * 和getEntry類似,用于當(dāng)key的hash直接計(jì)算出的索引值上找不到Entry時(shí)
 */
private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    while (e != null) {
        ThreadLocal k = e.get();
        if (k == key)
            return e;
        if (k == null)
            // 清理過期的Entry
            expungeStaleEntry(i);
        else
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

6. remove方法

remove方法和getEntry方法類似,計(jì)算索引值i,用線性探測的防止,找到Entry后,清理Entry。

private void remove(ThreadLocal key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            e.clear();
            expungeStaleEntry(i);
            return;
        }
    }
}

7. 擴(kuò)容方法

擴(kuò)容方法也很清楚,判斷目前使用的容量是否大于一定的值(size >= 3 / 4 * threshold),如果大于,則需要resize

resize方法的思路如下:

  1. size擴(kuò)大為兩倍,創(chuàng)建一個(gè)新的table表,將oldTab上的Entry轉(zhuǎn)移到newTab上。
  2. 轉(zhuǎn)移過程中,如果發(fā)現(xiàn)e.get() == null,則證明key已經(jīng)被GC回收,那么這個(gè)Entry就不轉(zhuǎn)移。
  3. 否則,用線性探測法找到EntrynewTab存放的位置,并設(shè)置。
  4. 最后設(shè)置新的threshold

可以看出threshold的大小為len * 2 / 3,所以每次size >= 0.5 * len的時(shí)候就要進(jìn)行擴(kuò)容(resize)。

private void rehash() {
    expungeStaleEntries();

    // 如果size > 3/4 * threshold,則擴(kuò)容
    if (size >= threshold - threshold / 4)
        resize();
}
/**
 * table容量翻倍
 */
private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal k = e.get();
            // 如果k已經(jīng)被GC回收,那么把value也設(shè)置為null,幫助GC回收,防止內(nèi)存泄漏
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                // 從新計(jì)算索引值,并通過線性探測的方式存放到table中
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }

    setThreshold(newLen);
    size = count;
    table = newTab;
}

private void setThreshold(int len) {
    threshold = len * 2 / 3;
}

8. 一些清理方法

1) expungeStaleEntry

首先會(huì)清理tab[staleSlot]上過期的Entry,然后需要再散列(rehash),中間可能還會(huì)遇到一些過期的Entry,這些也要清理掉,知道遇到table[i] == null,中間的所有Entry都要rehash。

private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 清理掉過期位置的Entry
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    // 再散列,知道遇到table[i] == null
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal k = e.get();
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            int h = k.threadLocalHashCode & (len - 1);
            // 如果h==i,那么證明這個(gè)Entry就是要放在table[i]上的,就不要rehash這個(gè)Entry
            // 否則,rehash
            if (h != i) {
                // 先把當(dāng)前位置tab[i]釋放出來,再把Entry放到新的位置
                tab[i] = null;

                // Unlike Knuth 6.4 Algorithm R, we must scan until
                // null because multiple entries could have been stale.
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    return i;
}

2) cleanSomeSlots

/**
 * 探索式掃描尋找過期的entry,當(dāng)增加新元素或者另一個(gè)過期entry被清理的時(shí)候會(huì)被調(diào)用
 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        i = nextIndex(i, len);
        Entry e = tab[i];
        // 判斷是否過期,如果是則處理
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    return removed;
}

每次新增元素的時(shí)候會(huì)進(jìn)行探索式掃描,尋找過期Entry并清理。

3) 何時(shí)會(huì)清理過期Entry

處理Thread實(shí)例被GC回收,ThreadLocalMap同時(shí)被回收之外,下面這些條件下,會(huì)清理過期的Entry。

  1. getEntry時(shí),線性探索尋找Entry的時(shí)候發(fā)現(xiàn)Entry過期。
  2. set的時(shí)候發(fā)現(xiàn),key對(duì)應(yīng)索引值的Entry已過期,則會(huì)清理并替換
  3. 每次調(diào)用set方法的時(shí)候,會(huì)探索式掃描Entry,如果發(fā)現(xiàn)過期,則清理。
  4. size > thresholdrehash的時(shí)候。
  5. 調(diào)用remove方法的時(shí)候。

當(dāng)前的應(yīng)用開發(fā)過程中,出于復(fù)用的目的,常常會(huì)使用線程池的技術(shù),線程中ThreadLocalMap可能會(huì)長期存在。因?yàn)?code>Entry中的keyWeakReference包裝,在key不存在強(qiáng)引用的時(shí)候,會(huì)回收key,但是Entryvalue并不會(huì)被回收。所以在ThreadLocalMap中需要不時(shí)地清理過期的Entry,來保證內(nèi)存不泄露。當(dāng)然,如果我們在代碼中每次使用完ThreadLocal,都可以remove一下,那么就可以盡早釋放不需要的內(nèi)存。

七、參考

并發(fā)編程 | ThreadLocal源碼深入分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1. 背景 ThreadLocal源碼解讀,網(wǎng)上面早已經(jīng)泛濫了,大多比較淺,甚至有的連基本原理都說的很有問題,包括...
    時(shí)之令閱讀 698評(píng)論 1 5
  • java版本是 java1.8.0_181 每個(gè)版本的具體實(shí)現(xiàn)細(xì)節(jié)都大同小異,接著從以下幾個(gè)問題進(jìn)行分析。 為什...
    衣忌破閱讀 227評(píng)論 0 0
  • 系列文章:Android消息機(jī)制Handler源碼分析Android Handler消息機(jī)制中的ThreadLoc...
    Hengtao24閱讀 287評(píng)論 0 1
  • 一. 簡介 提醒篇幅較大需耐心。 簡介來自ThreadLocal類注釋 ThreadLocal類提供了線程局部 (...
    BrightLoong閱讀 9,614評(píng)論 2 14
  • 下面我就以面試問答的形式學(xué)習(xí)我們的——ThreadLocal類(源碼分析基于JDK8) 問答內(nèi)容 1、問:Thre...
    Sophia_dd35閱讀 2,150評(píng)論 1 36

友情鏈接更多精彩內(nèi)容