一、引子
項(xiàng)目中遇到這樣一張表:user
| Sid | id | id_Type | tag |
|---|---|---|---|
| s_1 | a@qq.com | 性別:男 | |
| s_1 | a@qq.com | 年齡:12 | |
| s_1 | 13866660000 | phone | 會(huì)員:是 |
| s_2 | b@qq.com | 性別:男 |
要求對(duì)這個(gè)表按照sid 進(jìn)行聚合,將所有的id聚合成一個(gè)json,所有的tag聚合成一個(gè)json。
在hive和Spark中,對(duì)tag的聚合相對(duì)簡(jiǎn)單,用聚合函數(shù)collect_list 或者collect_set(hive sql 聚合函數(shù))或者直接基于Spark算子計(jì)算。
二、思路
id和id_Type 如何聚合呢?有以下幾種方案:
1. 將id,idType 使用特殊字符拼接成字符串,看待成一列聚合;
2. 同上,也是拼接成字符串,但使用Json保留原始數(shù)據(jù)結(jié)構(gòu);
3. 將id,idType 通過nestRow 構(gòu)造成Row 結(jié)構(gòu)。
4. 轉(zhuǎn)換成rdd,基于rdd算子聚合和Json化
對(duì)于方法1:如果目標(biāo)json 結(jié)構(gòu)要求 按照
{ "idType":
[
{"11111111"},
{"2222222"}
]
}
這種格式存儲(chǔ)當(dāng)id中存在特殊字符時(shí),聚合后的結(jié)果如何切割是個(gè)棘手的問題,但同時(shí)這種方式處理簡(jiǎn)單,且節(jié)省計(jì)算時(shí)內(nèi)存空間。
對(duì)于方法2:對(duì)比方法1,該方法不存在切割字符串的問題,但在聚合時(shí),由于Json串本身的格式,存在存儲(chǔ)空間的浪費(fèi)
對(duì)于方法3:目前發(fā)現(xiàn)Hive和Spark SQL 都還不支持嵌套的數(shù)據(jù)類型。對(duì)于arrary[Row] 這種數(shù)據(jù)結(jié)構(gòu),目前的版本(spark 2.2) 不支持。
對(duì)于方法4:會(huì)在Spark SQL和rdd 操作做對(duì)比,此次不詳述
三、實(shí)現(xiàn)
package org.hhl.example
import org.apache.spark.sql.functions.{collect_list, collect_set}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import scala.collection.JavaConverters._
/**
* Created by huanghl4 on 2017/11/6.
*/
object SparkSQL {
// 獲取SparkSession, spark 操作得入口
val spark = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.enableHiveSupport().getOrCreate()
// 通過字符串拼接,實(shí)現(xiàn)多列聚合
def multiColumnAggWithConcatStr = {
// 拼接
val data = spark.sql("select sid,id,idType,tag from hive.user").as[UserTag].map(x=> (x.sid,x.id + "|" + x.idType,x.tag)).toDF("sid","vid","tag")
// 或
//val data = spark.sql("select sid,concat(id,'|',idType),tag from hive.user").map(x=> (x.getString(0),x.getString(1),x.getString(2))
// 聚合, 聚合函數(shù)必須導(dǎo)入org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
val dataAgg = data
.groupBy("sid")
.agg(
collect_set("vid") as "ids",
collect_list("tag") as "tags"
).select("sid","ids","tags").map(x =>{
val sid = x.getString(0)
val ids = x.getList[String](1).asScala.toList
val tag = x.getList[String](2).asScala.toList
(sid,strToJson(ids),listToJson(tag))
}).toDF("sid","ids","tags")
// 數(shù)據(jù)傳輸?shù)紼lasticSearch
saveToES(dataAgg)
}
//通過Json實(shí)現(xiàn)多列聚合
def multiColumnAggWithJson = {
val data = spark.sql("select sid,id,idType,tag from hive.user").as[UserTag].map(x=>
(x.sid,listToJson(List(x.id,x.idType)),x.tag))
val dataAgg = data
.groupBy("sid")
.agg(
collect_set("vid") as "ids",
collect_list("tag") as "tags"
).select("sid","ids","tags").map(x =>{
val sid = x.getString(0)
val ids = x.getList[String](1).asScala.toList
val tag = x.getList[String](2).asScala.toList
(sid,strJsonToJson(ids),listToJson(tag))
}).toDF("sid","ids","tags")
// 數(shù)據(jù)傳輸?shù)紼lasticSearch
saveToES(dataAgg)
}
type strList= List[String]
def strToJson(ids:strList):String = {
// 構(gòu)造ids 的Json 結(jié)構(gòu)
val id = ids.map(x=>{
val vid = x.split("\\|")
(vid(0),vid(1))
}).groupBy(_._2).map(x=>(x._1,x._2.map(_._1)))
val json = id.map{x =>(
x._1-> x._2
)}
compact(render(json))
}
def strJsonToJson(ids:strList):String = {
// 構(gòu)造ids 的Json 結(jié)構(gòu)
val id = ids.map(x=>{
val vid = jsonToList(x)
(vid(0),vid(1))
}).groupBy(_._2).map(x=>(x._1,x._2.map(_._1)))
val json = id.map{x =>(
x._1-> x._2
)}
compact(render(json))
}
def listToJson(l:strList):String = compact(render(l))
def jsonToList(str:String):strList = {
implicit val formats = DefaultFormats
val json = parse(str)
json.extract[strList]
}
def saveToES(df:DataFrame) = {
}
case class UserTag(sid:String,id:String,idType:String,tag:String)
}
四、總結(jié)
- 多列聚合可采用字符串拼接或者Json化后再聚合
- 字符串拼接難點(diǎn)在于需判斷數(shù)據(jù)中是否可能存在拼接字符;Json 化的難點(diǎn)在于聚合時(shí)由于數(shù)據(jù)量巨大,有可能帶來數(shù)據(jù)傾斜問題,且處理起來較為復(fù)雜。
可參考GitHub上實(shí)現(xiàn):https://github.com/Smallhi/example/blob/master/src/main/scala/org/hhl/example/SparkSQL.scala
如有問題聯(lián)系:huanghl0817@gmail.com