淺析HystrixRollingNumber(用于qps計(jì)數(shù)的數(shù)據(jù)結(jié)構(gòu))

前言

考慮到一種需求場(chǎng)景,我們需要統(tǒng)計(jì)系統(tǒng)qps、每秒平均錯(cuò)誤率等。qps表示每秒的請(qǐng)求數(shù)目,能想到的最簡(jiǎn)單的方法就是統(tǒng)計(jì)一定時(shí)間內(nèi)的請(qǐng)求總數(shù)然后除以總統(tǒng)計(jì)時(shí)間,所以計(jì)數(shù)是其中最核心的部分。通常我們的額系統(tǒng)是工作在多線程的環(huán)境下,所以計(jì)數(shù)我們可以考慮使用AtomicInteger/AtomicLong系列,AtomXXX中沒有使用鎖,使用的是循環(huán)+CAS,在多線程的條件下可以在一定程度上減少鎖帶來(lái)的性能損失。但是在競(jìng)爭(zhēng)特別激烈的情況,會(huì)大量出現(xiàn)cas不成功的情況帶來(lái)性能上的開銷。為了更進(jìn)一步分散線程寫的壓力,JDK8中引入了LongAdder,前面的博客中介紹了LongAdder,LongAdder會(huì)分成多個(gè)桶,將每個(gè)線程綁定到固定的桶空間中進(jìn)行讀寫,計(jì)數(shù)可以對(duì)所有的桶中的值求總數(shù)。前面提到求qps最簡(jiǎn)單的方法就是統(tǒng)計(jì)一定時(shí)間內(nèi)的請(qǐng)求總數(shù)然后除以總統(tǒng)計(jì)時(shí)間,這樣的方法雖然簡(jiǎn)單但是對(duì)有一定的問題,比如說統(tǒng)計(jì)出的qps跳躍性會(huì)比較大,不夠平滑等。在本文中將介紹HystrixRollingNumber,這個(gè)數(shù)據(jù)結(jié)構(gòu)在統(tǒng)計(jì)qps等類似的求和統(tǒng)計(jì)的場(chǎng)景下非常有用。

基本原理

如前所說,HystrixRollingNumber中利用了LongAdder,也借鑒了LongAdder分段的思想。HystrixRollingNumber基本思想就是分段統(tǒng)計(jì),比如說要統(tǒng)計(jì)qps,即1秒內(nèi)的請(qǐng)求總數(shù)。如下圖所示,我們可以將1s的時(shí)間分成10段(圖中話的是8段,當(dāng)成10段好了),每段100ms。在第一個(gè)100ms內(nèi),寫入第一個(gè)段中進(jìn)行計(jì)數(shù),在第二個(gè)100ms內(nèi),寫入第二個(gè)段中進(jìn)行計(jì)數(shù),這樣如果要統(tǒng)計(jì)當(dāng)前時(shí)間的qps,我們總是可以通過統(tǒng)計(jì)當(dāng)前時(shí)間前1s(共10段)的計(jì)數(shù)總和值。讓我們來(lái)看看HystrixRollingNumber中具體是怎么做的。

Bucket

HystrixRollingNumber中對(duì)Bucket的描述是“Counters for a given 'bucket' of time”,即“給定時(shí)間桶內(nèi)的計(jì)數(shù)器”,也即是我們上面所說的“段”。Bucket中有三個(gè)重要的屬性值

  • final long windowStart;

  • final LongAdder[] adderForCounterType;

  • final LongMaxUpdater[] updaterForCounterType;
    windowStart記錄了該Bucket所屬的時(shí)間段的開始時(shí)間,adderForCounterType是一個(gè)LongAdder數(shù)組,每個(gè)元素代表了一種事件類型的計(jì)數(shù)值。updaterForCounterType同理。
    adderForCounterType數(shù)組的長(zhǎng)度等于事件類型的個(gè)數(shù),具體的事件類型可以參考HystrixRollingNumberEvent枚舉類。相關(guān)的方法介紹如下(以下代碼去掉了LongMaxUpdater相關(guān),LongMaxUpdater用來(lái)統(tǒng)計(jì)最大值,和LongAdder類似可類比):

  • long get(HystrixRollingNumberEvent type):獲取事件對(duì)應(yīng)的LongAdder的總和

  • LongAdder getAdder(HystrixRollingNumberEvent type):獲取事件對(duì)應(yīng)的LongAdder對(duì)象

ListState

