一.Spark UI 選項(xiàng)卡的組成
1.Jobs
1.1 首頁(yè)

補(bǔ)充:
- Scheduling Mode:
- application 中 task 任務(wù)的調(diào)度策略,由參數(shù) spark.scheduler.mode 來(lái)設(shè)置,可選的參數(shù)有 FAIR 和 FIFO。這與 Yarn 的資源調(diào)度策略的層級(jí)不同,Yarn 的資源調(diào)度是針對(duì)集群中不同 application 間的,而 spark scheduler mode 則是針對(duì) application 內(nèi)部 task set(stage)級(jí)別的資源分配
1.2 Event TImeline
- Event Timeline:
- 在 application 應(yīng)用運(yùn)行期間,Job 和 Exector 的增加和刪除事件進(jìn)行圖形化的展現(xiàn)。這個(gè)就是用來(lái)表示調(diào)度 Job 何時(shí)啟動(dòng)何時(shí)結(jié)束,以及 Excutor 何時(shí)加入何時(shí)移除。
-
一般用來(lái)查看資源是否充足,通過(guò)看add executor時(shí)間是否很長(zhǎng)
image.png
例子1:
image.png
例子2
image.png
1.3 Job Detail
- Stauts:展示 Job 的當(dāng)前狀態(tài)信息。
- Active Stages:正在運(yùn)行的 stages 信息,點(diǎn)擊某個(gè) stage 可進(jìn)入查看具體的 stage 信息。
- Pending Stages:排隊(duì)的 stages 信息,根據(jù)解析的 DAG 圖 stage 可并發(fā)提交運(yùn)行,而有依賴的 stage 未運(yùn)行完時(shí)則處于等待隊(duì)列中。
示例:


- input:
- 上圖中發(fā)現(xiàn)input=35.6G,表示該stage有讀表或文件的操作。
- 上面這個(gè)是input stage,有input 、
- shuffle write
- 為下游stage輸出
2. stages
- Completed Stages:
- 已完成 Stage 的基本信息。
- Skipped Stages:
- 1.因?yàn)?shuffle 的落地文件如果還有對(duì)應(yīng) RDD 使用,它就不會(huì)被垃圾回收掉,stage 發(fā)現(xiàn)數(shù)據(jù)已經(jīng)在磁盤(pán)或內(nèi)存中了,那么就不會(huì)重新計(jì)算了。
- 如果發(fā)現(xiàn)下游fetch數(shù)據(jù)失敗了,此時(shí)需要通過(guò)血緣關(guān)系重新計(jì)算,那么上游的stage如果還在內(nèi)存中,那么就不用跑了
- 2.在將 Stage 分解成 TaskSet 的時(shí)候,如果一個(gè) RDD 已經(jīng)Cache 到了 BlockManager(BlockManager 是 spark 管理自己的存儲(chǔ)的組件,包括 memory 和 disk,比如RDD-Cache、 Shuffle-output、broadcast 都由它管理),則這個(gè) RDD 對(duì)應(yīng)的所有祖宗 Stage 都不會(huì)分解成TaskSet 進(jìn)行執(zhí)行,所以這些祖宗 Stage 和它們對(duì)應(yīng)的 Task 就會(huì)在 Spark UI 上顯示為 skipped。
- 因?yàn)閷tage cache到內(nèi)存,那么通過(guò)血緣關(guān)系,就不需要從源頭重新讀數(shù)據(jù),那么這個(gè)rdd對(duì)應(yīng)的祖宗stage都可以跳過(guò)。
- 3.大多數(shù) skipped stage都是input stage或有一些shuffle read 的stage,因?yàn)槎糲ache過(guò)了,那么久可以跳過(guò)。
- 1.因?yàn)?shuffle 的落地文件如果還有對(duì)應(yīng) RDD 使用,它就不會(huì)被垃圾回收掉,stage 發(fā)現(xiàn)數(shù)據(jù)已經(jīng)在磁盤(pán)或內(nèi)存中了,那么就不會(huì)重新計(jì)算了。

運(yùn)行中的界面

2.1stage detail


總覽

- Total time across all tasks:當(dāng)前 stage 中所有 task 花費(fèi)的時(shí)間和。
- Locality Level Summary:不同本地化級(jí)別下的任務(wù)數(shù),本地化級(jí)別是指數(shù)據(jù)與計(jì)算間的關(guān)系。
- PROCESS_LOCAL 進(jìn)程本地化:task 與計(jì)算的數(shù)據(jù)在同一個(gè) Executor 中。
- NODE_LOCAL 節(jié)點(diǎn)本地化:
- 情況一:task 要計(jì)算的數(shù)據(jù)是在同一個(gè) Worker 的不同 Executor 進(jìn)程中;
- 情況二:task 要計(jì)算的數(shù)據(jù)是在同一個(gè) Worker 的磁盤(pán)上,或在 HDFS 上,恰好有 block 在同一個(gè)節(jié)點(diǎn)上。
- RACK_LOCAL 機(jī)架本地化,數(shù)據(jù)在同一機(jī)架的不同節(jié)點(diǎn)上:
- 情況一:task 計(jì)算的數(shù)據(jù)在 Worker2 的 Executor 中;
- 情況二:task 計(jì)算的數(shù)據(jù)在 Worker2 的磁盤(pán)上。
- ANY 跨機(jī)架,數(shù)據(jù)在非同一機(jī)架的網(wǎng)絡(luò)上,速度最慢。
- 補(bǔ)充:
- 一般生產(chǎn)環(huán)境中網(wǎng)絡(luò)環(huán)境比較好,本地化這個(gè)功能是關(guān)閉的,不是很有用
- 本地化的實(shí)現(xiàn)是通過(guò)等待,一個(gè)task過(guò)來(lái)后,先等10秒(配置的)看是否能獲取一個(gè)資源恰好和該task是同一個(gè)機(jī)器,如果有就提交作業(yè),沒(méi)有的話就接著等等
- 因這個(gè)等待會(huì)比較耗時(shí),所以一般都是讓其走網(wǎng)絡(luò),不強(qiáng)調(diào)本地化。
- 一般生產(chǎn)環(huán)境中網(wǎng)絡(luò)環(huán)境比較好,本地化這個(gè)功能是關(guān)閉的,不是很有用
- Input Size/Records:輸入的數(shù)據(jù)字節(jié)數(shù)大小/記錄條數(shù)。
- Output Size / Records:輸出的數(shù)據(jù)字節(jié)數(shù)大小/記錄條數(shù)。
- Shuffle Write:
- 為下一個(gè)依賴的 stage 提供輸入數(shù)據(jù),shuffle 過(guò)程中通過(guò)網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)字節(jié)數(shù)/記錄條數(shù)。
- 應(yīng)該盡量減少 shuffle 的數(shù)據(jù)量及其操作次數(shù),這是 spark 任務(wù)優(yōu)化的一條基本原則。
- Shuffle read:
- 總的 shuffle 字節(jié)數(shù),包括本地節(jié)點(diǎn)和遠(yuǎn)程節(jié)點(diǎn)的數(shù)據(jù)。
- 一般第一個(gè)stage就不會(huì)有shffle read,肯定是寫(xiě),下游stage一般是讀
DAG

