http://www.csdn.net/article/2015-04-24/2824545
HIve on spark 總體設(shè)計(jì)思路,盡可能重用Hive邏輯層面的功能;從省城物理計(jì)劃開始,提供一整套針對(duì)spark的實(shí)現(xiàn),比如SparkCompiler,SparkTask等,這樣HIve的查詢就可以作為Spark的任務(wù)來(lái)執(zhí)行了。一下是幾點(diǎn)主要的設(shè)計(jì)原則。
1、盡可能減少對(duì)Hive原有代碼的修改,這是和之前的Shark設(shè)計(jì)思路最大的不同,Shark對(duì)HIve的修改動(dòng)太大以至于無(wú)法被Hive社區(qū)接受,HIve on spark盡可能減少改動(dòng)HIve的代碼,從而不影響Hive目前對(duì)MapReduce和Tez的支持,同時(shí),HIve on spark 保證對(duì)現(xiàn)有的MapReduce和Tez模式在功能性能方面不會(huì)有任何影響。
2、對(duì)于選擇Spark的用戶,應(yīng)使其能夠自動(dòng)的獲取hive現(xiàn)有和未來(lái)新增的功能。
3、盡可能降低維護(hù)成本,保證對(duì)spark依賴的松耦合。
新的計(jì)算引擎
Hive的用戶可以通過(guò)hive.execution.engine來(lái)設(shè)置計(jì)算引擎,目前該參數(shù)可選的值為mr何tez,為了實(shí)現(xiàn)Hive on spark,我們將spark作為參數(shù)第三個(gè)選項(xiàng)。要開啟Hive on Spark模式,用戶僅需將這個(gè)參數(shù)設(shè)置為Spark即可。
以Hive的表作為RDD
Spark以分布式可靠數(shù)據(jù)集合作為其數(shù)據(jù)抽象,因此我們需要將Hive的表轉(zhuǎn)換為RDD以便spark處理。本質(zhì)上,hive表和spark的HadoopRDD都是HDFS上的一組文件,通過(guò)inputFormat和RecordReader讀取其中的數(shù)據(jù),因此這個(gè)轉(zhuǎn)化為自然而然的。
使用Hive原語(yǔ)
這里主要是指Hive的操作符對(duì)數(shù)據(jù)進(jìn)行處理。Spark為RDD提供了一些列的轉(zhuǎn)換Transformation,其中有些轉(zhuǎn)換也是面向SQL的,比如groupByKey、join等如果使用這些轉(zhuǎn)換(就如Shark所做的那樣),那就意味著我們要重新實(shí)現(xiàn)一些Hive已有的功能;而且當(dāng)Hive增加新的功能時(shí),我們需要響應(yīng)的Hive on spark模式。有鑒于此,我們選擇將Hive的操作包裝為Function,然后應(yīng)用到RDD上,這樣,我們只需要依賴較少的集中RDD的轉(zhuǎn)換,而主要的計(jì)算邏輯任由Hive提供。
由于使用了Hive的原語(yǔ),因此我們需要顯示的調(diào)用一些Transformation來(lái)實(shí)現(xiàn)Shuffle的功能。下表中列舉了Hive on Spark使用的所有轉(zhuǎn)換。
Transformation ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?功能 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 應(yīng)用場(chǎng)景
mapPartitionsToPair ? ? ? ? ? ? ? ? ? 將function應(yīng)用到RDD的每個(gè)partition上 ? ? ? ? ? 主要的計(jì)算邏輯
union ? ? ? ? ? ? ? ? ? ? 返回兩個(gè)RDD的聯(lián)合 ? ? ? ? ? ? ? ? ?多表查詢
groupByKey ? ? ? ?按照key對(duì)RDD進(jìn)行g(shù)roup,group后的結(jié)果不保證有序,使用Hash Partitioner ? ? ?Shuffle 使用用不要求排序的情況
sortByKey ? ? ? ? ? 按照key對(duì)RDD進(jìn)行全局排序,使用Range Partitioner ? ? Shuffle時(shí)使,適用于全局排序的查詢,且由于使用了Range Partitioner,因此可以用多個(gè)reducer來(lái)實(shí)現(xiàn)全局排序
repartitionAndSortWithinPartitions ? 對(duì)RDD進(jìn)行重新分區(qū),并對(duì)每個(gè)分區(qū)進(jìn)行排序,使用hash Partition ?shuffle時(shí)使用,適用于需要排序的情況
repartitionAndSortWithinPartitions簡(jiǎn)單說(shuō)明,這個(gè)功能由SPARK-2978引入,目的是提供一種MapReduce風(fēng)格的Shuffle.雖然sortBySort也提供了排序的功能,但某些情況下我們并不需要全局有序,另外其使用的Range Partitioner對(duì)于某些Hive查詢并不適用。
物理執(zhí)行計(jì)劃:
通過(guò)SparkCompiler將Operator Tree轉(zhuǎn)換為Task Tree,其中需要提交給spark執(zhí)行的任務(wù)即為sparkTask,不同于MapReduce中Map+Reduce的兩階段執(zhí)行模式,spark采用DAG的執(zhí)行模式,因此一個(gè)sparktask包含了一個(gè)表示RDD轉(zhuǎn)換的DAG,我們將這個(gè)DAG包裝為SparkWork。執(zhí)行SparkTask時(shí),就根據(jù)SparkWork所表示的DAG計(jì)算出最終的RDD,然后通過(guò)RDD的foreachAsync來(lái)出阿發(fā)原酸,使用foreachAsync是因?yàn)槲覀兪褂昧薍ive原語(yǔ),因此不需要RDD返回結(jié)果;此外foreachAsync異步提交任務(wù)便于我們對(duì)任務(wù)進(jìn)行監(jiān)控。
SparkContext聲明周期
sparkContext是用戶與spark集群進(jìn)行交互的接口,HIve on spark應(yīng)該為每個(gè)用戶會(huì)話創(chuàng)建一個(gè)SparkContext。但是Spark目前的使用方式假設(shè)SparkContext的生命周期是spark應(yīng)用級(jí)別的,而且目前同一個(gè)JVM中不能創(chuàng)建多個(gè)SparkContext,這明顯無(wú)法滿足HiveServer2的應(yīng)用場(chǎng)景,因?yàn)槎鄠€(gè)客戶端需要通過(guò)一個(gè)HiveServer2來(lái)提供服務(wù)。鑒于此,我們需要在單獨(dú)JVM中啟動(dòng)SparkContext,并通過(guò)RPC與遠(yuǎn)程的SparkContext進(jìn)行通信。
任務(wù)監(jiān)控與統(tǒng)計(jì)信息收集
spark提供了SparkListener接口來(lái)監(jiān)聽任務(wù)執(zhí)行期間的各種事件,因此我們可以實(shí)現(xiàn)一個(gè)Listener來(lái)監(jiān)控任務(wù)執(zhí)行進(jìn)度以及收集任務(wù)級(jí)別的統(tǒng)計(jì)的統(tǒng)計(jì)信息(目前任務(wù)級(jí)別的統(tǒng)計(jì)由sparkListener采集,任務(wù)進(jìn)度則由spark提供專門的api來(lái)監(jiān)控)另外Hive還提供了Operatior級(jí)別的統(tǒng)計(jì)數(shù)據(jù)信息,比如讀取的行數(shù)等。在MapReduce模式下,這些信息通過(guò)Hadoop Counter收集。我們可以使用Spark提供的Accumulator來(lái)實(shí)現(xiàn)該功能。
測(cè)試
除了一般的單元測(cè)試以外,hive還提供了Qfile Test,即圓形一些事先定義好的查詢,并根據(jù)結(jié)果判斷測(cè)試是否通過(guò)。HIve on spark的Qfile Test應(yīng)該盡可能接近真實(shí)的spark部署環(huán)境。目前我們采用的是local-cluster方式(該不是)。。。。
實(shí)現(xiàn)細(xì)節(jié):
sparkTask的生成執(zhí)行

