Spark廣播變量應(yīng)用

一、廣播變量

1、廣播變量的優(yōu)點(diǎn)

不需要每個(gè)task帶上一份變量副本,而是變成每個(gè)節(jié)點(diǎn)的executor存一份副本。這樣的話, 就可以讓變量產(chǎn)生的副本數(shù)量大大減少。

2、廣播變量的用法
//將mapRdd廣播后返回broadcastValue
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(mapRdd)
//獲取廣播變量的值
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
3、廣播變量的原理

初始的時(shí)候,在Driver端有一個(gè)副本數(shù)據(jù)。廣播變量后,task運(yùn)行的時(shí)候,在使用副本數(shù)據(jù)前,首先在所在本地Executor對(duì)應(yīng)的BlockManager中,嘗試獲取副本數(shù)據(jù);如果本地沒(méi)有,即從Driver端拉取副本數(shù)據(jù),并且保存在所在本地的BlockManager中;此后這個(gè)Executor上所有的task,都會(huì)直接使用本地BlockManager中的副本數(shù)據(jù)。另Executor的BlockManager除了從Driver端拉取數(shù)據(jù),也可能從其他節(jié)點(diǎn)的BlockManager中拉去副本數(shù)據(jù)。
BlockManager:負(fù)責(zé)管理某個(gè)Executor對(duì)應(yīng)的內(nèi)存和磁盤的數(shù)據(jù),嘗試本地BlockManager中招map數(shù)據(jù)。

4、優(yōu)化說(shuō)明

假設(shè)有50個(gè)Executor,共1000個(gè)task;若每個(gè)map數(shù)據(jù)10M。默認(rèn)情況下,1000個(gè)副本10M共10G數(shù)據(jù)。在集群中,通過(guò)網(wǎng)絡(luò)傳輸,耗費(fèi)10G的內(nèi)存資源;如果使用了廣播變量,50個(gè)Executor即50個(gè)副本10M共500M數(shù)據(jù)。而且Executor的BlockManager不一定都從Driver傳輸?shù)奖镜兀€可能從最近的節(jié)點(diǎn)的Executor的BlockManager中拉取數(shù)據(jù),網(wǎng)絡(luò)傳輸速度大大增加,傳輸數(shù)據(jù)大大減少。
10G/500M=20倍,極大的提高了性能。

二、代碼實(shí)例

1、準(zhǔn)備數(shù)據(jù)
//訂單數(shù)據(jù)
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
//產(chǎn)品數(shù)據(jù)
p0001,xiaomi,1000,2
p0002,appale,1000,3
p0003,samsung,1000,4

2、代碼開(kāi)發(fā)

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkBroadCast {
  def main(args: Array[String]): Unit = {
    //構(gòu)造Spark程序執(zhí)行環(huán)境
    val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
    //如果集群運(yùn)行,則不需要設(shè)置setMaster("local[*]")
    val sc = new SparkContext(conf)
    //設(shè)置日志級(jí)別
    sc.setLogLevel("WARN")
    //創(chuàng)建RDD,讀取產(chǎn)品信息數(shù)據(jù)
    //產(chǎn)品記錄樣例:p0001,xiaomi,1000,2
    val productRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\prts.txt")
    val productMapRdd: Array[(String, String)] = productRdd.map(x => {
      (x.split(",")(0), x)
    }).collect()
//    productMapRdd.foreach(println)
    /**
     * (p0001,p0001,xiaomi,1000,2)
     * (p0002,p0002,appale,1000,3)
     * (p0003,p0003,samsung,1000,4)
     */
    //將產(chǎn)品數(shù)據(jù)作為廣播變量
    val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(productMapRdd)

    //讀取訂單記錄:1001,20150710,p0001,2
    val ordersRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\orders.txt")
    //將訂單記錄按照分區(qū)處理
    val productAndOrderRdd: RDD[String] = ordersRdd.mapPartitions(eachPartition => {
      //獲取產(chǎn)品廣播變量的數(shù)據(jù)并轉(zhuǎn)換為map類型,目的是通過(guò)getOrElse獲取產(chǎn)品數(shù)據(jù)
      val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
      //處理分區(qū)內(nèi)的訂單數(shù)據(jù)記錄
      val finalStr = eachPartition.map(eachLine => {
        //將每條訂單記錄按照逗號(hào)拆分,返回集合類型
        val ordersGet: Array[String] = eachLine.split(",")
        //產(chǎn)品的map類型,通過(guò)key(訂單的產(chǎn)品id)獲取對(duì)應(yīng)的產(chǎn)品記錄,返回產(chǎn)品數(shù)據(jù)記錄
        val getProductStr: String = getBroadCastMap.getOrElse(ordersGet(2), "")
        //訂單記錄拼接產(chǎn)品記錄
        eachLine + "\t" + getProductStr
      })
      finalStr
    })
    productAndOrderRdd.foreach(println)

    /**
     * 1001,20150710,p0001,2    p0001,xiaomi,1000,2
     * 1002,20150710,p0002,3    p0002,appale,1000,3
     * 1002,20150710,p0003,3    p0003,samsung,1000,4
     */


    //關(guān)閉Spark環(huán)境
    sc.stop()
  }

}

三、注意事項(xiàng)

  • 能不能將一個(gè)RDD使用廣播變量廣播出去?
    不能,因?yàn)镽DD是不存儲(chǔ)數(shù)據(jù)的??梢詫DD的結(jié)果廣播出去。
  • 廣播變量只能在Driver端定義,不能在Executor端定義。
  • 在Driver端可以修改廣播變量的值,在Executor端無(wú)法修改廣播變量的值。
  • 當(dāng)Executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。
  • 當(dāng)Executor端用到了Driver的變量,如果使用廣播變量在每個(gè)Executor中只有一份Driver端的變量副本。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、背景 舉例來(lái)說(shuō),(雖然是舉例,但是基本都是用我們實(shí)際在企業(yè)中用的生產(chǎn)環(huán)境中的配置和經(jīng)驗(yàn)來(lái)說(shuō)明的)。50個(gè)exe...
    文子軒閱讀 1,343評(píng)論 0 1
  • 1.分配更多的資源 -- 性能調(diào)優(yōu)的王道 真實(shí)項(xiàng)目里的腳本: bin/spark-submit \ --c...
    evan_355e閱讀 2,085評(píng)論 0 0
  • 在默認(rèn)情況下,當(dāng)Spark在集群的多個(gè)不同節(jié)點(diǎn)的多個(gè)任務(wù)上并行運(yùn)行一個(gè)函數(shù)時(shí),它會(huì)把函數(shù)中涉及到的每個(gè)變量,在每個(gè)...
    tracy_668閱讀 339評(píng)論 0 1
  • 概述本文介紹spark中Broadcast Variables的實(shí)現(xiàn)原理。 基本概念在spark中廣播變量屬于共享...
    達(dá)微閱讀 958評(píng)論 0 0
  • 廣播數(shù)據(jù)變量 ? 在App中經(jīng)常會(huì)用到List、MaP等變量。如果不適用廣播變量,默認(rèn)每個(gè)task都會(huì)拉取一份...
    孤單的阿怪閱讀 1,624評(píng)論 0 0

友情鏈接更多精彩內(nèi)容