【2018-04-10】【2.1.1】spark sql操作mysql和hdfs

spark 2.X與1.x的區(qū)別

spark sql 2.x以上版本和1.x版本有個(gè)很大的區(qū)別:spark1.x的sqlContext在spark2.0中被整合到sparkSession,故而利用spark-shell客戶(hù)端操作會(huì)有些許不同,具體如下文所述。


載入外部數(shù)據(jù)的load方法

在spark sql中有一個(gè)DataStreamReader封裝了讀取各種格式的外部數(shù)據(jù)的方法,其中,format(str)用于傳數(shù)據(jù)格式,比如csv,json,parquet,jdbc等;load(path)用于傳入數(shù)據(jù)的地址,其中可以傳入本地?cái)?shù)據(jù)路徑也可以是hdfs上的路徑,在官網(wǎng)給的demo中都是傳的本地?cái)?shù)據(jù)路徑:比如:

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
  • load(path)的源碼:注意:load不能l載入hive的數(shù)據(jù),hive數(shù)據(jù)需要使用table方法來(lái)載入。

    def load(path: String): DataFrame = {
     option("path", path).load()
       }
    
       def load(): DataFrame = {
     if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
       throw new AnalysisException("Hive data source can only be used with tables, you can not " +
         "read files of Hive data source directly.")
     }
    
     val dataSource =
       DataSource(
       sparkSession,
     userSpecifiedSchema = userSpecifiedSchema,
     className = source,
     options = extraOptions.toMap)
     Dataset.ofRows(sparkSession, StreamingRelation(dataSource))
       }
    
  • 【hdfs路徑】寫(xiě)入寫(xiě)出hdfs上的路徑,則需要加入hdfs的完全路徑,如:

    studentDF.write.parquet("hdfs://h4:9000/test/spark/parquet")
    studentDF.write.json("hdfs://h4:9000/test/spark/json")


spark sql與mysql 和hdfs交互的實(shí)戰(zhàn)

  • 1.添加jar包
  1. 正常配置不再贅述,這里如果需要讀取MySQL數(shù)據(jù),則需要在當(dāng)前用戶(hù)下的環(huán)境變量里額外加上JDBC的驅(qū)動(dòng)jar包 例如我的是:mysql-connector-java-5.1.18-bin.jar 存放路徑是$SPARK_HOME/jars 所以需要額外配置環(huán)境變量
    export PATH = $PATH:$SPARK_HOME/jars
  • 2.啟動(dòng)spark-shell

    bin/spark-shell --master=spark://h4:7077 --driver-class-path=./jars/mysql-connector-java-5.1.18-bin.jar -- jars=./jars/mysql-connector-java-5.1.18-bin.jar

  • 3.代碼

spark-sql采用sql方式執(zhí)行操作正常啟動(dòng)之后可以先通過(guò)spark-sql建立數(shù)據(jù)庫(kù)并切換到當(dāng)前新建的數(shù)據(jù)庫(kù)
spark.sql("create database spark")
可以查看下是否新建成功
spark.sql("show databases ").show
創(chuàng)建成功之后切換數(shù)據(jù)庫(kù)
spark.sql("use spark")
現(xiàn)在開(kāi)始讀取遠(yuǎn)程MySQL數(shù)據(jù)
val sql = """CREATE TABLE student USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://worker2:3306/spark", dbtable "student", user "root", password "root" )"""
執(zhí)行:
spark.sql(sql);

等待執(zhí)行完畢之后,將表數(shù)據(jù)存入緩存
spark.sql("cache table student")
此時(shí)即可進(jìn)行操作,例如:val studentDF = spark.sql("select id,name from student")
完成需求查詢(xún)之后,可將結(jié)果以parquet的格式保存到HDFS
studentDF.write.parquet("hdfs://h4:9000/test/spark/parquet")
也可以寫(xiě)成json格式
studentDF.write.json("hdfs://h4:9000/test/spark/json")

  • 4.性能:

集群狀態(tài)下,硬件配置32G內(nèi)存 2T硬盤(pán),spark配了4核,內(nèi)存分配了20G的情況下,測(cè)試速度如下: 2700萬(wàn)條記錄的表導(dǎo)入spark用時(shí)1秒以?xún)?nèi) sparksql將其以json格式存入HDFS用時(shí)288秒,共1.0G,將其以parquet格式存入HDFS用時(shí)207秒,共86.6M,可見(jiàn)parquet的優(yōu)勢(shì)還是比較明顯

參考鏈接: http://blog.51cto.com/10901776/1875371

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容