我們通過(guò)一個(gè)例子看下一個(gè)簡(jiǎn)單的兩表JOIN查詢?nèi)绾伪晦D(zhuǎn)換成為sparkTask并被執(zhí)行。下圖左半部分展示了這個(gè)查詢的operation tree,以及該operation Tree如何被轉(zhuǎn)化成SparkTask;右半部分展示了該SparkTask執(zhí)行時(shí)如何得到最終的RDD并通過(guò)foreachAsync提交spark任務(wù)
SparkCompiler遍歷Operator Tree,將其劃分為不同的MapWork和reducework,Mapwork為根節(jié)點(diǎn),總是由TableScanOperator(hive中對(duì)表進(jìn)行掃描操作符)開始;后續(xù)的work均為reducework。ReduceSinkOperator(Hive中進(jìn)行Shuffle輸出的操作符)用來(lái)標(biāo)記兩個(gè)Work之間的界限,出現(xiàn)ReduceSinkOperator表示當(dāng)前work到下一個(gè)work之間的數(shù)據(jù)需要進(jìn)行shuffle,因此當(dāng)我們發(fā)現(xiàn)FileSinkOperator(hive中將結(jié)果輸出到文件的操作符)的work為葉子節(jié)點(diǎn)。與MapReduce最大的不同在于,我們并不要求ReduceWork一定是葉子節(jié)點(diǎn),即ReduceWork之后可以連接更多的reducework,并在同一個(gè)SParkTask中執(zhí)行。
這個(gè)查詢的OperatorTree 被轉(zhuǎn)化為兩個(gè)MapWork和一個(gè)ReduceWork。在執(zhí)行SparkTask時(shí),首先根據(jù)MapWork來(lái)生成最底層的HadoopRDD,然后將各個(gè)MapWork和ReduceWork包裝成Function應(yīng)用到RDD上,在有依賴的Work之間,需要顯示的調(diào)用shuffle轉(zhuǎn)換,具體選用哪種shuffle則根據(jù)查詢的類型來(lái)確定。另外,由于這個(gè)例子涉及多表查詢,因此在shuffle之前還要對(duì)RDD進(jìn)行Union,經(jīng)過(guò)一系列的轉(zhuǎn)換后,得到最終的RDD,并通過(guò)foreachAsync提交到Spark集群上進(jìn)行計(jì)算。
運(yùn)行模式

