Spark 基礎(下篇)

上篇介紹了spark的突出特點以及基本框架,下面給大家介紹下spark的基本數(shù)據(jù)結構、spark任務調度的詳細流程以及spark中stage的劃分。

5. spark的基本數(shù)據(jù)類型

RDD、DataFrame和DataSet可以說是spark獨有的三種基本的數(shù)據(jù)類型。Spark的核心概念是RDD (resilientdistributed dataset),指的是一個只讀的,可分區(qū)的分布式數(shù)據(jù)集,這個數(shù)據(jù)集的全部或部分可以緩存在內存中,在多次計算間重用。DataFrame是一個以RDD為基礎的,但卻是一種類似二維數(shù)據(jù)表的一種分布式數(shù)據(jù)集。與RDD不同的是,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這樣,spark就可以使用sql操作dataframe,像操作數(shù)據(jù)庫中的表一樣。目前,spark sql支持大多數(shù)的sql數(shù)據(jù)庫的操作。Dataset可以認為是DataFrame的一個特例,主要區(qū)別是Dataset每一個record存儲的是一個強類型值而不是一個Row。后面版本DataFrame會繼承DataSet,DataFrame和DataSet可以相互轉化,df.as[ElementType]這樣可以把DataFrame轉化為DataSet,ds.toDF()這樣可以把DataSet轉化為DataFrame。創(chuàng)建Dataframe的代碼如下所示:

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

創(chuàng)建Dataset的代碼如下所示:

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

6. spark scheduler(spark任務調度)

(1) 在使用spark-summit提交spark程序后,根據(jù)提交時指定(deploy-mode)的位置,創(chuàng)建driver進程,driver進程根據(jù)sparkconf中的配置,初始化sparkcontext。Sparkcontext的啟動后,創(chuàng)建DAG Scheduler(將DAG圖分解成stage)和Task Scheduler(提交和監(jiān)控task)兩個調度模塊。
  (2) driver進程根據(jù)配置參數(shù)向resource manager(資源管理器)申請資源(主要是用來執(zhí)行的executor),resource manager接到到了Application的注冊請求之后,會使用自己的資源調度算法,在spark集群的worker上,通知worker為application啟動多個Executor。
  (3) executor創(chuàng)建后,會向resource manager進行資源及狀態(tài)反饋,以便resource manager對executor進行狀態(tài)監(jiān)控,如監(jiān)控到有失敗的executor,則會立即重新創(chuàng)建。
  (4) Executor會向taskScheduler反向注冊,以便獲取taskScheduler分配的task。
  (5) Driver完成SparkContext初始化,繼續(xù)執(zhí)行application程序,當執(zhí)行到Action時,就會創(chuàng)建Job。并且由DAGScheduler將Job劃分多個Stage,每個Stage 由TaskSet 組成,并將TaskSet提交給taskScheduler,taskScheduler把TaskSet中的task依次提交給Executor, Executor在接收到task之后,會使用taskRunner(封裝task的線程池)來封裝task,然后,從Executor的線程池中取出一個線程來執(zhí)行task。
   就這樣Spark的每個Stage被作為TaskSet提交給Executor執(zhí)行,每個Task對應一個RDD的partition,執(zhí)行我們的定義的算子和函數(shù)。直到所有操作執(zhí)行完為止。如下圖所示:

圖4. Spark 任務調度流程

7. Spark作業(yè)調度中stage劃分

Spark在接收到提交的作業(yè)后,DAGScheduler會根據(jù)RDD之間的依賴關系將作業(yè)劃分成多個stage,DAGSchedule在將劃分的stage提交給TASKSchedule,TASKSchedule將每個stage分成多個task,交給executor執(zhí)行。task的個數(shù)等于stage末端的RDD的分區(qū)個數(shù)。因此對了解stage的劃分尤為重要。
  在spark中,RDD之間的依賴關系有兩種:一種是窄依賴,一種是寬依賴。窄依賴的描述是:父RDD的分區(qū)最多只會被子RDD的一個分區(qū)使用。寬依賴是:父RDD的一個分區(qū)會被子RDD的多個分區(qū)使用。如下圖所示:

圖5. RDD的兩種依賴關系

  上圖中,以一豎線作為分界,左邊是窄依賴,右邊是寬依賴。
  Stage的劃分不僅根據(jù)RDD的依賴關系,還有一個原則是將依賴鏈斷開,每個stage內部可以并行運行,整個作業(yè)按照stage順序依次執(zhí)行,最終完成整個Job。

實際劃分時,DAGScheduler就是根據(jù)DAG圖,從圖的末端逆向遍歷整個依賴鏈,一般是以一次shuffle為邊界來劃分的。一般劃分stage是從程序執(zhí)行流程的最后往前劃分,遇到寬依賴就斷開,遇到窄依賴就將將其加入當前stage中。一個典型的RDD Graph如下圖所示:其中實線框是RDD,RDD內的實心矩形是各個分區(qū),實線箭頭表示父子分區(qū)間依賴關系,虛線框表示stage。針對下圖流程首先根據(jù)最后一步join(寬依賴)操作來作為劃分stage的邊界,再往左走,A和B之間有個group by也為寬依賴,也可作為stage劃分的邊界,所以我們將下圖劃分為三個stage。

圖6. Spark中的Stage劃分示例
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容