探索HyperLogLog算法(含Java實(shí)現(xiàn))

引言


HyperLogLog算法經(jīng)常在數(shù)據(jù)庫中被用來統(tǒng)計(jì)某一字段的Distinct Value(下文簡稱DV),比如Redis的HyperLogLog結(jié)構(gòu),出于好奇探索了一下這個算法的原理,無奈中文資料很少,只能直接去閱讀論文以及一些英文資料,總結(jié)成此文。

介紹


HyperLogLog算法來源于論文《HyperLogLog the analysis of a near-optimal cardinality estimation algorithm》(下載地址見文末的參考文獻(xiàn)),可以使用固定大小的字節(jié)計(jì)算任意大小的DV,本文先介紹該算法的原理,然后通過剖析stream-lib(一個Java實(shí)現(xiàn)的實(shí)時計(jì)算庫)對此算法的實(shí)現(xiàn)來進(jìn)一步理解該算法。本文追求直觀理解,所以不會太過于糾結(jié)一些數(shù)學(xué)細(xì)節(jié),如果關(guān)心數(shù)學(xué)細(xì)節(jié)的話可以直接去看論文,論文里會有具體的證明。

基數(shù)


基數(shù)就是指一個集合中不同值的數(shù)目,比如[a,b,c,d]的基數(shù)就是4,[a,b,c,d,a]的基數(shù)還是4,因?yàn)閍重復(fù)了一個,不算。基數(shù)也可以稱之為Distinct Value,簡稱DV。下文中可能有時候稱呼為基數(shù),有時候稱之為DV,但都是同一個意思。HyperLogLog算法就是用來計(jì)算基數(shù)的。

生活中的啟發(fā)-以拋硬幣為例


拋硬幣

HyperLogLog本質(zhì)上來源于生活中一個小的發(fā)現(xiàn),假設(shè)你拋了很多次硬幣,你告訴在這次拋硬幣的過程中最多只有兩次扔出連續(xù)的反面,讓我猜你總共拋了多少次硬幣,我敢打賭你拋硬幣的總次數(shù)不會太多,相反,如果你和我說最多出現(xiàn)了100次連續(xù)的反面,那么我敢肯定扔硬盤的總次數(shù)非常的多,甚至我還可以給出一個估計(jì),這個估計(jì)要怎么給呢?其實(shí)是一個很簡單的概率問題,假設(shè)1代表拋出正面,0代表反面:


以序列1110100110為例

上圖中以拋硬幣序列"1110100110"為例,其中最長的反面序列是"00",我們順手把后面那個1也給帶上,也就是"001",因?yàn)樗诵蛄兄凶铋L的一串0,所以在序列中肯定只出現(xiàn)過一次,而它在任意序列出現(xiàn)出現(xiàn)且僅出現(xiàn)一次的概率顯然是上圖所示的三個二分之一相乘,也就是八分之一,所以我可以給出一個估計(jì)值,你大概總共拋了8次硬幣。
很顯然,上面這種做法雖然能夠估計(jì)拋硬幣的總數(shù),但是顯然誤差是比較大的,很容易受到突發(fā)事件(比如突然連續(xù)拋出好多0)的影響,HyperLogLog算法研究的就是如何減小這個誤差。

之前說過,HyperLogLog算法是用來計(jì)算基數(shù)的,這個拋硬幣的序列和基數(shù)有什么關(guān)系呢?比如在數(shù)據(jù)庫中,我只要在每次插入一條新的記錄時,計(jì)算這條記錄的hash,并且轉(zhuǎn)換成二進(jìn)制,就可以將其看成一個硬幣序列了,如下(0b前綴表示二進(jìn)制數(shù)):

計(jì)算hash

最簡單的想法


根據(jù)上面拋硬幣的啟發(fā)我可以想到如下的估計(jì)基數(shù)的算法(這里先給出偽代碼,后面會有Java實(shí)現(xiàn)):

輸入:一個集合
輸出:集合的基數(shù)
算法:
     max = 0
     對于集合中的每個元素:
               hashCode = hash(元素)
               num = hashCode二進(jìn)制表示中最前面連續(xù)的0的數(shù)量
               if num > max:
                   max = num
     最后的結(jié)果是2的(max + 1)次冪  