Hive on Spark支持兩種運(yùn)行模式;本地和遠(yuǎn)程。當(dāng)用戶吧Spark Master URL設(shè)置為local時(shí),采用本地模式;其余情況則采用遠(yuǎn)程模式。本地模式下,SparkContext與客戶端運(yùn)行在同一個(gè)JVM中,遠(yuǎn)程模式下,SparkContext運(yùn)行在一個(gè)獨(dú)立的JVM中,本地模式提供了主要為了調(diào)試,一般用戶不應(yīng)該選擇該模式。因此我們這里主要介紹遠(yuǎn)程模式(Remote SparkContext。RSC)
用戶的每個(gè)Session都會(huì)創(chuàng)建一個(gè)SparkClient,SparkClient會(huì)啟動(dòng)RemoteDriver進(jìn)程,并由RemoteDriver創(chuàng)建SparkContext。SparkTask執(zhí)行時(shí),通過(guò)session提交任務(wù),任務(wù)的主體就是對(duì)應(yīng)的SparkWork。SparkClient將任務(wù)提交給RemoteDriver,并返回一個(gè)SparkJobRef,通過(guò)該SparkJobRef,客戶端可以監(jiān)控任務(wù)的進(jìn)度,進(jìn)行錯(cuò)誤處理,以及采集統(tǒng)計(jì)信息等。由于最終的RDD計(jì)算沒有返回結(jié)果,因此客戶端只需要監(jiān)控執(zhí)行進(jìn)度而不需要處理返回值。RemoteDriver通過(guò)SparkListenner收集任務(wù)級(jí)別的統(tǒng)計(jì)數(shù)據(jù),通過(guò)Accumulator收集Operator級(jí)別的統(tǒng)計(jì)數(shù)據(jù)(Accumulator被包裝為SparkCounter),并在任務(wù)結(jié)束時(shí),返回給SparkClient。
SparkClient與RemoteDriver之間通過(guò)Netty的RPC進(jìn)行通信。除了提交任務(wù),SparkClient還提供了諸如添加jar包,獲取集群信息的接口。如果客戶端需要使用更一般的SparkContext的功能,可以自定義一個(gè)任務(wù)并通過(guò)SparkClient發(fā)送到Driver上執(zhí)行。
理論上來(lái)說(shuō),HIve on Spark對(duì)Spark集群的部署方式?jīng)]有特別的要求,除了local以外,RemoteDriver可以連接到任務(wù)的Spark集群來(lái)執(zhí)行任務(wù)。在我們的測(cè)試中,HIve on Spark在Standalone和spark on yarn的計(jì)算上都能正常工作(需要?jiǎng)討B(tài)添加jar包的查詢?cè)趛arn-cluster模式下還不能運(yùn)行)
優(yōu)化
Map join
Map join是Hive中一個(gè)重要的優(yōu)化,其原理是,如果參與join比較小的表可以存放如內(nèi)存,因?yàn)檫@些小表在內(nèi)存中生成Hash Table,這樣較大的表只需要通過(guò)一個(gè)MapWork唄掃描一次,然后與內(nèi)存中的Hash Table進(jìn)行join了,生氣了Shuffle和ReduceWork的開銷。在MapReduce模式下,通過(guò)一個(gè)在客戶端本地執(zhí)行的任務(wù)來(lái)小表生成Hash table,并保存在文件系統(tǒng)上。后續(xù)的MapWork首先將Hash Table上傳至Distibuted Cache中,最后只要讀取大表和Distributed cache中的數(shù)據(jù)進(jìn)行join就可以。
Hive on spark 對(duì)Map Join的實(shí)現(xiàn)與Map Reduce不同。當(dāng)初我們考慮使用spark提供的廣播功能來(lái)把小表的Hash table分發(fā)到各個(gè)節(jié)點(diǎn)上。使用廣播的有點(diǎn)是spark采用高效的廣播算法,其性能應(yīng)該優(yōu)于使用distributed cache。而使用廣播的缺點(diǎn)會(huì)為Driver何計(jì)算及誒點(diǎn)帶來(lái)很大的內(nèi)存開銷。為了使用廣播,Hash table的數(shù)據(jù)需要先被傳送到Driver端,然后由Driver進(jìn)行廣播;而且即使在廣播之后,Driver仍需保留部分?jǐn)?shù)據(jù),以便應(yīng)對(duì)計(jì)算節(jié)點(diǎn)的錯(cuò)誤,雖然支持spill,但廣播數(shù)據(jù)仍會(huì)加劇Driver的內(nèi)存壓力。此外廣播相對(duì)的開發(fā)成本比較高,不利于對(duì)已有的代碼的服用。
因此Hive on Spark選擇類似于Distributed cache的方式來(lái)實(shí)現(xiàn)Map join,而且為小表生成Hash table的任務(wù)可以分布式的執(zhí)行,進(jìn)一步減輕客戶端的壓力。

