筆記匯總

Hive Join

  1. common join
    如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join.整個過程包含Map、Shuffle、Reduce階段。
  • Map階段:讀取源表的數(shù)據(jù),Map輸出時候以Join on條件中的列為key,如果Join有多個關聯(lián)鍵,則以這些關聯(lián)鍵的組合作為key;Map輸出的value為join之后所關心的列即(select或者where中需要用到的);同時在value中還會包含表的Tag信息,用于標明此value對應哪個表;
  • shuffle階段:根據(jù)key的值進行hash,并將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位于同一個reduce中。
  • reduce階段:通過Tag來判斷每一個value是來自table1還是table2,在內(nèi)部分成2組,做集合笛卡爾乘積。
  • 缺點:shuffle的網(wǎng)絡傳輸和排序性能很低,reduce 端對2個集合做乘積計算,很耗內(nèi)存,容易導致OOM。
  1. map join
    在map 端進行join,其原理是broadcast join,即把小表作為一個完整的驅(qū)動表來進行join操作。通常情況下,要連接的各個表里面的數(shù)據(jù)會分布在不同的Map中進行處理。即同一個Key對應的Value可能存在不同的Map中。這樣就必須等到 Reduce中去連接。要使MapJoin能夠順利進行,那就必須滿足這樣的條件:除了一份表的數(shù)據(jù)分布在不同的Map中外,其他連接的表的數(shù)據(jù)必須在每 個Map中有完整的拷貝。MAPJION會把小表全部讀入內(nèi)存中,在map階段直接拿另外一個表的數(shù)據(jù)和內(nèi)存中表數(shù)據(jù)做匹配,由于在map是進行了join操作,省去了reduce運行的效率也會高很多。
  • 首先會啟動一個Local Task(在客戶端本地執(zhí)行的Task),負責掃描小表b的數(shù)據(jù),將其轉換成一個HashTable的數(shù)據(jù)結構,并寫入本地的文件中,之后將該文件加載到DistributeCache中(使用靜態(tài)方法DistributedCache.addCacheFile()指定要復制的文件,它的參數(shù)是文件的URI)。
  • 接下來是Task B,該任務是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據(jù)a的每一條記錄去和DistributeCache中b表對應的HashTable關聯(lián),并直接輸出結果。

Spark Join

spark提供了三種join實現(xiàn):hash join,sort merge join以及broadcast join。

  1. hash join:
    通過分區(qū)的形式將大批量的數(shù)據(jù)通過hash劃分成n份較小的數(shù)據(jù)集進行并行計算。
  • 對兩張表分別按照join keys進行重分區(qū),即shuffle,目的是為了讓有相同join keys值的記錄分到對應的分區(qū)中
  • 對對應分區(qū)中的數(shù)據(jù)進行join,此處先將小表分區(qū)構造為一張hash表,然后根據(jù)大表分區(qū)中記錄的join keys值拿出來進行匹配
  • 總結:要將來自buildIter的記錄放到hash表中,那么每個分區(qū)來自buildIter的記錄不能太大,否則就存不下,默認情況下hash join的實現(xiàn)是關閉狀態(tài)。buildIter總體估計大小以及分區(qū)后的大小要超過spark.sql.autoBroadcastJoinThreshold設定的值,即不滿足broadcast join條件

2、sort join:
hash join對于實現(xiàn)大小表比較合適,但是兩個表都非常大時,對內(nèi)存計算造成很大的壓力。

  • 實現(xiàn)方式:不需要將一側數(shù)據(jù)全部加載后再進行hash join,但需要在join前將數(shù)據(jù)排序
  • 在shuffle read階段,分別對streamIter和buildIter進行merge sort,在遍歷streamIter時,對于每條記錄,都采用順序查找的方式從buildIter查找對應的記錄,由于兩個表都是排序的,每次處理完streamIter的一條記錄后,對于streamIter的下一條記錄,只需從buildIter中上一次查找結束的位置開始查找,所以說每次在buildIter中查找不必重頭開始。

3、broadCast join:
Broadcast不會內(nèi)存溢出,因為數(shù)據(jù)保存級別StoreageLevel是MEMORY_AND_DISK模式

  • 設計思想:避免大量的shuffle。若buildIter是一個非常小的表,其實沒必要做shuffle了,直接將buildIter廣播到每個計算節(jié)點,然后將buildIter放到hash表中。
  • 步驟:
    a. broadcast階段:將小表廣播分發(fā)到大表所在的所有主機。分發(fā)方式可以有driver分發(fā)。
    b. 在每個executor上執(zhí)行單機版hash join,小表映射,大表試探。

