Spark Chapter 6 Spark Core進階

1 spark核心概念

1 Application

基于Spark的應(yīng)用程序 =1driver+n executors

User program built on Spark.

Consists of a driver program and executors on the cluster.

例如:一個py腳本,或者pyshark/spark-shell

2 Application jar

在java或者scala開發(fā)中會有,python中較為少見

A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.

3 Driver program

運行一個main方法并創(chuàng)建一個sc

The process running the main() function of the application and creating the SparkContext

4 Cluster manager

獲取外部資源的,可以指定需要的資源量

An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)

一個app需要申請,driver的內(nèi)存,executors的內(nèi)存,

5 Deploy mode

部署模式——driver 運行的位置

Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.

6 Worker node

standalone 相當(dāng)于slave節(jié)點,yarn相當(dāng)于node manager

Any node that can run application code in the cluster

7 Executor

A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.

在不同的app中是不共用的。

8 Task

A unit of work that will be sent to one executor

map、shuffle

9 Job

一個action對應(yīng)一個job

A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g.?save,?collect); you'll see this term used in the driver's logs.


10 stage

一個stage的邊界是從某個地方取數(shù)據(jù)開始到suffle結(jié)束

Each job gets divided into smaller sets of tasks called?stagesthat depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.

2 spark 運行架構(gòu)以及注意事項

【重要】

Spark applications run as independent sets of processes on a cluster, coordinated by the?SparkContext?object in your main program (called the?driver program).

Specifically, to run on a cluster, the SparkContext can connect to several types ofcluster managers?(either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources(分配資源) across applications.

Step1 Once connected, Spark acquires?executors?on nodes in the cluster, which are processes that run computations and store data for your application.

完成鏈接后,spark的任務(wù)就會鏈接到executors,executors會負(fù)責(zé)處理任務(wù)和數(shù)據(jù)存儲

Step2 Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors.

接下來,SC會把代碼發(fā)送到executors下

Step3? Finally, SparkContext sends?tasks?to the executors to run.

SparkContext 會把?tasks發(fā)送到executors 去執(zhí)行。相當(dāng)于是具體的任務(wù)執(zhí)行的命令。





Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.

每一個都是獨立的線程,應(yīng)用程序之間是隔離的。數(shù)據(jù)不能跨應(yīng)用程序共享。

Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN).

spark不關(guān)心底層運行的節(jié)點?

The driver program must listen for and accept incoming connections from its executors throughout its lifetime (e.g., see?spark.driver.port in the network config section). As such, the driver program must be network addressable from the worker nodes.

有監(jiān)聽機制,防止運行錯誤

Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network. If you’d like to send requests to the cluster remotely, it’s better to open an RPC to the driver and have it submit operations from nearby than to run a driver far away from the worker nodes.

driver要盡可能近的靠近節(jié)點,想要發(fā)送遠(yuǎn)程請求需要開一個RPC



3 spark和hadoop重要概念區(qū)分

hadoop

1 一個MR程序-一個Job

2 一個Job =1個N個Task(Map/Reduce)

3 一個Task對對應(yīng)于一個進程

4 Task運行時開啟進程,Task執(zhí)行完畢后銷毀進程,對于多個Task,開銷較大? (JVM共享對其無效)


Spark

1 Application= Driver(main方法創(chuàng)建SparkContext)+Executor

2 一個Application = 0~n個Job

3 一個Job = 一個Action

4 一個Job =1~n個Stage

5 一個Stage = 1~n個Task

6 一個Task 有一個線程,多個Task可以并行的方式運行在一個Executor進程中

4 Spark Cache詳情

調(diào)用方法:rdd.cache()/persist()

如果啟動這個機制,會把storageLevel改正memory only

使用lazy機制:沒有遇到action 不提交作業(yè)

如果一個RDD在后續(xù)的計算中會被使用,建議Cache

from pyspark import StorageLevel

lines= sc.textFile("file:///home/hadoop/data/page_views.dat")

lines.count()

lines.cache()

lines.count()

lines.persist(StorageLevel.MEMORY_ONLY_2)

如果做了持久化,這樣讀取只有一次


rdd.unprisist()是非lazy的


RDD Persistence

One of the most important capabilities in Spark is?persisting?(or?caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.


cache方法底層是persist,緩存是有容錯機制的,支持多副本。

如果內(nèi)存較小可以用序列化的方式,較為節(jié)省內(nèi)存。

persist的StorageLevel參數(shù):MEMORY_ONLY,?MEMORY_ONLY_2,?MEMORY_AND_DISK,?MEMORY_AND_DISK_2,?DISK_ONLY, and?DISK_ONLY_2.


持久化策略:內(nèi)存,單副本,序列化


ps 查看存儲情況:// 192.168.199.102:4040

5 Spark Lineage詳情

Lineage:RDD的依賴關(guān)系

在容錯機制中,按照partition計算,無需全部計算。

問1:如果沒有持久化,是不是如果第二個partition丟失了,還是的從磁盤里讀取后再計算?

問2:持久化的計算消耗大么?


6 Spark Dependancy

narrow窄依賴-pipline-abe

一個父RDD的分區(qū)最多被一個字RDD使用

——map,filiter,union,join with inputs co-partitioned

wide寬依賴-shuffle

一個父RDD會被子RDD使用多次

——groupbykey on non-partitioned data,join with inputs not co-partitioned,repartition,coclesce,cogroup,reducebykey

The?reduceByKey?operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key.?

The?Shuffle?is an expensive operation since it involves disk I/O, data serialization, and network I/O. T

區(qū)別

如果是窄依賴,如果錯了只需要重算部分父RDD,寬依賴要重新計算所有RDD。

問:reducebykey之后會重新分配數(shù)據(jù),那么partition輸出的partition應(yīng)該分幾塊?


寫代碼作業(yè)

lines = sc.textFile

words = flatMap

pairs = map(word,1)

reduceByKey



sc.textFile().flatmap().map().reduceByKey()

可以看到兩個stage

【作業(yè)】:官網(wǎng)suffle內(nèi)容閱讀


tips

進程——process

線程——threads

血源——lineage

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容