大數(shù)據(jù)Hadoop之——Spark SQL+Spark Streaming

一、Spark SQL概述

Spark SQL是Spark用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)模塊,它提供了兩個(gè)編程抽象叫做DataFrame和DataSet并且作為分布式SQL查詢(xún)引擎的作用,其實(shí)也是對(duì)RDD的再封裝。大數(shù)據(jù)Hadoop之——計(jì)算引擎Spark,官方文檔:https://spark.apache.org/sql/

二、SparkSQL版本

1)SparkSQL的演變之路

1.png
  • 1.0以前: Shark(入口:SQLContext和HiveContext)

    1. SQLContext:主要DataFrame的構(gòu)建以及DataFrame的執(zhí)行,SQLContext指的是spark中SQL模塊的程序入口。
    2. HiveContext:是SQLContext的子類(lèi),專(zhuān)門(mén)用于與Hive的集成,比如讀取Hive的元數(shù)據(jù),數(shù)據(jù)存儲(chǔ)到Hive表、Hive的窗口分析函數(shù)等。
  • 1.1.x開(kāi)始:SparkSQL(只是測(cè)試性的)

  • 1.3.x: SparkSQL(正式版本)+Dataframe

  • 1.5.x: SparkSQL 鎢絲計(jì)劃

  • 1.6.x: SparkSQL+DataFrame+DataSet(測(cè)試版本)

  • 2.x:

    1. 入口:SparkSession(spark應(yīng)用程序的一個(gè)整體入口),合并了SQLContext和HiveContext
    2. SparkSQL+DataFrame+DataSet(正式版本)
    3. Spark Streaming-》Structured Streaming(DataSet)

2)shark與SparkSQL對(duì)比

  • shark
    1. 執(zhí)行計(jì)劃優(yōu)化完全依賴(lài)于Hive,不方便添加新的優(yōu)化策略;
    2. Spark是線(xiàn)程級(jí)并行,而MapReduce是進(jìn)程級(jí)并行。
    3. Spark在兼容Hive的實(shí)現(xiàn)上存在線(xiàn)程安全問(wèn)題,導(dǎo)致Shark
      不得不使用另外一套獨(dú)立維護(hù)的打了補(bǔ)丁的Hive源碼分支;
  • Spark SQL
    1. 作為Spark生態(tài)的一員繼續(xù)發(fā)展,而不再受限于Hive,
    2. 只是兼容Hive;Hive on Spark作為Hive的底層引擎之一
    3. Hive可以采用Map-Reduce、Tez、Spark等引擎

3)SparkSession

  • SparkSession是Spark 2.0引如的新概念。SparkSession為用戶(hù)提供了統(tǒng)一的切入點(diǎn),來(lái)讓用戶(hù)學(xué)習(xí)spark的各項(xiàng)功能。
  • 在spark的早期版本中,SparkContext是spark的主要切入點(diǎn),由于RDD是主要的API,我們通過(guò)sparkcontext來(lái)創(chuàng)建和操作RDD。對(duì)于每個(gè)其他的API,我們需要使用不同的context。

【例如】對(duì)于Streming,我們需要使用StreamingContext;對(duì)于sql,使用sqlContext;對(duì)于Hive,使用hiveContext。但是隨著DataSet和DataFrame的API逐漸成為標(biāo)準(zhǔn)的API,就需要為他們建立接入點(diǎn)。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點(diǎn),SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向后兼容,SQLContext和HiveContext也被保存下來(lái)。

  • SparkSession實(shí)質(zhì)上是SQLContext和HiveContext的組合(未來(lái)可能還會(huì)加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內(nèi)部封裝了sparkContext,所以計(jì)算實(shí)際上是由sparkContext完成的,在spark 2.x中不推薦使用SparkContext對(duì)象讀取數(shù)據(jù),而是推薦SparkSession。

三、RDD、DataFrames和DataSet

1)三者關(guān)聯(lián)關(guān)系

DataFrame 和 DataSet 是 Spark SQL 提供的基于 RDD 的結(jié)構(gòu)化數(shù)據(jù)抽象。它既有 RDD 不可變、分區(qū)、存儲(chǔ)依賴(lài)關(guān)系等特性,又擁有類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù)的結(jié)構(gòu)化信息。所以,基于 DataFrame 和 DataSet API 開(kāi)發(fā)出的程序會(huì)被自動(dòng)優(yōu)化,使得開(kāi)發(fā)人員不需要操作底層的 RDD API 來(lái)進(jìn)行手動(dòng)優(yōu)化,大大提升開(kāi)發(fā)效率。但是 RDD API 對(duì)于非結(jié)構(gòu)化的數(shù)據(jù)處理有獨(dú)特的優(yōu)勢(shì),比如文本流數(shù)據(jù),而且更方便我們做底層的操作。

2.png

3.png

1)RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。RDD允許用戶(hù)在執(zhí)行多個(gè)查詢(xún)時(shí)顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢(xún)能夠重用工作集,這極大地提升了查詢(xún)速度。

1、核心概念

  • 一組分片(Partition):即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶(hù)可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒(méi)有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

  • 一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。

  • RDD之間的依賴(lài)關(guān)系:RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類(lèi)似于流水線(xiàn)一樣的前后依賴(lài)關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴(lài)關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

  • 一個(gè)Partitioner:即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類(lèi)型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

  • 一個(gè)列表:存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。

2、RDD簡(jiǎn)單操作

啟動(dòng)spark-shell,其實(shí)spark-shell低層也是調(diào)用spark-submit,首先需要配置好,當(dāng)然也可以寫(xiě)在命令行,但是不推薦。配置如下,僅供參考(這里使用yarn模式):

$ cat spark-defaults.conf
4.png

啟動(dòng)spark-shell(下面會(huì)詳解講解)

