Netty源碼分析2 - ThreadLocal 源碼解析

netty新增了FastThreadLocal和FastThreadLocalThread等類來實現(xiàn)了一個FastThread框架,相較于java.lang.Thread和java.lang.ThreadLocal速度更快。
在看FastThread框架之前,先看一下java.lang.ThreadLocal的源碼。
  • 一、使用姿勢
  • 二、數(shù)據(jù)結(jié)構(gòu)
  • 三、源碼分析
  • 四、回收機制
  • 總結(jié)

一、使用姿勢

public class ThreadLocalTest {
    /**
     * 最佳實踐一:使用private static
     * {@code ThreadLocal} instances are typically private static fields in classes
     * that wish to associate state with a thread (e.g., a user ID or Transaction ID)
     *
     * java8寫法:
     * private static final ThreadLocal<Integer> integerThreadLocal = ThreadLocal.withInitial(() -> 100);
     *
     * java8之前的寫法:
     * <pre>
     *     private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
     *          @Override
     *          protected Integer initialValue() {
     *              return 100;
     *          }
     *      };
     * </pre>
     */
    private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
        /**
         * 最佳實踐二:重寫initialValue()
         */
        @Override
        protected Integer initialValue() {
            return 100;
        }
    };

    @Test
    public void testThreadLocal() {
        System.out.println(integerThreadLocal.get());
        /**
         * 最佳實踐三:用完如果可以刪除,則手動刪除
         */
        integerThreadLocal.remove();
        System.out.println(integerThreadLocal.get());
    }
}

最佳實踐

  • 在類中定義ThreadLocal,用private static修飾;
  • 根據(jù)源碼分析,由于ThreadLocal對象本身會作為Entry的key去獲取數(shù)據(jù),所以最好也要用final去修飾。key通常都是不可變對象,否則當作為key的對象發(fā)生了變化之后,之前存儲的數(shù)據(jù)將無法get,造成資源泄露;
  • 使用匿名內(nèi)部類實現(xiàn)initialValue(),在執(zhí)行g(shù)et()時,如果當前的Thread的ThreadLocalMap沒有integerThreadLocal這個key的Entry(不管是第一次get還是remove之后的第一次get),則直接使用initialValue()進行初始化操作;
  • 在使用完ThreadLocal后,及時手動remove()無用的ThreadLocal,防止資源泄露

二、數(shù)據(jù)結(jié)構(gòu)

threadlocal.png

說明

  • 每一個Thread類都有一個屬性:ThreadLocal.ThreadLocalMap threadLocals = null
  • 每一個ThreadLocalMap都有一個Entry[]數(shù)組
  • 每一個Entry繼承于WeakReference,Entry的key(ThreadLocal實例)作為WeakReference的referent,傳入的value作為value

java引用:

注意

  • 從第三條說明點發(fā)現(xiàn)每一個ThreadLocal只能存儲一個值,對于dubbo的RpcContext.LOCAL來講,存儲了一個RpcContext上下文信息;如果需要存儲多個值,根據(jù)情況,將多個值合并為對象進行存儲或者使用多個ThreadLocal實例進行存儲

三、源碼分析

3.1、ThreadLocal的創(chuàng)建

基礎屬性

    /**
     * 每一個ThreadLocal對象都有一個唯一的threadLocalHashCode。
     * ThreadLocal對象會作為ThreadLocalMap的Entry的key     
     */
    private final int threadLocalHashCode = nextHashCode();

    private static AtomicInteger nextHashCode = new AtomicInteger();

    /**
     * 魔數(shù):與斐波那契數(shù)列和黃金分割點有關。用于散列均勻,減少hash沖突
     */
    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

說明

  • 每一個ThreadLocal對象都有一個唯一的threadLocalHashCode,該值用于計算Entry元素存儲的直接索引。
    • 直接索引:元素根據(jù)threadLocalHashCode&(table.len-1)算出來的值
    • 真實索引:元素可能由于hash沖突,使用線性探測法,放置在不是直接索引的位置
  • hash算法:每一個ThreadLocal的threadLocalHashCode都是前一個的threadLocalHashCode + 0x61c88647。這樣的hash算法可以很大程度的避免hash沖突,散列很均勻。0x61c88647這個數(shù)是怎么算出來的,見:https://www.javaspecialists.eu/archive/Issue164.html

普通構(gòu)造函數(shù)

    public ThreadLocal() {
    }

java8提供的構(gòu)造函數(shù)

    public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
        return new SuppliedThreadLocal<>(supplier);
    }
    
    static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {

        private final Supplier<? extends T> supplier;

        SuppliedThreadLocal(Supplier<? extends T> supplier) {
            this.supplier = Objects.requireNonNull(supplier);
        }

        @Override
        protected T initialValue() {
            return supplier.get();
        }
    }

