背景
在網(wǎng)站性能測(cè)試中,我們經(jīng)常會(huì)選擇 TP50、TP95 或者 TP99 等作為性能指標(biāo)。接下來我們講講這些指標(biāo)的含義、以及在flink中如何實(shí)時(shí)統(tǒng)計(jì):
- TP50,top percent 50,即 50% 的數(shù)據(jù)都滿足某一條件;
- TP95,top percent 95,即 95% 的數(shù)據(jù)都滿足某一條件;
- TP99,top percent 99,即 99% 的數(shù)據(jù)都滿足某一條件;
我們舉一個(gè)例子,我們要統(tǒng)計(jì)網(wǎng)站一分鐘之內(nèi)的的響應(yīng)時(shí)間的TP90,正常的處理邏輯就是把這一分鐘之內(nèi)所有的網(wǎng)站的響應(yīng)時(shí)間從小到大排序,然后計(jì)算出總條數(shù)count,然后計(jì)算出排名在90%的響應(yīng)時(shí)間是多少(count*0.9),就是我們要的值。
自定義聚合函數(shù)
這個(gè)需求很明顯就是一個(gè)使用聚合函數(shù)來做的案例,F(xiàn)link中提供了大量的聚合函數(shù),比如count,max,min等等,但是對(duì)于這個(gè)需求,卻無法滿足,所以我們需要自定義一個(gè)聚合函數(shù)來實(shí)現(xiàn)我們的需求。
在前段時(shí)間,我們聊了聊flink的聚合算子,具體可參考: flink實(shí)戰(zhàn)-聊一聊flink中的聚合算子 , 聚合算子是我們?cè)趯懘a的時(shí)候用來實(shí)現(xiàn)一個(gè)聚合功能,聚合函數(shù)其實(shí)和聚合算子類似,只不過聚合函數(shù)用于在寫sql的時(shí)候使用。
自定義聚合函數(shù)需要繼承抽象類org.apache.flink.table.functions.AggregateFunction。并實(shí)現(xiàn)下面幾個(gè)方法。
- createAccumulator():這個(gè)方法會(huì)在一次聚合操作的開始調(diào)用一次,主要用于構(gòu)造一個(gè)Accumulator,用于存儲(chǔ)在聚合過程中的臨時(shí)對(duì)象。
- accumulate() 這個(gè)方法,每來一條數(shù)據(jù)會(huì)調(diào)用一次這個(gè)方法,我們就在這個(gè)方法里實(shí)現(xiàn)我們的聚合函數(shù)的具體邏輯。
- getValue() 這個(gè)方法是在聚合結(jié)束以后,對(duì)中間結(jié)果做處理,然后將結(jié)果返回,最終sql中得到的結(jié)果數(shù)據(jù)就是這個(gè)值。
實(shí)例講解
對(duì)于TP指標(biāo),正常的思路我們可以先創(chuàng)建一個(gè)臨時(shí)變量,里面有一個(gè)list,每來一個(gè)數(shù)據(jù),就放到這個(gè)list里面,在getValue方法里,進(jìn)行排序,取相應(yīng)的TP值。
但是這種思路會(huì)有一個(gè)問題,就是如果要聚合的時(shí)間范圍內(nèi),數(shù)據(jù)過多的話。就會(huì)在list存儲(chǔ)大量的數(shù)據(jù),會(huì)造成checkpoint過大,時(shí)間過長(zhǎng),最后導(dǎo)致程序失敗。得不到正確的結(jié)果。
所以我們需要換一個(gè)思路,既然最后我們想要的是一個(gè)有序列表,那么我們是不是可以把這個(gè)list結(jié)構(gòu)優(yōu)化一下,使用Treemap來存儲(chǔ),map的key就是指標(biāo),比如響應(yīng)時(shí)間。value就是對(duì)應(yīng)的指標(biāo)出現(xiàn)的次數(shù)。這樣getValue方法里,只需要將map的value值累加,就能得到總數(shù)count,然后計(jì)算出來相應(yīng)的tp值的位置position,最后我們?cè)購(gòu)念^累加map的value,直到累加結(jié)果大于相應(yīng)的位置position,則map的key即為所求。
示例如下:
我們先構(gòu)建一個(gè)source,只是隨機(jī)生成一個(gè)變量,網(wǎng)站的相應(yīng)時(shí)間response_time。
String sql = "CREATE TABLE source (\n" +
" response_time INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts," +
"proctime as proctime()\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1000',\n" +
" 'fields.response_time.min'='1',\n" +
" 'fields.response_time.max'='1000'" +
")";
定義一個(gè)聚合函數(shù)用的臨時(shí)變量:
public static class TPAccum{
public Integer tp;
public Map<Integer,Integer> map = new HashMap<>();
}
實(shí)現(xiàn)自定義聚合函數(shù)類
public static class TP extends AggregateFunction<Integer,TPAccum>{
@Override
public TPAccum createAccumulator(){
return new TPAccum();
}
@Override
public Integer getValue(TPAccum acc){
if (acc.map.size() == 0){
return null;
} else {
Map<Integer,Integer> map = new TreeMap<>(acc.map);
int sum = map.values().stream().reduce(0, Integer::sum);
int tp = acc.tp;
int responseTime = 0;
int p = 0;
Double d = sum * (tp / 100D);
for (Map.Entry<Integer,Integer> entry: map.entrySet()){
p += entry.getValue();
int position = d.intValue() - 1;
if (p >= position){
responseTime = entry.getKey();
break;
}
}
return responseTime;
}
}
public void accumulate(TPAccum acc, Integer iValue, Integer tp){
acc.tp = tp;
if (acc.map.containsKey(iValue)){
acc.map.put(iValue, acc.map.get(iValue) + 1);
} else {
acc.map.put(iValue, 1);
}
}
}
實(shí)際的查詢sql如下:
String sqlSelect =
"select TUMBLE_START(proctime,INTERVAL '1' SECOND) as starttime,mytp(response_time,50) from source" +
" group by TUMBLE(proctime,INTERVAL '1' SECOND)";
完整代碼請(qǐng)參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/function/UdafTP.java
更多內(nèi)容,歡迎關(guān)注我的公眾號(hào)【大數(shù)據(jù)技術(shù)與應(yīng)用實(shí)戰(zhàn)】