SparkSQL溫習筆記-1

?一、介紹

? ?Shark是SparkSQL(其完全脫離了 Hive 的限制)的前身,Shark的性能比 MapReduce 的 Hive 普遍快 2 倍以上,當數據全部 load 在內存的話,將快 10 倍以上,因此 Shark 可以作為交互式查詢應用服務來使用。

? ? SparkSQL具有很多特性(官網):

????1.Integrated?

????Seamlessly mix SQL and?Spark programs,Apply functions to results of SQL queries.我的理解是SQL能和RDD完美結合使用,可視為RDD一樣去操作,各種算子可以直接使用。

????2.Uniform Data Access -- Connect to any data source the same way.

????整合各種數據比較方便,讀取json的數據注冊成表,讀取hive的表,兩張表可以直接join,其實底層都是轉換成RDD去操作。

?????DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources.

????常見的是數據源Hive(ETL后的數據),JSON,JDBC(mysql等存放分析參數),還可以讀取hdfs,S3(Amazon),Postgresql(gp、hawq直接讀取表?還是先放在hdfs中,有空調研下)。

????3.Hive Integration 和hive兼容性很好,很好配置就可以讀取hive metaStore

????Spark core的算子拼接的操作,可以用Sparksql去實現(xiàn)。。有時候會sql實現(xiàn)會更簡單一些。

? ? DataFrame存儲是列式存儲,不需要的字段不加載到內存中,這樣查詢、聚合速度快。

二、創(chuàng)建 DataFrame

? ? 1.動態(tài)創(chuàng)建 schema (scala/python)

--scala

????val sc = new SparkContext(conf)

? ? val sqlContext = new SQLContext(sc)

? ? val people = sc.textFile("scores.txt")

? ? val schemaString = "cla:String sc:Integer"

? ? //如果schema中制定了除String以外別的類型? 在構建rowRDD的時候要注意指定類型? ? 例如: p(2).toInt

? ? val rowRDD = people.map(_.split("\t")).map(p => Row(p(0), p(1).toInt))

? ? val schema =

? ? ? StructType(schemaString.split(" ").map(fieldName => StructField(fieldName.split(":")(0), if (fieldName.split(":")(1).equals("String")) StringType else IntegerType, true)))

//? ? val structFields = Array(StructField("clazz",StringType,true),StructField("score",IntegerType))

//? ? val schema = StructType(structFields)

? ? //? val arr = Array(StructField("name",StringType,true),StructField("age",IntegerType,true))

? ? //? val schema = StructType.apply(arr)

? ? val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

? ? peopleDataFrame.printSchema()

? ? peopleDataFrame.show()

--python

sqlContext = SQLContext(sc)

#讀取hdfs

data = sc.textFile("/user/nixm/czrk_data.txt")

#切分字符串

data_noheader = data.map(lambda x:x.split(";"))

#創(chuàng)建schema

schema = StructType([StructField("hh",StringType(),True),

StructField("sfzjhm",StringType(),True),

StructField("yhzgx",StringType(),True)

])

#創(chuàng)建DF

df = sqlContext.createDataFrame(data_noheader,schema)

#DF注冊成臨時表

df.registerTempTable('test')

#執(zhí)行sql(返回DF)并持久化

df_dis = sqlContext.sql(sqlQuery="select hh,sfzjhm from test where yhzgx='戶主' group by hh,sfzjhm").cache()

#將讀取的結果(DF)注冊成臨時表

df_dis.registerTempTable('df_dis')

#執(zhí)行sql并持久化

df_all= sqlContext.sql(sqlQuery="select * from test where yhzgx!='戶主'").cache()

#注冊成臨時表

df_all.registerTempTable('df_all')

#表與表的join

result=sqlContext.sql(sqlQuery="select a.sfzjhm as a_sfzjhm, a.yhzgx, b.sfzjhm as b_sfzjhm from df_all a inner join df_dis b on a.hh=b.hh ").cache()

#對dataFrame使用map算子后,返回類型是RDD

hdfsRDD=result.rdd.map(lambda p:p. a_sfzjhm+";"+p.yhzgx+";"+p.b_sfzjhm)

#重新分區(qū),將DF最終存放到hdfs

hdfsRDD.repartition(1).saveAsTextFile("/user/nixm/jlout/czrk.txt")


????2.通過反射將RDD轉換成DF[比較死板]

peoples.txt? ? ?1,lucy,18? ? /n? ??2,jim,11

主要代碼

case class Person(name:String, age: Int)

def main(args: Array[String]): Unit = {

val conf =new SparkConf()//創(chuàng)建sparkConf對象

conf.setAppName("Spark App")//設置應用程序的名稱

conf.setMaster("local")

val sqlContext =new SQLContext(sc)

import sqlContext.implicits._ //隱式轉換

//使用反射方法將RDD轉換成DF

val people = sc.textFile("peoples.txt").map(_.split(",")).map(p =>Person(p(1), p(2).trim.toInt)).toDF()

people.registerTempTable("people") //注冊成臨時表

//執(zhí)行sql查詢

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 6 AND age <= 19")

//對dataFrame使用map算子后,返回類型是RDD

teenagers.map(t =>"Name: " + t(0)).foreach(println)

// or by field name:

teenagers.map(t =>"Name: " + t.getAs[String]("name")).foreach(println)

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容