RxJava 源碼分析系列(三) - SpscLinkedArrayQueue原理分析

??在上一個文章中分析BufferAsyncEmitter時,說到BufferAsyncEmitter使用了SpscLinkedArrayQueue隊列來緩存數(shù)據(jù)。當時在文末時,只是簡單的提了一句,并沒有詳細介紹SpscLinkedArrayQueue隊列的原理,在本文,將詳細介紹SpscLinkedArrayQueue隊列的神奇之處。

1.數(shù)據(jù)結(jié)構(gòu)

??在分析SpscLinkedArrayQueue隊列之前,我們先來了解一下SpscLinkedArrayQueue的第一個神奇之處,它神奇的數(shù)據(jù)結(jié)構(gòu)。
??SpscLinkedArrayQueue的數(shù)據(jù)結(jié)構(gòu)主要神奇在它既不是傳統(tǒng)的數(shù)組,又不是傳統(tǒng)的鏈表,而是數(shù)組+鏈表。我說的好像過于玄乎了,還是具體來看看吧。


??在SpscLinkedArrayQueue內(nèi)部維持著類似于上面的數(shù)據(jù)結(jié)構(gòu),鏈表的每個節(jié)點是一個數(shù)組,而每個節(jié)點數(shù)組,最后兩位不是用來存儲數(shù)據(jù),而是倒數(shù)第二位用來存儲一個標記對象,倒數(shù)第一位用來存儲下一個節(jié)點引用。
??在整個數(shù)據(jù)結(jié)構(gòu)中,SpscLinkedArrayQueue是不會遍歷鏈表的,而是用一個producerBuffer或者consumerBuffer對象用來指向當前的節(jié)點。所以這里存在一個問題,一旦前一個節(jié)點被填充滿了,producerBuffer就指向了下一個節(jié)點,同時一旦前一個節(jié)點被消費完畢,consumerBuffer也指向下一個節(jié)點,此時前一個節(jié)點不會被SpscLinkedArrayQueue復(fù)用,而是安安靜靜的等待自己被GC回收。
??實際上,上面圖中的HASH_NEXT不是在固定的位置,也就是說,它不一定在倒數(shù)第二位,這種情況待會我們在下面分析時,會詳細的解釋。但是next指針絕對在該數(shù)組的最后一位,這個是毋庸置疑的。

2.成員變量

??了解了SpscLinkedArrayQueue的數(shù)據(jù)結(jié)構(gòu),我們開始正式來分析SpscLinkedArrayQueue,當然,我們還是從它的成員變量開始,來看看它成員變量有哪些,分別表示什么含義。

變量名 類型 含義
producerIndex AtomicLong 這個用來表示當前生產(chǎn)者生成數(shù)據(jù)的index,實際上這個變量不是指生成數(shù)據(jù)的index,而是要跟相應(yīng)的mask計算才是,此變量只增不減。(對哦,你沒有看錯,只增不減)
producerLookAheadStep int 這個變量用來表示生產(chǎn)者可以往前看的數(shù)量,默認為容量的1/4,最大為4096。
producerLookAhead long 這個變量用來表示index最大的值,也就是說在擴容之前,index能達到的最大值。
producerMask int 這個變量用跟index計算offset,這個offset才是真正的位置。默認值二進制全為1,也就是2^n - 1。
producerBuffer AtomicReferenceArray 表示生產(chǎn)者生成的數(shù)據(jù)放入的節(jié)點。這個變量是鏈表的一個節(jié)點。
consumerMask int 消費者的mask,用來計算當前消費需要消費的數(shù)據(jù)的位置。默認跟producerMask一樣。
consumerBuffer AtomicReferenceArray 表示消費者當前需要消費的那個數(shù)組節(jié)點。意義跟producerBuffer差不多
consumerIndex AtomicLong 表示當前消費者需要消費的數(shù)據(jù)的index,意義跟producerIndex差不多。
HAS_NEXT Object 用來表示當前數(shù)組節(jié)點有下一個節(jié)點。

3.構(gòu)造方法

??我們先來看看SpscLinkedArrayQueue的構(gòu)造方法,看看它為我們做了哪些初始化。

    public SpscLinkedArrayQueue(final int bufferSize) {
        int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize));
        int mask = p2capacity - 1;
        AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<Object>(p2capacity + 1);
        producerBuffer = buffer;
        producerMask = mask;
        adjustLookAheadStep(p2capacity);
        consumerBuffer = buffer;
        consumerMask = mask;
        producerLookAhead = mask - 1; // we know it's all empty to start with
        soProducerIndex(0L);
    }

