Netty對(duì)象池實(shí)現(xiàn)分析

什么是對(duì)象池技術(shù)?對(duì)象池應(yīng)用在哪些地方?

對(duì)象池其實(shí)就是緩存一些對(duì)象從而避免大量創(chuàng)建同一個(gè)類型的對(duì)象,類似線程池的概念。對(duì)象池緩存了一些已經(jīng)創(chuàng)建好的對(duì)象,避免需要時(shí)才創(chuàng)建對(duì)象,同時(shí)限制了實(shí)例的個(gè)數(shù)。池化技術(shù)最終要的就是重復(fù)的使用池內(nèi)已經(jīng)創(chuàng)建的對(duì)象。從上面的內(nèi)容就可以看出對(duì)象池適用于以下幾個(gè)場(chǎng)景:

  1. 創(chuàng)建對(duì)象的開銷大
  2. 會(huì)創(chuàng)建大量的實(shí)例
  3. 限制一些資源的使用

如果創(chuàng)建一個(gè)對(duì)象的開銷特別大,那么提前創(chuàng)建一些可以使用的并且緩存起來(池化技術(shù)就是重復(fù)使用對(duì)象,提前創(chuàng)建并緩存起來重復(fù)使用就是池化)可以降低創(chuàng)建對(duì)象時(shí)的開銷。

會(huì)大量創(chuàng)建實(shí)例的場(chǎng)景,重復(fù)的使用對(duì)象可減少創(chuàng)建的對(duì)象數(shù)量,降低GC的壓力(如果這些對(duì)象的生命周期都很短暫,那么可以降低YoungGC的頻率;如果生命周期很長,那么可以避免掉這些對(duì)象被FullGC——生命周期長,且大量創(chuàng)建,這里就要結(jié)合系統(tǒng)的TPS等考慮池的大小了)。

對(duì)于限制資源的使用更多的是一種保護(hù)策略,比如數(shù)據(jù)庫鏈接池。除去這些對(duì)象本身的開銷外,他們對(duì)外部系統(tǒng)也會(huì)造成壓力,比如大量創(chuàng)建鏈接對(duì)DB也是有壓力的。那么池化除了優(yōu)化資源以外,本身限制了資源數(shù),對(duì)外部系統(tǒng)也起到了一層保護(hù)作用。

如何實(shí)現(xiàn)對(duì)象池?

開源實(shí)現(xiàn):Apache Commons Pool
自己實(shí)現(xiàn):Netty輕量級(jí)對(duì)象池實(shí)現(xiàn)

Apache Commons Pool開源軟件庫提供了一個(gè)對(duì)象池API和一系列對(duì)象池的實(shí)現(xiàn),支持各種配置,比如活躍對(duì)象數(shù)或者閑置對(duì)象個(gè)數(shù)等。DBCP數(shù)據(jù)庫連接池基于Apache Commons Pool實(shí)現(xiàn)。

Netty自己實(shí)現(xiàn)了一套輕量級(jí)的對(duì)象池。在Netty中,通常會(huì)有多個(gè)IO線程獨(dú)立工作,基于NioEventLoop的實(shí)現(xiàn),每個(gè)IO線程輪詢單獨(dú)的Selector實(shí)例來檢索IO事件,并在IO來臨時(shí)開始處理。最常見的IO操作就是讀寫,具體到NIO就是從內(nèi)核緩沖區(qū)拷貝數(shù)據(jù)到用戶緩沖區(qū)或者從用戶緩沖區(qū)拷貝數(shù)據(jù)到內(nèi)核緩沖區(qū)。這里會(huì)涉及到大量的創(chuàng)建和回收Buffer,Netty對(duì)Buffer進(jìn)行了池化從而降低系統(tǒng)開銷。

<h3>Netty對(duì)象池實(shí)現(xiàn)分析</h3>

上面提到了IO操作中會(huì)涉及到大量的緩沖區(qū)操作,NIO提供了兩種Buffer最為緩沖區(qū):DirectByteBuffer和HeapByteBuffer。Netty在兩種緩沖區(qū)的基礎(chǔ)上進(jìn)行了池化進(jìn)而提升性能。

DirectByteBuffer
DirectByteBuffer顧名思義是直接內(nèi)存(Direct Memory)上的Byte緩存區(qū),直接內(nèi)存不是JVM Runtime數(shù)據(jù)區(qū)域的一部分,也不是Java虛擬機(jī)規(guī)范中定義的內(nèi)存區(qū)域。簡(jiǎn)單的說這部分就是機(jī)器內(nèi)存,分配的大小等都和虛擬機(jī)限制無關(guān)。JDK1.4中開始我們可以使用native方法在直接內(nèi)存上來分配內(nèi)存,并在JVM堆內(nèi)存上維持一個(gè)引用來進(jìn)行訪問,當(dāng)JVM堆內(nèi)存上的引用被回收后,這塊內(nèi)存被操作系統(tǒng)回收。

