spark-submit的時候如何引入外部jar包
在通過spark-submit提交任務時,可以通過添加配置參數(shù)來指定
–driver-class-path 外部jar包 –jars 外部jar包
方法一:spark-submit –jars
根據(jù)spark官網(wǎng),在提交任務的時候指定–jars,用逗號分開。這樣做的缺點是每次都要指定jar包,如果jar包少的話可以這么做,但是如果多的話會很麻煩。
命令:spark-submit --master yarn-client --jars .jar,.jar
方法二:extraClassPath
提交時在spark-default中設定參數(shù),將所有需要的jar包考到一個文件里,然后在參數(shù)中指定該目錄就可以了,較上一個方便很多:
spark.executor.extraClassPath=/home/hadoop/wzq_workspace/lib/* spark.driver.extraClassPath=/home/hadoop/wzq_workspace/lib/*
需要注意的是,你要在所有可能運行spark任務的機器上保證該目錄存在,并且將jar包考到所有機器上。這樣做的好處是提交代碼的時候不用再寫一長串jar了,缺點是要把所有的jar包都拷一遍。
談談spark中的寬窄依賴
RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
寬依賴:指的是多個子RDD的Partition會依賴同一個父RDD的Partition
窄依賴:指的是每一個父RDD的Partition最多被子RDD的一個Partition使用。
spark中如何劃分stage
Spark Application中可以因為不同的Action觸發(fā)眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,后面的Stage依賴于前面的Stage,也就是說只有前面依賴的Stage計算完畢后,后面的Stage才會運行。
Stage劃分的依據(jù)就是寬依賴,何時產生寬依賴,例如reduceByKey,groupByKey的算子,會導致寬依賴的產生。
由Action(例如collect)導致了SparkContext.runJob的執(zhí)行,最終導致了DAGScheduler中的submitJob的執(zhí)行,其核心是通過發(fā)送一個case class JobSubmitted對象給eventProcessLoop。
eventProcessLoop是DAGSchedulerEventProcessLoop的具體實例,而DAGSchedulerEventProcessLoop是eventLoop的子類,具體實現(xiàn)EventLoop的onReceive方法,onReceive方法轉過來回調doOnReceive在doOnReceive中通過模式匹配的方法把執(zhí)行路由到
在handleJobSubmitted中首先創(chuàng)建finalStage,創(chuàng)建finalStage時候會建立父Stage的依賴鏈條
spark 如何防止內存溢出
driver端的內存溢出
可以增大driver的內存參數(shù):spark.driver.memory (default 1g) 這個參數(shù)用來設置Driver的內存。
在Spark程序中,SparkContext,DAGScheduler都是運行在Driver端的。對應rdd的Stage切分也是在Driver端運行,如果用戶自己寫的程序有過多的步驟,切分出過多的Stage,這部分信息消耗的是Driver的內存,這個時候就需要調大Driver的內存。
map過程產生大量對象導致內存溢出
這種溢出的原因是在單個map中產生了大量的對象導致的
通過 rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
shuffle后內存溢出
shuffle內存溢出的情況可以說都是shuffle后,單個文件過大導致的。
解釋:
在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區(qū)數(shù),這個參數(shù)通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism參數(shù)只對HashPartitioner有效,所以如果是別的Partitioner或者自己實現(xiàn)的Partitioner就不能使用spark.default.parallelism這個參數(shù)來控制shuffle的并發(fā)量了。如果是別的partitioner導致的shuffle內存溢出,就需要從partitioner的代碼增加partitions的數(shù)量。
可以通過設置Partitioner,充分區(qū)解決
standalone模式下資源分配不均勻導致內存溢出
在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 這兩個參數(shù),但是沒有配置–executor-cores這個參數(shù)的話,就有可能導致,每個Executor的memory是一樣的,但是cores的數(shù)量不同,那么在cores數(shù)量多的Executor中,由于能夠同時執(zhí)行多個Task,就容易導致內存溢出的情況。這種情況的解決方法就是同時配置–executor-cores或者spark.executor.cores參數(shù),確保Executor資源分配均勻。
使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在內存不足的時候rdd.cache()的數(shù)據(jù)會丟失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在內存不足的時候會存儲在磁盤,避免重算,只是消耗點IO時間。
spark中cache和persist的區(qū)別
cache:緩存數(shù)據(jù),默認是緩存在內存中,其本質還是調用persist persist:緩存數(shù)據(jù),有豐富的數(shù)據(jù)緩存策略。數(shù)據(jù)可以保存在內存也可以保存在磁盤中,使用的時候指定對應的緩存級別就可以了。
Spark分布式集群搭建的步驟
spark中的數(shù)據(jù)傾斜的現(xiàn)象、原因、后果
(1)、數(shù)據(jù)傾斜的現(xiàn)象
多數(shù)task執(zhí)行速度較快,少數(shù)task執(zhí)行時間非常長,或者等待很長時間后提示你內存不足,執(zhí)行失敗。
(2)、數(shù)據(jù)傾斜的原因
數(shù)據(jù)問題
1、key本身分布不均衡(包括大量的key為空) 2、key的設置不合理 spark使用問題
1、shuffle時的并發(fā)度不夠 2、計算方式有誤
(3)、數(shù)據(jù)傾斜的后果
1、spark中的stage的執(zhí)行時間受限于最后那個執(zhí)行完成的task,因此運行緩慢的任務會拖垮整個程序的運行速度(分布式程序運行的速度是由最慢的那個task決定的)。 2、過多的數(shù)據(jù)在同一個task中運行,將會把executor撐爆。
如何解決spark中的數(shù)據(jù)傾斜問題
發(fā)現(xiàn)數(shù)據(jù)傾斜的時候,不要急于提高executor的資源,修改參數(shù)或是修改程序,首先要檢查數(shù)據(jù)本身,是否存在異常數(shù)據(jù)。
1、數(shù)據(jù)問題造成的數(shù)據(jù)傾斜
找出異常的key
如果任務長時間卡在最后最后1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。
選取key,對數(shù)據(jù)進行抽樣,統(tǒng)計出現(xiàn)的次數(shù),根據(jù)出現(xiàn)次數(shù)大小排序取出前幾個。 比如: df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10) 如果發(fā)現(xiàn)多數(shù)數(shù)據(jù)分布都較為平均,而個別數(shù)據(jù)比其他數(shù)據(jù)大上若干個數(shù)量級,則說明發(fā)生了數(shù)據(jù)傾斜。
經過分析,傾斜的數(shù)據(jù)主要有以下三種情況:
1、null(空值)或是一些無意義的信息()之類的,大多是這個原因引起。
2、無效數(shù)據(jù),大量重復的測試數(shù)據(jù)或是對結果影響不大的有效數(shù)據(jù)。
3、有效數(shù)據(jù),業(yè)務導致的正常數(shù)據(jù)分布。
解決辦法
第1,2種情況,直接對數(shù)據(jù)進行過濾即可(因為該數(shù)據(jù)對當前業(yè)務不會產生影響)。 第3種情況則需要進行一些特殊操作,常見的有以下幾種做法
(1) 隔離執(zhí)行,將異常的key過濾出來單獨處理,最后與正常數(shù)據(jù)的處理結果進行union操作。
(2) 對key先添加隨機值,進行操作后,去掉隨機值,再進行一次操作。
(3) 使用reduceByKey 代替 groupByKey(reduceByKey用于對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,并且merge操作可以通過函數(shù)自定義.)
(4) 使用map join。
案例
如果使用reduceByKey因為數(shù)據(jù)傾斜造成運行失敗的問題。具體操作流程如下:
(1) 將原始的 key 轉化為 key + 隨機值(例如Random.nextInt) (2) 對數(shù)據(jù)進行 reduceByKey(func) (3) 將 key + 隨機值 轉成 key (4) 再對數(shù)據(jù)進行 reduceByKey(func)
2、spark使用不當造成的數(shù)據(jù)傾斜
提高shuffle并行度
dataFrame和sparkSql可以設置spark.sql.shuffle.partitions參數(shù)控制shuffle的并發(fā)度,默認為200。 rdd操作可以設置spark.default.parallelism控制并發(fā)度,默認參數(shù)由不同的Cluster Manager控制。 局限性: 只是讓每個task執(zhí)行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執(zhí)行它,也會受到數(shù)據(jù)傾斜的困擾。
使用map join 代替reduce join
在小表不是特別大(取決于你的executor大小)的情況下使用,可以使程序避免shuffle的過程,自然也就沒有數(shù)據(jù)傾斜的困擾了. 局限性: 因為是先將小數(shù)據(jù)發(fā)送到每個executor上,所以數(shù)據(jù)量不能太大。
flume整合sparkStreaming問題
如何實現(xiàn)sparkStreaming讀取flume中的數(shù)據(jù)
有兩種方式push和pull方式:
push方式
LogLevel.setStreamingLogLevels()
val Array(host, port) = args
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, batchInterval)
// Create a flume stream
val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events.").print()
//拿到消息中的event,從event中拿出body,body是真正的消息體
stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print
ssc.start()
ssc.awaitTermination()
pull 方式
//當sink有多個的時候
val flumesinklist = Array[InetSocketAddress](new InetSocketAddress("mini1", 8888))
val flumeStream = FlumeUtils.createPollingStream(ssc, flumesinklist, StorageLevel.MEMORY_ONLY_2)
flumeStream.count().map(cnt => "Received " + cnt + " flume events.").print()
flumeStream.flatMap(t => {
new String(t.event.getBody.array()).split(" ")
}).map((_, 1)).reduceByKey(_ + _).print()
// Print out the count of events received from this server in each batch
//stream.count().map(cnt => "Received " + cnt + " flume events.").print()
//拿到消息中的event,從event中拿出body,body是真正的消息體
//stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print
ssc.start()
ssc.awaitTermination()
如何保證flume數(shù)據(jù)不丟失
1.flume那邊采用的channel是將數(shù)據(jù)落地到磁盤中,保證數(shù)據(jù)源端安全性(可以在補充一下,flume在這里的channel可以設置為memory內存中,提高數(shù)據(jù)接收處理的效率,但是由于數(shù)據(jù)在內存中,安全機制保證不了,故選擇channel為磁盤存儲。整個流程運行有一點的延遲性)
- sparkStreaming通過拉模式整合的時候,使用了FlumeUtils這樣一個類,該類是需要依賴一個額外的jar包(spark-streaming-flume_2.10)
3.利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)來創(chuàng)建一個StreamingContext,設置checkpoin地址
4.spark.streaming.receiver.writeAheadLogs.enable這個property設置為true。
Spark Streaming的數(shù)據(jù)可靠性
有了checkpoint機制、write ahead log機制、Receiver緩存機器、可靠的Receiver(即數(shù)據(jù)接收并備份成功后會發(fā)送ack),可以保證無論是worker失效還是driver失效,都是數(shù)據(jù)0丟失。原因是:如果沒有Receiver服務的worker失效了,RDD數(shù)據(jù)可以依賴血統(tǒng)來重新計算;如果Receiver所在worker失敗了,由于Reciever是可靠的,并有write ahead log機制,則收到的數(shù)據(jù)可以保證不丟;如果driver失敗了,可以從checkpoint中恢復數(shù)據(jù)重新構建。
kafka整合sparkStreaming問題
receiver 方式
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
注意點:
1.在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關的,
所以如果我們加大每個topic的partition數(shù)量,僅僅是增加線程來處理由單一Receiver消費的主題。
但是這并沒有增加Spark在處理數(shù)據(jù)上的并行度.
對于不同的Group和topic我們可以使用多個Receiver創(chuàng)建不同的Dstream來并行接收數(shù)據(jù),
之后可以利用union來統(tǒng)一成一個Dstream
2.可能因為execture ,driver死掉而丟失
3.增加(spark.streaming.receiver.writeAheadLog.enable=true)
KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)
減少數(shù)據(jù)丟失
Direct 方式
//創(chuàng)建DStream,返回接收到的輸入數(shù)據(jù)
var stream=KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,SubscribeString,String)
兩種方式優(yōu)缺點:
receiver 保證數(shù)據(jù)不丟失,但是可能數(shù)據(jù)會重復消費
Direct 相比Receiver模式而言能夠確保機制更加健壯. 區(qū)別于使用Receiver來被動接收數(shù)據(jù), Direct模式會周期性地主動查詢Kafka, 來獲得每個topic+partition的最新的offset, 從而定義每個batch的offset的范圍. 當處理數(shù)據(jù)的job啟動時, 就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數(shù)據(jù)。
receiver 對于多個分區(qū)需要增加多個recevier然后union使用
Direct 不需要創(chuàng)建多個輸入DStream然后對它們進行union操作. Spark會創(chuàng)建跟Kafka partition一樣多的RDD partition, 并且會并行從Kafka中讀取數(shù)據(jù). 所以在Kafka partition和RDD partition之間, 有一個一對一的映射關系
receiver沒有direc高性能,dircet不需要設置spark.streaming.receiver.writeAheadLog.enable=true,而是利用kafka本身的數(shù)據(jù)備份保證數(shù)據(jù)的唯一性,
Spark master使用zookeeper進行HA的,有哪些元數(shù)據(jù)保存在Zookeeper?
答:spark通過這個參數(shù)spark.deploy.zookeeper.dir指定master元數(shù)據(jù)在zookeeper中保存的位置,包括Worker,Driver和Application以及Executors。standby節(jié)點要從zk中,獲得元數(shù)據(jù)信息,恢復集群運行狀態(tài),才能對外繼續(xù)提供服務,作業(yè)提交資源申請等,在恢復前是不能接受請求的。另外,Master切換需要注意2點
1)在Master切換的過程中,所有的已經在運行的程序皆正常運行!因為Spark Application在運行前就已經通過Cluster Manager獲得了計算資源,所以在運行時Job本身的調度和處理和Master是沒有任何關系的!
2) 在Master的切換過程中唯一的影響是不能提交新的Job:一方面不能夠提交新的應用程序給集群,因為只有Active Master才能接受新的程序的提交請求;另外一方面,已經運行的程序中也不能夠因為Action操作觸發(fā)新的Job的提交請求;
Spark master HA 主從切換過程不會影響集群已有的作業(yè)運行,為什么?
因為程序在運行之前,已經申請過資源了,driver和Executors通訊,不需要和master進行通訊的。
如何配置spark master的HA?
1)配置zookeeper
2)修改spark_env.sh文件,spark的master參數(shù)不在指定,添加如下代碼到各個master節(jié)點
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk01:2181,zk02:2181,zk03:2181 -Dspark.deploy.zookeeper.dir=/spark"
- 將spark_env.sh分發(fā)到各個節(jié)點
4)找到一個master節(jié)點,執(zhí)行./start-all.sh,會在這里啟動主master,其他的master備節(jié)點,啟動master命令: ./sbin/start-master.sh
5)提交程序的時候指定master的時候要指定三臺master,例如
./spark-shell --master spark://master01:7077,master02:7077,master03:7077
driver的功能是什么?
1)一個Spark作業(yè)運行時包括一個Driver進程,也是作業(yè)的主進程,具有main函數(shù),并且有SparkContext的實例,是程序的人口點;2)功能:負責向集群申請資源,向master注冊信息,負責了作業(yè)的調度,,負責作業(yè)的解析、生成Stage并調度Task到Executor上。包括DAGScheduler,TaskScheduler。
Spark中Work的主要工作是什么?
答:主要功能:管理當前節(jié)點內存,CPU的使用狀況,接收master分配過來的資源指令,通過ExecutorRunner啟動程序分配任務,worker就類似于包工頭,管理分配新進程,做計算的服務,相當于process服務。需要注意的是:1)worker會不會匯報當前信息給master,worker心跳給master主要只有workid,它不會發(fā)送資源信息以心跳的方式給mater,master分配的時候就知道work,只有出現(xiàn)故障的時候才會發(fā)送資源。2)worker不會運行代碼,具體運行的是Executor是可以運行具體appliaction寫的業(yè)務邏輯代碼,操作代碼的節(jié)點,它不會運行程序的代碼的。
Spark為什么比mapreduce快?
1)基于內存計算,減少低效的磁盤交互;
2)高效的調度算法,基于DAG;
3)容錯機制Linage,精華部分就是DAG和Lingae
Mapreduce和Spark的都是并行計算,那么他們有什么相同和區(qū)別
答:兩者都是用mr模型來進行并行計算:
1)hadoop的一個作業(yè)稱為job,job里面分為map task和reduce task,每個task都是在自己的進程中運行的,當task結束時,進程也會結束。
2)spark用戶提交的任務成為application,一個application對應一個sparkcontext,app中存在多個job,每觸發(fā)一次action操作就會產生一個job。這些job可以并行或串行執(zhí)行,每個job中有多個stage,stage是shuffle過程中DAGSchaduler通過RDD之間的依賴關系劃分job而來的,每個stage里面有多個task,組成taskset有TaskSchaduler分發(fā)到各個executor中執(zhí)行,executor的生命周期是和app一樣的,即使沒有job運行也是存在的,所以task可以快速啟動讀取內存進行計算。
3)hadoop的job只有map和reduce操作,表達能力比較欠缺而且在mr過程中會重復的讀寫hdfs,造成大量的io操作,多個job需要自己管理關系。
spark的迭代計算都是在內存中進行的,API中提供了大量的RDD操作如join,groupby等,而且通過DAG圖可以實現(xiàn)良好的容錯。
spark工作機制?
用戶在client端提交作業(yè)后,會由Driver運行main方法并創(chuàng)建spark context上下文。
執(zhí)行add算子,形成dag圖輸入dagscheduler,按照add之間的依賴關系劃分stage輸入task scheduler。 task scheduler會將stage劃分為task set分發(fā)到各個節(jié)點的executor中執(zhí)行。
cache后面能不能接其他算子,它是不是action操作?
cache可以接其他算子,但是接了算子之后,起不到緩存應有的效果,因為會重新觸發(fā)cache。
cache不是action操作
RDD的彈性表現(xiàn)在哪幾點?
1)自動的進行內存和磁盤的存儲切換;
2)基于Lingage的高效容錯;
3)task如果失敗會自動進行特定次數(shù)的重試;
4)stage如果失敗會自動進行特定次數(shù)的重試,而且只會計算失敗的分片;
5)checkpoint和persist,數(shù)據(jù)計算之后持久化緩存
6)數(shù)據(jù)調度彈性,DAG TASK調度和資源無關
7)數(shù)據(jù)分片的高度彈性,a.分片很多碎片可以合并成大的,b.par
RDD通過Linage(記錄數(shù)據(jù)更新)的方式為何很高效?
1)lazy記錄了數(shù)據(jù)的來源,RDD是不可變的,且是lazy級別的,且rDD
之間構成了鏈條,lazy是彈性的基石。由于RDD不可變,所以每次操作就
產生新的rdd,不存在全局修改的問題,控制難度下降,所有有計算鏈條
將復雜計算鏈條存儲下來,計算的時候從后往前回溯
900步是上一個stage的結束,要么就checkpoint
2)記錄原數(shù)據(jù),是每次修改都記錄,代價很大
如果修改一個集合,代價就很小,官方說rdd是
粗粒度的操作,是為了效率,為了簡化,每次都是
操作數(shù)據(jù)集合,寫或者修改操作,都是基于集合的
rdd的寫操作是粗粒度的,rdd的讀操作既可以是粗粒度的
也可以是細粒度,讀可以讀其中的一條條的記錄。
3)簡化復雜度,是高效率的一方面,寫的粗粒度限制了使用場景
如網(wǎng)絡爬蟲,現(xiàn)實世界中,大多數(shù)寫是粗粒度的場景
Spark的Shuffle過程介紹
http://www.cnblogs.com/jxhd1/p/6528540.html
collect功能是什么,其底層是怎么實現(xiàn)的?
driver通過collect把集群中各個節(jié)點的內容收集過來匯總成結果,collect返回結果是Array類型的,collect把各個節(jié)點上的數(shù)據(jù)抓過來,抓過來數(shù)據(jù)是Array型,collect對Array抓過來的結果進行合并,合并后Array中只有一個元素,是tuple類型(KV類型的)的。
Spaek程序執(zhí)行,有時候默認為什么會產生很多task,怎么修改默認task執(zhí)行個數(shù)?
1)因為輸入數(shù)據(jù)有很多task,尤其是有很多小文件的時候,有多少個輸入
block就會有多少個task啟動;2)spark中有partition的概念,每個partition都會對應一個task,task越多,在處理大規(guī)模數(shù)據(jù)的時候,就會越有效率。不過task并不是越多越好,如果平時測試,或者數(shù)據(jù)量沒有那么大,則沒有必要task數(shù)量太多。3)參數(shù)可以通過spark_home/conf/spark-default.conf配置文件設置:
spark.sql.shuffle.partitions 50 spark.default.parallelism 10
第一個是針對spark sql的task數(shù)量
第二個是非spark sql程序設置生效
為什么Spark Application在沒有獲得足夠的資源,job就開始執(zhí)行了,可能會導致什么什么問題發(fā)生?
會導致執(zhí)行該job時候集群資源不足,導致執(zhí)行job結束也沒有分配足夠的資源,分配了部分Executor,該job就開始執(zhí)行task,應該是task的調度線程和Executor資源申請是異步的;如果想等待申請完所有的資源再執(zhí)行job的:需要將spark.scheduler.maxRegisteredResourcesWaitingTime設置的很大;spark.scheduler.minRegisteredResourcesRatio 設置為1,但是應該結合實際考慮
否則很容易出現(xiàn)長時間分配不到資源,job一直不能運行的情況。
.map與flatMap的區(qū)別
map:對RDD每個元素轉換,文件中的每一行數(shù)據(jù)返回一個數(shù)組對象
flatMap:對RDD每個元素轉換,然后再扁平化
將所有的對象合并為一個對象,文件中的所有行數(shù)據(jù)僅返回一個數(shù)組
對象,會拋棄值為null的值
Spark為什么要持久化,一般什么場景下要進行persist操作?
為什么要進行持久化?
spark所有復雜一點的算法都會有persist身影,spark默認數(shù)據(jù)放在內存,spark很多內容都是放在內存的,非常適合高速迭代,1000個步驟
只有第一個輸入數(shù)據(jù),中間不產生臨時數(shù)據(jù),但分布式系統(tǒng)風險很高,所以容易出錯,就要容錯,rdd出錯或者分片可以根據(jù)血統(tǒng)算出來,如果沒有對父rdd進行persist 或者cache的化,就需要重頭做。
以下場景會使用persist
1)某個步驟計算非常耗時,需要進行persist持久化
2)計算鏈條非常長,重新恢復要算很多步驟,很好使,persist
3)checkpoint所在的rdd要持久化persist,
lazy級別,框架發(fā)現(xiàn)有checnkpoint,checkpoint時單獨觸發(fā)一個job,需要重算一遍,checkpoint前
要持久化,寫個rdd.cache或者rdd.persist,將結果保存起來,再寫checkpoint操作,這樣執(zhí)行起來會非???,不需要重新計算rdd鏈條了。checkpoint之前一定會進行persist。
4)shuffle之后為什么要persist,shuffle要進性網(wǎng)絡傳輸,風險很大,數(shù)據(jù)丟失重來,恢復代價很大
5)shuffle之前進行persist,框架默認將數(shù)據(jù)持久化到磁盤,這個是框架自動做的。
為什么要進行序列化
序列化可以減少數(shù)據(jù)的體積,減少存儲空間,高效存儲和傳輸數(shù)據(jù),不好的是使用的時候要反序列化,非常消耗CPU
介紹一下join操作優(yōu)化經驗?
join其實常見的就分為兩類: map-side join 和 reduce-side join。當大表和小表join時,用map-side join能顯著提高效率。將多份數(shù)據(jù)進行關聯(lián)是數(shù)據(jù)處理過程中非常普遍的用法,不過在分布式計算系統(tǒng)中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數(shù)據(jù)根據(jù) key 發(fā)送到所有的 reduce 分區(qū)中去,也就是 shuffle 的過程。造成大量的網(wǎng)絡以及磁盤IO消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。如果其中有張表較小的話,我們則可以自己實現(xiàn)在 map 端實現(xiàn)數(shù)據(jù)關聯(lián),跳過大量數(shù)據(jù)進行 shuffle 的過程,運行時間得到大量縮短,根據(jù)不同數(shù)據(jù)可能會有幾倍到數(shù)十倍的性能提升。
介紹一下cogroup rdd實現(xiàn)原理,你在什么場景下用過這個rdd?
cogroup的函數(shù)實現(xiàn):這個實現(xiàn)根據(jù)兩個要進行合并的兩個RDD操作,生成一個CoGroupedRDD的實例,這個RDD的返回結果是把相同的key中兩個RDD分別進行合并操作,最后返回的RDD的value是一個Pair的實例,這個實例包含兩個Iterable的值,第一個值表示的是RDD1中相同KEY的值,第二個值表示的是RDD2中相同key的值.由于做cogroup的操作,需要通過partitioner進行重新分區(qū)的操作,因此,執(zhí)行這個流程時,需要執(zhí)行一次shuffle的操作(如果要進行合并的兩個RDD的都已經是shuffle后的rdd,同時他們對應的partitioner相同時,就不需要執(zhí)行shuffle,),
場景:表關聯(lián)查詢
Spark使用parquet文件存儲格式能帶來哪些好處
如果說HDFS 是大數(shù)據(jù)時代分布式文件系統(tǒng)首選標準,那么parquet則是整個大數(shù)據(jù)時代文件存儲格式實時首選標準
速度更快:從使用spark sql操作普通文件CSV和parquet文件速度對比上看,絕大多數(shù)情況
會比使用csv等普通文件速度提升10倍左右,在一些普通文件系統(tǒng)無法在spark上成功運行的情況
下,使用parquet很多時候可以成功運行
- parquet的壓縮技術非常穩(wěn)定出色,在spark sql中對壓縮技術的處理可能無法正常的完成工作
(例如會導致lost task,lost executor)但是此時如果使用parquet就可以正常的完成
- 極大的減少磁盤I/o,通常情況下能夠減少75%的存儲空間,由此可以極大的減少spark sql處理
數(shù)據(jù)的時候的數(shù)據(jù)輸入內容,尤其是在spark1.6x中有個下推過濾器在一些情況下可以極大的
減少磁盤的IO和內存的占用,(下推過濾器)
spark 1.6x parquet方式極大的提升了掃描的吞吐量,極大提高了數(shù)據(jù)的查找速度spark1.6和spark1.5x相比而言,提升了大約1倍的速度,在spark1.6X中,操作parquet時候cpu也進行了極大的優(yōu)化,有效的降低了cpu
采用parquet可以極大的優(yōu)化spark的調度和執(zhí)行。我們測試spark如果用parquet可以有效的減少stage的執(zhí)行消耗,同時可以優(yōu)化執(zhí)行路徑
Executor之間如何共享數(shù)據(jù)?
基于hdfs或者基于tachyon
Spark累加器有哪些特點?
1)累加器在全局唯一的,只增不減,記錄全局集群的唯一狀態(tài)
2)在exe中修改它,在driver讀取
3)executor級別共享的,廣播變量是task級別的共享
兩個application不可以共享累加器,但是同一個app不同的job可以共享
spark hashParitioner的弊端是什么?
HashPartitioner分區(qū)的原理很簡單,對于給定的key,計算其hashCode,并除于分區(qū)的個數(shù)取余,如果余數(shù)小于0,則用余數(shù)+分區(qū)的個數(shù),最后返回的值就是這個key所屬的分區(qū)ID;弊端是數(shù)據(jù)不均勻,容易導致數(shù)據(jù)傾斜,極端情況下某幾個分區(qū)會擁有rdd的所有數(shù)據(jù)
RangePartitioner分區(qū)的原理?
RangePartitioner分區(qū)則盡量保證每個分區(qū)中數(shù)據(jù)量的均勻,而且分區(qū)與分區(qū)之間是有序的,也就是說一個分區(qū)中的元素肯定都是比另一個分區(qū)內的元素小或者大;但是分區(qū)內的元素是不能保證順序的。簡單的說就是將一定范圍內的數(shù)映射到某一個分區(qū)內。其原理是水塘抽樣??梢詤⒖歼@篇博文
https://www.iteblog.com/archives/1522.html
介紹parition和block有什么關聯(lián)關系?
1)hdfs中的block是分布式存儲的最小單元,等分,可設置冗余,這樣設計有一部分磁盤空間的浪費,但是整齊的block大小,便于快速找到、讀取對應的內容;
2)Spark中的partion是彈性分布式數(shù)據(jù)集RDD的最小單元,RDD是由分布在各個節(jié)點上的partion組成的。partion是指的spark在計算過程中,生成的數(shù)據(jù)在計算空間內最小單元,同一份數(shù)據(jù)(RDD)的partion大小不一,數(shù)量不定,是根據(jù)application里的算子和最初讀入的數(shù)據(jù)分塊數(shù)量決定;
3)block位于存儲空間、partion位于計算空間,block的大小是固定的、partion大小是不固定的,是從2個不同的角度去看數(shù)據(jù)。
Spark應用程序的執(zhí)行過程是什么?
1)構建Spark Application的運行環(huán)境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊并申請運行Executor資源;
2).資源管理器分配Executor資源并啟動StandaloneExecutorBackend,Executor運行情況將隨著心跳發(fā)送到資源管理器上;
3).SparkContext構建成DAG圖,將DAG圖分解成Stage,并把Taskset發(fā)送給Task Scheduler。Executor向SparkContext申請Task,Task Scheduler將Task發(fā)放給Executor運行同時SparkContext將應用程序代碼發(fā)放給Executor。
4).Task在Executor上運行,運行完畢釋放所有資源。
Spark如何自定義partitioner分區(qū)器?
1)spark默認實現(xiàn)了HashPartitioner和RangePartitioner兩種分區(qū)策略,我們也可以自己擴展分區(qū)策略,自定義分區(qū)器的時候繼承org.apache.spark.Partitioner類,實現(xiàn)類中的三個方法
def numPartitions: Int:這個方法需要返回你想要創(chuàng)建分區(qū)的個數(shù);
def getPartition(key: Any): Int:這個函數(shù)需要對輸入的key做計算,然后返回該key的分區(qū)ID,范圍一定是0到numPartitions-1;
equals():這個是Java標準的判斷相等的函數(shù),之所以要求用戶實現(xiàn)這個函數(shù)是因為Spark內部會比較兩個RDD的分區(qū)是否一樣。
2)使用,調用parttionBy方法中傳入自定義分區(qū)對象
參考:http://blog.csdn.net/high2011/article/details/68491115
spark中task有幾種類型?
1)result task類型,最后一個task,
2)是shuffleMapTask類型,除了最后一個task都是
什么是二次排序,你是如何用spark實現(xiàn)二次排序的?(互聯(lián)網(wǎng)公司常面)
http://blog.csdn.net/sundujing/article/details/51399606
class SecondarySort(val first:Int, val second:Int) extends Ordered[SecondarySort] with Serializable{
override def compare(that: SecondarySort): Int = {
if(this.first - that.first != 0)
{
this.first - that.first
} else {
this.second - that.second
}
`}
}
object SecondarySortApp {
def main (args: Array[String]) {
//第一步;創(chuàng)建spark的配置對象sparkconf
val conf = new SparkConf()//創(chuàng)建sparkconf對象
conf.setAppName("SecondarySortApp")//設置應用程序的名稱
conf.setMaster("local")//設置本地運行
//創(chuàng)建sparkcontext對象,sparkcontext是程序的唯一入口
val sc = new SparkContext(conf)
val lines = sc.textFile("D:\\JavaWorkspaces\\sparkproject\\sparktest.txt")
val pairWithSortkey = lines.map(line =>(
new SecondarySort( line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
))
val sorted = pairWithSortkey.sortByKey(false)
val sortedResult = sorted.map(sortedline => sortedline._2)
sortedResult.collect.foreach(println)
}
}
如何使用Spark解決TopN問題?(互聯(lián)網(wǎng)公司常面)
函數(shù)partitionFunc、keyFunc準備好之后,我們可以開始調用repartitionAndSortWithinPartitions:
umPartitions值為10,該值取決于分區(qū)(產品線)的個數(shù);ascending值為False,該值表示分區(qū)內排序時使用降序。
如何使用Spark解決分組排序問題?(互聯(lián)網(wǎng)公司常面)
1、對上述數(shù)據(jù)按key值進行分組
2、對分組后的值進行排序
3、截取分組后值得top 3位以key-value形式返回結果
窄依賴父RDD的partition和子RDD的parition是不是都是一對一的關系?
不一定,除了一對一的窄依賴,還包含一對固定個數(shù)的窄依賴(就是對父RDD的依賴的Partition的數(shù)量不會隨著RDD數(shù)量規(guī)模的改變而改變),比如join操作的每個partiion僅僅和已知的partition進行join,這個join操作是窄依賴,依賴固定數(shù)量的父rdd,因為是確定的partition關系
不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?
不一定??!當數(shù)據(jù)規(guī)模小,Hash shuffle快于Sorted Shuffle數(shù)據(jù)規(guī)模大的時候;當數(shù)據(jù)量大,sorted Shuffle會比Hash shuffle快很多,因為數(shù)量大的有很多小文件,不均勻,甚至出現(xiàn)數(shù)據(jù)傾斜,消耗內存大,1.x之前spark使用hash,適合處理中小規(guī)模,1.x之后,增加了Sorted shuffle,Spark更能勝任大規(guī)模處理了。
conslidate是如何優(yōu)化Hash shuffle時在map端產生的小文件?
1)conslidate為了解決Hash Shuffle同時打開過多文件導致Writer handler內存使用過大以及產生過多文件導致大量的隨機讀寫帶來的低效磁盤IO;2)conslidate根據(jù)CPU的個數(shù)來決定每個task shuffle map端產生多少個文件,假設原來有10個task,100個reduce,每個CPU有10個CPU
那么使用hash shuffle會產生10100=1000個文件,conslidate產生1010=100個文件
備注:conslidate部分減少了文件和文件句柄,并行讀很高的情況下(task很多時)還是會很多文件
Sort-basesd shuffle產生多少個臨時文件
2*Map階段所有的task數(shù)量,Mapper階段中并行的Partition的總數(shù)量,其實就是Mapper端task
https://blog.csdn.net/kxr0502/article/details/50616805
Sort-based shuffle的缺陷?
- 如果mapper中task的數(shù)量過大,依舊會產生很多小文件,此時在shuffle傳遞數(shù)據(jù)的過程中reducer段,reduce會需要同時大量的記錄進行反序列化,導致大量的內存消耗和GC的巨大負擔,造成系統(tǒng)緩慢甚至崩潰
2)如果需要在分片內也進行排序,此時需要進行mapper段和reducer段的兩次排序
spark.storage.memoryFraction參數(shù)的含義,實際生產中如何調優(yōu)?
1)用于設置RDD持久化數(shù)據(jù)在Executor內存中能占的比例,默認是0.6,,默認Executor 60%的內存,可以用來保存持久化的RDD數(shù)據(jù)。根據(jù)你選擇的不同的持久化策略,如果內存不夠時,可能數(shù)據(jù)就不會持久化,或者數(shù)據(jù)會寫入磁盤。
2)如果持久化操作比較多,可以提高spark.storage.memoryFraction參數(shù),使得更多的持久化數(shù)據(jù)保存在內存中,提高數(shù)據(jù)的讀取性能,如果shuffle的操作比較多,有很多的數(shù)據(jù)讀寫操作到JVM中,那么應該調小一點,節(jié)約出更多的內存給JVM,避免過多的JVM gc發(fā)生。在web ui中觀察如果發(fā)現(xiàn)gc時間很長,可以設置spark.storage.memoryFraction更小一點。
介紹一下你對Unified Memory Management內存管理模型的理解?
Spark中的內存使用分為兩部分:執(zhí)行(execution)與存儲(storage)
執(zhí)行內存主要用于shuffles、joins、sorts和aggregations,存儲內存則用于緩存或者跨節(jié)點的內部數(shù)據(jù)傳輸。1.6之前,對于一個Executor,內存都有哪些部分構成:
1)ExecutionMemory。這片內存區(qū)域是為了解決 shuffles,joins, sorts and aggregations 過程中為了避免頻繁IO需要的buffer。 通過spark.shuffle.memoryFraction(默認 0.2) 配置。
2)StorageMemory。這片內存區(qū)域是為了解決 block cache(就是你顯示調用dd.cache, rdd.persist等方法), 還有就是broadcasts,以及task results的存儲。可以通過參數(shù) spark.storage.memoryFraction(默認0.6)。設置
3)OtherMemory。給系統(tǒng)預留的,因為程序本身運行也是需要內存的。 (默認為0.2).
描述Yarn執(zhí)行一個任務的過程?
1)客戶端client向ResouceManager提交Application,ResouceManager接受Application
并根據(jù)集群資源狀況選取一個node來啟動Application的任務調度器driver(ApplicationMaster)
2)ResouceManager找到那個node,命令其該node上的nodeManager來啟動一個新的
JVM進程運行程序的driver(ApplicationMaster)部分,driver(ApplicationMaster)啟動時會首先向ResourceManager注冊,說明由自己來負責當前程序的運行
3)driver(ApplicationMaster)開始下載相關jar包等各種資源,基于下載的jar等信息決定向ResourceManager申請具體的資源內容。
4)ResouceManager接受到driver(ApplicationMaster)提出的申請后,會最大化的滿足
資源分配請求,并發(fā)送資源的元數(shù)據(jù)信息給driver(ApplicationMaster);
5)driver(ApplicationMaster)收到發(fā)過來的資源元數(shù)據(jù)信息后會根據(jù)元數(shù)據(jù)信息發(fā)指令給具體
機器上的NodeManager,讓其啟動具體的container。
6)NodeManager收到driver發(fā)來的指令,啟動container,container啟動后必須向driver(ApplicationMaster)注冊。
7)driver(ApplicationMaster)收到container的注冊,開始進行任務的調度和計算,直到
任務完成。
Yarn中的container是由誰負責銷毀的,在Hadoop Mapreduce中container可以復用么?
ApplicationMaster負責銷毀,在Hadoop Mapreduce不可以復用,在spark on yarn程序container可以復用
提交任務時,如何指定Spark Application的運行模式?
1)cluster模式:./spark-submit --class xx.xx.xx --master yarn --deploy-mode cluster xx.jar
- client模式:./spark-submit --class xx.xx.xx --master yarn --deploy-mode client xx.jar
不啟動Spark集群Master和work服務,可不可以運行Spark程序?
可以,只要資源管理器第三方管理就可以,如由yarn管理,spark集群不啟動也可以使用spark;spark集群啟動的是work和master,這個其實就是資源管理框架,yarn中的resourceManager相當于master,NodeManager相當于worker,做計算是Executor,和spark集群的work和manager可以沒關系,歸根接底還是JVM的運行,只要所在的JVM上安裝了spark就可以。
Spark中的4040端口由什么功能?
收集Spark作業(yè)運行的信息
spark on yarn Cluster 模式下,ApplicationMaster和driver是在同一個進程么?
是,driver 位于ApplicationMaster進程中。該進程負責申請資源,還負責監(jiān)控程序、資源的動態(tài)情況。
Spark on Yarn 模式有哪些優(yōu)點?
1)與其他計算框架共享集群資源(eg.Spark框架與MapReduce框架同時運行,如果不用Yarn進行資源分配,MapReduce分到的內存資源會很少,效率低下);資源按需分配,進而提高集群資源利用等。
2)相較于Spark自帶的Standalone模式,Yarn的資源分配更加細致
3)Application部署簡化,例如Spark,Storm等多種框架的應用由客戶端提交后,由Yarn負責資源的管理和調度,利用Container作為資源隔離的單位,以它為單位去使用內存,cpu等。
4)Yarn通過隊列的方式,管理同時運行在Yarn集群中的多個服務,可根據(jù)不同類型的應用程序負載情況,調整對應的資源使用量,實現(xiàn)資源彈性管理。
談談你對container的理解?
1)Container作為資源分配和調度的基本單位,其中封裝了的資源如內存,CPU,磁盤,網(wǎng)絡帶寬等。 目前yarn僅僅封裝內存和CPU
2)Container由ApplicationMaster向ResourceManager申請的,由ResouceManager中的資源調度器異步分配給ApplicationMaster
- Container的運行是由ApplicationMaster向資源所在的NodeManager發(fā)起的,Container運行時需提供內部執(zhí)行的任務命令.
運行在yarn中Application有幾種類型的container?
1) 運行ApplicationMaster的Container:這是由ResourceManager(向內部的資源調度器)申請和啟動的,用戶提交應用程序時,可指定唯一的ApplicationMaster所需的資源;
2) 運行各類任務的Container:這是由ApplicationMaster向ResourceManager申請的,并由ApplicationMaster與NodeManager通信以啟動之。
Spark on Yarn架構是怎么樣的?(要會畫哦,這個圖)
Executor啟動時,資源通過哪幾個參數(shù)指定?
1)num-executors是executor的數(shù)量
2)executor-memory 是每個executor使用的內存
3)executor-cores 是每個executor分配的CPU
Mapreduce的執(zhí)行過程?
階段1:input/map/partition/sort/spill
階段2:mapper端merge
階段3:reducer端merge/reduce/output
詳細過程參考這個http://www.cnblogs.com/hipercomer/p/4516581.html
一個task的map數(shù)量由誰來決定?
一般情況下,在輸入源是文件的時候,一個task的map數(shù)量由splitSize來決定的,那么splitSize是由以下幾個來決定的
goalSize = totalSize / mapred.map.tasks
inSize = max {mapred.min.split.size, minSplitSize}
splitSize = max (minSize, min(goalSize, dfs.block.size))
一個task的reduce數(shù)量,由partition決定。
你的項目提交到job的時候數(shù)據(jù)量有多大?
1)回答出數(shù)據(jù)是什么格式,有沒有采用什么壓縮,采用了壓縮的話,壓縮比大概是多少;2)文件大概多大:大概起了多少個map,起了多少個reduce,map階段讀取了多少數(shù)據(jù),reduce階段讀取了多少數(shù)據(jù),程序大約執(zhí)行了多久,3)集群什么規(guī)模,集群有多少節(jié)點,多少內存,多少CPU核數(shù)等。把這些點回答進去,而不是給個數(shù)字了事。
如何殺死一個正在運行的job
殺死一個job
MRV1:Hadoop job kill jobid
YARN: yarn application -kill applicationId
列出你所知道的調度器,說明其工作原理
a) Fifo schedular 默認的調度器 先進先出
b) Capacity schedular 計算能力調度器 選擇占用內存小 優(yōu)先級高的
c) Fair schedular 調肚臍 公平調度器 所有job 占用相同資源
導致Executor產生FULL gc 的原因,可能導致什么問題?
可能導致Executor僵死問題,海量數(shù)據(jù)的shuffle和數(shù)據(jù)傾斜等都可能導致full gc。以shuffle為例,伴隨著大量的Shuffle寫操作,JVM的新生代不斷GC,Eden Space寫滿了就往Survivor Space寫,同時超過一定大小的數(shù)據(jù)會直接寫到老生代,當新生代寫滿了之后,也會把老的數(shù)據(jù)搞到老生代,如果老生代空間不足了,就觸發(fā)FULL GC,還是空間不夠,那就OOM錯誤了,此時線程被Blocked,導致整個Executor處理數(shù)據(jù)的進程被卡住
Spark執(zhí)行任務時出現(xiàn)java.lang.OutOfMemoryError: GC overhead limit exceeded和java.lang.OutOfMemoryError: java heap space原因和解決方法?
原因:加載了太多資源到內存,本地的性能也不好,gc時間消耗的較多
解決方法:
1)增加參數(shù),-XX:-UseGCOverheadLimit,關閉這個特性,同時增加heap大小,-Xmx1024m
2)下面這個兩個參數(shù)調大點
export SPARK_EXECUTOR_MEMORY=6000M
export SPARK_DRIVER_MEMORY=7000M
可以參考這個:http://www.cnblogs.com/hucn/p/3572384.html
hadoop的TextInputFormat作用是什么,如何自定義實現(xiàn)
InputFormat會在map操作之前對數(shù)據(jù)進行兩方面的預處理
1是getSplits,返回的是InputSplit數(shù)組,對數(shù)據(jù)進行split分片,每片交給map操作一次
2是getRecordReader,返回的是RecordReader對象,對每個split分片進行轉換為key-value鍵值對格式傳遞給map
常用的InputFormat是TextInputFormat,使用的是LineRecordReader對每個分片進行鍵值對的轉換,以行偏移量作為鍵,行內容作為值
自定義類繼承InputFormat接口,重寫createRecordReader和isSplitable方法
在createRecordReader中可以自定義分隔符
Hbase行健列族的概念,物理模型,表的設計原則?
行?。菏莌base表自帶的,每個行健對應一條數(shù)據(jù)。
列族:是創(chuàng)建表時指定的,為列的集合,每個列族作為一個文件單獨存儲,存儲的數(shù)據(jù)都是字節(jié)數(shù)組,其中的數(shù)據(jù)可以有很多,通過時間戳來區(qū)分。
物理模型:整個hbase表會拆分為多個region,每個region記錄著行健的起始點保存在不同的節(jié)點上,查詢時就是對各個節(jié)點的并行查詢,當region很大時使用.META表存儲各個region的起始點,-ROOT又可以存儲.META的起始點。
rowkey的設計原則:各個列簇數(shù)據(jù)平衡,長度原則、相鄰原則,創(chuàng)建表的時候設置表放入regionserver緩存中,避免自動增長和時間,使用字節(jié)數(shù)組代替string,最大長度64kb,最好16字節(jié)以內,按天分表,兩個字節(jié)散列,四個字節(jié)存儲時分毫秒。
列族的設計原則:盡可能少(按照列族進行存儲,按照region進行讀取,不必要的io操作),經常和不經常使用的兩類數(shù)據(jù)放入不同列族中,列族名字盡可能短。
Hadoop性能調優(yōu)?
調優(yōu)可以通過系統(tǒng)配置、程序編寫和作業(yè)調度算法來進行。
hdfs的block.size可以調到128/256(網(wǎng)絡很好的情況下,默認為64)
調優(yōu)的大頭:mapred.map.tasks、mapred.reduce.tasks設置mr任務數(shù)(默認都是1)
mapred.tasktracker.map.tasks.maximum每臺機器上的最大map任務數(shù)
mapred.tasktracker.reduce.tasks.maximum每臺機器上的最大reduce任務數(shù)
mapred.reduce.slowstart.completed.maps配置reduce任務在map任務完成到百分之幾的時候開始進入
這個幾個參數(shù)要看實際節(jié)點的情況進行配置,reduce任務是在33%的時候完成copy,要在這之前完成map任務,(map可以提前完成)
mapred.compress.map.output,mapred.output.compress配置壓縮項,消耗cpu提升網(wǎng)絡和磁盤io
合理利用combiner
注意重用writable對象
spark的優(yōu)化怎么做?
通過spark-env文件、程序中sparkconf和set property設置。
(1)計算量大,形成的lineage過大應該給已經緩存了的rdd添加checkpoint,以減少容錯帶來的開銷。
(2)小分區(qū)合并,過小的分區(qū)造成過多的切換任務開銷,使用repartition。
ALS算法原理?
對于user-product-rating數(shù)據(jù),als會建立一個稀疏的評分矩陣,其目的就是通過一定的規(guī)則填滿這個稀疏矩陣。
als會對稀疏矩陣進行分解,分為用戶-特征值,產品-特征值,一個用戶對一個產品的評分可以由這兩個矩陣相乘得到。
通過固定一個未知的特征值,計算另外一個特征值,然后交替反復進行最小二乘法,直至差平方和最小,即可得想要的矩陣。