$ spark-shell
5.png

\color{red}{【問(wèn)題】發(fā)現(xiàn)有個(gè)WARN:WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.}
【原因】是因?yàn)镾park提交任務(wù)到y(tǒng)arn集群,需要上傳相關(guān)spark的jar包到HDFS。
【解決】 提前上傳到HDFS集群,并且在Spark配置文件指定文件路徑,就可以避免每次提交任務(wù)到Y(jié)arn都需要重復(fù)上傳文件。下面是解決的具體操作步驟:

### 打包jars,jar相關(guān)的參數(shù)說(shuō)明
#-c  創(chuàng)建一個(gè)jar包
# -t 顯示jar中的內(nèi)容列表
#-x 解壓jar包
#-u 添加文件到j(luò)ar包中
#-f 指定jar包的文件名
#-v  生成詳細(xì)的報(bào)造,并輸出至標(biāo)準(zhǔn)設(shè)備
#-m 指定manifest.mf文件.(manifest.mf文件中可以對(duì)jar包及其中的內(nèi)容作一些一設(shè)置)
#-0 產(chǎn)生jar包時(shí)不對(duì)其中的內(nèi)容進(jìn)行壓縮處理
#-M 不產(chǎn)生所有文件的清單文件(Manifest.mf)。這個(gè)參數(shù)與忽略掉-m參數(shù)的設(shè)置
#-i    為指定的jar文件創(chuàng)建索引文件
#-C 表示轉(zhuǎn)到相應(yīng)的目錄下執(zhí)行jar命令,相當(dāng)于cd到那個(gè)目錄,然后不帶-C執(zhí)行jar命令
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
$ jar cv0f spark-libs.jar -C ./jars/ .
$ ll
### 在hdfs上創(chuàng)建存放jar包目錄
$ hdfs dfs -mkdir -p /spark/jars
## 上傳jars到HDFS
$ hdfs dfs -put spark-libs.jar /spark/jars/
## 增加配置spark-defaults.conf 
spark.yarn.archive=hdfs:///spark/jars/spark-libs.jar

然后再啟動(dòng)spark-shell

在Spark Shell中,有一個(gè)專(zhuān)有的SparkContext已經(jīng)為您創(chuàng)建好了,變量名叫做sc,自己創(chuàng)建的SparkContext將無(wú)法工作。

$ spark-shell
6.png
### 由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建。
val array = Array(1,2,3,4,5)
# spark使用parallelize方法創(chuàng)建RDD
val rdd = sc.parallelize(array)
7.png

這里只是簡(jiǎn)單的創(chuàng)建RDD操作,后面會(huì)有更多RDD相關(guān)的演示操作。

3、RDD API

Spark支持兩個(gè)類(lèi)型(算子)操作:Transformation和Action

1)Transformation

主要做的是就是將一個(gè)已有的RDD生成另外一個(gè)RDD。Transformation具有l(wèi)azy特性(延遲加載)。Transformation算子的代碼不會(huì)真正被執(zhí)行。只有當(dāng)我們的程序里面遇到一個(gè)action算子的時(shí)候,代碼才會(huì)真正的被執(zhí)行。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。

常用的Transformation:

