hive是怎么轉化hql為MR程序的?

image.png
總的來說,Hive是通過給用戶提供的一系列交互接口,接收到用戶的指令(SQL),使用自己的Driver,結合元數據(MetaStore),將這些指令翻譯成MapReduce,提交到Hadoop中執(zhí)行,最后,將執(zhí)行返回的結果輸出到用戶交互接口。
- 用戶接口:Client
CLI(hiveshell)、JDBC/ODBC(java訪問hive)、WEBUI(瀏覽器訪問hive) - 元數據:Metastore
元數據包括:表名、表所屬的數據庫(默認是default)、表的擁有者、列/分區(qū)字段、表的類型(是否是外部表)、表的數據所在目錄等;
默認存儲在自帶的derby數據庫中,推薦使用MySQL存儲Metastore - Hadoop
使用HDFS進行存儲,使用MapReduce進行計算。 - 驅動器:Driver
(1)解析器(SQL Parser):將SQL字符串轉換成抽象語法樹AST,這一步一般都用第三方工具庫完成,比如antlr;對AST進行語法分析,比如表是否存在、字段是否存在、SQL語義是否有誤。
(2)編譯器(Physical Plan):將AST編譯生成邏輯執(zhí)行計劃。
(3)優(yōu)化器(Query Optimizer):對邏輯執(zhí)行計劃進行優(yōu)化。
(4)執(zhí)行器(Execution):把邏輯執(zhí)行計劃轉換成可以運行的物理計劃。對于Hive來說,就是MR/Spark。
-
執(zhí)行流程:
- ANTLR將用戶提供的語法文件進行分析,轉換成語法樹,包含各種符號(token)和字面值;--- TOK_QUERY、TOK_FROM、TOK_SELECT等
- 遍歷語法樹,抽象出查詢的基本單元塊,QueryBlock,包含輸入源、計算過程、輸出。可以理解為子查詢
- 遍歷QueryBlock生成操作樹,包含 TableScanOperator、SelectOperator等
- 優(yōu)化器優(yōu)化操作樹,變換、減少MR任務數、Shuffle階段數量等
- 轉換為最終的MR程序提交作業(yè)。
例:
explain select * from sqoop where id > 0;
Stage-0
Fetch Operator
limit:-1
Select Operator [SEL_2]
outputColumnNames:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18","_col19","_col20","_col21","_col22","_col23","_col24","_col25","_col26","_col27","_col28","_col29","_col30","_col31"]
Filter Operator [FIL_4]
predicate:(id > 0) (type: boolean)
TableScan [TS_0]
alias:sqoop

image.png
package com.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author phil.zhang
* @date 2019/4/3
*/
// SELECT pageid, age, count(1) FROM TABLE GROUP BY pageid,age
public class Hive2MR {
static class PageMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String data = value.toString();
context.write(new Text(data), new IntWritable(1));
}
}
static class PageReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int total=0;
for (IntWritable value : values) {
total=total+value.get();
}
context.write(key, new IntWritable(total));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
System.setProperty("hadoop.home.dir","c:\\hadoop\\2.7.3");
Job job = Job.getInstance(new Configuration());
job.setJarByClass(Hive2MR.class);
job.setMapperClass(PageMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(PageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,new Path("C:\\zf\\pageAge.txt"));
FileOutputFormat.setOutputPath(job, new Path("C:\\zf\\result"));
boolean b = job.waitForCompletion(true);
}
}
package com.test;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;
/**
* @author phil.zhang
* @date 2019/4/3
*/
// SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid);
// pgid, uid , time
// uid, age , gender
public class Hive2Spark {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir","c:\\hadoop\\2.7.3");
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("hive");
SparkContext context = new SparkContext(conf);
RDD<String> page = context.textFile("c:/zf/page.txt", 1);
RDD<String> user = context.textFile("c:/zf/user.txt", 1);
JavaPairRDD<String, String> pagePair = page.toJavaRDD()
.map(str -> str.split(",")).mapToPair(strs -> new Tuple2<>(strs[1], strs[0]));
for (Tuple2<String, String> tuple2 : pagePair.collect()) {
System.out.println(tuple2._1 + ":" + tuple2._2);
}
JavaPairRDD<String, String> userPair = user.toJavaRDD()
.map(str -> str.split(",")).mapToPair(strs -> new Tuple2<>(strs[0], strs[1]));
for (Tuple2<String, String> tuple2 : userPair.collect()) {
System.out.println(tuple2._1 + ":" + tuple2._2);
}
JavaPairRDD<String, Tuple2<String, String>> pairRDD = pagePair.join(userPair);
for (Tuple2<String, Tuple2<String, String>> tuple2 : pairRDD.collect()) {
System.out.println(tuple2._1 + ":" + tuple2._2()._1() +"," + tuple2._2()._2());
}
JavaRDD<String> result = pairRDD.map(pair -> pair._2()._1 + "," + pair._2()._2());
for (String s : result.collect()) {
System.out.println(s);
}
}
}