舉個例子,對于集合{ele1, ele2},先求hash(ele1)=0b00110111,它最前面的連續(xù)的0的數(shù)量為2(又稱為前導(dǎo)0),然后求hash(ele2)=0b10010000111,它的前導(dǎo)0數(shù)量為0,我們始終只保存前導(dǎo)零數(shù)量的最大值,所以最后max是2,我們估計(jì)的基數(shù)就是2的(2+1)次冪,即8。
為什么最后的max要加1呢?這是一個數(shù)學(xué)細(xì)節(jié),具體要看論文,簡單的理解的話,可以像之前拋硬幣的例子那樣理解,把最長的一串零的后面的一個1或者前面的一個1"順手"帶上進(jìn)行概率估計(jì)。
顯然這個算法是非常不準(zhǔn)確的,但是這個想法還是很有啟發(fā)性的,從這個簡單的想法跟隨下文一步一步優(yōu)化即可得到最終的比較高精度的HyperLogLog算法。

分桶


最簡單的一種優(yōu)化方法顯然就是把數(shù)據(jù)分成m個均等的部分,分別估計(jì)其總數(shù)求平均后再乘以m,稱之為分桶。對應(yīng)到前面拋硬幣的例子,其實(shí)就是把硬幣序列分成m個均等的部分,分別用之前提到的那個方法估計(jì)總數(shù)求平均后再乘以m,這樣就能一定程度上避免單一突發(fā)事件造成的誤差。
具體要怎么分桶呢?我們可以將每個元素的hash值的二進(jìn)制表示的前幾位用來指示數(shù)據(jù)屬于哪個桶,然后把剩下的部分再按照之前最簡單的想法處理。
還是以剛剛的那個集合{ele1,ele2}為例,假設(shè)我要分2個桶,那么我只要去ele1的hash值的第一位來確定其分桶即可,之后用剩下的部分進(jìn)行前導(dǎo)零的計(jì)算,如下圖:
假設(shè)ele1和ele2的hash值二進(jìn)制表示如下:

hash(ele1) = 00110111
hash(ele2) = 10010001
分桶算法

到這里,你大概已經(jīng)理解了LogLog算法的基本思想,LogLog算法是在HyperLogLog算法之前提出的一個基數(shù)估計(jì)算法,HyperLogLog算法其實(shí)就是LogLog算法的一個改進(jìn)版。

LogLog算法完整的基數(shù)計(jì)算公式如下:


LogLog算法

其中m代表分桶數(shù),R頭上一道橫杠的記號就代表每個桶的結(jié)果(其實(shí)就是桶中數(shù)據(jù)的最長前導(dǎo)零+1)的均值,相比我之前舉的簡單的例子,LogLog算法還乘了一個常數(shù)constant進(jìn)行修正,這個constant具體是多少等我講到Java實(shí)現(xiàn)的時候再說。

調(diào)和平均數(shù)


前面的LogLog算法中我們是使用的是平均數(shù)來將每個桶的結(jié)果匯總起來,但是平均數(shù)有一個廣為人知的缺點(diǎn),就是容易受到大的數(shù)值的影響,一個常見的例子是,假如我的工資是1000元一個月,我老板的工資是100000元一個月,那么我和老板的平均工資就是(100000 + 1000)/2,即50500元,顯然這離我的工資相差甚遠(yuǎn),我肯定不服這個平均工資。
用調(diào)和平均數(shù)就可以解決這一問題,調(diào)和平均數(shù)的結(jié)果會傾向于集合中比較小的數(shù),x1到xn的調(diào)和平均數(shù)的公式如下:


調(diào)和平均數(shù)

再用這個公式算一下我和老板的平均工資:


使用調(diào)和平均數(shù)計(jì)算平均工資

最后的結(jié)果是1980元,這和我的工資水平還比較接近,這樣的平均工資水平我才比較信服。

再回到前面的LogLog算法,從前面的舉的例子可以看出,
影響LogLog算法精度的一個重要因素就是,hash值的前導(dǎo)零的數(shù)量顯然是有很大的偶然性的,經(jīng)常會出現(xiàn)一兩數(shù)據(jù)前導(dǎo)零的數(shù)目比較多的情況,所以HyperLogLog算法相比LogLog算法一個重要的改進(jìn)就是使用調(diào)和平均數(shù)而不是平均數(shù)來聚合每個桶中的結(jié)果,HyperLogLog算法的公式如下:


HyperLogLog算法

其中constant常數(shù)和m的含義和之前的LogLog算法公式中的含義一致,Rj代表(第j個桶中的數(shù)據(jù)的最大前導(dǎo)零數(shù)目+1),為了方便理解,我將公式再拆解一下:


HyperLogLog公式的理解

其實(shí)從算術(shù)平均數(shù)改成調(diào)和平均數(shù)這個優(yōu)化是很容易想到的,但是為什么LogLog算法沒有直接使用調(diào)和平均數(shù)嗎?網(wǎng)上看到一篇英文文章里說大概是因?yàn)槭褂盟阈g(shù)平均數(shù)的話證明比較容易一些,畢竟科學(xué)家們出論文每一步都是要證明的,不像我們這里簡單理解一下,猜一猜就可以了。

細(xì)節(jié)微調(diào)


關(guān)于HyperLogLog算法的大體思想到這里你就已經(jīng)全部理解了。
不過算法中還有一些細(xì)微的校正,在數(shù)據(jù)總量比較小的時候,很容易就預(yù)測偏大,所以我們做如下校正:
(DV代表估計(jì)的基數(shù)值,m代表桶的數(shù)量,V代表結(jié)果為0的桶的數(shù)目,log表示自然對數(shù))

if DV < (5 / 2) * m:
    DV = m * log(m/V)

我再詳細(xì)解釋一下V的含義,假設(shè)我分配了64個桶(即m=64),當(dāng)數(shù)據(jù)量很小時(比方說只有兩三個),那肯定有大量桶中沒有數(shù)據(jù),也就說他們的估計(jì)值是0,V就代表這樣的桶的數(shù)目。
事實(shí)證明,這個校正的效果是非常好,在數(shù)據(jù)量小的時,估計(jì)得非常準(zhǔn)確,有興趣可以去玩一下外國大佬制作的一個HyperLogLog算法的仿真:
http://content.research.neustar.biz/blog/hll.html

constant常數(shù)的選擇


constant常數(shù)的選擇與分桶的數(shù)目有關(guān),具體的數(shù)學(xué)證明請看論文,這里就直接給出結(jié)論:
假設(shè):m為分桶數(shù),p是m的以2為底的對數(shù)


p

則按如下的規(guī)則計(jì)算constant

switch (p) {
   case 4:
       constant = 0.673 * m * m;
   case 5:
       constant = 0.697 * m * m;
   case 6:
       constant = 0.709 * m * m;
   default:
       constant = (0.7213 / (1 + 1.079 / m)) * m * m;
}

分桶數(shù)m的選擇


如果理解了之前的分桶算法,那么很顯然分桶數(shù)只能是2的整數(shù)次冪。
如果分桶越多,那么估計(jì)的精度就會越高,統(tǒng)計(jì)學(xué)上用來衡量估計(jì)精度的一個指標(biāo)是“相對標(biāo)準(zhǔn)誤差”(relative standard deviation,簡稱RSD),RSD的計(jì)算公式這里就不給出了,百科上一搜就可以知道,從直觀上理解,RSD的值其實(shí)就是((每次估計(jì)的值)在(估計(jì)均值)上下的波動)占(估計(jì)均值)的比例(這句話加那么多括號是為了方便大家斷句)。RSD的值與分桶數(shù)m存在如下的計(jì)算關(guān)系:

RSD

有了這個公式,你可以先確定你想要達(dá)到的RSD的值,然后再推出分桶的數(shù)目m。

合并


假設(shè)有兩個數(shù)據(jù)流,分別構(gòu)建了兩個HyperLogLog結(jié)構(gòu),稱為a和b,他們的桶數(shù)是一樣的,為n,現(xiàn)在要計(jì)算兩個數(shù)據(jù)流總體的基數(shù)。

數(shù)據(jù)流a:"a" "b" "c" "d"  基數(shù):4
數(shù)據(jù)流b:"b" "c" "d" "e"  基數(shù):4
兩個數(shù)據(jù)流的總體基數(shù):5