轉(zhuǎn)換 含義
map(func) 返回一個(gè)新的RDD,該RDD由每一個(gè)輸入元素經(jīng)過(guò)func函數(shù)轉(zhuǎn)換后組成
filter(func) 返回一個(gè)新的RDD,該RDD由經(jīng)過(guò)func函數(shù)計(jì)算后返回值為true的輸入元素組成
flatMap(func) 類(lèi)似于map,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列,而不是單一元素)
mapPartitions(func) 類(lèi)似于map,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行,因此在類(lèi)型為T(mén)的RDD上運(yùn)行時(shí),func的函數(shù)類(lèi)型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類(lèi)似于mapPartitions,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值,因此在類(lèi)型為T(mén)的RDD上運(yùn)行時(shí),func的函數(shù)類(lèi)型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對(duì)數(shù)據(jù)進(jìn)行采樣,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子
union(otherDataset) 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD
intersection(otherDataset) 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD
distinct([numTasks])) 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD
groupByKey([numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K,V)的RDD,使用指定的reduce函數(shù),將相同key的值聚合到一起,與groupByKey類(lèi)似,reduce任務(wù)的個(gè)數(shù)可以通過(guò)第二個(gè)可選的參數(shù)來(lái)設(shè)置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分區(qū)聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對(duì)k/y的RDD進(jìn)行操作
sortByKey([ascending], [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,K必須實(shí)現(xiàn)Ordered接口,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類(lèi)似,但是更靈活 第一個(gè)參數(shù)是根據(jù)什么排序 第二個(gè)是怎么排序 false倒序 第三個(gè)排序后分區(qū)數(shù) 默認(rèn)與原RDD一樣
join(otherDataset, [numTasks]) 在類(lèi)型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD 相當(dāng)于內(nèi)連接(求交集)
cogroup(otherDataset, [numTasks]) 在類(lèi)型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類(lèi)型的RDD
cartesian(otherDataset) 兩個(gè)RDD的笛卡爾積 的成很多個(gè)K/V
pipe(command, [envVars]) 調(diào)用外部程序
coalesce(numPartitions) 重新分區(qū) 第一個(gè)參數(shù)是要分多少區(qū),第二個(gè)參數(shù)是否shuffle 默認(rèn)false 少分區(qū)變多分區(qū) true 多分區(qū)變少分區(qū) false
repartition(numPartitions)
重新分區(qū) 必須shuffle 參數(shù)是要分多少區(qū) 少變多
repartitionAndSortWithinPartitions(partitioner) 重新分區(qū)+排序 比先分區(qū)再排序效率高 對(duì)K/V的RDD進(jìn)行操作
foldByKey(zeroValue)(seqOp) 該函數(shù)用于K/V做折疊,合并處理 ,與aggregate類(lèi)似 第一個(gè)括號(hào)的參數(shù)應(yīng)用于每個(gè)V值 第二括號(hào)函數(shù)是聚合例如:+
combineByKey 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
partitionBy(partitioner) 對(duì)RDD進(jìn)行分區(qū) partitioner是分區(qū)器 例如new HashPartition(2)
cache/persist RDD緩存,可以避免重復(fù)計(jì)算從而減少時(shí)間,區(qū)別:cache內(nèi)部調(diào)用了persist算子,cache默認(rèn)就一個(gè)緩存級(jí)別MEMORY-ONLY ,而persist則可以選擇緩存級(jí)別
Subtract(rdd) 返回前rdd元素不在后rdd的rdd
leftOuterJoin leftOuterJoin類(lèi)似于SQL中的左外關(guān)聯(lián)left outer join,返回結(jié)果以前面的RDD為主,關(guān)聯(lián)不上的記錄為空。只能用于兩個(gè)RDD之間的關(guān)聯(lián),如果要多個(gè)RDD關(guān)聯(lián),多關(guān)聯(lián)幾次即可。
rightOuterJoin rightOuterJoin類(lèi)似于SQL中的有外關(guān)聯(lián)right outer join,返回結(jié)果以參數(shù)中的RDD為主,關(guān)聯(lián)不上的記錄為空。只能用于兩個(gè)RDD之間的關(guān)聯(lián),如果要多個(gè)RDD關(guān)聯(lián),多關(guān)聯(lián)幾次即可
subtractByKey substractByKey和基本轉(zhuǎn)換操作中的subtract類(lèi)似只不過(guò)這里是針對(duì)K的,返回在主RDD中出現(xiàn),并且不在otherRDD中出現(xiàn)的元素
2)Action

觸發(fā)代碼的運(yùn)行,我們一段spark代碼里面至少需要有一個(gè)action操作。

常用的Action:

動(dòng)作 含義
reduce(func) 通過(guò)func函數(shù)聚集RDD中的所有元素,這個(gè)功能必須是課交換且可并聯(lián)的
collect() 在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
count() 返回RDD的元素個(gè)數(shù)
first() 返回RDD的第一個(gè)元素(類(lèi)似于take(1))
take(n) 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個(gè)數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子
takeOrdered(n, [ordering]) 返回原RDD排序(默認(rèn)升序排)后,前n個(gè)元素組成的數(shù)組
saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對(duì)于每個(gè)元素,Spark將會(huì)調(diào)用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。
saveAsObjectFile(path) saveAsObjectFile用于將RDD中的元素序列化成對(duì)象,存儲(chǔ)到文件中。使用方法和saveAsTextFile類(lèi)似
countByKey() 針對(duì)(K,V)類(lèi)型的RDD,返回一個(gè)(K,Int)的map,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)。
foreach(func) 在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新。
aggregate 先對(duì)分區(qū)進(jìn)行操作,在總體操作
reduceByKeyLocally 返回一個(gè) dict 對(duì)象,同樣是將同 key 的元素進(jìn)行聚合
lookup lookup用于(K,V)類(lèi)型的RDD,指定K值,返回RDD中該K對(duì)應(yīng)的所有V值。
top top函數(shù)用于從RDD中,按照默認(rèn)(降序)或者指定的排序規(guī)則,返回前num個(gè)元素。
fold fold是aggregate的簡(jiǎn)化,將aggregate中的seqOp和combOp使用同一個(gè)函數(shù)op。
foreachPartition 遍歷原RDD元素經(jīng)過(guò)func函數(shù)運(yùn)算過(guò)后的結(jié)果集,foreachPartition算子分區(qū)操作

4、實(shí)戰(zhàn)操作

1、針對(duì)各個(gè)元素的轉(zhuǎn)化操作

我們最常用的轉(zhuǎn)化操作應(yīng)該是map() 和filter(),轉(zhuǎn)化操作map() 接收一個(gè)函數(shù),把這個(gè)函數(shù)用于RDD 中的每個(gè)元素,將函數(shù)的返回結(jié)果作為結(jié)果RDD 中對(duì)應(yīng)元素的值。而轉(zhuǎn)化操作filter() 則接收一個(gè)函數(shù),并將RDD 中滿(mǎn)足該函數(shù)的元素放入新的RDD 中返回。

讓我們看一個(gè)簡(jiǎn)單的例子,用map() 對(duì)RDD 中的所有數(shù)求平方

# 通過(guò)parallelize創(chuàng)建RDD對(duì)象
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))
8.png

2、對(duì)一個(gè)數(shù)據(jù)為{1,2,3,3}的RDD進(jìn)行基本的RDD轉(zhuǎn)化操作(去重)

var rdd = sc.parallelize(List(1,2,3,3))
rdd.distinct().collect().mkString(",")
9.png

3、對(duì)數(shù)據(jù)分別為{1,2,3}和{3,4,5}的RDD進(jìn)行針對(duì)兩個(gè)RDD的轉(zhuǎn)化操作

var rdd = sc.parallelize(List(1,2,3))
var other = sc.parallelize(List(3,4,5))
# 生成一個(gè)包含兩個(gè)RDD中所有元素的RDD
rdd.union(other).collect().mkString(",")
# 求兩個(gè)RDD共同的元素RDD
rdd.intersection(other).collect().mkString(",")
10.png

4、行動(dòng)操作