不同于MapReduce,對(duì)于Hive on spark而言,LocalWork只是為了提供一些優(yōu)化時(shí)必要的信息,并不會(huì)真正被執(zhí)行。對(duì)于小表的掃描以獨(dú)立的SparkTask分布式地執(zhí)行,為此,我們也實(shí)現(xiàn)了能夠分布式運(yùn)行HashTableSinkOperator(Hive中輸出小表Hash Table的操作符),其主要原理是通過(guò)提高HDFS Replication Factor的方式,是的生成的HashTable能夠被每個(gè)節(jié)點(diǎn)在本地訪問(wèn)。
雖然目前采取了類似Distributed的這種實(shí)現(xiàn)方式,但如果再后期的測(cè)試中發(fā)現(xiàn)廣播的方式確實(shí)能帶來(lái)較大的性能提升,而且其引入的內(nèi)存開銷可以被接受,我們也會(huì)考慮改用廣播來(lái)實(shí)現(xiàn)Map Join
Table cache
Spark的一個(gè)優(yōu)勢(shì)是可以利用充分利用內(nèi)存,允許用戶顯示地把一個(gè)RDD保存到內(nèi)存或者磁盤上,以便于在多次訪問(wèn)時(shí)候提高性能。另外在目前的RDD轉(zhuǎn)換模式弘,一個(gè)RDD的數(shù)據(jù)是無(wú)法同時(shí)被多個(gè)下游使用,當(dāng)一個(gè)RDD需要通過(guò)不同的轉(zhuǎn)換得到不同的子節(jié)點(diǎn)時(shí),就要被計(jì)算多次。這時(shí)我們應(yīng)該使用cache來(lái)避免重復(fù)計(jì)算。
在Shark和SparkSQL中,都允許用戶顯式吧一張表cache來(lái)提高對(duì)該表的查詢性能。比如以下查詢,在

在這種情況下,對(duì)應(yīng)的SparkWork中,一個(gè)MapWork/ReduceWork會(huì)有多個(gè)下游的Work,如果不進(jìn)行cache,那么共享的數(shù)據(jù)就會(huì)被計(jì)算多次。為了避免這種情況,我們會(huì)將這些MapWork/ReduceWork復(fù)制成多個(gè),每個(gè)對(duì)應(yīng)一個(gè)下游的Work,并對(duì)其共享的數(shù)據(jù)進(jìn)行cache(由于IOContext的同步問(wèn)題,該工功能尚未完成)
更為一般的應(yīng)用場(chǎng)景是一張表在查詢中被使用了多次的情況,Hive on spark 目前還不會(huì)針對(duì)這種查詢進(jìn)行cache,
初步性能測(cè)試:
測(cè)試集群由10臺(tái)虛擬機(jī)組成,測(cè)試數(shù)據(jù)為320GB的TPC-DS數(shù)據(jù)。目前的測(cè)試用例為6條,包含了自定義的查詢以及TPC-DS數(shù)據(jù)集合。目前的測(cè)試用例為6條,包含了自定義的查詢以及TPC-DS中兩條查詢。由于Hive主要用于處理ETL查詢,因此我們?cè)赥PC-DS中選取用例時(shí),選用較為接近ETL查詢用例(TPC-DS的用例主要針對(duì)交互式查詢,impala,sparkSQL等引擎更合適此類查詢),主要針對(duì)Hive on Spark 和Hive on Tez進(jìn)行性能對(duì)比

圖中橫坐標(biāo)為各個(gè)測(cè)試用例,縱坐標(biāo)為所用時(shí)間,以秒為單位。
總結(jié):
Hive on spark由多家公司協(xié)作開發(fā),從項(xiàng)目開始以來(lái),受到社區(qū)的廣泛關(guān)注。