從前文我們可以知道,HyperLogLog算法在內(nèi)存中的結(jié)構(gòu)其實(shí)就是一個桶數(shù)組,需要先用下面的算法從a和我b的桶數(shù)組中構(gòu)建出新的桶數(shù)組c,其實(shí)就是從a,b的對應(yīng)位置取最大的:

輸入:桶數(shù)組a,b。它們的長度都是n
輸出:新的桶數(shù)組c
算法:
     c = c[n];
     for (i=0; i<n; i++){
         c[i]=max(a[i], b[i]);
     }
     return c;

之后用桶數(shù)組c代入前面的算法即可得到合并的總體基數(shù)。

Redis中的實(shí)現(xiàn)


Redis中和HyperLogLog相關(guān)的命令有三個:

  • PFADD hll ele:將ele添加進(jìn)hll的基數(shù)計(jì)算中。流程:
    • 先對ele求hash(使用的是一種叫做MurMurHash的算法)
    • 將hash的低14位(因?yàn)榭偣灿?的14次方個桶)作為桶的編號,選桶,記桶中當(dāng)前的值為count
    • 從的hash的第15位開始數(shù)0,假設(shè)從第15位開始有n個連續(xù)的0(即前導(dǎo)0)
    • 如果n大于count,則把選中的桶的值置為n,否則不變
  • PFCOUNT hll:計(jì)算hll的基數(shù)。就是使用上面給出的DV公式根據(jù)桶中的數(shù)值,計(jì)算基數(shù)
  • PFMERGE hll3 hll1 hll2:將hll1和hll2合并成hll3。用的就是上面說的合并算法。

Redis的所有HyperLogLog結(jié)構(gòu)都是固定的16384個桶(2的14次方),并且有兩種存儲格式:

  • 稀疏格式:HyperLogLog算法在剛開始的時候,大多數(shù)桶其實(shí)都是0,稀疏格式通過存儲連續(xù)的0的數(shù)目,而不是每個0存一遍,大大減小了HyperLogLog剛開始時需要占用的內(nèi)存
  • 緊湊格式:用6個bit表示一個桶,需要占用12KB內(nèi)存

如果還想更詳細(xì)地了解Redis中的實(shí)現(xiàn)細(xì)節(jié)的話,可以閱讀我的另一篇博客Redis源碼走馬觀花(5)HyperLogLog

HyperLogLog索引


之前在螞蟻實(shí)習(xí)的時候,用的一個自研數(shù)據(jù)庫號稱支持HyperLogLog索引.(目前還不知道有什么開源的數(shù)據(jù)庫支持這玩意,如果你知道,歡迎在評論里告訴我)。

所謂HyperLogLog索引,比如你在user列上建立了一個hyperLogLog索引,那么當(dāng)你使用如下的查詢時:

SELECT COUNT(DISTINCT user) FROM users WHERE age >= 10 and city = "shanghai";

在計(jì)算COUNT(DISTINCT)時,會自動使用之前構(gòu)建好的HyperLogLog索引來加速,據(jù)說能夠獲得數(shù)量級上的查詢速度提升。

如果仔細(xì)看了之前的算法,到這里可能會產(chǎn)生困惑,通過HyperLogLog似乎只能得到user的基數(shù)是多少,那又怎么能知道含有一定含有一定篩選條件(WHERE age > 10 and city = "shanghai")的user基數(shù)是多少呢?

其實(shí)再仔細(xì)想想,也很簡單,通過前面介紹過的“合并”就可以完成,對每個不同的city都構(gòu)建了一個關(guān)于user的HyperLogLog結(jié)構(gòu),因?yàn)閍ge的基數(shù)相對大一些,數(shù)據(jù)庫可以根據(jù)范圍在每個范圍構(gòu)建了一個HyperLogLog結(jié)構(gòu),比如分別是0~10,10~20,20~30,這樣只需要將上面查詢涉及到的三個HyperLogLog結(jié)構(gòu)合并即可(三個分別是指city為"guangzhou",age為10~20和age為20~30)。

這個只是我的個人猜測,也可能不是這樣。

Java實(shí)現(xiàn)分析


stream-lib是一個開源的Java流式計(jì)算庫,里面有很多大數(shù)據(jù)估值算法的實(shí)現(xiàn),其中當(dāng)然包括HyperLogLog算法,HyperLogLog實(shí)現(xiàn)類的代碼地址如下:
https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLog.java

