Kylin 4.0 TopN實現(xiàn)原理介紹

引言

Apache Kylin 是一個開源的分布式分析引擎,提供 Hadoop 之上的 SQL 查詢接口及多維分析(OLAP)能力以支持超大規(guī)模數據。它能在亞秒內查詢巨大的數據集 。

從Kylin 1.5開始就已經加入了TopN的度量,一直到kylin 3.x,實現(xiàn)上沒有太大改變,想了解kylin3之前的TopN實現(xiàn)原理可以參考下面的文章:

https://www.infoq.cn/article/2016/08/Apache-Kylin-Top-N/?utm_source=tuicool

在2020年9月份Apache Kylin社區(qū)發(fā)布了Kylin 4.0.0-alpha版本,本文將詳細介紹 Apache Kylin 4.0.0-alpha 中TopN 的實現(xiàn)。

背景

我們先從一個典型的TopN應用場景入手,在電商平臺做數據分析的時候我們想要獲取可能會經常需要查看銷售額靠前100的賣家是哪些,SQL查詢示例如下:

SELECT kylin_sales.part_dt, seller_id

FROM kylin_sales

GROUP BY

kylin_sales.part_dt, kylin_sales.seller_id

ORDER BY SUM(kylin_sales.price) desc LIMIT 100;

在大數據量的場景下,想要求TopN的數據,如果先group by后再計算出所有的sum(price),然后再對sum(price)進行排序,這里總的計算開銷非常大的。

image

TopN介紹

通過對Kylin 3.x的TopN實現(xiàn)原理的介紹,我們知道Kylin 3及之前版本的TopN使用了Space-Saving的算法,并在此之上做了優(yōu)化,代碼實現(xiàn)可以查看org.apache.kylin.measure.topn.TopNCounter 。
Kylin 4.0繼續(xù)使用了Space-Saving的算法,并在Kylin 3.x的TopNCounter的基礎上做了優(yōu)化,不過同樣的當前TopN也是存在誤差的,這些在后面會有詳細介紹。

TopN實現(xiàn)

當前Kylin4的TopN UDAF注冊是在org.apache.kylin.engine.spark.job.CuboidAggregator#aggInternal, 代碼如下:

def aggInternal(ss: SparkSession,
                  dataSet: DataFrame,
                  dimensions: util.Set[Integer],
                  measures: util.Map[Integer, FunctionDesc],
                  isSparkSql: Boolean): DataFrame = {
      //省略
      measure.expression.toUpperCase(Locale.ROOT) match {
        //省略
        case "TOP_N" =>
          // Uses new TopN aggregate function
          // located in kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/udaf/TopN.scala
          val schema = StructType(measure.pra.map { col =>
            val dateType = col.dataType
            if (col == measure) {
              StructField(s"MEASURE_${col.columnName}", dateType)
            } else {
              StructField(s"DIMENSION_${col.columnName}", dateType)
            }
          })

          if (reuseLayout) {
            new Column(ReuseTopN(measure.returnType.precision, schema, columns.head.expr)
              .toAggregateExpression()).as(id.toString)
          } else {
            new Column(EncodeTopN(measure.returnType.precision, schema, columns.head.expr, columns.drop(1).map(_.expr))
              .toAggregateExpression()).as(id.toString)
          }
       //省略
        case _ =>
          max(columns.head).as(id.toString)
      }
    }.toSeq
//省略
    if (reuseLayout) {
      val columns = NSparkCubingUtil.getColumns(dimensions) ++ measureColumns(dataSet.schema, measures)
      df.select(columns: _*)
    } else {
      df
    }
  }

其實TopN最初的實現(xiàn)的在org.apache.kylin.engine.spark.job.TopNUDAF,但是可以看到目前TopN的實現(xiàn)是在org.apache.spark.sql.udaf.BaseTopN.scala,最新的實現(xiàn)主要針對舊的實現(xiàn)修復了性能問題,詳情可以查看KYLIN-4760。

Kylin 4.0的TopN是通過Spark UDAF的方式實現(xiàn)的,以下是實現(xiàn)類接口之間的關系,可以看到最終實現(xiàn)的是BaseTopN,繼承的是TypedImperativeAggregate。然后BaseTopN又有兩個子類,分別是EncodeTopN和ReuseTopN,當從平表(FlatTable)開始構建的時候,F(xiàn)latTable中沒有構建過TopN,這里會調用EncodeTopN,再次之后從已經構建好的cuboid構建下一層cuboid的時候會調用ReuseTopN,避免重復計算,接口關系圖如下:

image

繼承TypedImperativeAggregate實現(xiàn)TopN,而不是UserDefineAggregateFunction主要是因為UserDefinedAggregateFunction 是把 catalyst 內部 internalRow 類型轉換為了 Row 類型,然后使用用戶自己的 update 方法處理,然后TypedImperativeAggregate需要自己做序列化、反序列化處理,少了一層轉換。

TopNCounter介紹

前面提到Space-Saving算法是在TopNCounter中實現(xiàn)的,此處我們對TopNCounter的實現(xiàn)進行一個簡要的介紹。BaseTopN對象初始化的時候會創(chuàng)建TopNCounter對象,用戶保存計算過程中符合TopN條件的行,對應于Spark UDAF的概念是aggregate buffer。update,merge,eval都是處理的TopNCounter。TopNCounter在初始化的時候需要指定容量, 大小建議為N * TopNCounter.EXTRA_SPACE_RATE, 其中N為TopN定義的大小,EXTRA_SPACE_RATE為建議額外空間調整參數,默認為10, 也就是說如果定義的topn(10,4), 那么TopNCounter的初始化大小則為10 * 10 = 100 。

TopN的處理流程可以見下圖:

image

update()主要將傳入的行通過TopNCounter.offer() 將一行的內容插入到TopNCounter對象中,merge則是對兩個經過update()操作的group進行去重合并,最后在eval()的時候調用TopNCounter.sortAndRetain()來排序和調整TopNCounter大小,最終得到聚合結果。

存儲

Kylin 4.0目前使用的是parquet進行存儲,我們定義topn(10,4), TopNCounter.EXTRA_SPACE_RATE 設置為1。cuboid中維度和度量列明的映射關系為:

0 -> seller_id

1 -> item_id

2 -> id

3 -> price

4 -> Count

5 -> TopN

如下是只有TopN和只有SUM的cuboid內容:

image

值得注意的是第二行,Count為11,但是實際上TopN列只存儲了10個值,這是因為TopNCounter的容量只有10 * EXTRA_SPACE_RATE = 10, 超過10的內容不會被存儲,這也是當前TopN存在誤差的原因所在??梢钥吹絋opN將計算的維度和group by的維度放到了一起,然后用數組的形式進行存儲。

image

對于sum度量,kylin則是直接存儲的sum后的聚合值。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容