HeapByteBuffer
HeapByteBuffer是在JVM堆內(nèi)存上分配的Byte緩沖區(qū),可以簡(jiǎn)單的理解為byte[]數(shù)組的一種封裝。基于HeapByteBuffer的寫流程通常要先在直接內(nèi)存上分配一個(gè)臨時(shí)的緩沖區(qū),將數(shù)據(jù)從Heap拷貝到直接內(nèi)存,然后再將直接內(nèi)存的數(shù)據(jù)發(fā)送到IO設(shè)備的緩沖區(qū),之后回收直接內(nèi)存。讀流程也類似。使用DirectByteBuffer避免了不必要的拷貝工作,所以在性能上會(huì)有提升。

DirectByteBuffer的缺點(diǎn)在于分配和回收的的代價(jià)相對(duì)較大,因此DirectByteBuffer適用于緩沖區(qū)可以重復(fù)使用的場(chǎng)景。

Netty的池化實(shí)現(xiàn)

以Buffer為例,對(duì)應(yīng)直接內(nèi)存和堆內(nèi)存,Netty的池化分別為PooledDirectByteBuffer和PolledHeapByteBuffer。

ByteBuffer繼承關(guān)系

通過PooledDirectByteBuffer的API定義可以看到,它的構(gòu)造方法是私有的,而創(chuàng)建一個(gè)實(shí)例的入口是:

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }

可見RECYCLER是池化的核心,創(chuàng)建對(duì)象時(shí)都通過RECYCLER.get來獲得一個(gè)實(shí)例(Recycler就是Netty實(shí)輕量級(jí)池化技術(shù)的核心)。

Recycler實(shí)現(xiàn)分析(源碼分析)

/**
 * Light-weight object pool based on a thread-local stack.
 *
 * @param <T> the type of the pooled object
 */
public abstract class Recycler<T>

從注釋可以看出Netty基于thread-local實(shí)現(xiàn)了輕量級(jí)的對(duì)象池。

Recycler成員

Recycler的API非常簡(jiǎn)單:

  • get():獲取一個(gè)實(shí)例
  • recycle(T, Handle<T>):回收一個(gè)實(shí)例
  • newObject(Handle<T>):創(chuàng)建一個(gè)實(shí)例

get流程

    @SuppressWarnings("unchecked")
    public final T get() {
        if (maxCapacity == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

get的簡(jiǎn)化流程(這里先不深究細(xì)節(jié)):

  1. 拿到當(dāng)前線程對(duì)應(yīng)的stack
  2. 從stack中pop出一個(gè)元素
  3. 如果不為空則返回,否則創(chuàng)建一個(gè)新的實(shí)例

可以大概明白Stack是對(duì)象池化背后存儲(chǔ)實(shí)例的數(shù)據(jù)結(jié)構(gòu):如果能從stack中拿到可用的實(shí)例就不再創(chuàng)建新的實(shí)例。

recycle流程

一個(gè)“池子”最核心的就是做兩件事情,第一個(gè)是上面的Get,即從池子中拿出一個(gè)可用的實(shí)例。另一個(gè)就是在用完后將數(shù)據(jù)放回到池子中(線程池、連接池都是這樣)。

    public final boolean recycle(T o, Handle<T> handle) {
        if (handle == NOOP_HANDLE) {
            return false;
        }

        DefaultHandle<T> h = (DefaultHandle<T>) handle;
        if (h.stack.parent != this) {
            return false;
        }

        h.recycle(o);
        return true;
    }


    public void recycle(Object object) {
        if (object != value) {
            throw new IllegalArgumentException("object does not belong to handle");
        }
        Thread thread = Thread.currentThread();
        if (thread == stack.thread) {
            stack.push(this);
            return;
        }
        // we don't want to have a ref to the queue as the value in our weak map
        // so we null it out; to ensure there are no races with restoring it later
        // we impose a memory ordering here (no-op on x86)
        Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
        WeakOrderQueue queue = delayedRecycled.get(stack);
        if (queue == null) {
            delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread));
        }
        queue.add(this);
    }

回收一個(gè)實(shí)例核心的步驟由以上兩個(gè)方法組成:Recycler的recycle方法和DefaultHandle的recycle方法。
Recycler的recycle方法主要做了一些參數(shù)驗(yàn)證。
DefaultHandle的recycle方法流程如下:

  1. 如果當(dāng)前線程是當(dāng)前stack對(duì)象的線程,那么將實(shí)例放入stack中,否則:
  2. 獲取當(dāng)前線程對(duì)應(yīng)的Map<Stack, WeakOrderQueue>,并將實(shí)例加入到Stack對(duì)應(yīng)的Queue中。