java8提供了Supplier(提供者) FunctionalInterface?;谠摻涌诳梢苑奖愕奶娲羰纠械氖褂媚涿麅?nèi)部類來重寫initialValue()的方法。需要注意的是,supplier函數(shù)不可為null。

@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     */
    T get();
}

3.2 數(shù)據(jù)的初始化與設置

    /**
     * 設置初始值
     * 與set(T value)幾乎相同,下邊代碼等價于
     * <pre>
     *     private T setInitialValue() {
     *          T value = initialValue();
     *          set(value);
     *          return value;
     *     }
     * </pre>
     */
    private T setInitialValue() {
        // 獲取初始值(通常是調(diào)用使用方重寫的initialValue(),否則使用{@link ThreadLocal#initialValue()返回null})
        T value = initialValue();
        // 獲取當前線程
        Thread t = Thread.currentThread();
        // 獲取當前線程的ThreadLocalMap對象
        ThreadLocalMap map = getMap(t);
        // 如果當前線程的ThreadLocalMap對象不為null,則直接將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
        if (map != null)
            map.set(this, value);
        // 如果當前線程的ThreadLocalMap為null,則直接新建ThreadLocalMap對象,并將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
        else
            createMap(t, value);
        // 返回初始值
        return value;
    }
    
    protected T initialValue() {
        return null;
    }
    
    /**
     * 設置值
     */
    public void set(T value) {
        // 獲取當前線程
        Thread t = Thread.currentThread();
        // 獲取當前線程的ThreadLocalMap對象
        ThreadLocalMap map = getMap(t);
        // 如果當前線程的ThreadLocalMap對象不為null,則直接將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
        if (map != null)
            map.set(this, value);
        // 如果當前線程的ThreadLocalMap為null,則直接新建ThreadLocalMap對象,并將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
        else
            createMap(t, value);
    }
    
    /**
     * 獲取當前線程t的ThreadLocalMap
     */
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
    
    /**
     * 創(chuàng)建ThreadLocalMap,并將 {當前的ThreadLocal對象,firstValue} 設置到ThreadLocalMap對象中去
     * this:當前的ThreadLocal對象
     */
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

setInitialValue()流程:

  • 首先調(diào)用initialValue()獲取初始值(通常是調(diào)用使用方重寫的initialValue(),否則返回null)
  • 然后獲取當前線程
  • 然后獲取當前線程的ThreadLocalMap對象屬性
  • 如果當前線程的ThreadLocalMap對象不為null,則直接將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
  • 如果當前線程的ThreadLocalMap為null,則直接新建ThreadLocalMap對象,并將{當前的ThreadLocal對象,value}設置到ThreadLocalMap對象中去
  • 最后返回initialValue()獲取到的初始值

setInitialValue()被使用的時機:ThreadLocal.get()。

關于ThreadLocalMap的創(chuàng)建和向ThreadLocalMap中設置參數(shù)的源碼后續(xù)再說。

3.3 數(shù)據(jù)的獲取

    public T get() {
        // 獲取當前線程
        Thread t = Thread.currentThread();
        // 獲取當前線程的ThreadLocalMap屬性
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 從ThreadLocalMap屬性中獲取key為當前的ThreadLocal的Entry
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                // 如果Entry不為null,直接獲取entry的value,返回即可
                @SuppressWarnings("unchecked")
                T result = (T) e.value;
                return result;
            }
        }
        // 如果當前線程的ThreadLocalMap為null或者key為當前的ThreadLocal的Entry為null
        return setInitialValue();
    }

get()流程:

  • 獲取當前線程
  • 獲取當前線程的ThreadLocalMap屬性
  • 如果當前線程的ThreadLocalMap對象不為null,從ThreadLocalMap對象中獲取key為當前的ThreadLocal的Entry
  • 如果該Entry不為null,直接獲取entry的value,返回即可
  • 如果當前線程的ThreadLocalMap為null(還未createMap(Thread t, T firstValue))或者key為當前的ThreadLocal的Entry為null(沒有調(diào)用map.set(this, value)),則執(zhí)行設置初始化值的方法。

關于根據(jù)ThreadLocal從ThreadLocalMap中獲取Entry的源碼后續(xù)再說。

3.4 數(shù)據(jù)的刪除

    public void remove() {
        ThreadLocalMap m = getMap(Thread.currentThread());
        if (m != null)
            m.remove(this);
    }

remove()步驟:

  • 獲取當前的線程
  • 獲取當前線程的ThreadLocalMap
  • 如果當前的ThreadLocalMap不為null,直接執(zhí)行調(diào)用ThreadLocalMap的remove(ThreadLocal<?> key)

關于根據(jù)ThreadLocal從ThreadLocalMap中remove Entry的源碼后續(xù)再說。

可以看到具體的操作全部委托到了ThreadLocalMap中來執(zhí)行。

3.5 ThreadLocalMap的創(chuàng)建