??初始化的東西還真的不少,但是我們這里挑比較重要的說。
??首先,是對傳遞進來的bufferSize進行了重新計算的操作,讓它始終為2^n。也就是如下的代碼。

 int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize));

??其實這個調(diào)整為2^n的操作也不是什么騷操作,就是最普通的位運算。我們先來看看Pow2roundToPowerOfTwo方法里面究竟是怎么計算的。

    public static int roundToPowerOfTwo(final int value) {
        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
    }

??是不是一臉懵逼?其實我們這樣來考慮,一個int為32bit,其中32個bit中,有且只有1,那么這個數(shù)字肯定是2^n。如果我們這樣想的話,就非常的簡單。
??在這個構(gòu)造方法里面,還有幾個地方需要我們注意。

        AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<Object>(p2capacity + 1);

??數(shù)組的容量為2 ^ n + 1,這個得需要我們特別注意,如果這里沒有注意,后面就會有理解上的誤差。
??還有就是需要注意producerLookAhead:

        producerLookAhead = mask - 1; // we know it's all empty to start with

?? producerLookAheadmask - 1,也就是 2 ^ n - 2,至于為什么是,這里也有很大的學(xué)問咯。

4. offer方法

??在SpscLinkedArrayQueue中,offer方法和poll方法是占據(jù)非常重要的地位,所以分析這兩個方法是非常有必要的,當然我們也可以通過分析這兩個方法來了解SpscLinkedArrayQueue的本質(zhì)。我們首先來看看offer方法。

    public boolean offer(final T e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }
        // local load of field to avoid repeated loads after volatile reads
        final AtomicReferenceArray<Object> buffer = producerBuffer;
        final long index = lpProducerIndex();
        final int mask = producerMask;
        final int offset = calcWrappedOffset(index, mask);
        if (index < producerLookAhead) {
            return writeToQueue(buffer, e, index, offset);
        } else {
            final int lookAheadStep = producerLookAheadStep;
            // go around the buffer or resize if full (unless we hit max capacity)
            int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
            if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad
                producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
                return writeToQueue(buffer, e, index, offset);
            } else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
                return writeToQueue(buffer, e, index, offset);
            } else {
                resize(buffer, index, offset, e, mask); // add a buffer and link old to new
                return true;
            }
        }
    }

??整個offer過程,我將它分為兩種情形。
??1.第一情形是index還未超過producerLookAhead,這種情形下,直接通過插入到相應(yīng)位置當中去就行了。
??2.第二情形就是index超過producerLookAhead,這種情形比較復(fù)雜,既要考慮到producerLookAheadStep的存在,又要考慮到是否達到必須擴容的條件。

(1).當index還未超過producerLookAhead

??這種情況比較簡單,在這里主要講解offset的計算。offset的計算主要通過calcWrappedOffset方法來計算,我們來看看這個方法到底為我們做了什么吧。

    private static int calcWrappedOffset(long index, int mask) {
        return calcDirectOffset((int)index & mask);
    }
    private static int calcDirectOffset(int index) {
        return index;
    }

??方法比較簡單,歸根結(jié)底就是index & mask。這個計算有什么特殊的含義嗎?當然有了,還記得mask的值為多少嗎?mask2^n - 1,index & mask相當于index % 2 ^ n。
??在統(tǒng)一說明成員變量那里,我曾說過index是只增不減的,這里的計算就能體現(xiàn)出來。當index超出了這個數(shù)組的長度時,通過mask來取模又能定位一個位置。
??但是在哪種情況下可能會出現(xiàn)index超過數(shù)組的長度呢?我們從producerLookAhead這個變量里面尋找答案,之前也說過,producerLookAhead是index能達到的最大位置。當生產(chǎn)者產(chǎn)生數(shù)組已經(jīng)達到了數(shù)組末尾時,此時還不能立即進行擴容,而是得看看這個數(shù)組節(jié)點的前面部分是否已經(jīng)被消費者消費了,如果已經(jīng)消費了,我們此時可以往前部分產(chǎn)生數(shù)據(jù),而沒必要去擴容。這個相當于是一個循環(huán)隊列的設(shè)計思想。