從獲取實(shí)例和回收實(shí)例的代碼可以看出,整個(gè)對(duì)象池的核心實(shí)現(xiàn)由ThreadLocal和Stack及WrakOrderQueue構(gòu)成,接著來看Stack和WrakOrderQueue的具體實(shí)現(xiàn),最后概括整體實(shí)現(xiàn)。

Stack實(shí)體

Stack<T>
    parent:Recycler               // 關(guān)聯(lián)對(duì)應(yīng)的Recycler
    thread:Thread                 // 對(duì)應(yīng)的Thread
    elements:DefaultHandle<?>[]   // 存儲(chǔ)DefaultHandle的數(shù)組
    head:WeakOrderQueue           // 指向WeakOrderQueue元素組成的鏈表的頭部“指針”
    cursor,prev:WrakOrderQueue    // 當(dāng)前游標(biāo)和前一元素的“指針”

pop實(shí)現(xiàn)

    DefaultHandle<T> pop() {
        int size = this.size;
        if (size == 0) {
            if (!scavenge()) {
                return null;
            }
            size = this.size;
        }
        size --;
        DefaultHandle ret = elements[size];
        if (ret.lastRecycledId != ret.recycleId) {
            throw new IllegalStateException("recycled multiple times");
        }
        ret.recycleId = 0;
        ret.lastRecycledId = 0;
        this.size = size;
        return ret;
    }

  1. 如果size為0(這里的size表示stack中可用的元素),嘗試進(jìn)行scavenge。
  2. 返回elements中的最后一個(gè)元素。
    boolean scavenge() {
        // continue an existing scavenge, if any
        if (scavengeSome()) {
            return true;
        }

        // reset our scavenge cursor
        prev = null;
        cursor = head;
        return false;
    }

    boolean scavengeSome() {
        WeakOrderQueue cursor = this.cursor;
        if (cursor == null) {
            cursor = head;
            if (cursor == null) {
                return false;
            }
        }

        boolean success = false;
        WeakOrderQueue prev = this.prev;
        do {
            if (cursor.transfer(this)) {
                success = true;
                break;
            }

            WeakOrderQueue next = cursor.next;
            if (cursor.owner.get() == null) {
                // If the thread associated with the queue is gone, unlink it, after
                // performing a volatile read to confirm there is no data left to collect.
                // We never unlink the first queue, as we don't want to synchronize on updating the head.
                if (cursor.hasFinalData()) {
                    for (;;) {
                        if (cursor.transfer(this)) {
                            success = true;
                        } else {
                            break;
                        }
                    }
                }
                if (prev != null) {
                    prev.next = next;
                }
            } else {
                prev = cursor;
            }

            cursor = next;

        } while (cursor != null && !success);

        this.prev = prev;
        this.cursor = cursor;
        return success;
    }

簡(jiǎn)要概括上面的流程就是Stack從“背后”的Queue中獲取可用的實(shí)例,如果Queue中沒有可用實(shí)例就遍歷到下一個(gè)Queue(Queue組成了一個(gè)鏈表)。

push實(shí)現(xiàn)

    void push(DefaultHandle<?> item) {
        if ((item.recycleId | item.lastRecycledId) != 0) {
            throw new IllegalStateException("recycled already");
        }
        item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
        
        int size = this.size;
        if (size >= maxCapacity) {
            // Hit the maximum capacity - drop the possibly youngest object.
            return;
        }
        if (size == elements.length) {
            elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
        }

        elements[size] = item;
        this.size = size + 1;
    }

push相對(duì)pop流程要更加簡(jiǎn)單,直接將回收的元素放到隊(duì)尾(實(shí)際是一個(gè)數(shù)組)。

WeakOrderQueue實(shí)體

WeakOrderQueue
    head,tail:Link          // 內(nèi)部元素的指針(WeakOrderQueue內(nèi)部存儲(chǔ)的是一個(gè)Link的鏈表)
    next:WeakOrderQueue     // 指向下一個(gè)WeakOrderQueue的指針
    owner:Thread            // 對(duì)應(yīng)的線程

WeakOrderQueue核心包含兩個(gè)方法,add方法將元素添加到自身的“隊(duì)列”中,transfer方法將自己擁有的元素“傳輸”到Stack中。

Linke結(jié)構(gòu)如下

    private static final class Link extends AtomicInteger {
        private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];

        private int readIndex;
        private Link next;
    }