基礎屬性

        /**
         * The initial capacity -- MUST be a power of two.
         */
        private static final int INITIAL_CAPACITY = 16;

        /**
         * The table, resized as necessary.
         * table.length MUST always be a power of two.
         */
        private Entry[] table;

        /**
         * The number of entries in the table.
         */
        private int size = 0;

        /**
         * threshold == table大小的2/3,
         * 1、當size >= threshold,遍歷table并刪除所有key為null的元素(rehash()),
         * 2、如果刪除后size >= threshold*3/4 == 數(shù)組長度*1/2 時,需要對table進行擴容
         */
        private int threshold; // Default to 0

        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }

靜態(tài)內(nèi)部類Entry

        static class Entry extends WeakReference<ThreadLocal<?>> {
            Object value;
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

其中value就是set的value,而k是當前的ThreadLocal對象,作為WeakReference的referent屬性。

構(gòu)造器

        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            // 創(chuàng)建數(shù)組實例,默認長度16
            table = new Entry[INITIAL_CAPACITY];
            // 求余計算數(shù)組索引i
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            // 第一次插入,沒有hash沖突,直接放置
            table[i] = new Entry(firstKey, firstValue);
            // 數(shù)組實際元素長度 + 1
            size = 1;
            // 設置擴容的閾值為 16*2/3 = 10,當有10個元素時,發(fā)生擴容
            setThreshold(INITIAL_CAPACITY);
        }

步驟:

  • 創(chuàng)建數(shù)組實例,默認長度16
  • 計算firstKey的直接索引,當len=2的n次方時,x&(len-1) <=> x%len,所以這里相當于直接求余,只是&運算效率更高
  • 第一次插入,沒有hash沖突,直接創(chuàng)建Entry并放置就ok了
  • 然后數(shù)組中元素個數(shù)size+1
  • 設置rehash()的閾值

索引操作

        /**
         * i:數(shù)組索引
         * len:數(shù)組長度
         * 如果 i+1<數(shù)組長度,則返回i+1, 如果 i+1>=數(shù)組長度,則直接取0(即開頭table[0]),
         * 在實際使用中,實際上就是不斷向后的循環(huán)取,到了末尾,再回到table[0],再向后循環(huán)取
         */
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        /**
         * 如果 i-1>=0,則返回i-1, 如果 i<1,則直接取數(shù)組長度-1(即末尾table[len-1]),
         * 在實際使用中,實際上就是不斷向前的循環(huán)取,到了開頭,再回到table[len-1],再向前循環(huán)取
         */
        private static int prevIndex(int i, int len) {
            return ((i - 1 >= 0) ? i - 1 : len - 1);
        }

從上述的邏輯可知,我們可以將table[]看做是一個循環(huán)數(shù)組。

