Hive數(shù)據(jù)源實(shí)戰(zhàn)

Spark SQL支持對(duì)Hive中存儲(chǔ)的數(shù)據(jù)進(jìn)行讀寫。操作Hive中的數(shù)據(jù)時(shí),必須創(chuàng)建HiveContext,而不是SQLContext。HiveContext繼承自SQLContext,但是增加了在Hive元數(shù)據(jù)庫中查找表,以及用HiveQL語法編寫SQL的功能。除了sql()方法,HiveContext還提供了hql()方法,從而用Hive語法來編譯sql。

使用HiveContext,可以執(zhí)行Hive的大部分功能,包括創(chuàng)建表、往表里導(dǎo)入數(shù)據(jù)以及用SQL語句查詢表中的數(shù)據(jù)。查詢出來的數(shù)據(jù)是一個(gè)Row數(shù)組。

將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下

HiveContext sqlContext = new HiveContext(sc); 
sqlContext.sql("CREATE TABLE IF NOT EXISTS students (name STRING, age INT)");

sqlContext.sql("LOAD DATA LOCAL INPATH '/usr/local/spark-study/resources/students.txt' INTO TABLE students");

Row[] teenagers = sqlContext.sql("SELECT name, age FROM students WHERE age<=18").collect();

將數(shù)據(jù)保存到表中

Spark SQL還允許將數(shù)據(jù)保存到Hive表中。調(diào)用DataFrame的saveAsTable命令,即可將DataFrame中的數(shù)據(jù)保存到Hive表中。與registerTempTable不同,saveAsTable是會(huì)將DataFrame中的數(shù)據(jù)物化到Hive表中的,而且還會(huì)在Hive元數(shù)據(jù)庫中創(chuàng)建表的元數(shù)據(jù)。

默認(rèn)情況下,saveAsTable會(huì)創(chuàng)建一張Hive Managed Table,也就是說,數(shù)據(jù)的位置都是由元數(shù)據(jù)庫中的信息控制的。當(dāng)Managed Table被刪除時(shí),表中的數(shù)據(jù)也會(huì)一并被物理刪除。

registerTempTable只是注冊(cè)一個(gè)臨時(shí)的表,只要Spark Application重啟或者停止了,那么表就沒了。而saveAsTable創(chuàng)建的是物化的表,無論Spark Application重啟或者停止,表都會(huì)一直存在。

調(diào)用HiveContext.table()方法,還可以直接針對(duì)Hive中的表,創(chuàng)建一個(gè)DataFrame。

案例:查詢分?jǐn)?shù)大于80分的學(xué)生的完整信息

創(chuàng)建一個(gè)student_infos.txt

leo18

marry17

jack19

創(chuàng)建一個(gè)student_scores.txt

leo88

marry99

jack76

package cn.spark.study.sql;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.hive.HiveContext;

/**

* Hive數(shù)據(jù)源

* @author Administrator

*

*/

public class HiveDataSource {

?@SuppressWarnings("deprecation")
public static void main(String[] args) {

??// 首先還是創(chuàng)建SparkConf

??SparkConf conf = new SparkConf().setAppName("HiveDataSource");

??// 創(chuàng)建JavaSparkContext

??JavaSparkContext sc = new JavaSparkContext(conf);

??// 創(chuàng)建HiveContext,注意,這里,它接收的是SparkContext作為參數(shù),不是JavaSparkContext

HiveContext hiveContext = new HiveContext(sc.sc());

// 第一個(gè)功能,使用HiveContext的sql()方法,可以執(zhí)行Hive中能夠執(zhí)行的HiveQL語句
??// 判斷是否存在student_infos表,如果存在則刪除
??hiveContext.sql("DROP TABLE IF EXISTS student_infos");

// 判斷student_infos表是否不存在,如果不存在,則創(chuàng)建該表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT)");

??// 將學(xué)生基本信息數(shù)據(jù)導(dǎo)入student_infos表

??hiveContext.sql("LOAD DATA " + "LOCAL INPATH '/usr/local/spark-study/resources/student_infos.txt' " ????+ "INTO TABLE student_infos");

??// 用同樣的方式給student_scores導(dǎo)入數(shù)據(jù)

??hiveContext.sql("DROP TABLE IF EXISTS student_scores"); 

??hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT)");  

hiveContext.sql("LOAD DATA " ????+ "LOCAL INPATH '/usr/local/spark-study/resources/student_scores.txt' " ????+ "INTO TABLE student_scores");

??// 第二個(gè)功能,執(zhí)行sql還可以返回DataFrame,用于查詢
??// 執(zhí)行sql查詢,關(guān)聯(lián)兩張表,查詢成績大于80分的學(xué)生
DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score " ????+ "FROM student_infos si " ????+ "JOIN student_scores ss ON si.name=ss.name " ????+ "WHERE ss.score>=80");

// 第三個(gè)功能,可以將DataFrame中的數(shù)據(jù),理論上來說,DataFrame對(duì)應(yīng)的RDD的元素,是Row即可

??// 將DataFrame中的數(shù)據(jù)保存到hive表中

??// 接著將DataFrame中的數(shù)據(jù)保存到good_student_infos表中

??hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");  

??goodStudentsDF.saveAsTable("good_student_infos");  

??// 第四個(gè)功能,可以用table()方法,針對(duì)hive表,直接創(chuàng)建DataFrame
??// 然后針對(duì)good_student_infos表,直接創(chuàng)建DataFrame

??Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();  

??for(Row goodStudentRow : goodStudentRows) {

???System.out.println(goodStudentRow);  
}
sc.close();
?}
}

測(cè)試:

hive

show tables;

select * from table1

Scala版本

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.hive.HiveContext

/**

* @author Administrator

*/

object HiveDataSource {

 def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("HiveDataSource");

val sc = new SparkContext(conf);

val hiveContext = new HiveContext(sc);

hiveContext.sql("DROP TABLE IF EXISTS student_infos");

hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING, age INT)");

hiveContext.sql("LOAD DATA "

   + "LOCAL INPATH '/usr/local/study/sql/student_infos.txt' "

   + "INTO TABLE student_infos");

hiveContext.sql("DROP TABLE IF EXISTS student_scores"); 

hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT)");  

hiveContext.sql("LOAD DATA "

   + "LOCAL INPATH '/usr/local/study/sql/student_scores.txt' "

   + "INTO TABLE student_scores");

val goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score "

   + "FROM student_infos si "

   + "JOIN student_scores ss ON si.name=ss.name "

   + "WHERE ss.score>=80");

hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");  

goodStudentsDF.saveAsTable("good_student_infos");  

val goodStudentRows = hiveContext.table("good_student_infos").collect();  

for(goodStudentRow <- goodStudentRows) {

 println(goodStudentRow);  
}
}
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容