行動(dòng)操作reduce(),它接收一個(gè)函數(shù)作為參數(shù),這個(gè)函數(shù)要操作兩個(gè)RDD 的元素類(lèi)型的數(shù)據(jù)并返回一個(gè)同樣類(lèi)型的新元素。一個(gè)簡(jiǎn)單的例子就是函數(shù)+,可以用它來(lái)對(duì)我們的RDD 進(jìn)行累加。使用reduce(),可以很方便地計(jì)算出RDD中所有元素的總和、元素的個(gè)數(shù),以及其他類(lèi)型的聚合操作。

var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
# 求和
var sum = rdd.reduce((x, y) => x + y)
# 求元素個(gè)數(shù)
var sum = rdd.count()

# 聚合操作
var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
var result = rdd.aggregate((0,0))((acc,value) => (acc._1 + value,acc._2 + 1),(acc1,acc2) => (acc1._1 + acc2._1 , acc1._2 + acc2._2))
var avg = result._1/result._2.toDouble
11.png

這里只是演示幾個(gè)簡(jiǎn)單的示例,更多RDD的操作,可以參考官方文檔學(xué)習(xí)哦。

2)DataFrames

在Spark中,DataFrame提供了一個(gè)領(lǐng)域特定語(yǔ)言(DSL)和SQL來(lái)操作結(jié)構(gòu)化數(shù)據(jù),DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類(lèi)似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。

12.png
  • RDD,由于無(wú)從得知所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu),Spark Core只能在stage層面進(jìn)行簡(jiǎn)單、通用的流水線(xiàn)優(yōu)化。
  • DataFrame底層是以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,和RDD的主要區(qū)別的是:RDD中沒(méi)有schema信息,而DataFrame中數(shù)據(jù)每一行都包含schema。DataFrame = RDD + shcema

1、DSL風(fēng)格語(yǔ)法操作

1)DataFrame創(chuàng)建

創(chuàng)建DataFrame的兩種基本方式:

  • 已存在的RDD調(diào)用toDF()方法轉(zhuǎn)換得到DataFrame。
  • 通過(guò)Spark讀取數(shù)據(jù)源直接創(chuàng)建DataFrame。

直接創(chuàng)建DataFarme對(duì)象

若使用SparkSession方式創(chuàng)建DataFrame,可以使用spark.read從不同類(lèi)型的文件中加載數(shù)據(jù)創(chuàng)建DataFrame。spark.read的具體操作,如下所示。

方法名 描述
spark.read.text(“people.txt”) 讀取txt格式文件,創(chuàng)建DataFrame
spark.read.csv (“people.csv”) 讀取csv格式文件,創(chuàng)建DataFrame
spark.read.text(“people.json”) 讀取json格式文件,創(chuàng)建DataFrame
spark.read.text(“people.parquet”) 讀取parquet格式文件,創(chuàng)建DataFrame

1、在本地創(chuàng)建一個(gè)person.txt文本文檔,用于讀取:運(yùn)行spark-shell:

# person.txt,Name,Age,Height
p1_name,18,165
p2_name,19,170
p3_name,20,188
p4_name,21,190
# 啟動(dòng)spark shell,默認(rèn)會(huì)創(chuàng)建一個(gè)spark名稱(chēng)的spark session對(duì)象
$ spark-shell
# 定義變量,【注意】所有節(jié)點(diǎn)都得創(chuàng)建這個(gè)person文件,要不然調(diào)度沒(méi)有這個(gè)文件的機(jī)器會(huì)報(bào)錯(cuò)
var inputFile = "file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/test/person.txt"
# 讀取本地文件
val personDF = spark.read.text("file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/test/person.txt")
val personDF = spark.read.text(inputFile)
# 顯示
personDF.show()
# 將文件put到hdfs上
# 讀取hdfs文件(推薦)
val psersonDF = spark.read.text("hdfs:///person.txt")
13.png

2、有RDD轉(zhuǎn)換成DataFrame

動(dòng)作 含義
show() 查看DataFrame中的具體內(nèi)容信息
printSchema() 查看DataFrame的Schema信息
select() 查看DataFrame中選取部分列的數(shù)據(jù)及進(jìn)行重命名
filter() 實(shí)現(xiàn)條件查詢(xún),過(guò)濾出想要的結(jié)果
groupBy() 對(duì)記錄進(jìn)行分組
sort() 對(duì)特定字段進(jìn)行排序操作
toDF() 把RDD數(shù)據(jù)類(lèi)型轉(zhuǎn)成DataFarme
# 讀取文本文檔,按逗號(hào)分割開(kāi)來(lái)
val lineRDD = sc.textFile("hdfs:///person.txt").map(_.split(","))
case class Person(name:String, age:Int, height:Int)
# 按照樣式類(lèi)對(duì)RDD數(shù)據(jù)進(jìn)行分割成map
val personRDD = lineRDD.map(x => Person(x(0).toString, x(1).toInt, x(2).toInt))
# 把RDD數(shù)據(jù)類(lèi)型轉(zhuǎn)成DataFarme
val personDF = personRDD.toDF()
# 查看這個(gè)表
personDF.show()
# 查看Schema數(shù)據(jù)
personDF.printSchema()
# 查看列
personDF.select(personDF.col("name")).show
# 過(guò)濾年齡小于25的
personDF.filter(col("age") >= 25).show
14.png

15.png

這里提供常用的spark dataframe方法:

方法名 含義
collect() 返回值是一個(gè)數(shù)組,返回dataframe集合所有的行
collectAsList() 返回值是一個(gè)java類(lèi)型的數(shù)組,返回dataframe集合所有的行
count() 返回一個(gè)number類(lèi)型的,返回dataframe集合的行數(shù)
describe(cols: String*) 返回一個(gè)通過(guò)數(shù)學(xué)計(jì)算的類(lèi)表值(count, mean, stddev, min, and max),這個(gè)可以傳多個(gè)參數(shù),中間用逗號(hào)分隔,如果有字段為空,那么不參與運(yùn)算,只這對(duì)數(shù)值類(lèi)型的字段。例如df.describe("age", "height").show()
first() 返回第一行 ,類(lèi)型是row類(lèi)型
head() 返回第一行 ,類(lèi)型是row類(lèi)型
head(n:Int) 返回n行 ,類(lèi)型是row 類(lèi)型
show() 返回dataframe集合的值 默認(rèn)是20行,返回類(lèi)型是unit
show(n:Int) 返回n行,返回值類(lèi)型是unit
table(n:Int) 返回n行 ,類(lèi)型是row 類(lèi)型
cache() 同步數(shù)據(jù)的內(nèi)存
columns 返回一個(gè)string類(lèi)型的數(shù)組,返回值是所有列的名字
dtypes 返回一個(gè)string類(lèi)型的二維數(shù)組,返回值是所有列的名字以及類(lèi)型
explan() 打印執(zhí)行計(jì)劃 物理的
explain(n:Boolean) 輸入值為 false 或者true ,返回值是unit 默認(rèn)是false ,如果輸入true 將會(huì)打印 邏輯的和物理的
isLocal 返回值是Boolean類(lèi)型,如果允許模式是local返回true 否則返回false
persist(newlevel:StorageLevel) 返回一個(gè)dataframe.this.type 輸入存儲(chǔ)模型類(lèi)型
printSchema() 打印出字段名稱(chēng)和類(lèi)型 按照樹(shù)狀結(jié)構(gòu)來(lái)打印
registerTempTable(tablename:String) 返回Unit ,將df的對(duì)象只放在一張表里面,這個(gè)表隨著對(duì)象的刪除而刪除了
schema 返回structType 類(lèi)型,將字段名稱(chēng)和類(lèi)型按照結(jié)構(gòu)體類(lèi)型返回
toDF() 返回一個(gè)新的dataframe類(lèi)型的
toDF(colnames:String*) 將參數(shù)中的幾個(gè)字段返回一個(gè)新的dataframe類(lèi)型的
unpersist() 返回dataframe.this.type 類(lèi)型,去除模式中的數(shù)據(jù)
unpersist(blocking:Boolean) 返回dataframe.this.type類(lèi)型 true 和unpersist是一樣的作用false 是去除RDD
agg(expers:column*) 返回dataframe類(lèi)型 ,同數(shù)學(xué)計(jì)算求值
agg(exprs: Map[String, String]) 返回dataframe類(lèi)型 ,同數(shù)學(xué)計(jì)算求值 map類(lèi)型的
agg(aggExpr: (String, String), aggExprs: (String, String)*) 返回dataframe類(lèi)型 ,同數(shù)學(xué)計(jì)算求值
apply(colName: String) 返回column類(lèi)型,捕獲輸入進(jìn)去列的對(duì)象
as(alias: String) 返回一個(gè)新的dataframe類(lèi)型,就是原來(lái)的一個(gè)別名
col(colName: String) 返回column類(lèi)型,捕獲輸入進(jìn)去列的對(duì)象
cube(col1: String, cols: String*) 返回一個(gè)GroupedData類(lèi)型,根據(jù)某些字段來(lái)匯總
distinct 去重 返回一個(gè)dataframe類(lèi)型
drop(col: Column) 刪除某列 返回dataframe類(lèi)型
dropDuplicates(colNames: Array[String]) 刪除相同的列 返回一個(gè)dataframe
except(other: DataFrame) 返回一個(gè)dataframe,返回在當(dāng)前集合存在的在其他集合不存在的
filter(conditionExpr: String) 刷選部分?jǐn)?shù)據(jù),返回dataframe類(lèi)型
groupBy(col1: String, cols: String*) 根據(jù)某寫(xiě)字段來(lái)匯總返回groupedate類(lèi)型
intersect(other: DataFrame) 返回一個(gè)dataframe,在2個(gè)dataframe都存在的元素
join(right: DataFrame, joinExprs: Column, joinType: String) 一個(gè)是關(guān)聯(lián)的dataframe,第二個(gè)關(guān)聯(lián)的條件,第三個(gè)關(guān)聯(lián)的類(lèi)型:inner, outer, left_outer, right_outer, leftsemi
limit(n: Int) 返回dataframe類(lèi)型 去n 條數(shù)據(jù)出來(lái)
orderBy(sortExprs: Column*) 做alise排序
sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默認(rèn)是asc
select(cols:string*) dataframe 做字段的刷選 df.select("colA","colB" + 1)
withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
withColumn(colName: String, col: Column) 增加一列 df.withColumn("aa",df("name")).show();

這里已經(jīng)列出了很多常用方法了,基本上涵蓋了大部分操作,當(dāng)然也可以參考官方文檔

2、SQL風(fēng)格語(yǔ)法操作

DataFrame的一個(gè)強(qiáng)大之處就是我們可以將它看作是一個(gè)關(guān)系型數(shù)據(jù)表,然后可以通過(guò)在程序中使用spark.sql() 來(lái)執(zhí)行SQL查詢(xún),結(jié)果將作為一個(gè)DataFrame返回。因?yàn)閟park session包含了Hive Context,所以spark.sql() 會(huì)自動(dòng)啟動(dòng)連接hive,默認(rèn)模式就是hive里的local模式(內(nèi)嵌derby)

啟動(dòng)spark-shell

$ spark-shell

會(huì)在執(zhí)行spark-shell當(dāng)前目錄下生成兩個(gè)文件:derby.log,metastore_db


16.png

接下來(lái)就可以happy的寫(xiě)sql了,這里就演示幾個(gè)命令,跟之前的hive一樣,把sql語(yǔ)句放在spark.sql()方法里執(zhí)行即可,不清楚hive sql的可以參考我之前的文章:大數(shù)據(jù)Hadoop之——數(shù)據(jù)倉(cāng)庫(kù)Hive