3.6 向ThreadLocalMap中設置Entry

        /**
         * 設置 {key, value} 到當前線程的ThreadLocalMap中
         */
        private void set(ThreadLocal<?> key, Object value) {
            // 成員變量局部化,提高性能
            Entry[] tab = table;
            // 獲取數(shù)組容量
            int len = tab.length;
            // 獲取當前的threadlocal位于數(shù)組的下標,當len=2的n次方時,
            // key.threadLocalHashCode & (len - 1) <=> key.threadLocalHashCode % len
            int i = key.threadLocalHashCode & (len - 1);

            /**
             * 先獲取tab[i],判斷如果不為null,則進行循環(huán)體內(nèi)邏輯;
             * 若沒有return,則進行二次循環(huán),tab[i+1],判斷如果不為null,則進行循環(huán)體內(nèi)邏輯;
             * 當 i + 1 >= len 時,tab[0]回到頭再進行循環(huán)。
             *
             * 這就是解決hash沖突的另一種方式:線性探測法。如果當前槽沒有元素,直接插入元素;如果當前槽有元素,則向后尋找第一個為null的槽,放置該元素。
             * 在這里,還添加了hash沖突時替換原有元素或者替換無效元素的邏輯。
             */
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                // 獲取當前的Entry的key,使用e.get(),是因為當前的key是一個WeakReference,使用get()獲取key
                ThreadLocal<?> k = e.get();
                // 如果當前的key就是剛剛查詢出來的Entry的key(即修改的是同一個Entry),則直接替換值
                if (k == key) {
                    e.value = value;
                    return;
                }
                // 如果剛剛查詢出來的Entry的key是null,則表明該Entry的key(ThreadLocal)已經(jīng)被回收了
                // 使用replaceStaleEntry替換掉已經(jīng)無效的Entry
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            // 通過上述for循環(huán)解決了hash沖突之后,創(chuàng)建新的Entry,放置到當前的table[i]節(jié)點上
            tab[i] = new Entry(key, value);
            // table的size+1
            int sz = ++size;
            // 回收部分無效的Entry,如果成功,則當前的數(shù)組中的元素個數(shù)將小于閾值,則肯定不需要擴容,
            // 否則判斷當前的數(shù)組中的元素個數(shù)是否達到了擴容閾值
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

說明:

  • 首先將table[]成員變量局部化(在后續(xù)的操作中如果會對table成員變量進行多次操作,局部化會提高性能)
  • 獲取數(shù)組長度,計算直接索引
  • 然后進行hash沖突檢測與處理(線性探測法)
    • 獲取直接索引位置的Entry,如果不為null,表示發(fā)生了hash沖突,則先獲取Entry中的key,如果該key與當前的ThreadLocal是同一個對象,則直接替換value,返回;
    • 如果Entry中的key==null,表示當前的Entry已經(jīng)是一個無效的Entry了,執(zhí)行replaceStaleEntry方法進行處理(如果從該Entry后的第一個元素開始在一個連續(xù)的Entry子數(shù)組內(nèi)找到一個key與當前ThreadLocal相等的元素,則替換值,然后互換該Entry和當前的key==null的Entry位置;如果沒找到,則將當前的key==null的Entry設置為新的將設置的Entry)返回;
    • 如果發(fā)生了hash沖突,但是直接索引位置上的Entry即不是當前的ThreadLocal的Entry,也不是一個無效的Entry,則需要向后循環(huán)遍歷下一個Entry元素,再進行上述邏輯的處理。直到找到一個Entry為null的位置或者Entry失效的位置或者Entry就是當前的ThreadLocal的Entry的位置。
  • 假設沒有發(fā)生hash沖突或者經(jīng)過上述的for循環(huán)找到了一個Entry為null的位置的時候,在該位置創(chuàng)建設置新添加的Entry,數(shù)組元素+1;
  • 嘗試回收部分無效的Entry,如果成功,則當前的數(shù)組中的元素個數(shù)將小于閾值,則肯定不需要rehash();如果失敗,且數(shù)組元素個數(shù) > threshold,執(zhí)行rehash() -- 即執(zhí)行部分資源清理與可能的擴容操作。

replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)

        /**
         * 對一段連續(xù)的Entry子數(shù)組進行操作
         * Replace a stale entry encountered during a set operation
         * with an entry for the specified key.
         */
        private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
            // 獲取當前的table及其容量
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            // 設置需要被回收的槽索引(無效的Entry索引)
            int slotToExpunge = staleSlot;
            /**
             * 向前遍歷,直到找到第一個為null的槽
             * 在遍歷的過程中,如果找到了無效的Entry,則將slotToExpunge設置為當前的被回收的無效索引
             */
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

            /**
             * 向后遍歷,直到找到第一個為null的槽或者找到與當前Entry key相等的槽
             */
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                // The newly stale slot, or any other stale slot
                // encountered above it, can then be sent to expungeStaleEntry
                // to remove or rehash all of the other entries in run.
                // 如果key相等,直接替換
                if (k == key) {
                    // 替換值
                    e.value = value;

                    /**
                     * 互換 tab[i] 和 tab[staleSlot],將無效的tab[staleSlot]中的Entry后移;
                     * 將正確的 tab[i]中的Entry前移,方便下一次的查找(可以根據(jù)直接槽索引查找到了)
                     */
                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    // 如果待回收的槽索引就是當前的無效索引,則設置待回收的槽索引為i
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                // 如果 "k==null && 待回收的槽索引就是當前的無效索引",則設置待回收的槽索引為i
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // 置空原本的entry的value,將當前的Entry添加到這個槽上,原本的Entry等待GC
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);

            // If there are any other stale entries in run, expunge them
            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }

步驟:

  • 首先從當前索引staleSlot的前一個元素開始向前遍歷一個連續(xù)的Entry子數(shù)組,一直找到該連續(xù)子數(shù)組最靠前的那個無效索引,設置為slotToExpunge
  • 然后從staleSlot的 后一個元素開始 向后遍歷一個連續(xù)的Entry子數(shù)組
  • 如果找到的Entry的key是當前的ThreadLocal,則直接替換value,之后將找到的Entry放置到table[staleSlot]處,也就是Entry本身的直接索引處,而Entry本身的真實索引的位置放置為原來的table[staleSlot];然后進行一次連續(xù)段的回收,之后進行一次log次數(shù)級別的回收,最后返回
  • 如果經(jīng)過循環(huán),沒有找到key與ThreadLocal相等的Entry,則直接將要set的Entry放置到table[staleSlot]處,最后根據(jù)需要進行一次連續(xù)段的回收,之后在進行一次log次數(shù)級別的回收

關于回收與擴容相關的操作,后續(xù)分析。