這個實(shí)現(xiàn)類中還包含很多與算法無關(guān)的序列化之類的代碼,所以不建議你直接去看,我把它的算法主干抽取了出來,變成了如下的三個類,你把這三個類的代碼復(fù)制下來放到項(xiàng)目的同一個包下即可,HyperLogLog類中還包含一個main函數(shù),你可以運(yùn)行一下看看代碼是否正確,代碼如下:
HyperLogLog.java

public class HyperLogLog {

    private final RegisterSet registerSet;
    private final int log2m;   //log(m)
    private final double alphaMM;


    /**
     *
     *  rsd = 1.04/sqrt(m)
     * @param rsd  相對標(biāo)準(zhǔn)偏差
     */
    public HyperLogLog(double rsd) {
        this(log2m(rsd));
    }

    /**
     * rsd = 1.04/sqrt(m)
     * m = (1.04 / rsd)^2
     * @param rsd 相對標(biāo)準(zhǔn)偏差
     * @return
     */
    private static int log2m(double rsd) {
        return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2));
    }

    private static double rsd(int log2m) {
        return 1.106 / Math.sqrt(Math.exp(log2m * Math.log(2)));
    }


    /**
     * accuracy = 1.04/sqrt(2^log2m)
     *
     * @param log2m
     */
    public HyperLogLog(int log2m) {
        this(log2m, new RegisterSet(1 << log2m));
    }

    /**
     *
     * @param registerSet
     */
    public HyperLogLog(int log2m, RegisterSet registerSet) {
        this.registerSet = registerSet;
        this.log2m = log2m;
        int m = 1 << this.log2m; //從log2m中算出m

        alphaMM = getAlphaMM(log2m, m);
    }


    public boolean offerHashed(int hashedValue) {
        // j 代表第幾個桶,取hashedValue的前l(fā)og2m位即可
        // j 介于 0 到 m
        final int j = hashedValue >>> (Integer.SIZE - log2m);
        // r代表 除去前l(fā)og2m位剩下部分的前導(dǎo)零 + 1
        final int r = Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1;
        return registerSet.updateIfGreater(j, r);
    }

    /**
     * 添加元素
     * @param o  要被添加的元素
     * @return
     */
    public boolean offer(Object o) {
        final int x = MurmurHash.hash(o);
        return offerHashed(x);
    }


    public long cardinality() {
        double registerSum = 0;
        int count = registerSet.count;
        double zeros = 0.0;
        //count是桶的數(shù)量
        for (int j = 0; j < registerSet.count; j++) {
            int val = registerSet.get(j);
            registerSum += 1.0 / (1 << val);
            if (val == 0) {
                zeros++;
            }
        }

        double estimate = alphaMM * (1 / registerSum);

        if (estimate <= (5.0 / 2.0) * count) {  //小數(shù)據(jù)量修正
            return Math.round(linearCounting(count, zeros));
        } else {
            return Math.round(estimate);
        }
    }


    /**
     *  計(jì)算constant常數(shù)的取值
     * @param p   log2m
     * @param m   m
     * @return
     */
    protected static double getAlphaMM(final int p, final int m) {
        // See the paper.
        switch (p) {
            case 4:
                return 0.673 * m * m;
            case 5:
                return 0.697 * m * m;
            case 6:
                return 0.709 * m * m;
            default:
                return (0.7213 / (1 + 1.079 / m)) * m * m;
        }
    }

    /**
     *
     * @param m   桶的數(shù)目
     * @param V   桶中0的數(shù)目
     * @return
     */
    protected static double linearCounting(int m, double V) {
        return m * Math.log(m / V);
    }

    public static void main(String[] args) {
        HyperLogLog hyperLogLog = new HyperLogLog(0.1325);//64個桶
        //集合中只有下面這些元素
        hyperLogLog.offer("hhh");
        hyperLogLog.offer("mmm");
        hyperLogLog.offer("ccc");
        //估算基數(shù)
        System.out.println(hyperLogLog.cardinality());
    }
}

MurmurHash.java


/**
 * 一種快速的非加密hash
 * 適用于對保密性要求不高以及不在意hash碰撞攻擊的場合
 */
public class MurmurHash {