# 有個(gè)默認(rèn)default庫(kù)
$ spark.sql("show databases").show
# 默認(rèn)當(dāng)前庫(kù)是default
$ spark.sql("show tables").show
17.png

通過(guò)spark-sql啟動(dòng)spark shell

操作就更像sql語(yǔ)法了,已經(jīng)跟hive差不多了。接下來(lái)演示幾個(gè)命令,大家就很清楚了。

$ spark-sql
show databases;
create database test007

同樣也會(huì)在當(dāng)前目錄下自動(dòng)創(chuàng)建兩個(gè)文件:derby.log,metastore_db


18.png

3)DataSet

DataSet是分布式的數(shù)據(jù)集合,Dataset提供了強(qiáng)類(lèi)型支持,也是在RDD的每行數(shù)據(jù)加了類(lèi)型約束。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的優(yōu)點(diǎn)(強(qiáng)類(lèi)型和可以用強(qiáng)大lambda函數(shù))以及使用了Spark SQL優(yōu)化的執(zhí)行引擎。DataSet可以通過(guò)JVM的對(duì)象進(jìn)行構(gòu)建,可以用函數(shù)式的轉(zhuǎn)換(map/flatmap/filter)進(jìn)行多種操作。

1、通過(guò)spark.createDataset通過(guò)集合進(jìn)行創(chuàng)建dataSet

val ds1 = spark.createDataset(1 to 10)
ds1.show
19.png

2、從已經(jīng)存在的rdd當(dāng)中構(gòu)建dataSet

官方文檔

val ds2 = spark.createDataset(sc.textFile("hdfs:////person.txt"))
20.png

3、通過(guò)樣例類(lèi)配合創(chuàng)建DataSet

case class Person(name:String,age:Int)
val personDataList = List(Person("zhangsan",18),Person("lisi",28))
val personDS = personDataList.toDS
personDS.show
21.png

4、通過(guò)DataFrame轉(zhuǎn)化生成
Music.json文件內(nèi)容如下:

{"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"一生何求","singer":"陳百?gòu)?qiáng)","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"紅日","singer":"李克勤","album":"懷舊專(zhuān)輯","path":"mp3/shanghaitan.mp3"}
{"name":"愛(ài)如潮水","singer":"張信哲","album":"懷舊專(zhuān)輯","path":"mp3/airucaoshun.mp3"}
{"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專(zhuān)輯","path":"mp3/redteabar.mp3"}

case class Music(name:String,singer:String,album:String,path:String)
# 注意把test.json傳到hdfs上
val jsonDF = spark.read.json("hdfs:///Music.json")
val jsonDS = jsonDF.as[Music]
jsonDS.show
22.png

RDD,DataFrame,DataSet互相轉(zhuǎn)化

23.png

四、RDD、DataFrame和DataSet的共性與區(qū)別

24.png
  • RDD[Person]:以Person為類(lèi)型參數(shù),但不了解 其內(nèi)部結(jié)構(gòu)。

  • DataFrame:提供了詳細(xì)的結(jié)構(gòu)信息schema(結(jié)構(gòu))列的名稱(chēng)和類(lèi)型。這樣看起來(lái)就像一張表了

  • DataSet[Person]:不光有schema(結(jié)構(gòu))信息,還有類(lèi)型信息

1)共性

  • 三者都是spark平臺(tái)下的分布式彈性數(shù)據(jù)集,為處理超大型數(shù)據(jù)提供便利
  • 三者都有惰性機(jī)制。在創(chuàng)建時(shí)、轉(zhuǎn)換時(shí)(如map)不會(huì)立即執(zhí)行,只有在遇到action算子的時(shí)候(比如foreach),才開(kāi)始進(jìn)行觸發(fā)計(jì)算。極端情況下,如果代碼中只有創(chuàng)建、轉(zhuǎn)換,但是沒(méi)有在后面的action中使用對(duì)應(yīng)的結(jié)果,在執(zhí)行時(shí)會(huì)被跳過(guò)。
  • 三者都有partition的概念,都有緩存(cache)的操作,還可以進(jìn)行檢查點(diǎn)操作(checkpoint)
  • 三者都有許多共同的函數(shù)(如map、filter,sorted等等)。
    在對(duì)DataFrame和DataSet操作的時(shí)候,大多數(shù)情況下需要引入隱式轉(zhuǎn)換(ssc.implicits._)

2)區(qū)別

  • DataFrame:DataFrame是DataSet的特例,也就是說(shuō)DataSet[Row]的別名;DataFrame = RDD + schema
    1. DataFrame的每一行的固定類(lèi)型為Row,只有通過(guò)解析才能獲得各個(gè)字段的值
    2. DataFrame與DataSet通常與spark ml同時(shí)使用
    3. DataFrame與DataSet均支持sparkSql操作,比如select,groupby等,也可以注冊(cè)成臨時(shí)表,進(jìn)行sql語(yǔ)句操作
    4. DataFrame與DateSet支持一些方便的保存方式,比如csv,可以帶上表頭,這樣每一列的字段名就可以一目了然
  • DataSet:DataSet = RDD + case class
    1. DataSet與DataFrame擁有相同的成員函數(shù),區(qū)別只是只是每一行的數(shù)據(jù)類(lèi)型不同。
    2. DataSet的每一行都是case class,在自定義case class之后可以很方便的獲取每一行的信息

五、spark-shell

