Spark2.1和2.2 SQL物理執(zhí)行策略關鍵源碼分析

1. 文章開始之前

先附上一句SQL,使用tpc-ds的表結構,我們圍繞這句SQL講。

  • SQL:

SQL> select
avg(cs_ext_discount_amt)
from
catalog_sales, date_dim
where
d_date between '1999-02-22'
and
cast('1999-05-22' as date)
and
d_date_sk = cs_sold_date_sk
group by cs_sold_date_sk;

  • 邏輯計劃:
Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
+- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
   +- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
      :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
      :  +- Filter isnotnull(cs_sold_date_sk#24)
      :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
      +- Project [d_date_sk#58]
         +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
            +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]

2. 物理計劃源碼分析

2.1 物理策略

def strategies: Seq[Strategy] =
      extraStrategies ++ (
      FileSourceStrategy ::
      DataSourceStrategy ::
      DDLStrategy ::
      SpecialLimits ::
      Aggregation ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)

其中,extraStrategies是提供給外部人員可以自己添加的策略。調(diào)用這些strategies的代碼如下:

// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))

將strategies逐個去應用在邏輯計劃上,然后做flat操作,返回一個PhysicalPlan的iterator。那么每個策略什么作用?

2.1.1 FileSourceStrategy

一個針對Hadoop文件系統(tǒng)做的策略,當執(zhí)行計劃的底層Relation是HadoopFsRelation時會調(diào)用到,用來掃描文件。

2.1.2 DataSourceStrategy

Spark針對DataSource預定義了四種scan接口,TableScan、PrunedScanPrunedFilteredScan、CatalystScan(其中CatalystScan是unstable的,也是不常用的),如果開發(fā)者(用戶)自己實現(xiàn)的DataSource是實現(xiàn)了這四種接口之一的,在scan到執(zhí)行計劃的底層Relation時,就會調(diào)用來掃描文件。

2.1.3 DDLStrategy(2.2中已經(jīng)消失了,2.1中有)

會在create table的時候調(diào)用,因為后續(xù)版本不會存在,所以不做解釋。

2.1.4 SpecialLimits

在Spark SQL中加limit n時候回調(diào)用到(如果不指定,Spark 默認也會limit 20),在源碼中,會給每種case的limit節(jié)點的子節(jié)點使用PlanLater,這是個很神奇的東西下文會講到。

2.1.5 Aggregation

顧名思義,執(zhí)行聚合函數(shù)的策略。

2.1.6 JoinSelection

執(zhí)行join的策略。Join的執(zhí)行策略也同樣分BroadcastJoin(也就是MapSideJoin),和ShuffledJoin,這個之后的文章會展開講。

2.1.7 InMemoryScans

當數(shù)據(jù)在內(nèi)存中被緩存過,就會用到該策略。

2.1.8 BasicOperators

一些基本操作的執(zhí)行策略,如flatMap,sort,project等,但是實際上大都是給這些節(jié)點的子節(jié)點套上一個PlanLater。

2.2 PlanLater

Spark SQL物理計劃里一個非常重要的概念。字面意思很好理解,就是之后再計劃。那么經(jīng)過以上策略逐個去執(zhí)行以后,原來的邏輯計劃會變成什么樣呢?

ReturnAnswer
+- GlobalLimit 21
   +- LocalLimit 21
      +- PlanLater Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
         , Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
         +- PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
            , Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
            +- PlanLater Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
               :- PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
                  , Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
               :  +- Filter isnotnull(cs_sold_date_sk#24)
               :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
               +- PlanLater Project [d_date_sk#58]
                  , Project [d_date_sk#58]
                  +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
                     +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]

有什么差別呢?主要有二:

    1. 頂層多了個ReturnAnswerLimit節(jié)點
    1. Aggregate、ProjectJoin節(jié)點都用了PlanLater

(其實Filter節(jié)點也是可以用PlanLater的,但是由于邏輯計劃已經(jīng)將Filter下推至底部,所以最底部的Project->Filter->Relation的三層節(jié)點是可以直接調(diào)用一個策略去執(zhí)行的,因此只需要三層節(jié)點的最上層也就是Project節(jié)點使用PlanLater即可。)

言歸正傳,語法樹頂部多了ReturnAnswerLimit節(jié)點,很容易理解,Limit是Spark SQL默認限制行數(shù),ReturnAnswer是將結果返回。那么加的PlanLater有什么作用?我的理解是,將物理計劃分割成一段段,每一段物理計劃會有其對應策略來執(zhí)行。具體源碼如下:

  def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
    // Obviously a lot to do here still...

    // Collect physical plan candidates.
    val candidates = strategies.iterator.flatMap(_(plan))

    // The candidates may contain placeholders marked as [[planLater]],
    // so try to replace them by their child plans.
    val plans = candidates.flatMap { candidate =>
      val placeholders = collectPlaceholders(candidate)

      if (placeholders.isEmpty) {
        // Take the candidate as is because it does not contain placeholders.
        Iterator(candidate)
      } else {
        // Plan the logical plan marked as [[planLater]] and replace the placeholders.
        placeholders.iterator.foldLeft(Iterator(candidate)) {
          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
            // Plan the logical plan for the placeholder.
            val childPlans = this.plan(logicalPlan)

            candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
              childPlans.map { childPlan =>
                // Replace the placeholder by the child plan
                candidateWithPlaceholders.transformUp {
                  case p if p == placeholder => childPlan
                }
              }
            }
        }
      }
    }

    val pruned = prunePlans(plans)
    assert(pruned.hasNext, s"No plan for $plan")
    pruned
  }

可以看到,經(jīng)過策略迭代器和flat過后的candidates候選計劃們(一般來說只有一個,是最頂層的planLater),然后收集placeholder(其實就是planlater),這個時候對placeholders進行迭代,并對每個placeholder的child plan遞歸調(diào)用plan方法。舉例文章這句SQL,遞歸調(diào)用plan方法,得到每個placeholder及其child plan節(jié)點(也就是 case (candidatesWithPlaceholders, (placeholder, logicalPlan))這句話的placeholder和logicalPlan兩個變量)如下:

placeholder:
PlanLater Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]

logicalPlan:
Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
+- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
   +- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
      :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
      :  +- Filter isnotnull(cs_sold_date_sk#24)
      :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
      +- Project [d_date_sk#58]
         +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
            +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]

logicalPlan:
Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
+- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
   :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
   :  +- Filter isnotnull(cs_sold_date_sk#24)
   :     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
   +- Project [d_date_sk#58]
      +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
         +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)

logicalPlan:
Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
:- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
:  +- Filter isnotnull(cs_sold_date_sk#24)
:     +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
+- Project [d_date_sk#58]
   +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
      +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]

logicalPlan:
Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
+- Filter isnotnull(cs_sold_date_sk#24)
   +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
placeholder:
PlanLater Project [d_date_sk#58]

logicalPlan:
Project [d_date_sk#58]
+- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
   +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]

那么可以看到,遞歸到最底處,就是project->filter->relation的三層節(jié)點組合,由于我實際是重寫過了DataSource,這個時候會調(diào)用DataSourceStrategy,去讀取獲取數(shù)據(jù),然后遞歸逐個返回根據(jù)每個planLater分割點會有對應的策略去對數(shù)據(jù)進行相應的操作。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容