Spark map-side-join 關聯(lián)優(yōu)化

將多份數據進行關聯(lián)是數據處理過程中非常普遍的用法,不過在分布式計算系統(tǒng)中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數據根據 key 發(fā)送到所有的 reduce 分區(qū)中去,也就是 shuffle 的過程。造成大量的網絡以及磁盤IO消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。

如果其中有張表較小的話,我們則可以自己實現(xiàn)在 map 端實現(xiàn)數據關聯(lián),跳過大量數據進行 shuffle 的過程,運行時間得到大量縮短,根據不同數據可能會有幾倍到數十倍的性能提升。

下文將會以一個 demo 進行說明。

何時使用

在海量數據中匹配少量特定數據

原理

以前寫過一篇關于spark-sql中利用broadcast join進行優(yōu)化的文章,原理與那篇文章相同,這里重新畫了圖。

sparkSql broadcast join

reduce-side-join 的缺陷在于會將key相同的數據發(fā)送到同一個partition中進行運算,大數據集的傳輸需要長時間的IO,同時任務并發(fā)度收到限制,還可能造成數據傾斜。

reduce-side-join

reduce-side-join

map-side-join

map-side-join

代碼說明

數據1(個別人口信息):

身份證 姓名 ...
110   lsw 
222   yyy

數據2(全國學生信息):

身份證 學校名稱 學號 ...         
110   s1      211
111   s2      222
112   s3      233
113   s2      244

期望得到的數據 :

身份證 姓名 學校名稱
110 lsw s1

將少量的數據轉化為Map進行廣播,廣播會將此 Map 發(fā)送到每個節(jié)點中,如果不進行廣播,每個task執(zhí)行時都會去獲取該Map數據,造成了性能浪費。

val people_info = sc.parallelize(Array(("110","lsw"),("222","yyy"))).collectAsMap()
val people_bc = sc.broadcast(people_info)

對大數據進行遍歷,使用mapPartition而不是map,因為mapPartition是在每個partition中進行操作,因此可以減少遍歷時新建broadCastMap.value對象的空間消耗,同時匹配不到的數據也不會返回()。

val res = student_all.mapPartitions(iter =>{
    val stuMap = people_bc.value
    val arrayBuffer = ArrayBuffer[(String,String,String)]()
    iter.foreach{case (idCard,school,sno) =>{
        if(stuMap.contains(idCard)){
        arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
    }
    }}
    arrayBuffer.iterator
})

也可以使用 for 的守衛(wèi)機制來實現(xiàn)上述代碼

val res1 = student_all.mapPartitions(iter => {
    val stuMap = people_bc.value
    for{
        (idCard, school, sno) <- iter
        if(stuMap.contains(idCard))
        } yield (idCard, stuMap.getOrElse(idCard,""),school)
})

完整代碼

import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.mutable.ArrayBuffer

object joinTest extends App{

  val conf = new SparkConf().setMaster("local[2]").setAppName("test")
  val sc = new SparkContext(conf)

  /**
   * map-side-join
   * 取出小表中出現(xiàn)的用戶與大表關聯(lián)后取出所需要的信息
   * */
  //部分人信息(身份證,姓名)
  val people_info = sc.parallelize(Array(("110","lsw"),("222","yyy"))).collectAsMap()
  //全國的學生詳細信息(身份證,學校名稱,學號...)
  val student_all = sc.parallelize(Array(("110","s1","211"),
                                              ("111","s2","222"),
                                              ("112","s3","233"),
                                              ("113","s2","244")))

  //將需要關聯(lián)的小表進行關聯(lián)
  val people_bc = sc.broadcast(people_info)

  /**
   * 使用mapPartition而不是用map,減少創(chuàng)建broadCastMap.value的空間消耗
   * 同時匹配不到的數據也不需要返回()
   * */
  val res = student_all.mapPartitions(iter =>{
    val stuMap = people_bc.value
    val arrayBuffer = ArrayBuffer[(String,String,String)]()
    iter.foreach{case (idCard,school,sno) =>{
      if(stuMap.contains(idCard)){
        arrayBuffer.+= ((idCard, stuMap.getOrElse(idCard,""),school))
      }
    }}
    arrayBuffer.iterator
  })

  /**
   * 使用另一種方式實現(xiàn)
   * 使用for的守衛(wèi)
   * */
  val res1 = student_all.mapPartitions(iter => {
    val stuMap = people_bc.value
    for{
      (idCard, school, sno) <- iter
      if(stuMap.contains(idCard))
    } yield (idCard, stuMap.getOrElse(idCard,""),school)
  })

  res.foreach(println)

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容