Spark的shell作為一個(gè)強(qiáng)大的交互式數(shù)據(jù)分析工具,提供了一個(gè)簡(jiǎn)單的方式學(xué)習(xí)API。它可以使用Scala(在Java虛擬機(jī)上運(yùn)行現(xiàn)有的Java庫(kù)的一個(gè)很好方式)或Python。spark-shell的本質(zhì)是在后臺(tái)調(diào)用了spark-submit腳本來(lái)啟動(dòng)應(yīng)用程序的,在spark-shell中會(huì)創(chuàng)建了一個(gè)名為sc的SparkContext對(duì)象。

\color{red}{【注】spark-shell只能以client方式啟動(dòng)。}

查看幫助

$ spark-shell --help
25.png

spark-shell常用選項(xiàng)

--master MASTER_URL 指定模式(spark://host:port, mesos://host:port, yarn,
                              k8s://https://host:port, or local (Default: local[*]))
--executor-memory MEM 指定每個(gè)Executor的內(nèi)存,默認(rèn)1GB
--total-executor-cores NUM 指定所有Executor所占的核數(shù)
--num-executors NUM 指定Executor的個(gè)數(shù)
--help, -h 顯示幫助信息
--version 顯示版本號(hào)

從上面幫助看,spark有五種運(yùn)行模式:spark、mesos、yarn、k8s、local。這里主要講local和yarn模式

Master URL 含義
local 在本地運(yùn)行,只有一個(gè)工作進(jìn)程,無(wú)并行計(jì)算能力
local[K] 在本地運(yùn)行,有 K 個(gè)工作進(jìn)程,通常設(shè)置 K 為機(jī)器的CPU 核心數(shù)量
local[*] 在本地運(yùn)行,工作進(jìn)程數(shù)量等于機(jī)器的 CPU 核心數(shù)量。
spark://HOST:PORT 以 Standalone 模式運(yùn)行,這是 Spark 自身提供的集群運(yùn)行模式,默認(rèn)端口號(hào): 7077
mesos://HOST:PORT 在 Mesos 集群上運(yùn)行,Driver 進(jìn)程和 Worker 進(jìn)程運(yùn)行在 Mesos 集群上,部署模式必須使用固定值:--deploy-mode cluster
yarn 在yarn集群上運(yùn)行,依賴(lài)于hadoop集群,yarn資源調(diào)度框架,將應(yīng)用提交給yarn,在A(yíng)pplactionMaster(相當(dāng)于Stand alone模式中的Master)中運(yùn)行driver,在集群上調(diào)度資源,開(kāi)啟excutor執(zhí)行任務(wù)。
k8s 在k8s集群上運(yùn)行

1)local

在Spark Shell中,有一個(gè)專(zhuān)有的SparkContext已經(jīng)為您創(chuàng)建好了,變量名叫做sc。自己創(chuàng)建的SparkContext將無(wú)法工作??梢杂?-master參數(shù)來(lái)設(shè)置SparkContext要連接的集群,用--jars來(lái)設(shè)置需要添加到CLASSPATH的jar包,如果有多個(gè)jar包,可以使用逗號(hào)分隔符連接它們。例如,在一個(gè)擁有2核的環(huán)境上運(yùn)行spark-shell,使用:

#資源存儲(chǔ)的位置,默認(rèn)為本地,以及使用什么調(diào)度框架 ,默認(rèn)使用的是spark內(nèi)置的資源管理和調(diào)度框架Standalone 
# local單機(jī)版,只占用一個(gè)線(xiàn)程,local[*]占用當(dāng)前所有線(xiàn)程,local[2]:2個(gè)CPU核運(yùn)行
$ spark-shell --master local[2]
# --master 默認(rèn)為 local[*] 
#默認(rèn)使用集群最大的內(nèi)存大小
--executor-memorty
#默認(rèn)使用最大核數(shù)
--total-executor-cores 
$ spark-shell --master local[*] --executor-memory 1g --total-executor-cores 1
26.png

Web UI地址:http://hadoop-node1:4040

27.png

隨后,就可以使用spark-shell內(nèi)使用Scala語(yǔ)言完成一定的操作。這里做幾個(gè)簡(jiǎn)單的操作,有興趣的話(huà),可以自行去了解scala

val textFile = sc.textFile("file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/README.md")
textFile.count()
textFile.first()
28.png

其中,count代表RDD中的總數(shù)據(jù)條數(shù);first代表RDD中的第一行數(shù)據(jù)。

2)on Yarn(推薦)

# on yarn,也可以在配置文件中修改這個(gè)字段spark.master
$ spark-shell --master yarn 

--master用來(lái)設(shè)置context將要連接并使用的資源主節(jié)點(diǎn),master的值是standalone模式中spark的集群地址、yarn或mesos集群的URL,或是一個(gè)local地址。

六、SparkSQL和Hive的集成(Spark on Hive)

1)創(chuàng)建軟鏈接

$ ln -s /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/conf/hive-site.xml /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf/hive-site.xml

2)復(fù)制 hive lib目錄 下的mysql連接jar包到spark的jars下

$ cp /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/lib/mysql-connector-java-5.1.49-bin.jar /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/jars/

3)配置

# 創(chuàng)建spark日志在hdfs存儲(chǔ)目錄
$ hadoop fs -mkdir -p /tmp/spark
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf
$ cp spark-defaults.conf.template spark-defaults.conf

在spark-defaults.conf追加如下配置:

# 使用yarn模式
spark.master                     yarn
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop-node1:8082/tmp/spark
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              512m
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

4)啟動(dòng) spark-shell操作Hive(local)

支持多用戶(hù)得啟動(dòng)metastore服務(wù)

$ nohup hive --service metastore &
$ ss -atnlp|grep 9083

在hive-site.xml加入如下配置:

<property>  
  <name>hive.metastore.uris</name>  
  <value>thrift://hadoop-node1:9083</value>  
</property>  

啟動(dòng)spark-sql