    public static int hash(Object o) {
        if (o == null) {
            return 0;
        }
        if (o instanceof Long) {
            return hashLong((Long) o);
        }
        if (o instanceof Integer) {
            return hashLong((Integer) o);
        }
        if (o instanceof Double) {
            return hashLong(Double.doubleToRawLongBits((Double) o));
        }
        if (o instanceof Float) {
            return hashLong(Float.floatToRawIntBits((Float) o));
        }
        if (o instanceof String) {
            return hash(((String) o).getBytes());
        }
        if (o instanceof byte[]) {
            return hash((byte[]) o);
        }
        return hash(o.toString());
    }

    public static int hash(byte[] data) {
        return hash(data, data.length, -1);
    }

    public static int hash(byte[] data, int seed) {
        return hash(data, data.length, seed);
    }

    public static int hash(byte[] data, int length, int seed) {
        int m = 0x5bd1e995;
        int r = 24;

        int h = seed ^ length;

        int len_4 = length >> 2;

        for (int i = 0; i < len_4; i++) {
            int i_4 = i << 2;
            int k = data[i_4 + 3];
            k = k << 8;
            k = k | (data[i_4 + 2] & 0xff);
            k = k << 8;
            k = k | (data[i_4 + 1] & 0xff);
            k = k << 8;
            k = k | (data[i_4 + 0] & 0xff);
            k *= m;
            k ^= k >>> r;
            k *= m;
            h *= m;
            h ^= k;
        }

        // avoid calculating modulo
        int len_m = len_4 << 2;
        int left = length - len_m;

        if (left != 0) {
            if (left >= 3) {
                h ^= (int) data[length - 3] << 16;
            }
            if (left >= 2) {
                h ^= (int) data[length - 2] << 8;
            }
            if (left >= 1) {
                h ^= (int) data[length - 1];
            }

            h *= m;
        }

        h ^= h >>> 13;
        h *= m;
        h ^= h >>> 15;

        return h;
    }

    public static int hashLong(long data) {
        int m = 0x5bd1e995;
        int r = 24;

        int h = 0;

        int k = (int) data * m;
        k ^= k >>> r;
        h ^= k * m;

        k = (int) (data >> 32) * m;
        k ^= k >>> r;
        h *= m;
        h ^= k * m;

        h ^= h >>> 13;
        h *= m;
        h ^= h >>> 15;

        return h;
    }

    public static long hash64(Object o) {
        if (o == null) {
            return 0l;
        } else if (o instanceof String) {
            final byte[] bytes = ((String) o).getBytes();
            return hash64(bytes, bytes.length);
        } else if (o instanceof byte[]) {
            final byte[] bytes = (byte[]) o;
            return hash64(bytes, bytes.length);
        }
        return hash64(o.toString());
    }

    // 64 bit implementation copied from here:  https://github.com/tnm/murmurhash-java

    /**
     * Generates 64 bit hash from byte array with default seed value.
     *
     * @param data   byte array to hash
     * @param length length of the array to hash
     * @return 64 bit hash of the given string
     */
    public static long hash64(final byte[] data, int length) {
        return hash64(data, length, 0xe17a1465);
    }


    /**
     * Generates 64 bit hash from byte array of the given length and seed.
     *
     * @param data   byte array to hash
     * @param length length of the array to hash
     * @param seed   initial seed value
     * @return 64 bit hash of the given array
     */
    public static long hash64(final byte[] data, int length, int seed) {
        final long m = 0xc6a4a7935bd1e995L;
        final int r = 47;

        long h = (seed & 0xffffffffl) ^ (length * m);

        int length8 = length / 8;

        for (int i = 0; i < length8; i++) {
            final int i8 = i * 8;
            long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8)
                    + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24)
                    + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40)
                    + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56);

            k *= m;
            k ^= k >>> r;
            k *= m;

            h ^= k;
            h *= m;
        }

        switch (length % 8) {
            case 7:
                h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
            case 6:
                h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
            case 5:
                h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
            case 4:
                h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
            case 3:
                h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
            case 2:
                h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
            case 1:
                h ^= (long) (data[length & ~7] & 0xff);
                h *= m;
        }
        ;

        h ^= h >>> r;
        h *= m;
        h ^= h >>> r;

        return h;
    }
}

RegisterSet.java

public class RegisterSet {