- DAG Visualization:
- 當(dāng)前 stage 中包含的詳細(xì)的 tranformation 操作流程圖。
- 顯示的比較初級(jí),一般也不怎么看
- Show Additional Metrics:顯示額外的一些指標(biāo)。鼠標(biāo)移上去會(huì)有相應(yīng)的解釋。
- Scheduler Delay:調(diào)度延遲時(shí)間,包含把任務(wù)從調(diào)度器輸送給 excutor,并且把任務(wù)的結(jié)果從 excutor 返回給調(diào)度器。如果調(diào)度時(shí)間比較久,則考慮降低任務(wù)的數(shù)量,并且降低任務(wù)結(jié)果大小
- Task Deserialization Time:反序列化 excutor 的任務(wù),也包含讀取廣播任務(wù)的時(shí)間
- Result Serialization Time:在 executor 上序列化task結(jié)果所花費(fèi)的時(shí)間。
- Getting Result Time:從 executor 中獲取結(jié)果的時(shí)間。
- Event Timeline: 清楚地展示在每個(gè) Executor 上各個(gè) task 的各個(gè)階段的時(shí)間統(tǒng)計(jì)信息,可以清楚地看到 task 任務(wù)時(shí)間是否有明顯傾斜,以及傾斜的時(shí)間主要是屬于哪個(gè)階段,從而有針對(duì)性的進(jìn)行優(yōu)化。
- 以上幾個(gè)點(diǎn)都不怎么看
Summary Metrics(重點(diǎn))

- Summary Metrics for XXX Completed Tasks:已完成的 Task 的指標(biāo)摘要。
- Duration:task 持續(xù)時(shí)間
- GC Time:task GC 消耗時(shí)間
- Output Write Time:輸出寫(xiě)時(shí)間
- Output Size/ Records:輸出數(shù)量大小,條數(shù)。
- Shuffle HDFS Read Time:shuffle 中間結(jié)果從 HDFS 讀取時(shí)間
- Shuffle Read Size/Records:shuffle 讀入數(shù)據(jù)大小/條數(shù)
- Shuffle Spill Time:shuffle 中間結(jié)果溢寫(xiě)時(shí)間。
- 折現(xiàn)指標(biāo)只會(huì)在有值時(shí)展示
- 主要通看 input 、shuffle write的幾個(gè)分位線的數(shù)據(jù)是否平均,是否有數(shù)據(jù)傾斜的問(wèn)題
Aggregated Metrics by Executor

- Aggregated Metrics by Executor:executor維度做聚合(看的不是很多)
- 匯總指標(biāo)。將 task 運(yùn)行的指標(biāo)信息按 Executor 做聚合后的統(tǒng)計(jì)信息,并可查看某個(gè) Excutor 上任務(wù)運(yùn)行的日志信息。這里可以看到 executor 完成 task 的情況,讀入的數(shù)據(jù)量,shufflewrite、shuffle read 數(shù)據(jù)量,根據(jù)這些指標(biāo)判斷節(jié)點(diǎn)是否健康。
Tasks

- Tasks:
- 當(dāng)前 stage 中所有任務(wù)運(yùn)行的明細(xì)信息。右邊部分都是需要關(guān)注的點(diǎn)。
- 因?yàn)橛信判蛐Ч?,一般可以通過(guò)列排序找找一些關(guān)鍵指標(biāo):
- input size 查看哪些task輸入比較多,該task也極有可能是比較慢的task
- erros 排序,查找錯(cuò)誤都在哪些機(jī)器上分布,如果都在一臺(tái)機(jī)器,則會(huì)很快定位出現(xiàn)問(wèn)題的機(jī)器
- 通過(guò) index、task id在對(duì)應(yīng) log中檢索對(duì)應(yīng)的task日志

speculative:推測(cè)執(zhí)行

因?yàn)橛型茰y(cè)執(zhí)行,所以如果發(fā)現(xiàn)task killed是正?,F(xiàn)象
3.利用 Web UI 定位問(wèn)題
查看stages

