將多份數據進行關聯(lián)是數據處理過程中非常普遍的用法,不過在分布式計算系統(tǒng)中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數據根據 key 發(fā)送到所有的 reduce 分區(qū)中去,也就是 shuffle 的過程。造成大量的網絡以及磁盤IO消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。
如果其中有張表較小的話,我們則可以自己實現(xiàn)在 map 端實現(xiàn)數據關聯(lián),跳過大量數據進行 shuffle 的過程,運行時間得到大量縮短,根據不同數據可能會有幾倍到數十倍的性能提升。
下文將會以一個 demo 進行說明。
何時使用
在海量數據中匹配少量特定數據
原理
以前寫過一篇關于spark-sql中利用broadcast join進行優(yōu)化的文章,原理與那篇文章相同,這里重新畫了圖。
reduce-side-join 的缺陷在于會將key相同的數據發(fā)送到同一個partition中進行運算,大數據集的傳輸需要長時間的IO,同時任務并發(fā)度收到限制,還可能造成數據傾斜。
reduce-side-join
map-side-join
代碼說明
數據1(個別人口信息):
身份證 姓名 ...
110 lsw
222 yyy
數據2(全國學生信息):
身份證 學校名稱 學號 ...
110 s1 211
111 s2 222
112 s3 233
113 s2 244
期望得到的數據 :
身份證 姓名 學校名稱
110 lsw s1
將少量的數據轉化為Map進行廣播,廣播會將此 Map 發(fā)送到每個節(jié)點中,如果不進行廣播,每個task執(zhí)行時都會去獲取該Map數據,造成了性能浪費。
val people_info = sc.parallelize(Array(("110","lsw"),("222","yyy"))).collectAsMap()
val people_bc = sc.broadcast(people_info)
對大數據進行遍歷,使用mapPartition而不是map,因為mapPartition是在每個partition中進行操作,因此可以減少遍歷時新建broadCastMap.value對象的空間消耗,同時匹配不到的數據也不會返回()。
val res = student_all.mapPartitions(iter =>{
val stuMap = people_bc.value
val arrayBuffer = ArrayBuffer[(String,String,String)]()
iter.foreach{case (idCard,school,sno) =>{
if(stuMap.contains(idCard)){
arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
}
}}
arrayBuffer.iterator
})
也可以使用 for 的守衛(wèi)機制來實現(xiàn)上述代碼
val res1 = student_all.mapPartitions(iter => {
val stuMap = people_bc.value
for{
(idCard, school, sno) <- iter
if(stuMap.contains(idCard))
} yield (idCard, stuMap.getOrElse(idCard,""),school)
})
完整代碼
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer
object joinTest extends App{
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
/**
* map-side-join
* 取出小表中出現(xiàn)的用戶與大表關聯(lián)后取出所需要的信息
* */
//部分人信息(身份證,姓名)
val people_info = sc.parallelize(Array(("110","lsw"),("222","yyy"))).collectAsMap()
//全國的學生詳細信息(身份證,學校名稱,學號...)
val student_all = sc.parallelize(Array(("110","s1","211"),
("111","s2","222"),
("112","s3","233"),
("113","s2","244")))
//將需要關聯(lián)的小表進行關聯(lián)
val people_bc = sc.broadcast(people_info)
/**
* 使用mapPartition而不是用map,減少創(chuàng)建broadCastMap.value的空間消耗
* 同時匹配不到的數據也不需要返回()
* */
val res = student_all.mapPartitions(iter =>{
val stuMap = people_bc.value
val arrayBuffer = ArrayBuffer[(String,String,String)]()
iter.foreach{case (idCard,school,sno) =>{
if(stuMap.contains(idCard)){
arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
}
}}
arrayBuffer.iterator
})
/**
* 使用另一種方式實現(xiàn)
* 使用for的守衛(wèi)
* */
val res1 = student_all.mapPartitions(iter => {
val stuMap = people_bc.value
for{
(idCard, school, sno) <- iter
if(stuMap.contains(idCard))
} yield (idCard, stuMap.getOrElse(idCard,""),school)
})
res.foreach(println)