    public final static int LOG2_BITS_PER_WORD = 6;  //2的6次方是64
    public final static int REGISTER_SIZE = 5;     //每個register占5位,代碼里有一些細(xì)節(jié)涉及到這個5位,所以僅僅改這個參數(shù)是會報錯的

    public final int count;
    public final int size;

    private final int[] M;

    //傳入m
    public RegisterSet(int count) {
        this(count, null);
    }

    public RegisterSet(int count, int[] initialValues) {
        this.count = count;

        if (initialValues == null) {
            /**
             * 分配(m / 6)個int給M
             *
             * 因?yàn)橐粋€register占五位,所以每個int(32位)有6個register
             */
            this.M = new int[getSizeForCount(count)];
        } else {
            this.M = initialValues;
        }
        //size代表RegisterSet所占字的大小
        this.size = this.M.length;
    }

    public static int getBits(int count) {
        return count / LOG2_BITS_PER_WORD;
    }

    public static int getSizeForCount(int count) {
        int bits = getBits(count);
        if (bits == 0) {
            return 1;
        } else if (bits % Integer.SIZE == 0) {
            return bits;
        } else {
            return bits + 1;
        }
    }

    public void set(int position, int value) {
        int bucketPos = position / LOG2_BITS_PER_WORD;
        int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD));
        this.M[bucketPos] = (this.M[bucketPos] & ~(0x1f << shift)) | (value << shift);
    }

    public int get(int position) {
        int bucketPos = position / LOG2_BITS_PER_WORD;
        int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD));
        return (this.M[bucketPos] & (0x1f << shift)) >>> shift;
    }

    public boolean updateIfGreater(int position, int value) {
        int bucket = position / LOG2_BITS_PER_WORD;    //M下標(biāo)
        int shift = REGISTER_SIZE * (position - (bucket * LOG2_BITS_PER_WORD));  //M偏移
        int mask = 0x1f << shift;      //register大小為5位

        // 這里使用long是為了避免int的符號位的干擾
        long curVal = this.M[bucket] & mask;
        long newVal = value << shift;
        if (curVal < newVal) {
            //將M的相應(yīng)位置為新的值
            this.M[bucket] = (int) ((this.M[bucket] & ~mask) | newVal);
            return true;
        } else {
            return false;
        }
    }

    public void merge(RegisterSet that) {
        for (int bucket = 0; bucket < M.length; bucket++) {
            int word = 0;
            for (int j = 0; j < LOG2_BITS_PER_WORD; j++) {
                int mask = 0x1f << (REGISTER_SIZE * j);

                int thisVal = (this.M[bucket] & mask);
                int thatVal = (that.M[bucket] & mask);
                word |= (thisVal < thatVal) ? thatVal : thisVal;
            }
            this.M[bucket] = word;
        }
    }

    int[] readOnlyBits() {
        return M;
    }

    public int[] bits() {
        int[] copy = new int[size];
        System.arraycopy(M, 0, copy, 0, M.length);
        return copy;
    }
}

這里hash算法使用的是MurmurHash算法,可能很多人沒聽說過,其實(shí)在開源項(xiàng)目中使用的非常廣泛,這個算法在只追求速度和hash的隨機(jī)性,而不在意安全性和保密性的時候非常有效,我們不去深究這個算法的原理了,這個類的代碼也不必仔細(xì)看,就把它看成一個hash函數(shù)就好了。
還有需要稍微注意一下這里的RegisterSet類,我們把存放一個桶的結(jié)果的地方叫做一個register,類中M數(shù)組就是存放這些register內(nèi)容的地方,在這里我們設(shè)置一個register占5位,所以每個int(32位)總共可以存放6個register。
重點(diǎn)去閱讀HyperLogLog類,我添加了相關(guān)注釋方便你閱讀,希望能夠幫助你了解更多細(xì)節(jié)。

參考資料


1.論文《HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm》
http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
如果有興趣去了解算法的數(shù)學(xué)證明的大佬可以去看一下

2.老外寫的一篇博客:
https://research.neustar.biz/2012/10/25/sketch-of-the-day-hyperloglog-cornerstone-of-a-big-data-infrastructure/

3.老外做的一個仿真,玩一玩有助理解
http://content.research.neustar.biz/blog/hll.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容