在 stage 頁(yè)面可以看到 stage 44、58 兩個(gè)執(zhí)行時(shí)間很長(zhǎng),并且 shuffle 數(shù)據(jù)量也很大。
- stage id 44 shuffle read 6.8TB,但是寫(xiě)出18TB,有一個(gè)明顯的數(shù)據(jù)膨脹(數(shù)據(jù)翻了一番多)
- stage id 58 shffle read 18T,最終output表/文件中的數(shù)據(jù)是5.4TB
- 但是也只能發(fā)現(xiàn)兩個(gè)stage shuffle數(shù)據(jù)量大,執(zhí)行時(shí)間長(zhǎng),具體的信息就看不見(jiàn)了,具體需要看詳情
查看stage id 58的詳情
Summary Metrics for n Completed Tasks(通過(guò)分位線判斷是否發(fā)生數(shù)據(jù)傾斜)

可以看出 shuffle spill 耗時(shí)最多,其他指標(biāo)都比較均勻,沒(méi)有出現(xiàn)數(shù)據(jù)傾斜。

- shuffle spill 中位線 40s,但是最大是1.2h,這說(shuō)明有一個(gè)或者幾個(gè)task在shuffle spill 上花了1.2小時(shí)
- spill 多跟內(nèi)存緊張和磁盤(pán)有關(guān),如果在不同節(jié)點(diǎn)上有幾個(gè) task spill 特別慢,且 GC 時(shí)間長(zhǎng),則可能內(nèi)存不足。
- 如果 spill 慢發(fā)生在同一個(gè)節(jié)點(diǎn)上,則有可能和節(jié)點(diǎn)運(yùn)行狀態(tài)相關(guān),如磁盤(pán)問(wèn)題。
- 同時(shí)得出一個(gè)比較關(guān)鍵的結(jié)論,該stage中的task沒(méi)有發(fā)生數(shù)據(jù)傾斜
- 沒(méi)有 shuffle write、input
- shuffle read、output 中位線觀察都是很均勻
- 通過(guò)讀或者寫(xiě)的幾個(gè)中位線觀察是否有數(shù)據(jù)傾斜
Tasks

按照時(shí)間倒敘,Detail stage 頁(yè)面可以找到 task 執(zhí)行時(shí)間很長(zhǎng),shuffle spill 數(shù)據(jù)很大,這里產(chǎn)生了大量的 IO。
stage 58 總結(jié)
- 此時(shí)可以得出一個(gè)結(jié)論:
- stage id 58 沒(méi)有發(fā)生數(shù)據(jù)傾斜,但是shuffle read 18TB,output 5TB,數(shù)據(jù)量實(shí)在太大,在10000個(gè)task中跑批是比較慢的,總共花了3.4小時(shí),甚至有一些task要跑 1.8h、1.6h、1.4h等,都是小時(shí)級(jí)別的。
- 解決方法:
- 在資源允許的情況下,可以進(jìn)一步擴(kuò)大并行度,增加 Executor 來(lái)進(jìn)行優(yōu)化。后續(xù)并行度改為10000->15000,觀察時(shí)間有明顯提升。
查看 stage 44 的詳情
Summary Metrics for n Completed Tasks(通過(guò)分位線判斷是否發(fā)生數(shù)據(jù)傾斜)

Stage44 看出 shuffle 數(shù)據(jù)并不均勻,發(fā)生來(lái)數(shù)據(jù)傾斜,觀察依據(jù):
- shuffle read 中位線只有600MB,90線、95線、99線相差不多,但是max線居然有8.2GB,比中位線大了10倍多
- shuffle write 中位線只有1G多,但是max居然有15G多,比中位線大了將近15倍,比99線大了3倍多
- duration 大部分執(zhí)行時(shí)間都是分鐘級(jí)別,但是max是小時(shí)級(jí)別
接下來(lái)要通過(guò)sql ui觀察那部分執(zhí)行邏輯發(fā)生傾斜
sql ui中觀察
通過(guò)sql 標(biāo)簽 定位

stage no 44 的執(zhí)行部分:

我們可以根據(jù) SQL 執(zhí)行圖上找到 stage44,并查到發(fā)生 shuffle 操作的 join key(item_id),順著其上游可找到其中一個(gè)表 device_item。
找sql代碼

接著可以從任務(wù) SQL 中找到問(wèn)題出現(xiàn)的部分。定位到了問(wèn)題所在,就可以通過(guò)一些針對(duì)數(shù)據(jù)傾斜的優(yōu)化參數(shù)或者處理方法進(jìn)行一一實(shí)驗(yàn),并通過(guò)對(duì)比得出最優(yōu)解。
- 在新版本中可以通過(guò)AE做優(yōu)化
- 也可以讓客戶做優(yōu)化,如拆sql,加鹽,id拆分等
- 做分桶避免shuffle
- 也可以找業(yè)務(wù)確認(rèn),是否接收慢
4.Environment
介紹:
- Environment 選項(xiàng)卡提供有關(guān) Spark 應(yīng)用程序中使用的各種屬性和環(huán)境變量的信息。
- 用戶可以通過(guò)這個(gè)選項(xiàng)卡得到非常有用的各種 Spark 屬性信息,而不用去翻找屬性配置文件。通過(guò)平臺(tái)工具提交任務(wù)時(shí),其默認(rèn)參數(shù)配置可以在這里查找。
- Environment 除了展示 Spark 參數(shù)意外,有一個(gè)重要的功能是明確參數(shù)是由誰(shuí)設(shè)置的。
- 例如用戶設(shè)置了參數(shù)spark.sql.shuffle.partitions=1000, 平臺(tái)默認(rèn)參數(shù)是 spark.sql.shuffle.partitions=2000,智能優(yōu)化器根據(jù)作業(yè)運(yùn)行歷史為本次執(zhí)行設(shè)置了 spark.sql.shuffle.partitions=3000,那么最后生效的參數(shù)可以從 Environment 獲取,例如是 3000,同時(shí)還可以找出到底是誰(shuí)設(shè)置了最后生效的值。
- 如:spark.final.spark.sql.shuffle.partitions=HBO,標(biāo)注是HBO設(shè)置的
- 例如用戶設(shè)置了參數(shù)spark.sql.shuffle.partitions=1000, 平臺(tái)默認(rèn)參數(shù)是 spark.sql.shuffle.partitions=2000,智能優(yōu)化器根據(jù)作業(yè)運(yùn)行歷史為本次執(zhí)行設(shè)置了 spark.sql.shuffle.partitions=3000,那么最后生效的參數(shù)可以從 Environment 獲取,例如是 3000,同時(shí)還可以找出到底是誰(shuí)設(shè)置了最后生效的值。