3.7 從ThreadLocalMap中獲取Entry

        private Entry getEntry(ThreadLocal<?> key) {
            // 計算所查找的Entry在table[]中的索引
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            // 如果快速查找到的Entry!=null并且key也相等,則直接返回(也就是說在直接的hash槽找到了Entry)
            if (e != null && e.get() == key)
                return e;
            //
            else
                return getEntryAfterMiss(key, i, e);
        }

        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            /**
             * 循環(huán)遍歷,直到找到下一個不為null的Entry
             */
            while (e != null) {
                ThreadLocal<?> k = e.get();
                // 如果key相等,表示找到了,直接返回
                if (k == key)
                    return e;
                // 如果key==null,則嘗試回收或者整理"i到i之后的第一個Entry為null的索引(即返回值)之間"的Entry數(shù)據(jù)
                if (k == null)
                    expungeStaleEntry(i);
                // 繼續(xù)向后遍歷
                else
                    i = nextIndex(i, len);
                e = tab[i];
            }
            // 如果沒找到,返回null
            return null;
        }

步驟:

  • 計算當前的ThreadLocal的直接索引,獲取直接索引位置的Entry
  • 如果該位置的Entry不為null且Entry的key與當前的ThreadLocal是同一個元素,則直接返回該Entry
  • 如果該位置的Entry為null,直接返回null(看getEntryAfterMiss邏輯)
  • 如果該位置的Entry不為null,但是Entry的key與當前的ThreadLocal不是同一個元素,則表明發(fā)生了hash沖突。此時,會不斷的向后循環(huán)尋找,直到找到了要查找的Entry或者遍歷到的Entry為null,如果找到了,返回Entry,如果沒找到,返回null。

注意:

  • 在沒有根據(jù)直接索引找到Entry后的while循環(huán)中,如果檢測到無效的Entry時,會使用expungeStaleEntry方法嘗試回收或者整理"i到i之后的一段連續(xù)的不為null的索引(即返回值)之間"的Entry數(shù)據(jù)。
  • 對于get來講,線性探測法是以Entry!=null為循環(huán)查找結(jié)束條件的,所以要注意remove時的操作。

3.8 從ThreadLocalMap中刪除Entry

        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len - 1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    // key - ThreadLocal置為null
                    e.clear();
                    // 嘗試回收或者整理"i到i之后的一段連續(xù)的不為null的索引(即返回值)之間"的Entry數(shù)據(jù)
                    expungeStaleEntry(i);
                    return;
                }
            }
        }

步驟:

  • 計算key的直接索引,從直接索引的Entry開始,依舊在一段連續(xù)的不為null的Entry進行循環(huán),如果找到的Entry的key與當前的key相同,則進行刪除操作:
    • 首先將Entry的key置空(e.clear())
    • 嘗試回收或者整理"i到i之后的一段連續(xù)的不為null的索引(即返回值)之間"的Entry數(shù)據(jù)

注意:

  • 這里的expungeStaleEntry中的整理邏輯非常重要,直接影響get查詢的查詢鏈是否會斷掉的問題。

3.9 rehash()與resize()

        private void rehash() {
            // 回收table中所有無效的Entry,如果可以有效減小size到不滿足下邊的擴容條件,則下邊的resize()不需要執(zhí)行
            expungeStaleEntries();

            // 實際上擴容是發(fā)生在size>=threshold*3/4==length*(2/3)*(3/4)==lenth*1/2
            if (size >= threshold - threshold / 4)
                resize();
        }

        /**
         * 擴容兩倍數(shù)組,之后rehash
         */
        private void resize() {
            // 獲取舊數(shù)組及其長度
            Entry[] oldTab = table;
            int oldLen = oldTab.length;
            // 新數(shù)組擴容2倍
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];
            int count = 0;

            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    // 無效的Entry
                    if (k == null) {
                        e.value = null; // Help the GC
                    } else {
                        int h = k.threadLocalHashCode & (newLen - 1);
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen);
                        newTab[h] = e;
                        count++;
                    }
                }
            }
            // 設置新的閾值
            setThreshold(newLen);
            size = count;
            table = newTab;
        }

rehash()僅用在ThreadLocalMap#set(ThreadLocal<?> key, Object value)的末尾部分。

rehash()步驟:

  • 首先進行一次全表回收無效的Entry操作,如果可以有效減小數(shù)組中的元素個數(shù)size到數(shù)組長度的一半以下,則不需要進行擴容,如果達到一半及以上,則進行擴容操作。

resize()步驟:

  • 創(chuàng)建新的Entry[]是舊的長度的兩倍;之后遍歷舊數(shù)組,之后根據(jù)新的數(shù)組的長度重新計算每個有效Entry的直接索引,根據(jù)線性探測法重新填充數(shù)組。
  • 最后設置新的Entry[]的閾值threshold、數(shù)組元素個數(shù)size,將新數(shù)組賦值給全局變量table

四、回收機制

ThreadLocal 回收機制 就是如下的四個函數(shù)以及WeakReference<ThreadLocal<?>>

1、expungeStaleEntry(int staleSlot) 
從staleSlot開始的連續(xù)Entry子數(shù)組中的無效Entry的清理和 整理:

2、expungeStaleEntries()
清理整個table中的無效Entry

