一、廣播變量
1、廣播變量的優(yōu)點(diǎn)
不需要每個(gè)task帶上一份變量副本,而是變成每個(gè)節(jié)點(diǎn)的executor存一份副本。這樣的話, 就可以讓變量產(chǎn)生的副本數(shù)量大大減少。
2、廣播變量的用法
//將mapRdd廣播后返回broadcastValue
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(mapRdd)
//獲取廣播變量的值
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
3、廣播變量的原理
初始的時(shí)候,在Driver端有一個(gè)副本數(shù)據(jù)。廣播變量后,task運(yùn)行的時(shí)候,在使用副本數(shù)據(jù)前,首先在所在本地Executor對(duì)應(yīng)的BlockManager中,嘗試獲取副本數(shù)據(jù);如果本地沒(méi)有,即從Driver端拉取副本數(shù)據(jù),并且保存在所在本地的BlockManager中;此后這個(gè)Executor上所有的task,都會(huì)直接使用本地BlockManager中的副本數(shù)據(jù)。另Executor的BlockManager除了從Driver端拉取數(shù)據(jù),也可能從其他節(jié)點(diǎn)的BlockManager中拉去副本數(shù)據(jù)。
BlockManager:負(fù)責(zé)管理某個(gè)Executor對(duì)應(yīng)的內(nèi)存和磁盤的數(shù)據(jù),嘗試本地BlockManager中招map數(shù)據(jù)。
4、優(yōu)化說(shuō)明
假設(shè)有50個(gè)Executor,共1000個(gè)task;若每個(gè)map數(shù)據(jù)10M。默認(rèn)情況下,1000個(gè)副本10M共10G數(shù)據(jù)。在集群中,通過(guò)網(wǎng)絡(luò)傳輸,耗費(fèi)10G的內(nèi)存資源;如果使用了廣播變量,50個(gè)Executor即50個(gè)副本10M共500M數(shù)據(jù)。而且Executor的BlockManager不一定都從Driver傳輸?shù)奖镜兀€可能從最近的節(jié)點(diǎn)的Executor的BlockManager中拉取數(shù)據(jù),網(wǎng)絡(luò)傳輸速度大大增加,傳輸數(shù)據(jù)大大減少。
10G/500M=20倍,極大的提高了性能。
二、代碼實(shí)例
1、準(zhǔn)備數(shù)據(jù)
//訂單數(shù)據(jù)
1001,20150710,p0001,2
1002,20150710,p0002,3
1002,20150710,p0003,3
//產(chǎn)品數(shù)據(jù)
p0001,xiaomi,1000,2
p0002,appale,1000,3
p0003,samsung,1000,4
2、代碼開(kāi)發(fā)
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkBroadCast {
def main(args: Array[String]): Unit = {
//構(gòu)造Spark程序執(zhí)行環(huán)境
val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
//如果集群運(yùn)行,則不需要設(shè)置setMaster("local[*]")
val sc = new SparkContext(conf)
//設(shè)置日志級(jí)別
sc.setLogLevel("WARN")
//創(chuàng)建RDD,讀取產(chǎn)品信息數(shù)據(jù)
//產(chǎn)品記錄樣例:p0001,xiaomi,1000,2
val productRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\prts.txt")
val productMapRdd: Array[(String, String)] = productRdd.map(x => {
(x.split(",")(0), x)
}).collect()
// productMapRdd.foreach(println)
/**
* (p0001,p0001,xiaomi,1000,2)
* (p0002,p0002,appale,1000,3)
* (p0003,p0003,samsung,1000,4)
*/
//將產(chǎn)品數(shù)據(jù)作為廣播變量
val broadcastValue: Broadcast[Array[(String, String)]] = sc.broadcast(productMapRdd)
//讀取訂單記錄:1001,20150710,p0001,2
val ordersRdd: RDD[String] = sc.textFile(path = "D:\\03 Knowledge Related\\02 Learning\\02 Bigdata\\00 kkb\\02 Projects\\IDEA_WORKSPACE\\bigdata_hadoop\\Spark_core\\src\\main\\resources\\orders.txt")
//將訂單記錄按照分區(qū)處理
val productAndOrderRdd: RDD[String] = ordersRdd.mapPartitions(eachPartition => {
//獲取產(chǎn)品廣播變量的數(shù)據(jù)并轉(zhuǎn)換為map類型,目的是通過(guò)getOrElse獲取產(chǎn)品數(shù)據(jù)
val getBroadCastMap: Map[String, String] = broadcastValue.value.toMap
//處理分區(qū)內(nèi)的訂單數(shù)據(jù)記錄
val finalStr = eachPartition.map(eachLine => {
//將每條訂單記錄按照逗號(hào)拆分,返回集合類型
val ordersGet: Array[String] = eachLine.split(",")
//產(chǎn)品的map類型,通過(guò)key(訂單的產(chǎn)品id)獲取對(duì)應(yīng)的產(chǎn)品記錄,返回產(chǎn)品數(shù)據(jù)記錄
val getProductStr: String = getBroadCastMap.getOrElse(ordersGet(2), "")
//訂單記錄拼接產(chǎn)品記錄
eachLine + "\t" + getProductStr
})
finalStr
})
productAndOrderRdd.foreach(println)
/**
* 1001,20150710,p0001,2 p0001,xiaomi,1000,2
* 1002,20150710,p0002,3 p0002,appale,1000,3
* 1002,20150710,p0003,3 p0003,samsung,1000,4
*/
//關(guān)閉Spark環(huán)境
sc.stop()
}
}
三、注意事項(xiàng)
- 能不能將一個(gè)RDD使用廣播變量廣播出去?
不能,因?yàn)镽DD是不存儲(chǔ)數(shù)據(jù)的??梢詫DD的結(jié)果廣播出去。 - 廣播變量只能在Driver端定義,不能在Executor端定義。
- 在Driver端可以修改廣播變量的值,在Executor端無(wú)法修改廣播變量的值。
- 當(dāng)Executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。
- 當(dāng)Executor端用到了Driver的變量,如果使用廣播變量在每個(gè)Executor中只有一份Driver端的變量副本。