5.Executor


- Executors 選項(xiàng)卡:
- 提供了關(guān)于內(nèi)存、CPU 核和其他被 Executors 使用的資源的信息。
- 這些信息在 Executor 級(jí)別和匯總級(jí)別都可以獲取到。
- 一方面通過(guò)它可以看出來(lái)每個(gè) Excutor 是否發(fā)生了數(shù)據(jù)傾斜。
- 另一方面可以具體分析目前的應(yīng)用是否產(chǎn)生了大量的 shuffle,是否可以通過(guò)增加并行度來(lái)減少 shuffle 的數(shù)據(jù)量。
- 提供了關(guān)于內(nèi)存、CPU 核和其他被 Executors 使用的資源的信息。
- Summary:
- 該 application 運(yùn)行過(guò)程中使用 Executor 的統(tǒng)計(jì)信息。
- Executors:
- 每個(gè) Excutor的詳細(xì)信息(包含 driver),可以點(diǎn)擊查看某個(gè) Executor 中任務(wù)運(yùn)行的詳細(xì)日志。
二.如何通過(guò) Spark UI 定位問(wèn)題
1.如何找 Web UI地址


2.怎么看 Spark Driver/Executor 日志

因?yàn)閑xecutor里面會(huì)跑很多task,當(dāng)task有問(wèn)題的時(shí)候,需要通過(guò)看executor log定位task日志

3.如何定位分配資源不足

如果 UI 頁(yè)面上申請(qǐng)到的 executors 數(shù)量遠(yuǎn)小于配置的數(shù)量(靜態(tài)資源:spark.executor.instances,動(dòng)態(tài)資源:spark.dynamicAllocation.maxExecutors),則表示隊(duì)列資源不充足。
- 如配置的數(shù)量是300,但是active(6),遠(yuǎn)遠(yuǎn)小于配置數(shù)量,則表示隊(duì)列資源不足。
4.如何定位Executor太少

- 如果 Active Tasks 等于或接近 Cores,則表示申請(qǐng)的資源被占滿。
- 當(dāng)任務(wù)的 task 較多時(shí),可以適當(dāng)增加 executors 數(shù)提高任務(wù)的并行度。
5.如何定位Input讀慢

tasks標(biāo)簽觀察
5.1 task很少的場(chǎng)景

- 如果一個(gè)一個(gè)stage中有大量的spill 操作,就說(shuō)明input stage 邏輯是比較復(fù)雜的
- input stage 必然有 input size列
- 現(xiàn)象:Input Stage 包含較復(fù)雜的計(jì)算,task 數(shù)不多,單個(gè) task 很慢。
- 如果input stage執(zhí)行的很慢,可以適當(dāng)增加input stage個(gè)數(shù)
- 如果 task 數(shù)不太多的情況下,可以下調(diào) spark.sql.files.maxPartitionBytes 以擴(kuò)大并發(fā)為代價(jià),縮短整體運(yùn)行時(shí)間。
5.2 task 很多的場(chǎng)景