3、cleanSomeSlots(int i, int n)
進行l(wèi)og2(n)次循環(huán)(如果正好遍歷到無效的Entry,則n=table.length,調(diào)用expungeStaleEntry進行連續(xù)段的清理和整理);
是一種介于完全不掃描和完全掃描(expungeStaleEntries)之間的一種掃描方式

4、replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)
該函數(shù)已在set部分分析:從table[staleSlot]后的第一個元素開始在一個連續(xù)的Entry子數(shù)組內(nèi)如果找到一個key與當前ThreadLocal相等的Entry,則替換值,然后互換該Entry和table[staleSlot](①),返回;如果沒找到,則將table[staleSlot]直接設置為新的將設置的Entry(②)
①②處都會對該Entry所在的連續(xù)段,從連續(xù)段的第一個無效Entry(可能在當前的Entry之前)開始或者從該Entry的后一個元素開始(表示該Entry之前的元素全部有效)進行一次連續(xù)段的 回收和整理,之后在進行一次log級別的回收;

4.1 expungeStaleEntry(int staleSlot)

        /**
         * 嘗試回收或者整理"staleSlot到staleSlot之后的第一個Entry為null的索引(即返回值)之間"的Entry數(shù)據(jù)
         *
         * @return the index of the next null slot after staleSlot
         * (all between staleSlot and this slot will have been checked
         * for expunging).
         */
        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;

            // 回收table[staleSlot]處的值,value置為null,整體置為null
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            // size-1
            size--;

            // Rehash until we encounter null
            Entry e;
            int i;
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                // 如果當前的table[i] Entry無用,則直接回收
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {
                    /**
                     * 問題:
                     * 假設ThreadLocalMap的table長度為4的,其中table[1]、table[2]已經(jīng)有值,而且key都不為null,
                     * 這時候set一個threadlocal1,算出來的索引為1,根據(jù)線性探測法,最后會設置到table[3];
                     * 然后,table[2]被remove,此時table[2]==null;
                     * 然后,get threadlocal1,此時算出索引還是1, 從table[1]向后查找,結(jié)果table[2]==null,則查找鏈就斷了,結(jié)果返回null,
                     * 而沒有辦法查找到真正的table[3],這種情況threadlocal是怎么處理的呢?
                     *
                     * 答案就是下邊的代碼:會對remove元素的后邊的元素循環(huán)遍歷,直到遍歷到非null為止。對每個元素做如下邏輯:
                     * 如果當前元素的k==null,則回收,如果不是無效元素,則計算其真實的直接索引,例如重新計算table[3]的索引,此處為1,與當前遍歷的索引3不同,
                     * 所以將table[3]先置空,然后從table[1]開始向后尋找第一個為null的位置,重新放置table[3]的元素,此處就是table[2]=table[3]
                     *
                     * 簡言之:remove一個Entry之后,會整理remove的Entry元素之后的一塊兒連續(xù)的(中間沒有null的entry)元素子數(shù)組,全部使用線性探測法重新計算,
                     * 這樣remove掉的那一塊兒就會被后續(xù)的Entry重新填充,查詢鏈就不會斷
                     */
                    // 計算真實的直接索引位置,因為可能k是因為hash沖突循環(huán)后移的
                    int h = k.threadLocalHashCode & (len - 1);
                    // 如果h和i,不相等,表示確實k是循環(huán)后移了的
                    if (h != i) {
                        // 首先將之前的Entry置為null
                        tab[i] = null;

                        // 一直向后循環(huán)找到第一個為null的索引,之后table[h] = e(將當前的Entry放置到table[h]新的hash位置上)
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            return i;
        }

步驟:

  • 將table[staleSlot]的value和Entry都置為null,元素個數(shù)size - 1
  • 遍歷staleSlot之后的一段連續(xù)的Entry,如果Entry的key==null,將該Entry的value和Entry本身置空,如果key是有效的,將要進行 整理操作
    • 計算直接索引的位置,如果當前的Entry的真實索引與直接索引不相等,則表示該Entry之前是因為hash沖突被沖到后邊的slot里的,此處會重新從直接索引位置開始向后找到第一個不為null的的位置,將當前的Entry元素放置到這里,這樣會保證原本連續(xù)的Entry子數(shù)組,在從中刪除一個Entry后,子數(shù)組中剩下的元素還可以組成一個連續(xù)的Entry數(shù)組,這保證了get操作的查詢鏈不會斷。

注意:

  • remove一個元素后,會調(diào)用這里的 整理操作 進行連續(xù)Entry子數(shù)組的整理,不會使查詢鏈斷掉。要仔細的看注釋中的那個例子。
  • 整理的過程相當于一次從staleSlot開始的連續(xù)段 內(nèi)存碎片整理,將無效的Entry刪除,有效的Entry按照原本的順序聚集。

4.2 expungeStaleEntries()

        /**
         * 回收table中所有的無效Entry.
         */
        private void expungeStaleEntries() {
            Entry[] tab = table;
            int len = tab.length;
            for (int j = 0; j < len; j++) {
                Entry e = tab[j];
                if (e != null && e.get() == null)
                    expungeStaleEntry(j);
            }
        }

遍歷table中所有的元素,發(fā)現(xiàn)無效的Entry后,就調(diào)用expungeStaleEntry(int staleSlot)進行連續(xù)子數(shù)組的資源清理與整理。該方法影響較大,僅僅用在rehash()中。

4.3 cleanSomeSlots(int i, int n)

        /**
         * 對數(shù)級別的掃描,介于 "不掃描(fast but retains garbage)" 和 "完全掃描(find all garbage but would cause some insertions to take O(n) time)" 之間
         *
         * @param i a position known NOT to hold a stale entry. The
         *          scan starts at the element after i.
         * @param n scan control: {@code log2(n)} cells are scanned,
         *          unless a stale entry is found, in which case
         *          {@code log2(table.length)-1} additional cells are scanned.
         *          When called from insertions, this parameter is the number
         *          of elements, but when from replaceStaleEntry, it is the
         *          table length. (Note: all this could be changed to be either
         *          more or less aggressive by weighting n instead of just
         *          using straight log n. But this version is simple, fast, and
         *          seems to work well.)
         * @return true if any stale entries have been removed.
         */
        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                // 向后循環(huán)獲取下一個索引,如果已經(jīng)到了末尾,則返回table[0]
                i = nextIndex(i, len);
                Entry e = tab[i];
                // 如果此Entry已經(jīng)無效了,key-ThreadLocal已經(jīng)被回收掉了,無法再通過該key獲取Entry,如果不回收此Entry,則會造成內(nèi)存泄露
                if (e != null && e.get() == null) {
                    n = len;
                    removed = true;
                    // 嘗試回收或者rehash"i到i之后的第一個Entry為null的索引(即返回值)之間"的Entry數(shù)據(jù)
                    i = expungeStaleEntry(i);
                }
            } while ((n >>>= 1) != 0); // n / 2,假設n=16,當然set的時候n=size,則log2(16)=4,也就是說需要循環(huán)4次(也可以直接理解為16/2/2/2/2=1,第5次1/2==0則不再進行循環(huán))
            return removed;
        }