??所以,producerLookAhead不一定固定為數(shù)組長度 - 2,當前面已經(jīng)被消費者消費了,此時producerLookAhead就需要增大了。但是這個增加了多少了,在哪種情況下增加,這些都可以從我們接下來要說的第二種情形中找到答案。

(2).當index超過producerLookAhead

??這種情形下,我們還可以分為三種小情形:
??1.判斷當前index + producerLookAheadStep的位置上是否為null,如果為null,那么表示producerLookAheadStep可以增大,同時index也可以繼續(xù)增大到新的producerLookAheadStep;如果不為null,就是第二種情形
??2.如果第一種情形的條件不能達到,那么看看是否存在有消費者被消費了的位置,但是還不足producerLookAheadStep。如果存在,就進行index + 1操作,同時producerLookAheadStep不能變;如果不存在,那么進行第三種情形的操作。
??3.如果上面兩種情形都不成立的話,那么進行此種情形的操作,那就是擴容。
??這里先只講解前面兩種情形,擴容操作比較特殊,單獨來講。
??先來看看第一種情形:

            final int lookAheadStep = producerLookAheadStep;
            // go around the buffer or resize if full (unless we hit max capacity)
            int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
            if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad
                producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
                return writeToQueue(buffer, e, index, offset);
            } 

??這里的邏輯非常簡單,簡單來說就是我們上面所說的。先是通過取模計算了index + producerLookAheadStep的offset,然后判斷offset位置上是否為null,如果為null,表示達到producerLookAhead增大的條件,然后就是,producerLookAhead增大了lookAheadStep - 1,雖然這里是index + lookAheadStep - 1,但是我認為,在只要符合這種條件的,index等于producerLookAhead。因為如果index不等于producerLookAhead,肯定不是第一次進入這個判斷語句,而第一次進來的話,如果符合的話,就已經(jīng)擴容了,就變成了index還未超過producerLookAhead的情況。
??再來看看第二種情形:

 else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
                return writeToQueue(buffer, e, index, offset);
            } 

??這種情況比較簡單,此時index只是簡單的做加1操作。這種情形就像是,前面步子邁大了,扯著X了,開始一步一步的邁????。
??第三種情形便是我們的擴容操作,這個也是我們SpscLinkedArrayQueue神奇之處之一。接下來,我們慢慢看這個擴容騷操作。

5.擴容騷操作

??說這個騷,不是一般的騷,不得不佩服大佬們寫的代碼。
??在這里,我們會知道數(shù)組的容量為什么為2 ^ n + 1,而不是2 ^ n。
??首先,我們還是先來看看整個擴容的過程。

    private void resize(final AtomicReferenceArray<Object> oldBuffer, final long currIndex, final int offset, final T e,
            final long mask) {
        final int capacity = oldBuffer.length();
        final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
        producerBuffer = newBuffer;
        producerLookAhead = currIndex + mask - 1;
        soElement(newBuffer, offset, e);// StoreStore
        soNext(oldBuffer, newBuffer);
        soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is
                                                                 // inserted
        soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
    }

??整個方法的比較簡單,就是創(chuàng)建了一個新的AtomicReferenceArray對象,跟原對象容量是一模一樣的,然后通過soNext方法將兩個節(jié)點連接起來,我們來看看放的位置:

    private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Object> next) {
        soElement(curr, calcDirectOffset(curr.length() - 1), next);
    }

??沒錯,將新的AtomicReferenceArray對象放在了原對象的最后一位,這樣相當于是將兩個節(jié)點起來,也應(yīng)證我們前面的總結(jié)。

        soElement(newBuffer, offset, e);// StoreStore
        soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is

??然后接下來,你會看到極其騷的操作,將原數(shù)組的offset位置上設(shè)置為HAS_NEXT,同時在新數(shù)組的offset位置上放入需要加入的數(shù)據(jù)。這樣做有什么好處呢?
??這個有利于poll操作,當poll操作操作到這個位置上時,發(fā)現(xiàn)是HAS_NEXT,會到下一個節(jié)點的offset位置上去尋找。因為offer操作也是offset開始,所以必須保證poll操作從offer操作開始的地方進行。
??整個過程差不多就是這樣的,接下來我們分析上面的兩個問題。

(1).數(shù)組容量為什么為2 ^ n + 1