- Input Stage 單個(gè) task 處理數(shù)據(jù)量很小,每個(gè) task 很快,但 task 數(shù)特別多,時(shí)間在調(diào)度上浪費(fèi)很多。
- 此時(shí)如果調(diào)大 spark.sql.files.maxPartitionBytes 雖然可以減少task數(shù),但參數(shù)設(shè)置會(huì)影響全局,如果要讀多個(gè)表,可能對(duì)其他 input stage 造成影響。
- 對(duì)于這種情況,可以設(shè)置 spark.datasource.splits.max。
- 在本例中,通過(guò)設(shè)置 setspark.datasource.splits.max=40000,將 task 數(shù)從 17W + 減少到 4W+,單個(gè) task 數(shù)據(jù)量增加,但總體調(diào)度時(shí)間節(jié)約,stage 時(shí)間明顯縮短。
- 即調(diào)高單個(gè)task能處理的數(shù)據(jù)量
5.6 SparkSQL Input Split 劃分原理
Partition / Split 大小取決于四個(gè)條件:
- 1. spark.sql.files.maxPartitionBytes(默認(rèn)值128MB)
- 每個(gè)rdd partition能讀多少數(shù)據(jù),一般都是通過(guò)該值直接影響并發(fā)度,具體如何生效可以看maxSplitBytes部分
- 2. spark.sql.files.openCostInBytes(默認(rèn)值4MB)
- io讀文件是有cost的,不建議小于4M開(kāi)文件
- 3. spark.default.parallelism (默認(rèn)并行度,取決于調(diào)度平臺(tái)(local standalone yarn mesos)。
- 對(duì)于Yarn,該值等于當(dāng)前可用 core 總數(shù)。在啟動(dòng) DynamicAllocation 的情況下,該值取決于啟動(dòng) Job 時(shí)的實(shí)時(shí)可用 core 的數(shù)量)
- 一般不調(diào),交給調(diào)度平臺(tái)。
- 4. 待讀取數(shù)據(jù)總量 totalBytes
Split 的最大值取決于如下公式:
- val bytesPerCore = totalBytes / defaultParallelism
- 總值/默認(rèn)分區(qū),這個(gè)一般沒(méi)法控制
- var maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
- 如像增大并發(fā)度,即增加task,那就調(diào)小maxSplitBytes
- 如果想減少并發(fā)度,那就增大maxSplitBytes
Spark 將輸入文件按每 maxSplitBytes 切分成一個(gè)個(gè) partition
補(bǔ)充:
1.spark sql如何為每個(gè)job設(shè)置executor的cpu、內(nèi)存?
spark 中是沒(méi)法為所有job定制化設(shè)置這些資源,一般需要在每個(gè)app粒度啟動(dòng)的時(shí)候設(shè)置:
executor:
- spark.executor.memory 相當(dāng)于 JVM XMX
- spark.executor.cores=4 相當(dāng)于 thread =4
driver:
- spark.driver.memory
- spark.driver.cores=4
需要每個(gè)application中去調(diào)這些參數(shù),最小單元是app這個(gè)維度,無(wú)法為每個(gè)job定制資源。
- 都有默認(rèn)值,一般放在spark安裝包的conf/spark-defaults.conf中設(shè)置
- 當(dāng)然也可以在提交sql的時(shí)候指定
一般在yarn里面,一個(gè)個(gè)spark作業(yè)就是application,一個(gè)application可以啟很多個(gè)job
三.深入理解SQL頁(yè)面
1.介紹
- Spark UI 的 SQL 頁(yè)面是非常重要的一個(gè)頁(yè)面,通過(guò)該頁(yè)面我們可以看到一個(gè)查詢計(jì)劃的執(zhí)行過(guò)程。
- 查詢計(jì)劃是了解查詢執(zhí)行細(xì)節(jié)的入口點(diǎn)。 它包含許多有用的信息,并提供有關(guān)如何執(zhí)行查詢的見(jiàn)解。
- 這非常重要,尤其是在復(fù)雜的查詢中或執(zhí)行時(shí)間過(guò)長(zhǎng)且成本高昂的情況下。根據(jù)查詢計(jì)劃中的信息,我們可能會(huì)找出效率不高的地方,并決定重寫(xiě)部分查詢以獲得更好的性能。
- 根據(jù)前面課程的內(nèi)容,我們知道 SQL 查詢計(jì)劃組織成樹(shù)結(jié)構(gòu),每個(gè)節(jié)點(diǎn)代表一個(gè)運(yùn)算符,提供有關(guān)執(zhí)行的基本細(xì)節(jié)。
2.一個(gè)例子:
sql
val questionsDF = spark.read.json("/test/sparksql/simple/data/questions-json")
val usersDF = spark.read.parquet("/test/sparksql/simple/data/users")
val res = questionsDF.filter(col("score") > 0).groupBy("user_id").agg(count("*").alias("cnt")).join(usersDF, "user_id")
spark-shell
[root@hadoop102 spark-3.2.1]# spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-29 22:38:24,941 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://hadoop102:4040
Spark context available as 'sc' (master = local[*], app id = local-1653835106685).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

建表questionsDF
scala> val questionsDF = spark.read.json("/test/sparksql/simple/data/questions-json")
questionsDF: org.apache.spark.sql.DataFrame = [accepted_answer_id: bigint, answers: bigint ... 9 more fields]
通過(guò)ui觀看,4個(gè)任務(wù)

查看stages


建表usersDF
val usersDF = spark.read.parquet("/test/sparksql/simple/data/users")
查看列信息


執(zhí)行sql
val res = questionsDF.filter(col("score") > 0).groupBy("user_id").agg(count("*").alias("cnt")).join(usersDF, "user_id")

查看plan