該方法只掃描一部分Entry,進行l(wèi)og2(n)次循環(huán),如果恰好掃描到無效的Entry,則n=table.length,即進行l(wèi)og2(table.length)次循環(huán)。每個循環(huán)的步驟:

  • 獲取下一個索引位置i及該位置上的Entry
  • 如果該Entry==null或者Entry依然有效,則進行下一次循環(huán);
  • 如果該Entry!=null且無效,則先將n設置為長度(在set方法中n本來是)調(diào)用expungeStaleEntry(i)回收或者 整理 i之后的一段連續(xù)的Entry,之后再進行一下輪的回收和整理操作。

4.4 弱引用WeakReference

由于Entry繼承了弱引用,他的key是referent,當發(fā)生gc時,如果該key已經(jīng)不可達了,則直接回收(key==null),之后在如下操作時刪除key==null的Entry。

  • get時調(diào)用expungeStaleEntry
  • set時調(diào)用replaceStaleEntry或者cleanSomeSlots或者expungeStaleEntries
  • remove時調(diào)用expungeStaleEntry

五、總結(jié)

5.1 set整個流程

  1. 首先將table[]成員變量局部化(在后續(xù)的操作中如果會對table成員變量進行多次操作,局部化會提高性能)
  2. 獲取數(shù)組長度,計算直接索引
  3. 然后進行hash沖突檢測與處理(線性探測法)
    • 獲取直接索引位置上的Entry,如果不為null,表示發(fā)生了hash沖突,則先獲取Entry中的key,如果該key與當前的ThreadLocal是同一個對象,則直接替換value,返回;
    • 如果Entry中的key==null,表示當前的Entry已經(jīng)是一個無效的Entry了,執(zhí)行replaceStaleEntry方法進行處理(replaceStaleEntry:如果從該Entry后的第一個元素開始在一個連續(xù)的Entry子數(shù)組內(nèi)找到一個key與當前ThreadLocal相等的元素,則替換值,然后互換該Entry和當前的key==null的Entry位置;如果沒找到,則將當前的key==null的Entry設置為新的將設置的Entry)返回;
    • 如果發(fā)生了hash沖突,但是直接索引位置上的Entry即不是當前的ThreadLocal的Entry,也不是一個無效的Entry,則需要向后循環(huán)遍歷下一個Entry元素,再進行上述邏輯的處理。直到找到一個Entry為null的位置或者Entry失效的位置或者Entry就是當前的ThreadLocal的Entry的位置。
  4. 假設沒有發(fā)生hash沖突或者經(jīng)過上述的for循環(huán)找到了一個Entry為null的位置的時候,在該位置創(chuàng)建設置新添加的Entry,數(shù)組元素+1;
  5. 嘗試log級別的回收部分無效的Entry,如果成功,則當前的數(shù)組中的元素個數(shù)將小于閾值,則肯定不需要rehash();如果失敗,且數(shù)組元素個數(shù) >= threshold,執(zhí)行rehash()
    • 首先對整個table回收無效的Entry操作,如果可以有效減小數(shù)組中的元素個數(shù)size到數(shù)組長度的一半以下,則不需要進行擴容,如果size達到length的一半及以上,則進行擴容操作
    • 擴容:創(chuàng)建新的Entry[]是舊的長度的兩倍;之后遍歷舊數(shù)組,根據(jù)新的數(shù)組的長度重新計算每個有效Entry的直接索引,根據(jù)線性探測法重新填充數(shù)組。
      最后設置rehash()閾值、數(shù)組元素個數(shù)size,將新數(shù)組賦值給全局變量table。

