Spark SQL支持對(duì)Hive中存儲(chǔ)的數(shù)據(jù)進(jìn)行讀寫。操作Hive中的數(shù)據(jù)時(shí),必須創(chuàng)建HiveContext,而不是SQLContext。HiveContext繼承自SQLContext,但是增加了在Hive元數(shù)據(jù)庫中查找表,以及用HiveQL語法編寫SQL的功能。除了sql()方法,HiveContext還提供了hql()方法,從而用Hive語法來編譯sql。
使用HiveContext,可以執(zhí)行Hive的大部分功能,包括創(chuàng)建表、往表里導(dǎo)入數(shù)據(jù)以及用SQL語句查詢表中的數(shù)據(jù)。查詢出來的數(shù)據(jù)是一個(gè)Row數(shù)組。
將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下
HiveContext sqlContext = new HiveContext(sc);
sqlContext.sql("CREATE TABLE IF NOT EXISTS students (name STRING, age INT)");
sqlContext.sql("LOAD DATA LOCAL INPATH '/usr/local/spark-study/resources/students.txt' INTO TABLE students");
Row[] teenagers = sqlContext.sql("SELECT name, age FROM students WHERE age<=18").collect();
將數(shù)據(jù)保存到表中
Spark SQL還允許將數(shù)據(jù)保存到Hive表中。調(diào)用DataFrame的saveAsTable命令,即可將DataFrame中的數(shù)據(jù)保存到Hive表中。與registerTempTable不同,saveAsTable是會(huì)將DataFrame中的數(shù)據(jù)物化到Hive表中的,而且還會(huì)在Hive元數(shù)據(jù)庫中創(chuàng)建表的元數(shù)據(jù)。
默認(rèn)情況下,saveAsTable會(huì)創(chuàng)建一張Hive Managed Table,也就是說,數(shù)據(jù)的位置都是由元數(shù)據(jù)庫中的信息控制的。當(dāng)Managed Table被刪除時(shí),表中的數(shù)據(jù)也會(huì)一并被物理刪除。
registerTempTable只是注冊(cè)一個(gè)臨時(shí)的表,只要Spark Application重啟或者停止了,那么表就沒了。而saveAsTable創(chuàng)建的是物化的表,無論Spark Application重啟或者停止,表都會(huì)一直存在。
調(diào)用HiveContext.table()方法,還可以直接針對(duì)Hive中的表,創(chuàng)建一個(gè)DataFrame。
案例:查詢分?jǐn)?shù)大于80分的學(xué)生的完整信息
創(chuàng)建一個(gè)student_infos.txt
leo18
marry17
jack19
創(chuàng)建一個(gè)student_scores.txt
leo88
marry99
jack76
package cn.spark.study.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
/**
* Hive數(shù)據(jù)源
* @author Administrator
*
*/
public class HiveDataSource {
?@SuppressWarnings("deprecation")
public static void main(String[] args) {
??// 首先還是創(chuàng)建SparkConf
??SparkConf conf = new SparkConf().setAppName("HiveDataSource");
??// 創(chuàng)建JavaSparkContext
??JavaSparkContext sc = new JavaSparkContext(conf);
??// 創(chuàng)建HiveContext,注意,這里,它接收的是SparkContext作為參數(shù),不是JavaSparkContext
HiveContext hiveContext = new HiveContext(sc.sc());
// 第一個(gè)功能,使用HiveContext的sql()方法,可以執(zhí)行Hive中能夠執(zhí)行的HiveQL語句
??// 判斷是否存在student_infos表,如果存在則刪除
??hiveContext.sql("DROP TABLE IF EXISTS student_infos");
// 判斷student_infos表是否不存在,如果不存在,則創(chuàng)建該表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT)");
??// 將學(xué)生基本信息數(shù)據(jù)導(dǎo)入student_infos表
??hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/usr/local/spark-study/resources/student_infos.txt' " ????+ "INTO TABLE student_infos");
??// 用同樣的方式給student_scores導(dǎo)入數(shù)據(jù)
??hiveContext.sql("DROP TABLE IF EXISTS student_scores");
??hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT)");
hiveContext.sql("LOAD DATA " ????+ "LOCAL INPATH '/usr/local/spark-study/resources/student_scores.txt' " ????+ "INTO TABLE student_scores");
??// 第二個(gè)功能,執(zhí)行sql還可以返回DataFrame,用于查詢
??// 執(zhí)行sql查詢,關(guān)聯(lián)兩張表,查詢成績大于80分的學(xué)生
DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score " ????+ "FROM student_infos si " ????+ "JOIN student_scores ss ON si.name=ss.name " ????+ "WHERE ss.score>=80");
// 第三個(gè)功能,可以將DataFrame中的數(shù)據(jù),理論上來說,DataFrame對(duì)應(yīng)的RDD的元素,是Row即可
??// 將DataFrame中的數(shù)據(jù)保存到hive表中
??// 接著將DataFrame中的數(shù)據(jù)保存到good_student_infos表中
??hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
??goodStudentsDF.saveAsTable("good_student_infos");
??// 第四個(gè)功能,可以用table()方法,針對(duì)hive表,直接創(chuàng)建DataFrame
??// 然后針對(duì)good_student_infos表,直接創(chuàng)建DataFrame
??Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();
??for(Row goodStudentRow : goodStudentRows) {
???System.out.println(goodStudentRow);
}
sc.close();
?}
}
測(cè)試:
hive
show tables;
select * from table1
Scala版本
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
/**
* @author Administrator
*/
object HiveDataSource {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HiveDataSource");
val sc = new SparkContext(conf);
val hiveContext = new HiveContext(sc);
hiveContext.sql("DROP TABLE IF EXISTS student_infos");
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT)");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/usr/local/study/sql/student_infos.txt' "
+ "INTO TABLE student_infos");
hiveContext.sql("DROP TABLE IF EXISTS student_scores");
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT)");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/usr/local/study/sql/student_scores.txt' "
+ "INTO TABLE student_scores");
val goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score "
+ "FROM student_infos si "
+ "JOIN student_scores ss ON si.name=ss.name "
+ "WHERE ss.score>=80");
hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
goodStudentsDF.saveAsTable("good_student_infos");
val goodStudentRows = hiveContext.table("good_student_infos").collect();
for(goodStudentRow <- goodStudentRows) {
println(goodStudentRow);
}
}
}