scala> res.queryExecution.logical
res4: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Join UsingJoin(Inner,List(user_id)) ##inner join ,join id user_id,根節(jié)點(diǎn),向下看
:- Aggregate [user_id#16L], [user_id#16L, count(1) AS cnt#58L] ##join前做聚合,聚合后有count
: +- Filter (score#13L > cast(0 as bigint)) ##聚合前 做過(guò)濾
: +- Relation [accepted_answer_id#7L,answers#8L,body#9,comments#10L,creation_date#11,question_id#12L,score#13L,tags#14,title#15,user_id#16L,views#17L] json ## 讀questionsDF表
+- Relation [user_id#29L,display_name#30,about#31,location#32,downvotes#33L,upvotes#34L,reputation#35L,views#36L] parquet ##讀usersDF 表
這只是計(jì)劃,還沒(méi)有執(zhí)行,需要觸發(fā)action,如show
觸發(fā)action,查看sql
show默認(rèn)展示20行,結(jié)果如下


此時(shí)真正執(zhí)行sql了

通過(guò)spark ui查看sql

detail
Physical Plan

每個(gè)節(jié)點(diǎn)的操作,根據(jù)計(jì)劃樹(shù)的(n)查找對(duì)應(yīng)的算子操作

SQL 查詢計(jì)劃圖像化
圖像化的具體細(xì)節(jié)可以通過(guò)上面的detail查看

詳解

兩個(gè)scan


查找stage

exchange


3.知識(shí)補(bǔ)充
WholeStageCodegen
- 在物理計(jì)劃的圖形表示中,可以看到運(yùn)算符被分組為藍(lán)色的大矩形。 這些大矩形對(duì)應(yīng)于代碼生成階段。
- 這是一個(gè)優(yōu)化功能,發(fā)生在物理規(guī)劃階段。
- 其想法是采用支持代碼生成的運(yùn)算符并將其折疊在一起,以通過(guò)消除虛函數(shù)調(diào)用來(lái)加速執(zhí)行。
- 并非所有運(yùn)算符都支持代碼生成,因此某些運(yùn)算符(例如 Exchange)不是大矩形的一部分。
- 在我們的示例中,有三個(gè) codegen stage 對(duì)應(yīng)于三個(gè)大矩形,在格式化的計(jì)劃輸出中,可以在 operator 的括號(hào)中看到 codegenstage 的 id。
- 同樣從樹(shù)中,可以判斷算子是否支持 codegen,因?yàn)槿绻С?codegen,算子前會(huì)帶 * 號(hào)。

Scan Parquet
- Scan parquet 運(yùn)算符表示從 parquet 文件格式讀取數(shù)據(jù)。
- 我們可以從圖中直接看到選擇哪些列。
- 盡管我們沒(méi)有在查詢中選擇特定字段,但優(yōu)化器中有一個(gè)ColumnPruning 規(guī)則還是會(huì)被應(yīng)用,它確保僅從源中選擇那些實(shí)際需要的列。
- 另外需要關(guān)心的是兩種過(guò)濾器:
- PartitionFilters
- PartitionFilters 是應(yīng)用于對(duì)表的分區(qū)列進(jìn)行過(guò)濾。允許跳過(guò)我們不需要的分區(qū)。所以很多時(shí)候我們需要把鼠標(biāo)移上去查看下。
- PushedFilters
- PushedFilters 是可以直接下推到 parquet 文件的字段過(guò)濾器,如果 parquet 文件按這些過(guò)濾列排序,它們會(huì)很有用。另外,parquet foot 包含row group的元數(shù)據(jù)(如統(tǒng)計(jì)信息,每個(gè) row group 的最小值和最大值)?;谶@些元信息,Spark 可以決定是否讀取該 row group。
- PartitionFilters

Filter
- Filter 算子理解起來(lái)非常直觀,它只是表示過(guò)濾條件。
- 可能不太明顯的是運(yùn)算符是如何創(chuàng)建的,因?yàn)樗ǔ2恢苯訉?duì)應(yīng)于查詢中使用的過(guò)濾條件。
- 原因是所有過(guò)濾器首先由 Catalyst 優(yōu)化器處理,該優(yōu)化器可能會(huì)修改和重新定位它們。
- 在將邏輯過(guò)濾器轉(zhuǎn)換為物理運(yùn)算符之前,有幾個(gè)規(guī)則應(yīng)用于邏輯過(guò)濾器:
- PushDownPredicates:
- 此規(guī)則將通過(guò)其他幾個(gè)運(yùn)算符將過(guò)濾器推到更靠近源的位置,但不是全部。
- 例如,下推不會(huì)穿過(guò)nondeterministic表達(dá)式/算子。
- 如果我們使用諸如 first、last、collect_set、collect_list、rand之類的函數(shù),過(guò)濾器將不會(huì)被推送通過(guò)它們,因?yàn)檫@些函數(shù)在 Spark 中是nondeterministic(不確定的,每次執(zhí)行結(jié)果不一樣)的。
- 此規(guī)則將通過(guò)其他幾個(gè)運(yùn)算符將過(guò)濾器推到更靠近源的位置,但不是全部。
- CombineFilters:
- 將兩個(gè)相鄰的運(yùn)算符組合為一個(gè)(它將來(lái)自兩個(gè)過(guò)濾器的條件收集到一個(gè)復(fù)雜條件中)。
- InferFiltersFromConstraints:
- 這個(gè)規(guī)則實(shí)際上創(chuàng)建了一個(gè)新的過(guò)濾器運(yùn)算符,例如從一個(gè)連接條件(從一個(gè)簡(jiǎn)單的內(nèi)部連接它將創(chuàng)建一個(gè)過(guò)濾條件連接鍵不為空)。
- PruneFilters:
- 刪除多余的過(guò)濾器(例如,如果過(guò)濾器總是評(píng)估為 True)。
- PushDownPredicates:

Project
- 此運(yùn)算符表示將投影(選擇)哪些列。 每次我們?cè)?DataFrame 上調(diào)用 select、withColumn 或 drop 轉(zhuǎn)換時(shí),Spark 都會(huì)將 Project 運(yùn)算符添加到邏輯計(jì)劃中,然后將其轉(zhuǎn)換為物理計(jì)劃中的對(duì)應(yīng)項(xiàng)。
- 同樣,在轉(zhuǎn)換之前對(duì)其應(yīng)用了一些優(yōu)化規(guī)則:
- ColumnPruning:修剪不需要的列以減少將要掃描的數(shù)據(jù)量。
- CollapseProject:它將相鄰的 Project 運(yùn)算符合并為一個(gè)。
- PushProjectionThroughUnion:下推 Project 到 Union 的兩側(cè)
(記住,下推的下是指往葉子結(jié)點(diǎn)推)。

上面這個(gè)物理計(jì)劃圖形的數(shù)據(jù)是從上往下流入,即最上面是葉子結(jié)點(diǎn),最下面是根節(jié)點(diǎn)。
detail中與之展現(xiàn)相反,根在上面

Exchange
- Exchange 運(yùn)算符代表 shuffle,即物理數(shù)據(jù)移動(dòng)。
- 這個(gè)操作被認(rèn)為是相當(dāng)昂貴的,因?yàn)樗ㄟ^(guò)網(wǎng)絡(luò)移動(dòng)數(shù)據(jù)。 查詢計(jì)劃中的信息還包含有關(guān)如何重新分區(qū)數(shù)據(jù)的詳細(xì)信息。 在我們的示例中,它是 hashpartitioning(user_id, 200),如下所示:

- 這意味著數(shù)據(jù)將根據(jù) user_id 列重新分區(qū)為 200 個(gè)分區(qū),所有具有相同 user_id 值的行都屬于同一個(gè)分區(qū),并將位于同一個(gè) executor 上。
- 為確保準(zhǔn)確創(chuàng)建 200 個(gè)分區(qū),Spark 計(jì)算 user_id 的哈希值并模 200。這樣做的結(jié)果有可能發(fā)生的某些分區(qū)可能為空。
- 其他 partitioning 方式還有:
- RoundRobinPartitioning:數(shù)據(jù)將隨機(jī)分布到 n 個(gè)大小大致相等的分區(qū)中,其中 n 由用戶在 repartition(n) 函數(shù)中指定。
- SinglePartition:所有數(shù)據(jù)都被移動(dòng)到單個(gè)分區(qū)到單個(gè)執(zhí)行程序。 例如,當(dāng)調(diào)用窗口成為整個(gè) DataFrame 的窗口函數(shù)時(shí)(當(dāng)您沒(méi)有在窗口定義中為 partitionBy() 函數(shù)提供參數(shù)時(shí)),就會(huì)發(fā)生這種情況。
- RangePartitioning:在調(diào)用 orderBy 或排序轉(zhuǎn)換后對(duì)數(shù)據(jù)進(jìn)行排序時(shí)使用此方式。
ShuffleQueryStage/AQEShuffleRead
- 在3.0以上版本,在開(kāi)啟了 AQE 優(yōu)化后,Exchange 算子之后還會(huì)跟一個(gè)ShuffleQueryStage,但是在查詢圖上是不顯示的。
detail中有,但是上面的查詢圖中不顯示


- 出現(xiàn) ShuffleQueryStage,Spark 會(huì)截?cái)嗖樵冇?jì)劃,可以將ShuffleQueryStage 理解為葉子結(jié)點(diǎn)。
- 雖然通過(guò)上圖發(fā)現(xiàn) 算子(1)是葉子結(jié)點(diǎn),1-5會(huì)先執(zhí)行,AE會(huì)截?cái)嗖樵冇?jì)劃,將1-5重新計(jì)劃變成了(6),所以算子(6)就代表了算子(1)-(5),所以可以將之理解為葉子結(jié)點(diǎn)。
- 以上規(guī)則僅對(duì)于AE有效,即ShuffleQueryStage在AE中就是葉子結(jié)點(diǎn)。
- 在 ShuffleQueryStage 之后,可能出現(xiàn) AQEShuffleRead(3.0叫CostumShuffleRead)。
- 會(huì)根據(jù)上一個(gè) Stage 的 shuffle write 數(shù)據(jù)進(jìn)行計(jì)劃調(diào)整,如本例中的將200個(gè) partition 進(jìn)行合并(coalesced),合并成1個(gè)分區(qū)。
- 另外,Exchange 上的明細(xì)信息也能幫我們判斷有沒(méi)有數(shù)據(jù)傾斜:
- shuffle bytes written total (min, med, max (stageId: taskId))416.9 KiB (103.9 KiB, 104.1 KiB, 104.9 KiB (stage 8.0: task 13))
- 如果max 明顯大于 med那么就有可能數(shù)據(jù)傾斜

HashAggregate
- HashAggregate 表示使用 Hash 方式進(jìn)行的數(shù)據(jù)聚合。
- 通常兩個(gè) HashAggregate 成對(duì)出現(xiàn)。

- 有兩個(gè) HashAggregate 的原因是:
- 第一個(gè)進(jìn)行部分聚合,它聚合每個(gè) partition,或者叫 map 端聚合,用于減少數(shù)據(jù)的 shuffle。
- 即先本地聚合,map端聚合,減少數(shù)據(jù)廣播過(guò)程中的傳輸。
- 第一個(gè)進(jìn)行部分聚合,它聚合每個(gè) partition,或者叫 map 端聚合,用于減少數(shù)據(jù)的 shuffle。
- 在示例中,F(xiàn)unctions 字段中顯示 partial_count(1)。部分聚合的結(jié)果的最終在第二個(gè) HashAggregate 上合并。
- 該運(yùn)算符還具有 results 字段顯示聚合后可用的列。
BroadcastHashJoin & BroadcastExchange
- BroadcastHashJoin(BHJ)是一個(gè)表示特定連接算法的算子。 除了這個(gè)之外,Spark 中還有其他可用的連接算法,
- 例如 SortMergeJoin 或 ShuffleHashJoin。 BHJ 總是與 BroadcastExchange 成對(duì)出現(xiàn),后者是一個(gè)表示廣播shuffle 的算子——數(shù)據(jù)將被收集到 Driver,然后發(fā)送到每個(gè)執(zhí)行程序,它可以用于連接。

- 為什么會(huì)有broadcastexchange?
- 多了這么個(gè)算子是為了復(fù)用。
- 相當(dāng)于多個(gè)作業(yè)

三個(gè)作業(yè),broadcastexchange必然是一個(gè)作業(yè),看作業(yè)不是很重要
ColumnarToRow
- 這是 Spark 3.0 中引入的一個(gè)新算子,它用作列執(zhí)行和行執(zhí)行之間的轉(zhuǎn)換。在 Spark 中,Scan Parquet 是可以進(jìn)行向量化讀的。
- 但是 Spark 內(nèi)部其他算子并沒(méi)有實(shí)現(xiàn)向量化,所以需要將列執(zhí)行再轉(zhuǎn)換成行執(zhí)行模式。