# yarn模式,--master yarn可以不帶,因?yàn)樯厦嬖谂渲梦募镆呀?jīng)配置了yarn模式了
$ spark-sql --master yarn
show databases;
29.png

從上圖就可發(fā)現(xiàn),已經(jīng)查到我之前創(chuàng)建的庫(kù)了,說(shuō)明已經(jīng)集成ok了。

七、Spark beeline

Spark Thrift Server 是 Spark 社區(qū)基于 HiveServer2 實(shí)現(xiàn)的一個(gè) Thrift 服務(wù)。旨在無(wú)縫兼容
HiveServer2。因?yàn)?Spark Thrift Server 的接口和協(xié)議都和 HiveServer2 完全一致,因此我們部署好Spark Thrift Server后,可以直接使用hive的beeline訪(fǎng)問(wèn)Spark Thrift Server執(zhí)行相關(guān)語(yǔ)句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依舊可以和 Hive Metastore進(jìn)行交互,獲取到 hive 的元數(shù)據(jù)。

1)Spark Thrift Server架構(gòu)于HiveServer2架構(gòu)對(duì)比

30.png

2)Spark Thrift Server和HiveServer2的區(qū)別

Hive on Spark Spark Thrift Server
任務(wù)提交模式 每個(gè)session都會(huì)創(chuàng)建一個(gè)RemoteDriver,也就是對(duì)于一個(gè)Application。之后將sql解析成執(zhí)行的物理計(jì)劃序列化后發(fā)到RemoteDriver執(zhí)行 本身的Server服務(wù)就是一個(gè)Driver,直接接收sql執(zhí)行。也就是所有的session都共享一個(gè)Application
性能 性能一般 如果存儲(chǔ)格式是orc或者parquet,性能會(huì)比hive高幾倍,某些語(yǔ)句甚至?xí)邘资?。其他格式的?huà),性能相差不是很大,有時(shí)hive性能會(huì)更好
并發(fā) 如果任務(wù)執(zhí)行不是異步的,就是在thrift的worker線(xiàn)程中執(zhí)行,受worker線(xiàn)程數(shù)量的限制。異步的話(huà)則放到線(xiàn)程池執(zhí)行,并發(fā)度受異步線(xiàn)程池大小限制。 處理任務(wù)的模式和Hive一樣。
sql兼容 主要支持ANSI SQL 2003,但并不完全遵守,只是大部分支持。并擴(kuò)展了很多自己的語(yǔ)法 Spark SQL也有自己的實(shí)現(xiàn)標(biāo)準(zhǔn),因此和hive不會(huì)完全兼容。具體哪些語(yǔ)句會(huì)不兼容需要測(cè)試才能知道
HA 可以通過(guò)zk實(shí)現(xiàn)HA 沒(méi)有內(nèi)置的HA實(shí)現(xiàn),不過(guò)spark社區(qū)提了一個(gè)issue并帶上了patch,可以拿來(lái)用:https://issues.apache.org/jira/browse/SPARK-11100

【總結(jié)】Spark Thrift Server說(shuō)白了就是小小的改動(dòng)了下HiveServer2,代碼量也不多。雖然接口和HiveServer2完全一致,但是它以單個(gè)Application在集群運(yùn)行的方式還是比較奇葩的。可能官方也是為了實(shí)現(xiàn)簡(jiǎn)單而沒(méi)有再去做更多的優(yōu)化。

3)配置啟動(dòng)Spark Thrift Server

1、配置hive-site.xml

<!-- hs2端口 -->
<property>
  <name>hive.server2.thrift.port</name>
  <value>11000</value>
</property>

2、啟動(dòng)spark thriftserver服務(wù)(不能起hs2,因?yàn)榕渲檬且粯拥模瑫?huì)有沖突)

$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/sbin
$ ./start-thriftserver.sh
$ ss -tanlp|grep 11000
31.png

3、啟動(dòng)beeline操作

# 為了和hive的區(qū)別,這里使用絕對(duì)路徑啟動(dòng)
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/bin
# 操作跟hive操作一模一樣,只是計(jì)算引擎不一樣了,換成了spark了
$ ./beeline
!connect jdbc:hive2://hadoop-node1:11000
show databases;
32.png

訪(fǎng)問(wèn)HDFS WEB UI:http://hadoop-node1:8088/cluster/apps

33.png

34.png

35.png

八、Spark Streaming

Spark Streaming與其他大數(shù)據(jù)框架Storm、Flink一樣,Spark Streaming是基于Spark Core基礎(chǔ)之上用于處理實(shí)時(shí)計(jì)算業(yè)務(wù)的框架。其實(shí)現(xiàn)就是把輸入的流數(shù)據(jù)進(jìn)行按時(shí)間切分,切分的數(shù)據(jù)塊用離線(xiàn)批處理的方式進(jìn)行并行計(jì)算處理。原理如下圖:

36.png

支持多種數(shù)據(jù)源獲取數(shù)據(jù):


37.png

Spark處理的是批量的數(shù)據(jù)(離線(xiàn)數(shù)據(jù)),Spark Streaming實(shí)際上處理并不是像Strom一樣來(lái)一條處理一條數(shù)據(jù),而是將接收到的實(shí)時(shí)流數(shù)據(jù),按照一定時(shí)間間隔,對(duì)數(shù)據(jù)進(jìn)行拆分,交給Spark Engine引擎,最終得到一批批的結(jié)果。

38.png

由于考慮到本篇文章篇幅太長(zhǎng),所以這里只是稍微提了一下,如果有時(shí)間會(huì)繼續(xù)補(bǔ)充Spark Streaming相關(guān)的知識(shí)點(diǎn),請(qǐng)耐心等待……

官方文檔:https://spark.apache.org/docs/3.2.0/streaming-programming-guide.html

39.png

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容