HystrixRollingNumber中對(duì)ListState的描述是“Immutable object that is atomically set every time the state of the BucketCircularArray changes,This handles the compound operations”,即“ListState是個(gè)不可變類,每次BucketCircularArray狀態(tài)改變的時(shí)候,會(huì)新建一個(gè)并且會(huì)原子地設(shè)置到BucketCircularArray中,它用來(lái)處理復(fù)合操作”。ListState中比較重要的的屬性值介紹如下:

  • private final AtomicReferenceArray<Bucket> data:官方的說明是“this is an AtomicReferenceArray and not a normal Array because we're copying the reference between ListState objects and multiple threads could maintain references across these compound operations so I want the visibility/concurrency guarantees”,意思是說“ListState持有Bucket數(shù)組對(duì)象,但是這個(gè)數(shù)組不是普通的數(shù)組而是AtomicReferenceArray,這是因?yàn)槲覀儠?huì)在ListState對(duì)象之間拷貝reference,多個(gè)線程之間會(huì)通過復(fù)合操作持有引用,我們想要保證可見性/并發(fā)性”(AtomicXXX是原子操作)
  • private final int size;(持有的Bucket數(shù)組大小,可以增加,但是最大值是numBuckets)
  • private final int tail;(數(shù)組的尾部地址)
  • private final int head;(數(shù)組的頭部地址)

ListState中有幾個(gè)比較重要的方法

  • public Bucket tail():返回?cái)?shù)組尾部的元素
  • public ListState clear():清空數(shù)組元素
  • public ListState addBucket(Bucket b):在尾部增加一個(gè)Bucket

ListState是個(gè)不可變類,遵循者不可變類的原則

  • Fields為final,在構(gòu)造方法中全部發(fā)布一次
  • copy on write,寫方法(addBucket)返回新的ListState

ListState算是個(gè)助手類,維持了一個(gè)Bucket數(shù)組,定義了一些圍繞著Bucket數(shù)組的有用操作,并且自身是個(gè)不可變類,天然的線程安全屬性。

BucketCircularArray

從名字上來(lái)說是一個(gè)環(huán)形數(shù)組,數(shù)組中的每個(gè)元素是一個(gè)Bucket。BucketCircularArray中比較重要的屬性值介紹如下:

  • private final AtomicReference<ListState> state: 維持了一個(gè)ListState的AtomicReference
  • private final int numBuckets:環(huán)的大小

其中主要的比較重要的一個(gè)方法是:

  • public void addLast(Bucket o) :這個(gè)方法主要就是為了在ListState的尾部添加一個(gè)Bucket,并且將新返回的ListState對(duì)象CAS到state中,但是其中有個(gè)比較特殊的處理,就是在一次CAS不成功的時(shí)候,程序完全忽略這次失敗。注釋是這么解釋的“we failed, someone else was adding or removing instead of trying again and risking multiple addLast concurrently (which shouldn't be the case) we'll just return and let the other thread 'win' and if the timing is off the next call to getCurrentBucket will fix things”。大概意思就是說如果CAS失敗是因?yàn)槠渌麘椪趫?zhí)行adding或者remving操作。我們不重試,而只是返回讓其他線程“win”,如果時(shí)間片流逝了,我們可以通過下次調(diào)用getCurrentBucket進(jìn)行補(bǔ)償(詳細(xì)的請(qǐng)看下面對(duì)于getCurrentBucket的分析)

HystrixRollingNumber

官方doc中給其的定義是“A number which can be used to track counters (increment) or set values over time.”,用來(lái)統(tǒng)計(jì)一段時(shí)間內(nèi)的計(jì)數(shù)。其中比較重要的的屬性值如下:

  • private final Time time: 獲取當(dāng)前時(shí)間毫秒值
  • final int timeInMilliseconds: 統(tǒng)計(jì)的時(shí)間長(zhǎng)度(毫秒單位)
  • final int numberOfBuckets: Bucket的數(shù)量(分成多少段進(jìn)行統(tǒng)計(jì))
  • final int bucketSizeInMillseconds: 每個(gè)Bucket所對(duì)應(yīng)的時(shí)間片(毫秒單位)
  • final BucketCircularArray buckets: 使用BucketCircularArray幫助維持環(huán)形數(shù)組桶
