?一、介紹
? ?Shark是SparkSQL(其完全脫離了 Hive 的限制)的前身,Shark的性能比 MapReduce 的 Hive 普遍快 2 倍以上,當數據全部 load 在內存的話,將快 10 倍以上,因此 Shark 可以作為交互式查詢應用服務來使用。
? ? SparkSQL具有很多特性(官網):
????1.Integrated?
????Seamlessly mix SQL and?Spark programs,Apply functions to results of SQL queries.我的理解是SQL能和RDD完美結合使用,可視為RDD一樣去操作,各種算子可以直接使用。
????2.Uniform Data Access -- Connect to any data source the same way.
????整合各種數據比較方便,讀取json的數據注冊成表,讀取hive的表,兩張表可以直接join,其實底層都是轉換成RDD去操作。
?????DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources.
????常見的是數據源Hive(ETL后的數據),JSON,JDBC(mysql等存放分析參數),還可以讀取hdfs,S3(Amazon),Postgresql(gp、hawq直接讀取表?還是先放在hdfs中,有空調研下)。
????3.Hive Integration 和hive兼容性很好,很好配置就可以讀取hive metaStore
????Spark core的算子拼接的操作,可以用Sparksql去實現(xiàn)。。有時候會sql實現(xiàn)會更簡單一些。
? ? DataFrame存儲是列式存儲,不需要的字段不加載到內存中,這樣查詢、聚合速度快。
二、創(chuàng)建 DataFrame
? ? 1.動態(tài)創(chuàng)建 schema (scala/python)
--scala
????val sc = new SparkContext(conf)
? ? val sqlContext = new SQLContext(sc)
? ? val people = sc.textFile("scores.txt")
? ? val schemaString = "cla:String sc:Integer"
? ? //如果schema中制定了除String以外別的類型? 在構建rowRDD的時候要注意指定類型? ? 例如: p(2).toInt
? ? val rowRDD = people.map(_.split("\t")).map(p => Row(p(0), p(1).toInt))
? ? val schema =
? ? ? StructType(schemaString.split(" ").map(fieldName => StructField(fieldName.split(":")(0), if (fieldName.split(":")(1).equals("String")) StringType else IntegerType, true)))
//? ? val structFields = Array(StructField("clazz",StringType,true),StructField("score",IntegerType))
//? ? val schema = StructType(structFields)
? ? //? val arr = Array(StructField("name",StringType,true),StructField("age",IntegerType,true))
? ? //? val schema = StructType.apply(arr)
? ? val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
? ? peopleDataFrame.printSchema()
? ? peopleDataFrame.show()
--python
sqlContext = SQLContext(sc)
#讀取hdfs
data = sc.textFile("/user/nixm/czrk_data.txt")
#切分字符串
data_noheader = data.map(lambda x:x.split(";"))
#創(chuàng)建schema
schema = StructType([StructField("hh",StringType(),True),
StructField("sfzjhm",StringType(),True),
StructField("yhzgx",StringType(),True)
])
#創(chuàng)建DF
df = sqlContext.createDataFrame(data_noheader,schema)
#DF注冊成臨時表
df.registerTempTable('test')
#執(zhí)行sql(返回DF)并持久化
df_dis = sqlContext.sql(sqlQuery="select hh,sfzjhm from test where yhzgx='戶主' group by hh,sfzjhm").cache()
#將讀取的結果(DF)注冊成臨時表
df_dis.registerTempTable('df_dis')
#執(zhí)行sql并持久化
df_all= sqlContext.sql(sqlQuery="select * from test where yhzgx!='戶主'").cache()
#注冊成臨時表
df_all.registerTempTable('df_all')
#表與表的join
result=sqlContext.sql(sqlQuery="select a.sfzjhm as a_sfzjhm, a.yhzgx, b.sfzjhm as b_sfzjhm from df_all a inner join df_dis b on a.hh=b.hh ").cache()
#對dataFrame使用map算子后,返回類型是RDD
hdfsRDD=result.rdd.map(lambda p:p. a_sfzjhm+";"+p.yhzgx+";"+p.b_sfzjhm)
#重新分區(qū),將DF最終存放到hdfs
hdfsRDD.repartition(1).saveAsTextFile("/user/nixm/jlout/czrk.txt")
????2.通過反射將RDD轉換成DF[比較死板]
peoples.txt? ? ?1,lucy,18? ? /n? ??2,jim,11
主要代碼
case class Person(name:String, age: Int)
def main(args: Array[String]): Unit = {
val conf =new SparkConf()//創(chuàng)建sparkConf對象
conf.setAppName("Spark App")//設置應用程序的名稱
conf.setMaster("local")
val sqlContext =new SQLContext(sc)
import sqlContext.implicits._ //隱式轉換
//使用反射方法將RDD轉換成DF
val people = sc.textFile("peoples.txt").map(_.split(",")).map(p =>Person(p(1), p(2).trim.toInt)).toDF()
people.registerTempTable("people") //注冊成臨時表
//執(zhí)行sql查詢
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 6 AND age <= 19")
//對dataFrame使用map算子后,返回類型是RDD
teenagers.map(t =>"Name: " + t(0)).foreach(println)
// or by field name:
teenagers.map(t =>"Name: " + t.getAs[String]("name")).foreach(println)