4.問(wèn)題排查
關(guān)鍵字:number of output rows。
- 解釋:數(shù)據(jù)膨脹時(shí)通過(guò)此關(guān)鍵字確定膨脹的位置。
- 影響:作業(yè)運(yùn)行慢/卡住。
- 排查:在問(wèn)題stage的所有算子里找到 exchange 部分,看到他們的 number of output rows,然后上下對(duì)比 rows 條數(shù)看看哪個(gè)環(huán)節(jié)有膨脹。
- 很多算子中都有,一般在join的過(guò)程中容易出現(xiàn)數(shù)據(jù)膨脹,所以重點(diǎn)看join前后數(shù)據(jù)變化
- 建議:用戶自行排查處理。

如果上圖中BroadcastHashJoin 最終是200多明顯大于HashAggregate,那就是發(fā)生膨脹,一般聚合是將減少數(shù)據(jù),但是join有可能增加數(shù)據(jù),所以也是比較容易發(fā)生數(shù)據(jù)膨脹的算子。
關(guān)鍵字:limit。
- 解釋:導(dǎo)致 task 數(shù)量少。
- 影響:作業(yè)運(yùn)行慢/卡住。
- 排查:limit 會(huì)導(dǎo)致這個(gè)算子只能有一個(gè) task。
- 建議:要求用戶減少 limit 的數(shù)量,這樣盡量避免單 task 處理太多數(shù)據(jù)。
關(guān)鍵字:AQEShuffleReader/CustomShuffleReader。
- 解釋:發(fā)生數(shù)據(jù)傾斜。
- 影響:作業(yè)運(yùn)行慢/卡住。
- 排查:
- 通過(guò) CustomShuffleReader 里的 med 和 max 的差距可以判斷出是否有數(shù)據(jù)傾斜。
- 確定數(shù)據(jù)傾斜的字段的SQL:Select count(distinct *) from xxx groupby '關(guān)聯(lián)字段‘。
- 看哪個(gè)建數(shù)據(jù)多
- 建議:告訴用戶問(wèn)題所在,并要求用戶自行調(diào)整。未開(kāi)啟 AQE 的開(kāi)啟 AQE。

關(guān)鍵字:aggregate。
- 解釋:導(dǎo)致 task 數(shù)量少。
- 影響:作業(yè)運(yùn)行慢/卡住。
- 排查:
- aggregate 有可能造成 task 少的原因是可能做 groupby 的字段本身只有6個(gè)值,所以shuffle 完只能有6個(gè) task。
- 這個(gè)時(shí)候可能每個(gè)task中數(shù)據(jù)都非常多,但是沒(méi)法提高并行度,因?yàn)榉纸M只有6個(gè)。
- 建議:
- 確定 groupby 的字段數(shù)量:Select count(distinct 關(guān)聯(lián)字段) from xxx。
- 通過(guò) SQL 找到字段值少的 groupby 字段,讓用戶自行優(yōu)化 SQL。
關(guān)鍵字:BroadcastNestedLoopJoin 或 Cartesian。
- 解釋:產(chǎn)生笛卡爾積。
- 影響:作業(yè)運(yùn)行慢 / driver oom。
- 排查:分別從左右兩邊往上找,找到做 Join 的表是哪兩個(gè),之后可推測(cè)出 “左表.關(guān)聯(lián)字段” 和“右表.關(guān)聯(lián)字段”。
- 通常會(huì)造成笛卡爾積的情況:
- 1. 沒(méi)指定關(guān)聯(lián)條件;
- 沒(méi)寫(xiě)join條件
- 2. 關(guān)聯(lián)字段用 or 關(guān)聯(lián);
- 3. 非等值關(guān)聯(lián) a.id > b.id。
- 1. 沒(méi)指定關(guān)聯(lián)條件;
- 建議:提示用戶自行優(yōu)化。
關(guān)鍵字:skew。
- 解釋:發(fā)生數(shù)據(jù)傾斜并被 Spark 識(shí)別。
- 影響:可能導(dǎo)致作業(yè)慢。
- 排查:skew 存在,說(shuō)明有傾斜,并被 Spark 識(shí)別了,且已經(jīng)做過(guò)一定處理,但如果依舊存在傾斜問(wèn)題,可以從 skew 的上下游找一下 Exchange 或 CustomShuffleReader 算子,然后對(duì)比 med 和 max 判斷是否依舊存在數(shù)據(jù)傾斜。
- 建議:? 確定數(shù)據(jù)傾斜的字段:Select count(distinct *) from xxx groupby '關(guān)聯(lián)字段‘;
- 告訴用戶問(wèn)題所在,并要求用戶自行調(diào)整。
關(guān)鍵字:sort。
- 解釋:導(dǎo)致 task 數(shù)量少。
- 影響:當(dāng) sort 數(shù)據(jù)記錄很多時(shí),作業(yè)運(yùn)行慢/卡住。
- 建議:通過(guò) SQL 找到 orderby 命令的位置,讓用戶自行優(yōu)化 SQL。
關(guān)鍵字:number of files read。
- 解釋:算子讀的文件數(shù)。
- 影響:讀了超級(jí)多的文件,每個(gè)文件一個(gè) task,導(dǎo)致任務(wù)時(shí)間長(zhǎng)。
- 排查:如果紅框中的指標(biāo)超級(jí)多,說(shuō)明讀了太多文件,建立了太多 task。
- 建議:讓作業(yè) owner 減少讀取分區(qū)量,分成多個(gè) SQL 跑。

關(guān)鍵字:BroadcastHashJoin。
- 解釋:走廣播模式。
- 影響:driver OOM。
- 排查:分別從左右兩邊往上找,找到做 Join 的表是哪兩個(gè),之后可推測(cè)出 "左表.關(guān)聯(lián)字段" 和 "右表.關(guān)聯(lián)字段"
- 建議:強(qiáng)行關(guān)閉廣播模式。
- set spark.sql.adaptiveBroadcastJoinThreshold=-1;
- set spark.sql.autoBroadcastJoinThreshold=-1;


