flink實(shí)戰(zhàn)-使用自定義聚合函數(shù)統(tǒng)計(jì)網(wǎng)站TP指標(biāo)

背景

在網(wǎng)站性能測(cè)試中,我們經(jīng)常會(huì)選擇 TP50、TP95 或者 TP99 等作為性能指標(biāo)。接下來我們講講這些指標(biāo)的含義、以及在flink中如何實(shí)時(shí)統(tǒng)計(jì):

  • TP50,top percent 50,即 50% 的數(shù)據(jù)都滿足某一條件;
  • TP95,top percent 95,即 95% 的數(shù)據(jù)都滿足某一條件;
  • TP99,top percent 99,即 99% 的數(shù)據(jù)都滿足某一條件;

我們舉一個(gè)例子,我們要統(tǒng)計(jì)網(wǎng)站一分鐘之內(nèi)的的響應(yīng)時(shí)間的TP90,正常的處理邏輯就是把這一分鐘之內(nèi)所有的網(wǎng)站的響應(yīng)時(shí)間從小到大排序,然后計(jì)算出總條數(shù)count,然后計(jì)算出排名在90%的響應(yīng)時(shí)間是多少(count*0.9),就是我們要的值。

自定義聚合函數(shù)

這個(gè)需求很明顯就是一個(gè)使用聚合函數(shù)來做的案例,F(xiàn)link中提供了大量的聚合函數(shù),比如count,max,min等等,但是對(duì)于這個(gè)需求,卻無法滿足,所以我們需要自定義一個(gè)聚合函數(shù)來實(shí)現(xiàn)我們的需求。

在前段時(shí)間,我們聊了聊flink的聚合算子,具體可參考: flink實(shí)戰(zhàn)-聊一聊flink中的聚合算子 , 聚合算子是我們?cè)趯懘a的時(shí)候用來實(shí)現(xiàn)一個(gè)聚合功能,聚合函數(shù)其實(shí)和聚合算子類似,只不過聚合函數(shù)用于在寫sql的時(shí)候使用。

自定義聚合函數(shù)需要繼承抽象類org.apache.flink.table.functions.AggregateFunction。并實(shí)現(xiàn)下面幾個(gè)方法。

  • createAccumulator():這個(gè)方法會(huì)在一次聚合操作的開始調(diào)用一次,主要用于構(gòu)造一個(gè)Accumulator,用于存儲(chǔ)在聚合過程中的臨時(shí)對(duì)象。
  • accumulate() 這個(gè)方法,每來一條數(shù)據(jù)會(huì)調(diào)用一次這個(gè)方法,我們就在這個(gè)方法里實(shí)現(xiàn)我們的聚合函數(shù)的具體邏輯。
  • getValue() 這個(gè)方法是在聚合結(jié)束以后,對(duì)中間結(jié)果做處理,然后將結(jié)果返回,最終sql中得到的結(jié)果數(shù)據(jù)就是這個(gè)值。

實(shí)例講解

對(duì)于TP指標(biāo),正常的思路我們可以先創(chuàng)建一個(gè)臨時(shí)變量,里面有一個(gè)list,每來一個(gè)數(shù)據(jù),就放到這個(gè)list里面,在getValue方法里,進(jìn)行排序,取相應(yīng)的TP值。

但是這種思路會(huì)有一個(gè)問題,就是如果要聚合的時(shí)間范圍內(nèi),數(shù)據(jù)過多的話。就會(huì)在list存儲(chǔ)大量的數(shù)據(jù),會(huì)造成checkpoint過大,時(shí)間過長(zhǎng),最后導(dǎo)致程序失敗。得不到正確的結(jié)果。

所以我們需要換一個(gè)思路,既然最后我們想要的是一個(gè)有序列表,那么我們是不是可以把這個(gè)list結(jié)構(gòu)優(yōu)化一下,使用Treemap來存儲(chǔ),map的key就是指標(biāo),比如響應(yīng)時(shí)間。value就是對(duì)應(yīng)的指標(biāo)出現(xiàn)的次數(shù)。這樣getValue方法里,只需要將map的value值累加,就能得到總數(shù)count,然后計(jì)算出來相應(yīng)的tp值的位置position,最后我們?cè)購(gòu)念^累加map的value,直到累加結(jié)果大于相應(yīng)的位置position,則map的key即為所求。

示例如下:
我們先構(gòu)建一個(gè)source,只是隨機(jī)生成一個(gè)變量,網(wǎng)站的相應(yīng)時(shí)間response_time。

    String sql = "CREATE TABLE source (\n" +
                     " response_time INT,\n" +
                     " ts AS localtimestamp,\n" +
                     " WATERMARK FOR ts AS ts," +
                     "proctime as proctime()\n" +
                     ") WITH (\n" +
                     " 'connector' = 'datagen',\n" +
                     " 'rows-per-second'='1000',\n" +
                     " 'fields.response_time.min'='1',\n" +
                     " 'fields.response_time.max'='1000'" +
                     ")";

定義一個(gè)聚合函數(shù)用的臨時(shí)變量:

    public static class TPAccum{
        public Integer tp;
        public Map<Integer,Integer> map = new HashMap<>();
    }

實(shí)現(xiàn)自定義聚合函數(shù)類

    public static class TP extends AggregateFunction<Integer,TPAccum>{

        @Override
        public TPAccum createAccumulator(){
            return new TPAccum();
        }

        @Override
        public Integer getValue(TPAccum acc){
            if (acc.map.size() == 0){
                return null;
            } else {
                Map<Integer,Integer> map = new TreeMap<>(acc.map);
                int sum = map.values().stream().reduce(0, Integer::sum);

                int tp = acc.tp;
                int responseTime = 0;
                int p = 0;
                Double d = sum * (tp / 100D);
                for (Map.Entry<Integer,Integer> entry: map.entrySet()){
                    p += entry.getValue();
                    int position = d.intValue() - 1;
                    if (p >= position){
                        responseTime = entry.getKey();
                        break;
                    }

                }
                return responseTime;
            }
        }

        public void accumulate(TPAccum acc, Integer iValue, Integer tp){
            acc.tp = tp;
            if (acc.map.containsKey(iValue)){
                acc.map.put(iValue, acc.map.get(iValue) + 1);
            } else {
                acc.map.put(iValue, 1);
            }
        }

    }

實(shí)際的查詢sql如下:

    String sqlSelect =
                "select TUMBLE_START(proctime,INTERVAL '1' SECOND)  as starttime,mytp(response_time,50) from source" +
                " group by TUMBLE(proctime,INTERVAL '1' SECOND)";

完整代碼請(qǐng)參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/function/UdafTP.java

更多內(nèi)容,歡迎關(guān)注我的公眾號(hào)【大數(shù)據(jù)技術(shù)與應(yīng)用實(shí)戰(zhàn)】

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容