Bucket getCurrentBucket() {
                // 獲取當(dāng)前的毫秒時(shí)間
        long currentTime = time.getCurrentTimeInMillis();

        //獲取最后一個(gè)Bucket(即最新一個(gè)Bucket)
        Bucket currentBucket = buckets.peekLast();
        if (currentBucket != null && currentTime < currentBucket.windowStart + this.bucketSizeInMillseconds) {
            //如果當(dāng)前時(shí)間是在currentBucket對(duì)應(yīng)的時(shí)間窗口內(nèi),直接返回currentBucket
            return currentBucket;
        }

        /* if we didn't find the current bucket above, then we have to create one */

            //如果當(dāng)前時(shí)間對(duì)應(yīng)的Bucket不存在,我們需要?jiǎng)?chuàng)建一個(gè)
        if (newBucketLock.tryLock()) {
                //嘗試獲取一次鎖
            try {
                if (buckets.peekLast() == null) {
                    // the list is empty so create the first bucket
                    //首次創(chuàng)建
                    Bucket newBucket = new Bucket(currentTime);
                    buckets.addLast(newBucket);
                    return newBucket;
                } else {
                    // We go into a loop so that it will create as many buckets as needed to catch up to the current time
                    // as we want the buckets complete even if we don't have transactions during a period of time.
                    // 將創(chuàng)建一個(gè)或者多個(gè)Bucket,直到Bucket代表的時(shí)間窗口趕上當(dāng)前時(shí)間
                    for (int i = 0; i < numberOfBuckets; i++) {
                        // we have at least 1 bucket so retrieve it
                        Bucket lastBucket = buckets.peekLast();
                        if (currentTime < lastBucket.windowStart + this.bucketSizeInMillseconds) {
                            // if we're within the bucket 'window of time' return the current one
                            // NOTE: We do not worry if we are BEFORE the window in a weird case of where thread scheduling causes that to occur,
                            // we'll just use the latest as long as we're not AFTER the window
                            return lastBucket;
                        } else if (currentTime - (lastBucket.windowStart + this.bucketSizeInMillseconds) > timeInMilliseconds) {
                            // the time passed is greater than the entire rolling counter so we want to clear it all and start from scratch
                            reset();
                            // recursively call getCurrentBucket which will create a new bucket and return it
                            return getCurrentBucket();
                        } else { // we're past the window so we need to create a new bucket
                            // create a new bucket and add it as the new 'last'
                            buckets.addLast(new Bucket(lastBucket.windowStart + this.bucketSizeInMillseconds));
                            // add the lastBucket values to the cumulativeSum
                            cumulativeSum.addBucket(lastBucket);
                        }
                    }
                    // we have finished the for-loop and created all of the buckets, so return the lastBucket now
                    return buckets.peekLast();
                }
            } finally {
                //釋放鎖
                newBucketLock.unlock();
            }
        } else {
            //如果獲取不到鎖,嘗試獲取最新一個(gè)Bucket
            currentBucket = buckets.peekLast();
            if (currentBucket != null) {
                 //如果不為null,直接返回最新Bucket
                // we didn't get the lock so just return the latest bucket while another thread creates the next one
                return currentBucket;
            } else {
                //多個(gè)線程同時(shí)創(chuàng)建第一個(gè)Bucket,嘗試等待,遞歸調(diào)用getCurrentBucket
                // the rare scenario where multiple threads raced to create the very first bucket
                // wait slightly and then use recursion while the other thread finishes creating a bucket
                try {
                    Thread.sleep(5);
                } catch (Exception e) {
                    // ignore
                }
                return getCurrentBucket();
            }
        }
    }

其實(shí)HystrixRollingNumber中寫了很多有用的注釋,解釋了為什么要這么做。上述getCurrentBucket主要是為了獲取當(dāng)前時(shí)間窗所對(duì)應(yīng)的Bucket,但是為了減少競(jìng)爭(zhēng),其中只使用了tryLock(),如果不成功則直接返回最新的一個(gè)不為空的Bucket。如果獲取了鎖則嘗試增加Bucket(增加Bucket會(huì)一直增加到Bucket對(duì)應(yīng)的時(shí)間窗口覆蓋當(dāng)前時(shí)間)。這樣處理會(huì)有個(gè)小問題,就是獲取的Bucket可能沒有覆蓋當(dāng)前時(shí)間,這是為了減少競(jìng)爭(zhēng),提高效率。而且在統(tǒng)計(jì)的場(chǎng)景下可以容忍,將計(jì)數(shù)統(tǒng)計(jì)到之前的時(shí)間窗口內(nèi)在計(jì)算qps等數(shù)值時(shí)通常不會(huì)有太大影響(numberOfBuckets通常不止一個(gè))。

總結(jié)

HystrixRollingNumber這個(gè)數(shù)據(jù)結(jié)構(gòu)用于統(tǒng)計(jì)qps很有用,通常這種統(tǒng)計(jì)需求(限流監(jiān)控統(tǒng)計(jì)qps的場(chǎng)景下)不能影響主要業(yè)務(wù),對(duì)性能要求比較高,HystrixRollingNumber中采取了很多技巧避免使用鎖,避免多個(gè)線程競(jìng)爭(zhēng),所以HystrixRollingNumber效率會(huì)非常高。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容