Full Outer Join

  • 對于fullouter,IterA和IterB同時為streamedIter和hashedIter,即先IterA=streamedIter,IterB=hashedIter進行l(wèi)eftouter,然后再用先IterB=streamedIter,IterA=hashedIter進行l(wèi)eftouter,再把兩次結果進行合并
  • 對于FullOuterJoin,如果采用HashJoin方式來實現(xiàn),代價較大,需要建立雙向的Hash表,而基于SortJoin,它的代價與其他幾種Join相差不大,因此`FullOuter默認都是基于SortJon來實現(xiàn)。

Spark RDD

彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個不可變、可分區(qū)、里面的元素可并行計算的集合。

  • 它是一組分區(qū),分區(qū)是Spark中數(shù)據(jù)集最小的單位。也就是說Spark當中數(shù)據(jù)是以分區(qū)為單位存儲的,不同的分區(qū)被存儲在不同的節(jié)點上。這也是分布式計算的基礎。
  • 它是一個應用在這個分區(qū)上的計算任務。在Spark當中數(shù)據(jù)和執(zhí)行的操作是分開的,并且Spark基于懶加載的機制,也就是在真正觸發(fā)計算的行動操作出現(xiàn)前,Spark會存儲起來哪對哪些數(shù)據(jù)執(zhí)行哪些計算。數(shù)據(jù)和計算之間的映射關系就存儲在RDD中。
  • RDD之間的依賴關系,RDD之間存在轉化關系,一個RDD可以通過轉化操作轉化成其他RDD,這些轉化操作都會被記錄下來。當部分數(shù)據(jù)丟失的時候,Spark可以通過記錄的依賴關系重新計算丟失部分的數(shù)據(jù),而不是重新計算所有數(shù)據(jù)。
  • 一個分區(qū)的方法,也就是計算分區(qū)的函數(shù)。Spark中支持基于hash的hash分區(qū)和基于范圍的range分區(qū)。
  • 一個列表,存儲每個分區(qū)的存儲位置

Spark RDD緩存

Spark RDD 是惰性求值的,而有時候希望能多次使用同一個 RDD。如果簡單地對 RDD 調(diào)用行動操作,Spark 每次都會重算 RDD 及它的依賴,這樣就會帶來太大的消耗。為了避免多次計算同一個 RDD,可以讓 Spark 對數(shù)據(jù)進行持久化。
Spark 可以使用 persist 和 cache 方法將任意 RDD 緩存到內(nèi)存、磁盤文件系統(tǒng)中。緩存是容錯的,如果一個 RDD 分片丟失,則可以通過構建它的轉換來自動重構。被緩存的 RDD 被使用時,存取速度會被大大加速。一般情況下,Executor 內(nèi)存的 60% 會分配給 cache,剩下的 40% 用來執(zhí)行任務。
cache 是 persist 的特例,將該 RDD 緩存到內(nèi)存中。persist 可以讓用戶根據(jù)需求指定一個持久化級別。

  • MEMORY_ONLY : 將 RDD 以反序列化 Java 對象的形式存儲在 JVM 中。如果內(nèi)存空間不夠,部分數(shù)據(jù)分區(qū)將不再緩存,在每次需要用到這些數(shù)據(jù)時重新進行計算。這是默認的級別。
  • MEMORY_AND_DISK : 將 RDD 以反序列化 Java 對象的形式存儲在 JVM 中。如果內(nèi)存空間不夠,將未緩存的數(shù)據(jù)分區(qū)存儲到磁盤,在需要使用這些分區(qū)時從磁盤讀取。
  • MEMORY_ONLY_SER : 將 RDD 以序列化的 Java 對象的形式進行存儲(每個分區(qū)為一個 byte 數(shù)組)。這種方式會比反序列化對象的方式節(jié)省很多空間,尤其是在使用 fast serializer時會節(jié)省更多的空間,但是在讀取時會增加 CPU 的計算負擔。
  • MEMORY_AND_DISK_SER : 類似于 MEMORY_ONLY_SER ,但是溢出的分區(qū)會存儲到磁盤,而不是在用到它們時重新計算。
  • DISK_ONLY : 只在磁盤上緩存 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 與上面的級別功能相同,只不過每個分區(qū)在集群中兩個節(jié)點上建立副本。
  • OFF_HEAP(實驗中): 類似于 MEMORY_ONLY_SER ,但是將數(shù)據(jù)存儲在 off-heap memory,這需要啟動 off-heap 內(nèi)存。

謂詞下推

謂詞下推的基本思想:將過濾表達式盡可能移動至靠近數(shù)據(jù)源的位置,以使真正執(zhí)行時能直接跳過無關的數(shù)據(jù)。
hive官網(wǎng)上給出了outer join的謂詞下推規(guī)則

  • Join(只包括left join ,right join,full join)中的謂詞如果是保留表的,則不會下推
  • Join(只包括left join ,right join,full join)之后的謂詞如果是Null Supplying tables的,則不會下推

Spark DAG&Stage

https://www.studytime.xin/article/spark-knowledge-rdd-stage.html

Spark Shuffle機制及調(diào)優(yōu)

https://blog.csdn.net/qichangjian/article/details/88039576

MR&Spark Shuffle詳解

https://mp.weixin.qq.com/s?__biz=MzUxOTU5Mjk2OA==&mid=2247485991&idx=1&sn=79c9370801739813b4a624ae6fa55d6c&chksm=f9f60740ce818e56a18f8782d21d376d027928e434f065ac2c251df09d2d4283710679364639&scene=21#wechat_redirect

Spark為什么快

Spark SQL 比 Hadoop Hive 快,是有一定條件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎還比 Spark SQL 的引擎更快。其實,關鍵還是在于 Spark 本身快。

  • 消除了冗余的 HDFS 讀寫: Hadoop 每次 shuffle 操作后,必須寫到磁盤,而 Spark 在 shuffle 后不一定落盤,可以 cache 到內(nèi)存中,以便迭代時使用。如果操作復雜,很多的 shufle 操作,那么 Hadoop 的讀寫 IO 時間會大大增加,也是 Hive 更慢的主要原因了。
  • 消除了冗余的 MapReduce 階段: Hadoop 的 shuffle 操作一定連著完整的 MapReduce 操作,冗余繁瑣。而 Spark 基于 RDD 提供了豐富的算子操作,且 reduce 操作產(chǎn)生 shuffle 數(shù)據(jù),可以緩存在內(nèi)存中。+
    JVM 的優(yōu)化: Hadoop 每次 MapReduce 操作,啟動一個 Task 便會啟動一次 JVM,基于進程的操作。而 Spark 每次 MapReduce 操作是基于線程的,只在啟動 Executor 是啟動一次 JVM,內(nèi)存的 Task 操作是在線程復用的。每次啟動 JVM 的時間可能就需要幾秒甚至十幾秒,那么當 Task 多了,這個時間 Hadoop 不知道比 Spark 慢了多少。
    記住一種反例 考慮一種極端查詢:
Select month_id, sum(sales) from T group by month_id;

這個查詢只有一次 shuffle 操作,此時,也許 Hive HQL 的運行時間也許比 Spark 還快,反正 shuffle 完了都會落一次盤,或者都不落盤。
結論 :Spark 快不是絕對的,但是絕大多數(shù),Spark 都比 Hadoop 計算要快。這主要得益于其對 mapreduce 操作的優(yōu)化以及對 JVM 使用的優(yōu)化

Spark reduce by key& group by key

  • 相同點
  1. 都作用于RDD[k,v]
  2. 都根據(jù)Key來分組聚合
  3. 默認分區(qū)數(shù)量是不變的,但都可以通過參數(shù)指定分區(qū)數(shù)量
  • 不同點
  1. group by key默認沒有聚合函數(shù),得到的返回值是RDD[k,Iterable[V]]
  2. reduce by key必須傳聚合函數(shù),得到的返回值是RDD[k,聚合后的V]
    3.groupbykey.map()=reducebykey
  • 最重要的區(qū)別
    他們都是要經(jīng)過shuffle的,groupByKey在方法shuffle之間不會合并原樣進行shuffle,。reduceByKey進行shuffle之前會先做合并,這樣就減少了shuffle的io傳送,所以效率高一點

Spark讀取文件,分片

  1. 調(diào)用textFile方法,需要傳入文件路徑,分區(qū)數(shù)
  2. 調(diào)用hadoopFile方法,獲取Hadoop configuration并廣播,設置讀取的文件路徑,實例化Hadoop RDD
  3. HadoopRDD的getPartitions()方法,設置分片,分片規(guī)則如下:
    Math.max(minSize,Math.min(goalSize,blockSize))
  • goalSize:是根據(jù)用戶期望的分區(qū)數(shù)算出來的,每個分區(qū)的大小,總文件大小/用戶期望分區(qū)數(shù)
  • minSize :InputSplit的最小值,由配置參數(shù)mapred.min.split.size(在/conf/mapred-site.xml文件中配置)確定,默認是1(字節(jié))
  • blockSize :文件在HDFS中存儲的block大?。ㄔ?conf/hdfs-site.xml文件中配置),不同文件可能不同,默認是64MB或者128MB。

Spark統(tǒng)一內(nèi)存模型

1.Spark統(tǒng)一內(nèi)存模型
相關內(nèi)存設置參數(shù):
spark.executor.memory=8G;
spark.executor.memoryOverhead=6G;
spark.memory.fraction=0.6;
其中spark.memory.fraction不能設置太高,需要為otherememory留一些富裕內(nèi)存因為spark內(nèi)存統(tǒng)計信息收集是由延遲的,如果該值太大,且spill較重的情況下,會導致內(nèi)存釋放不及時而oom。
jvm堆內(nèi)的內(nèi)存分為四個部分:
Unified Memory:統(tǒng)一內(nèi)存,包含Storage內(nèi)存和Execution內(nèi)存,由spark.memory.fraction控制(spark2.0+默認為0.6,占可用內(nèi)存(系統(tǒng)內(nèi)存減去預留內(nèi)存)的60%,spark1.6默認0.75)
Storage內(nèi)存:主要用于rdd的緩存(比如Broadcast的數(shù)據(jù)),緩存數(shù)據(jù),由spark.storage.storageFraction控制(默認0.5,占統(tǒng)一內(nèi)存的50%)
Execution內(nèi)存:用于緩存在執(zhí)行shuffle過程中產(chǎn)生的中間數(shù)據(jù)(由1-spark.storage.storageFraction控制),用戶spark的計算,shuffle,sort,aggregation這些計算會用到的內(nèi)存
Storage內(nèi)存和Execution內(nèi)存之間存在動態(tài)占用機制,若己方不足對方空余則可占用對方,Execution內(nèi)存被對方占用后可強制回收
Other Memory:其它,默認占可用內(nèi)存的40%,用于spark內(nèi)部的一些元數(shù)據(jù),用戶的數(shù)據(jù)結構,防止在稀疏和異常大的記錄的情況下出現(xiàn)對內(nèi)存估計不足導致oom時的內(nèi)存緩沖
reservedMemory:預留內(nèi)存300M,用于保障spark的正常運行

execution內(nèi)存和storage內(nèi)存動態(tài)占用機制的理解:
1.不適用緩存(storage)的應用程序可以將整個空間用于執(zhí)行(execution),從而避免不必要的磁盤溢寫
2.storage曾經(jīng)想execution借用了空間,它緩存的數(shù)據(jù)可能非常的多,然后execution又不需要那么大的空間,假設現(xiàn)在storage占了80%,execution占了20%,然后當execution空間不足時,execution會向內(nèi)存管理器發(fā)信號把storage曾經(jīng)占用的超過50%數(shù)據(jù)的那部分強制擠掉(注意:drop后數(shù)據(jù)會不會丟失主要是看你在程序設置的storage_level來決定你是Drop到哪里,可能是
drop到磁盤)
3.execution空間不足的情況下,除了選擇向storage借用空間以外,也可以把一部分數(shù)據(jù)spill到磁盤,但很多時候基于性能調(diào)優(yōu)的考慮不想把數(shù)據(jù)溢寫到磁盤,會優(yōu)先選擇向storage借空間。如果此時storage實際占用不足50%,則會借空間給execution。但當storage發(fā)現(xiàn)自己空間不足時(指不能放下一個完整的block),只能等execution釋放空間。

Flink相關問題

https://www.cnblogs.com/qiu-hua/p/13767131.html

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容