Spark SQL 多列聚合的幾種方法

一、引子

項(xiàng)目中遇到這樣一張表:user

Sid id id_Type tag
s_1 a@qq.com email 性別:男
s_1 a@qq.com email 年齡:12
s_1 13866660000 phone 會(huì)員:是
s_2 b@qq.com email 性別:男

要求對(duì)這個(gè)表按照sid 進(jìn)行聚合,將所有的id聚合成一個(gè)json,所有的tag聚合成一個(gè)json。
在hive和Spark中,對(duì)tag的聚合相對(duì)簡(jiǎn)單,用聚合函數(shù)collect_list 或者collect_set(hive sql 聚合函數(shù))或者直接基于Spark算子計(jì)算。

二、思路

id和id_Type 如何聚合呢?有以下幾種方案:

  1. 將id,idType 使用特殊字符拼接成字符串,看待成一列聚合;
  2. 同上,也是拼接成字符串,但使用Json保留原始數(shù)據(jù)結(jié)構(gòu);
  3. 將id,idType 通過nestRow 構(gòu)造成Row 結(jié)構(gòu)。
  4. 轉(zhuǎn)換成rdd,基于rdd算子聚合和Json化

對(duì)于方法1:如果目標(biāo)json 結(jié)構(gòu)要求 按照
{ "idType":
[
{"11111111"},
{"2222222"}
]
}
這種格式存儲(chǔ)當(dāng)id中存在特殊字符時(shí),聚合后的結(jié)果如何切割是個(gè)棘手的問題,但同時(shí)這種方式處理簡(jiǎn)單,且節(jié)省計(jì)算時(shí)內(nèi)存空間。
對(duì)于方法2:對(duì)比方法1,該方法不存在切割字符串的問題,但在聚合時(shí),由于Json串本身的格式,存在存儲(chǔ)空間的浪費(fèi)
對(duì)于方法3:目前發(fā)現(xiàn)Hive和Spark SQL 都還不支持嵌套的數(shù)據(jù)類型。對(duì)于arrary[Row] 這種數(shù)據(jù)結(jié)構(gòu),目前的版本(spark 2.2) 不支持。
對(duì)于方法4:會(huì)在Spark SQL和rdd 操作做對(duì)比,此次不詳述

三、實(shí)現(xiàn)

package org.hhl.example
import org.apache.spark.sql.functions.{collect_list, collect_set}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import scala.collection.JavaConverters._
/**
  * Created by huanghl4 on 2017/11/6.
  */
object SparkSQL {
  // 獲取SparkSession, spark 操作得入口
  val spark = SparkSession.builder()
    .appName(s"${this.getClass.getSimpleName}")
    .enableHiveSupport().getOrCreate()
  // 通過字符串拼接,實(shí)現(xiàn)多列聚合
  def multiColumnAggWithConcatStr = {
    // 拼接
    val data = spark.sql("select sid,id,idType,tag from hive.user").as[UserTag].map(x=> (x.sid,x.id + "|" + x.idType,x.tag)).toDF("sid","vid","tag")
    // 或
    //val data = spark.sql("select sid,concat(id,'|',idType),tag from hive.user").map(x=> (x.getString(0),x.getString(1),x.getString(2))
    // 聚合, 聚合函數(shù)必須導(dǎo)入org.apache.spark.sql.functions._
    import org.apache.spark.sql.functions._
    val dataAgg = data
      .groupBy("sid")
      .agg(
        collect_set("vid") as "ids",
        collect_list("tag") as "tags"
      ).select("sid","ids","tags").map(x =>{
      val sid = x.getString(0)
      val ids = x.getList[String](1).asScala.toList
      val tag = x.getList[String](2).asScala.toList
      (sid,strToJson(ids),listToJson(tag))
    }).toDF("sid","ids","tags")
    // 數(shù)據(jù)傳輸?shù)紼lasticSearch
    saveToES(dataAgg)
  }
  //通過Json實(shí)現(xiàn)多列聚合
  def multiColumnAggWithJson = {
    val data = spark.sql("select sid,id,idType,tag from hive.user").as[UserTag].map(x=>
      (x.sid,listToJson(List(x.id,x.idType)),x.tag))
    val dataAgg = data
      .groupBy("sid")
      .agg(
        collect_set("vid") as "ids",
        collect_list("tag") as "tags"
      ).select("sid","ids","tags").map(x =>{
      val sid = x.getString(0)
      val ids = x.getList[String](1).asScala.toList
      val tag = x.getList[String](2).asScala.toList
      (sid,strJsonToJson(ids),listToJson(tag))
    }).toDF("sid","ids","tags")
    // 數(shù)據(jù)傳輸?shù)紼lasticSearch
    saveToES(dataAgg)
  }

  type strList= List[String]

  def strToJson(ids:strList):String = {
    // 構(gòu)造ids 的Json 結(jié)構(gòu)
     val id = ids.map(x=>{
       val vid = x.split("\\|")
       (vid(0),vid(1))
     }).groupBy(_._2).map(x=>(x._1,x._2.map(_._1)))
    val json = id.map{x =>(
      x._1-> x._2
    )}
    compact(render(json))
  }
  def strJsonToJson(ids:strList):String = {
    // 構(gòu)造ids 的Json 結(jié)構(gòu)
    val id = ids.map(x=>{
      val vid = jsonToList(x)
      (vid(0),vid(1))
    }).groupBy(_._2).map(x=>(x._1,x._2.map(_._1)))
    val json = id.map{x =>(
      x._1-> x._2
      )}
    compact(render(json))
  }
  def listToJson(l:strList):String = compact(render(l))
  def jsonToList(str:String):strList = {
    implicit val formats = DefaultFormats
    val json = parse(str)
    json.extract[strList]
  }
  def saveToES(df:DataFrame) = {
  }
  case class UserTag(sid:String,id:String,idType:String,tag:String)
}

四、總結(jié)

  1. 多列聚合可采用字符串拼接或者Json化后再聚合
  2. 字符串拼接難點(diǎn)在于需判斷數(shù)據(jù)中是否可能存在拼接字符;Json 化的難點(diǎn)在于聚合時(shí)由于數(shù)據(jù)量巨大,有可能帶來數(shù)據(jù)傾斜問題,且處理起來較為復(fù)雜。
    可參考GitHub上實(shí)現(xiàn):https://github.com/Smallhi/example/blob/master/src/main/scala/org/hhl/example/SparkSQL.scala
    如有問題聯(lián)系:huanghl0817@gmail.com
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容