??我們知道index是自增不減的,同時offset是通過index & mask計算得到的。同時HAS_NEXT坐標也是offset,所以,我們可以知道,在擴容是,HAS_NEXT的坐標是不定的。那數(shù)組容量為129為例來說,HAS_NEXT可能出現(xiàn)在0 ~ 127任何一個位置,但是128位置上始終是為next指針準備。
??這是為什么?我們可以這樣來理解,將HAS_NEXT當成一個特殊的數(shù)據(jù),它也屬于生產(chǎn)者生成的數(shù)據(jù)其中一個,但是next指針不可能當成其中一個,因為消費者不能正確找到next指針,除非將整個數(shù)組遍歷,顯然這是一個愚蠢的做法。所以next指針放在一個固定位置上,哪個位置不可能被占據(jù)呢?在0 ~ 127的范圍里面顯然是不可能的,所以得單獨找一個不在0 ~ 127范圍里面的位置,哪個位置呢?肯定是128,所以數(shù)組容量才為2 ^ n + 1。

6.poll方法

??看完了offer方法,現(xiàn)在我們再來看看poll方法。

    public T poll() {
        // local load of field to avoid repeated loads after volatile reads
        final AtomicReferenceArray<Object> buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final int mask = consumerMask;
        final int offset = calcWrappedOffset(index, mask);
        final Object e = lvElement(buffer, offset);// LoadLoad
        boolean isNextBuffer = e == HAS_NEXT;
        if (null != e && !isNextBuffer) {
            soElement(buffer, offset, null);// StoreStore
            soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
            return (T) e;
        } else if (isNextBuffer) {
            return newBufferPoll(lvNextBufferAndUnlink(buffer, mask + 1), index, mask);
        }

        return null;
    }

??整個poll方法比較簡單,通過獲取offset的數(shù)據(jù),先判斷是否是HAS_NEXT,如果不是那么就可以取出;如果是的話,那么就到mask + 1的位置找到下一個節(jié)點,再到下一個節(jié)點的offset位置上去取數(shù)據(jù)。

7.總結(jié)

??總的來說,SpscLinkedArrayQueue涉及過于神奇。這里我來做一個簡單的總結(jié)。
??1.SpscLinkedArrayQueue的數(shù)據(jù)結(jié)構(gòu)為數(shù)組+鏈表,其中SpscLinkedArrayQueue不會遍歷數(shù)組,這個是SpscLinkedArrayQueue涉及的神奇之處。
??2.SpscLinkedArrayQueue擴容必須同時達到三個條件,一是index大于producerLookAhead,二是index + lookAheadStep位置上不為null,三是index + 1不為null,也就是說,當前0 ~ 2^n - 1范圍內(nèi),只有index位置上為null。在這種情況下,才會擴容。
??3.擴容時,會將原數(shù)組的offset位置上設(shè)置為HAS_NEXT,同時將新數(shù)組的offset位置上設(shè)置為新添加的數(shù)據(jù),然后就是,將新數(shù)組的指針設(shè)置在原數(shù)組的最后一位。
??4.poll時,當發(fā)現(xiàn)是HAS_NEXT,此時就去下一個數(shù)組相應(yīng)的offset位置上去找。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、基本數(shù)據(jù)類型 注釋 單行注釋:// 區(qū)域注釋:/* */ 文檔注釋:/** */ 數(shù)值 對于byte類型而言...
    龍貓小爺閱讀 4,469評論 0 16
  • 前言 最先接觸編程的知識是在大學(xué)里面,大學(xué)里面學(xué)了一些基礎(chǔ)的知識,c語言,java語言,單片機的匯編語言等;大學(xué)畢...
    oceanfive閱讀 3,398評論 0 7
  • 關(guān)于記憶 漸漸發(fā)現(xiàn) 初中高中的記憶逐漸消散 至于小學(xué)的就不用提了 我的記憶大概只有70秒 突然很害怕 害怕某一天醒...
    楊洛閱讀 517評論 0 0
  • 1.div元素,也被稱作division(層)元素,是一個盛裝其他元素的通用容器。所以可以利用CSS的繼承關(guān)系把d...
    Gorden_x閱讀 2,724評論 0 0
  • “智慧不起煩惱,慈悲沒有敵人”。如果還有看不慣的事,說明你沒有智慧;如果還有看不起的人,說明你沒有慈悲。 ???
    鵬城祁林閱讀 391評論 0 0

友情鏈接更多精彩內(nèi)容