Link內(nèi)部包含了一個(gè)數(shù)組用于存放實(shí)例,同時(shí)標(biāo)記了讀取位置的索引和下一個(gè)Link元素的指針。
結(jié)合Link的結(jié)構(gòu),Weak的結(jié)構(gòu)如下:


Link結(jié)構(gòu)

add方法

    void add(DefaultHandle<?> handle) {
        handle.lastRecycledId = id;

        Link tail = this.tail;
        int writeIndex;
        if ((writeIndex = tail.get()) == LINK_CAPACITY) {
            this.tail = tail = tail.next = new Link();
            writeIndex = tail.get();
        }
        tail.elements[writeIndex] = handle;
        handle.stack = null;
        // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
        // this also means we guarantee visibility of an element in the queue if we see the index updated
        tail.lazySet(writeIndex + 1);
    }

add操作將元素添加到tail指向的Link對(duì)象中,如果Link已滿則創(chuàng)建一個(gè)新的Link實(shí)例。

transfer方法

boolean transfer(Stack<?> dst) {

    Link head = this.head;
    if (head == null) {
        return false;
    }

    if (head.readIndex == LINK_CAPACITY) {
        if (head.next == null) {
            return false;
        }
        this.head = head = head.next;
    }

    final int srcStart = head.readIndex;
    int srcEnd = head.get();
    final int srcSize = srcEnd - srcStart;
    if (srcSize == 0) {
        return false;
    }

    final int dstSize = dst.size;
    final int expectedCapacity = dstSize + srcSize;

    if (expectedCapacity > dst.elements.length) {
        final int actualCapacity = dst.increaseCapacity(expectedCapacity);
        srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
    }

    if (srcStart != srcEnd) {
        final DefaultHandle[] srcElems = head.elements;
        final DefaultHandle[] dstElems = dst.elements;
        int newDstSize = dstSize;
        for (int i = srcStart; i < srcEnd; i++) {
            DefaultHandle element = srcElems[i];
            if (element.recycleId == 0) {
                element.recycleId = element.lastRecycledId;
            } else if (element.recycleId != element.lastRecycledId) {
                throw new IllegalStateException("recycled already");
            }
            element.stack = dst;
            dstElems[newDstSize ++] = element;
            srcElems[i] = null;
        }
        dst.size = newDstSize;

        if (srcEnd == LINK_CAPACITY && head.next != null) {
            this.head = head.next;
        }

        head.readIndex = srcEnd;
        return true;
    } else {
        // The destination stack is full already.
        return false;
    }
}

transfer方法收件根據(jù)stack的容量和自身擁有的實(shí)例數(shù),計(jì)算出最終需要轉(zhuǎn)移的實(shí)例數(shù)。之后就是數(shù)組的拷貝和指標(biāo)的調(diào)整。
基本上所有的流程有個(gè)大致的了解,下面從整體的角度回顧一下Netty對(duì)象池的實(shí)現(xiàn)。

整體實(shí)現(xiàn)
結(jié)構(gòu)

整體結(jié)構(gòu)

整個(gè)設(shè)計(jì)上核心的幾點(diǎn):
1. Stack相當(dāng)于是一級(jí)緩存,同一個(gè)線程內(nèi)的使用和回收都將使用一個(gè)Stack
2. 每個(gè)線程都會(huì)有一個(gè)自己對(duì)應(yīng)的Stack,如果回收的線程不是Stack的線程,將元素放入到Queue中
3. 所有的Queue組合成一個(gè)鏈表,Stack可以從這些鏈表中回收元素(實(shí)現(xiàn)了多線程之間共享回收的實(shí)例)

整體結(jié)構(gòu)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 從三月份找實(shí)習(xí)到現(xiàn)在,面了一些公司,掛了不少,但最終還是拿到小米、百度、阿里、京東、新浪、CVTE、樂視家的研發(fā)崗...
    時(shí)芥藍(lán)閱讀 42,747評(píng)論 11 349
  • 1、Netty基礎(chǔ)入門 Netty是由JBOSS提供的一個(gè)java開源框架。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)...
    我是嘻哈大哥閱讀 4,771評(píng)論 0 31
  • Java SE 基礎(chǔ): 封裝、繼承、多態(tài) 封裝: 概念:就是把對(duì)象的屬性和操作(或服務(wù))結(jié)合為一個(gè)獨(dú)立的整體,并盡...
    Jayden_Cao閱讀 2,234評(píng)論 0 8
  • 一 我終于下決心要去那個(gè)城市了。 那個(gè)城市在哪其實(shí)我并不知道,只是我覺得非去不可了。一無所知,毫無記憶,卻在某一天...
    西江無鋒閱讀 392評(píng)論 5 2

友情鏈接更多精彩內(nèi)容