5.2 get整個流程

  1. 獲取當前線程
  2. 獲取當前線程的ThreadLocalMap屬性
  3. 如果當前線程的ThreadLocalMap對象不為null,從ThreadLocalMap對象中獲取key為當前的ThreadLocal的Entry
    • 計算當前的ThreadLocal的直接索引,獲取直接索引位置的Entry
    • 如果該位置的Entry不為null且Entry的key與當前的ThreadLocal是同一個元素,則直接返回該Entry
    • 如果該位置的Entry為null,直接返回null(看getEntryAfterMiss邏輯)
    • 如果該位置的Entry不為null,但是Entry的key與當前的ThreadLocal不是同一個元素,則表明發(fā)生了hash沖突。此時,會不斷的向后循環(huán)尋找,直到找到了要查找的Entry或者遍歷到的Entry為null,如果找到了,返回Entry,如果沒找到,返回null。
  4. 如果該Entry不為null,直接獲取entry的value,返回即可
  5. 如果當前線程的ThreadLocalMap為null(還未createMap(Thread t, T firstValue))或者key為當前的ThreadLocal的Entry為null(沒有調(diào)用map.set(this, value)),則執(zhí)行設置初始化值的方法。

5.3 各類回收的時機

  1. set或者setInitialValue
    • 如果從直接索引開始向后遍歷的一段連續(xù)Entry段內(nèi)發(fā)生hash沖突且沖突的Entry已經(jīng)是無效的,此時會執(zhí)行replaceStaleEntry,會執(zhí)行回收和整理操作
    • 如果沒有發(fā)生hash沖突 或者 發(fā)生了hash沖突但是在從直接索引開始的一段連續(xù)Entry段內(nèi)沒有找到與設置的key相等的Entry或者無效的Entry,如果直接索引所在的整個連續(xù)Entry段如果有無效Entry,則在set的尾部都會執(zhí)行一次log級別的回收操作,如果回收不理想,需要進行rehash()
    • rehash()會進行一次全table的回收
  2. get
    • 在通過直接索引沒有g(shù)et到數(shù)據(jù)的時候,會循環(huán)遍歷一段連續(xù)的Entry,如果遍歷到的Entry是無效的,則進行一次連續(xù)Entry子數(shù)組的回收和整理
  3. remove
    • 當找到需要被刪除的Entry時,進行一次連續(xù)Entry子數(shù)組的回收和整理
  4. gc時
    • 由于Entry繼承了弱引用,他的key是referent,當發(fā)生gc時,如果該key已經(jīng)無引用指向了,則直接回收(key==null了),之后再根據(jù)上述1,2,3的操作刪除key==null的Entry。(這也是為什么ThreadLocal沒有為Entry綁定ReferenceQueue的原因,因為Entry的刪除已經(jīng)可以發(fā)生在1,2,3中了;但是WeakHashMap不一樣,WeakHashMap的Entry繼承于WeakReference,其key是referent,當發(fā)生gc時,如果key不可達了,則回收key,之后將key對應的Entry放入queue,在后續(xù)對WeakHashMap的操作 - getTable()/size()/resize(int newCapacity)的時候進行Entry的回收)

5.4 ThreadLocal與線程池

線程池中的線程由于會被復用,所以線程池中的每一條線程在執(zhí)行task結(jié)束后,要清理掉其ThreadLocalMap及其中的各個Entry,否則,當這條線程在下一次被復用的時候,其ThreadLocalMap信息還存儲著上一次被使用的時的信息;另外,假設這條線程不再被使用,但是這個線程有可能不會被銷毀(與線程池的類型和配置相關),那么其上的ThreadLocal將發(fā)生了資源泄露。所以netty的io線程池使用FastThreadLocalRunnable wrap了Runnable任務,當任務執(zhí)行結(jié)束后,會做InternalThreadLocalMap和FastThreadLocal的清理操作。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容