github鏈接
針對(duì)Hive的優(yōu)化主要有以下幾個(gè)方面:
- map
- reduce
- file format
- shuffle & sort
- job as whole
- job chain

Hive map reduce 的過程如下:

Map 階段優(yōu)化
Map 階段的優(yōu)化,主要是控制 map size的大小, map任務(wù)的數(shù)量
Map task num = Total Input size / map size
map size = max{ ${mapred.min.split.size}, // 數(shù)據(jù)的最小分割單元大小, 可調(diào)整,默認(rèn)1B min( ${dfs.block.size}, // hdfs 上數(shù)據(jù)塊大小, 有hdfs配置決定 ${mapred.max.split.size}) // 數(shù)據(jù)最大分隔單元大小, 可調(diào)整,默認(rèn)256MB }
直接調(diào)整mapred.map.tasks這個(gè)參數(shù)是沒有效果
根據(jù)map階段的使用時(shí)間,來調(diào)整數(shù)據(jù)輸入大小
- Map 階段, 小文件對(duì)map任務(wù)數(shù)影響很大, 可以用以下參數(shù)合并小文件
Map輸入合并小文件對(duì)應(yīng)參數(shù):
- set mapred.max.split.size=256000000; // 每個(gè)Map最大輸入大小
- set mapred.min.split.size.per.node=100000000; // 一個(gè)節(jié)點(diǎn)上map最小分割,決定結(jié)點(diǎn)間是否合并
- set mapred.min.split.size.per.rack=100000000; // 一個(gè)機(jī)架下map最小分割,決定機(jī)架間是否合并
- set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
// 執(zhí)行Map前進(jìn)行小文件合并, 合并的大小由`mapred.max.split.size`參數(shù)決定
Map與Reduce之間的優(yōu)化(spill, copy, sort phase)
- map phase和reduce phase之間主要有3道工序
- 首先要把map輸出的結(jié)果進(jìn)行排序后做成中間文件
- 其次這個(gè)中間文件就能分發(fā)到各個(gè)reduce
- 最后reduce端在執(zhí)行reduce phase之前把收集到的排序子文件合并成一個(gè)排序文件
需要強(qiáng)調(diào)的是,雖然這個(gè)部分可以調(diào)的參數(shù)挺多,但是大部分在一般情況下都是不要調(diào)整的,除非能精準(zhǔn)的定位到這個(gè)部分有問題。
Spill 與 Sort
由于內(nèi)存不夠,局部排序的內(nèi)容會(huì)寫入磁盤文件,這個(gè)過程叫做spill.
spill出來的文件再進(jìn)行merge
1. io.sort.mb 控制mapper buffer大小,影響spill的發(fā)生時(shí)機(jī)。 buffer滿的時(shí)候,觸發(fā)spill
2. io.sort.factor 控制merge階段合并的文件大小, 默認(rèn)10(個(gè)文件)
> 調(diào)整參數(shù),由spill的時(shí)間和merge時(shí)間決定。io.sort.mb不能超過map的jvm heap size。
Reduce端的merge也是一樣可以用io.sort.factor。
一般情況下這兩個(gè)參數(shù)很少需要調(diào)整,除非很明確知道這個(gè)地方是瓶頸。比如如果map端的輸出太大, 要么是每個(gè)map讀入了很大的文件(比如不能split的大gz壓縮文件),要么是計(jì)算邏輯導(dǎo)致輸出膨脹了很多倍。
Copy
也就是shuffle,這個(gè)階段把文件從map端copy到reduce端。
mapred.reduce.slowstart.completed.maps map完成多少的時(shí)候,啟動(dòng)copy。默認(rèn)5%
tasktracker.http.threads 作為server端的map用于提供數(shù)據(jù)傳輸服務(wù)的線程
mapred.reduce.parallel.copies 作為client端的reduce同時(shí)從map端拉取數(shù)據(jù)的并行度(一次同時(shí)從多少個(gè)map拉數(shù)據(jù))
mapred.job.shuffle.input.buffer.percent 控制shuffle buffer占reduce task heap size的大小,默認(rèn)0.7(70%)
tasktracker.http.threads與mapred.reduce.parallel.copies需要相互配合。
shuffle階段可能會(huì)出現(xiàn)的OOM。一般認(rèn)為是內(nèi)存分配不合理,GC無法及時(shí)釋放內(nèi)存導(dǎo)致。
可以嘗試調(diào)低mapred.job.shuffle.input.buffer.percent或者增加 reduce 數(shù)量mapred.reduce.tasks
- Map輸出合并
- set hive.merge.mapfiles = true // 在Map-only的任務(wù)結(jié)束時(shí)合并小文件
- set hive.merge.mapredfiles = true // 在Map-Reduce的任務(wù)結(jié)束時(shí)合并小文件
- set hive.merge.size.per.task = 256*1000*1000 // 合并文件的大小
- set hive.merge.smallfiles.avgsize=16000000
// 當(dāng)輸出文件的平均大小小于該值時(shí),啟動(dòng)一個(gè)獨(dú)立的map-reduce任務(wù)進(jìn)行文件merge
Reduce 階段優(yōu)化
reduce phase 優(yōu)化
- 可以直接控制reduce的數(shù)量,
mapred.reduce.tasks參數(shù) - 靈活配置,Hive自動(dòng)計(jì)算reduce數(shù)量, 公式:
num_reduce_tasks = min { ${hive.exec.reducers.max}, // 默認(rèn)999, 上限 ${input.size} / ${ hive.exec.reducers.bytes.per.reducer} }
hive.exec.reducers.bytes.per.reducer // 默認(rèn)1G
文件格式的優(yōu)化
Hive0.9版本有3種,textfile,sequencefile和rcfile??傮w上來說,rcfile的壓縮比例和查詢時(shí)間稍好。
關(guān)于使用方法,可以在建表結(jié)構(gòu)時(shí)可以指定格式,然后指定壓縮插入:
create table rc_file_test( col int ) stored as rcfile;
set hive.exec.compress.output = true;
insert overwrite table rc_file_test select * from source_table;
也可以通過hive.default.fileformat
來設(shè)定輸出格式,適用于create table as select的情況:
set hive.default.fileformat = SequenceFile;
set hive.exec.compress.output = true;
/*對(duì)于sequencefile,有record和block兩種壓縮方式可選,block壓縮比更高*/
set mapred.output.compression.type = BLOCK;
create table seq_file_testas select * from source_table;
上面的文件格式轉(zhuǎn)換,其實(shí)是由hive完成的(也就是插入動(dòng)作)。但是也可以由外部直接導(dǎo)入純文本啟用壓縮(lzo壓縮),或者是由MapReduce Job生成的數(shù)據(jù)。
- Lzo壓縮
lzo 文件支持split, 適合hdfs存儲(chǔ)。 啟用lzo壓縮特別對(duì)于小規(guī)模集群還是很有用的,壓縮比率大概能達(dá)到原始日志大小的1/4左右。
Hadoop原生是支持gzip和bzip2壓縮, 壓縮比率比lzo大,但是不支持split,所以速度很慢。
不過lzo不比gzip和bzip2,不是linux系統(tǒng)原生支持的,需要安裝至少lzo,lzop 以及 hadoop lzo jar.
同時(shí),配置hadoop里面 core-site.xml, mapred-site.xml
core-site.xml
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
使用lzo,lzop,gzip,bzip2壓縮作為io壓縮的編解碼器,并指定lzo的類
mapred-site.xml
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>mapred.map.output.compression.codec</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Djava.library.path=/opt/hadoopgpl/native/Linux-amd64-64</value>
</property>
map結(jié)果采用壓縮輸出,可以降低網(wǎng)絡(luò)帶寬的使用,并指定map輸出所使用的lzo的類。以及指定編解碼器所在位置。
創(chuàng)建lzo索引:
hadoop jar /opt/hadoopgpl/lib/hadoop-lzo.jar \
com.hadoop.compression.lzo.LzoIndexer \
/path/to/lzo/file/or/path
在streaming中使用lzo:
hadoop jar /usr/share/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \
-file map.py \
-file red.py \
-mapper map.py \
-reducer red.py \
-inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \
-input /data/rawlog/test/20130325 -output /tmp/test_20130325
以及在hive中指定壓縮編解碼器:
hadoop集群?jiǎn)⒂昧藟嚎s,就需要在Hive建表的時(shí)候指定壓縮時(shí)所使用的編解碼器,否則Hive無法正確讀取數(shù)據(jù)。
Gzip和Bzip2由于是hadoop默認(rèn)支持的,所以無需指定特殊的編解碼器,只要指定Text類型即可。
CREATE EXTERNAL TABLE text_test_table(
id string,
name string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION
'hdfs://hadoopmaster:9000/data/dw/adpv/20130325'
而LZO是外掛的第三方庫,所以要指定輸入和輸出的編解碼器。
CREATE EXTERNAL TABLE lzo_test_table(
id string,
name string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT
'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://hadoopmaster:9000/data/dw/adpv/20130325'
對(duì)于日志文件,可以本地lzop壓縮好,然后推到hdfs。
另外,對(duì)hadoop的jobconf做一個(gè)指定。這樣就可以做到,輸入是lzo,輸出也可以lzo?;蛘咻斎胧莟ext,輸出是lzo。
-inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat \
-jobconf mapred.output.compress=true \
-jobconf mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec
最后對(duì)HDFS上的日志做Indexer(創(chuàng)建lzo索引),這樣Hive或者M(jìn)R,可以把大的lzo壓縮文件,切分多個(gè)map計(jì)算(splittable)
注意
hive讀取sequencefile,會(huì)忽略key,(比如,lzo文件就是key-value),直接讀value,并且按照指定分隔符分隔value,獲得字段值。
但是如果hive的數(shù)據(jù)來源是從mr生成的,那么寫sequencefile的時(shí)候,key和value都是有意義的,key不能被忽略,而是應(yīng)該當(dāng)成第一個(gè)字段。為了解決這種不匹配的情況,有兩種辦法:
- 一是要求凡是結(jié)果會(huì)給hive用的mr job輸出value的時(shí)候帶上key。但是這樣的話對(duì)于開發(fā)是一個(gè)負(fù)擔(dān),讀寫數(shù)據(jù)的時(shí)候都要注意這個(gè)情況。
- 二是把這個(gè)交給hive解決,寫一個(gè)InputFormat包裝一下,把value輸出加上key即可。以下是核心代碼,修改了RecordReader的next方法:
//注意:這里為了簡(jiǎn)化,假定了key和value都是Text類型,所以MR的輸出的k/v都要是Text類型。
//這個(gè)簡(jiǎn)化還會(huì)造成數(shù)據(jù)為空時(shí),出現(xiàn)org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text的錯(cuò)誤,因?yàn)槟J(rèn)hive的sequencefile的key是一個(gè)空的ByteWritable。
public synchronized boolean next(K key, V value) throws IOException {
Text tKey = (Text) key;
Text tValue = (Text) value;
if (!super.next(innerKey, innerValue))
return false;
Text inner_key = (Text) innerKey; //在構(gòu)造函數(shù)中用createKey()生成
Text inner_value = (Text) innerValue; //在構(gòu)造函數(shù)中用createValue()生成
tKey.set(inner_key);
tValue.set(inner_key.toString() + '\t' + inner_value.toString()); // 分隔符注意自己定義
return true;
}
- Map 輸入輸出其他壓縮 ( hadoop 2.x)
除了lzo, 常用的還有snappy壓縮等。
比起zlib,snappy壓縮大多數(shù)情況下都會(huì)更快,但是壓縮后大小會(huì)大20%到100%。
snappy跟lzo同屬于Lempel–Ziv 壓縮算法家族,但是優(yōu)于lzo的兩點(diǎn)是:
1)Snappy解壓縮比 LZO 更快, 壓縮速度相當(dāng), meaning thetotal round-trip time is superior.
- Snappy 是 BSD-licensed, 可以集成在 Hadoop。 LZO 是 GPL-licensed, 需要單獨(dú)安裝。
通常集群會(huì)使用lzo做map reduce的中間結(jié)果壓縮,中間結(jié)果是用戶不可見的,一般是mapper寫入磁盤,并供reducer讀取。中間結(jié)果對(duì)壓縮友好,一是key有冗余,而是需要寫入磁盤,壓縮減少IO量。同時(shí),lzo和snappy都不是CPU密集的壓縮算法,不會(huì)造成map,reduce的CPU時(shí)間缺乏。 而且snappy的效率比lzo要高20%。
壓縮算法比較
需要注意的一點(diǎn)是,Snappy旨在與容器格式(例如序列文件或Avro數(shù)據(jù)文件)一起使用,而不是直接在純文本上使用,因?yàn)楹笳卟豢刹鸱?not splittable),無法處理并行使用MapReduce。 這不同于LZO,其中可以索引LZO壓縮文件以確定分裂點(diǎn),使得LZO文件可以在后續(xù)處理中被有效地處理。
如圖,.snappy與.lzo都不是splittable的,lzo可以通過創(chuàng)建index文件彌補(bǔ),snappy適合用在序列文件( Sequence Files)上,比如常用的在map階段的輸出,如下:
使用
core-site.xml:
io.compression.codecs增加org.apache.hadoop.io.compress.SnappyCodec
mapred-site.xml:
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>mapred.map.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
Job整體優(yōu)化
從job的整體上,看是否有可以優(yōu)化的點(diǎn),以下分別陳述幾個(gè):
Job執(zhí)行模式
hadoop map-reduce 三種模式,local、偽分布式、分布式。
分布式模式,需要啟動(dòng)分布式j(luò)ob,調(diào)度資源,啟動(dòng)時(shí)間比較長(zhǎng)。對(duì)于很小的job可以采用local map-reduce 模式。參數(shù)如下:
`hive.exec.mode.local.auto`=true // 就可以自動(dòng)開啟local模式,同時(shí)需要以下兩個(gè)參數(shù)配合
`hive.exec.mode.local.auto.tasks.max` // 默認(rèn)4
`hive.exec.mode.local.auto.inputbytes.max` // 默認(rèn) 128(M)
默認(rèn)map task數(shù)少于4(參考文章上面提到的map任務(wù)的決定因素),并且總輸入大小不超過128M,則自動(dòng)開啟local模式。
另外,簡(jiǎn)單的select語句,比如select limit 10, 這種取少量sample的方式,那么在hive0.10之后有專門的fetch task優(yōu)化,使用參數(shù)hive.fetch.task.conversion。Hadoop 2.x 應(yīng)該是默認(rèn)開啟的。
JVM重用
正常情況下,MapReduce啟動(dòng)的JVM在完成一個(gè)task之后就退出了,但是如果任務(wù)花費(fèi)時(shí)間很短,又要多次啟動(dòng)JVM的情況下(比如對(duì)很大數(shù)據(jù)量進(jìn)行計(jì)數(shù)操作),JVM的啟動(dòng)時(shí)間就會(huì)變成一個(gè)比較大的overhead。在這種情況下,可以使用jvm重用的參數(shù):
set mapred.job.reuse.jvm.num.tasks=5; // 一個(gè)jvm完成多少個(gè)task之后再退出
適合小任務(wù)較多的場(chǎng)景,對(duì)于大任務(wù)影響不大,而且考慮大任務(wù)gc的時(shí)間,啟動(dòng)新的jvm成本可以忽略。
索引
Hive可以針對(duì)列建立索引,在以下情況需要使用索引:
- 數(shù)據(jù)集很大
- 查詢時(shí)間大大超過預(yù)期
- 需要快速查詢
- 當(dāng)構(gòu)建數(shù)據(jù)模型的時(shí)候
Hive的索引管理在獨(dú)立的表中(類似于lookup table,而不是傳統(tǒng)數(shù)據(jù)庫的B-tree),不影響數(shù)據(jù)表。另一個(gè)優(yōu)勢(shì)是索引同樣可以被partition,決定于數(shù)據(jù)的大小。
- 索引類型
- Compact Indexing
Compact indexing stores the pair of indexed column’s value and its blockid.
- Bitmap Indexing (在hive0.8出行,通常以用于列值比較獨(dú)特的場(chǎng)景(distinct values) )
Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.
- 創(chuàng)建索引
CREATE INDEX index_name
ON TABLE table_name (columns,....)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' // create a compact index
WITH DEFERRED REBUILD; // This statement means we need to alter the index in later stages
這個(gè)語句會(huì)創(chuàng)建一個(gè)索引,但是要完成創(chuàng)建,還需要完成rebuild語句。通過添加一個(gè)或多個(gè)更改語句,會(huì)開啟一個(gè)map-reudce 任務(wù)來完成索引創(chuàng)建。
ALTER INDEX index_nam on table_name REBUILD; // This ALTER statement will complete our REBUILDED index creation for the table.
- 查看表的索引
show formatted index on table_name;
- 創(chuàng)建bitmap索引
CREATE INDEX index_name
ON TABLE table_name (age)
AS 'BITMAP'
WITH DEFERRED REBUILD;
ALTER INDEX index_name on table_name REBUILD;
- 刪除索引
DROP INDEX IF EXISTS olympic_index ON olympic;
Note:
- 同一個(gè)表,創(chuàng)建不同類型的index在相同的列上,先創(chuàng)建的index會(huì)被使用
- 索引會(huì)加快query執(zhí)行速度
- 一個(gè)表能創(chuàng)建任意數(shù)量的索引
- 使用什么索引,依賴于數(shù)據(jù)。有時(shí)候,bitmap索引要快,有時(shí)候compact索引要快
- 索引應(yīng)該創(chuàng)建在頻繁操作的列上
- 創(chuàng)建更多的索引,同樣也會(huì)降低查詢的性能
- 應(yīng)該基于數(shù)據(jù)的特點(diǎn),創(chuàng)建一種類型索引。如果這個(gè)索引能加快查詢執(zhí)行
- 在集群資源充足的情況下,很可能沒有太大必要考慮索引
join
Hive join實(shí)現(xiàn)有兩種:
- map side join: 實(shí)現(xiàn)方式是replication join,把其中一個(gè)表復(fù)制到所有節(jié)點(diǎn),這樣另一個(gè)表在每個(gè)節(jié)點(diǎn)上面的分片就可以跟這個(gè)完整的表join了
- reduce side join: 實(shí)現(xiàn)方式是repartition join,把兩份數(shù)據(jù)按照join key進(jìn)行hash重分布,讓每個(gè)節(jié)點(diǎn)處理hash值相同的join key數(shù)據(jù),也就是做局部的join。
-
Map join
在hive 0.11之前,使用map join的配置方法有兩種:
一種直接在sql中寫hint,語法是/+MAPJOIN (tbl)/,其中tbl就是你想要做replication的表。
e.g.:
select /*+mapjoin(a) */count(*)from map_join_test ajoin map_join_test b on a.id = b.id;
另一種方法是設(shè)置 hive.auto.convert.join = true,這樣hive會(huì)自動(dòng)判斷當(dāng)前的join操作是否合適做map join,主要是找join的兩個(gè)表中有沒有小表。至于多大的表算小表,則是由hive.smalltable.filesize決定,默認(rèn)25MB。
Before release 0.11, a MAPJOIN could be invoked either through an optimizer hint:
select /*+ MAPJOIN(time_dim) */ count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
or via auto join conversion:
set hive.auto.convert.join=true;
select count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
The default value for hive.auto.convert.join was false in Hive 0.10.0. Hive 0.11.0 changed the default to true (HIVE-3297). Note that hive-default.xml.template incorrectly gives the default as false in Hive 0.11.0 through 0.13.1.
MAPJOINs 把小表加載到內(nèi)存中,建議內(nèi)存的hashmap,然后跟大表進(jìn)行穿透式key匹配。 主要實(shí)現(xiàn)如下的分工方式:
- Local work:
read records via standard table scan (including filters and projections) from source on local machine
build hashtable in memory
write hashtable to local disk
upload hashtable to dfs
-
add hashtable to distributed cache
- Map task:
read hashtable from local disk (distributed cache) into memory
match records' keys against hashtable
-
combine matches and write to output
- No reduce task:
從Hive 0.11開始,map join 鏈會(huì)得到優(yōu)化(hive.auto.convert.join=true 或者 map join hint),e.g.:
select /*+ MAPJOIN(time_dim, date_dim) */ count(*) from
store_sales
join time_dim on (ss_sold_time_sk = t_time_sk)
join date_dim on (ss_sold_date_sk = d_date_sk)
where t_hour = 8 and d_year = 2002
map joins優(yōu)化,會(huì)嘗試合并更多Map Joins。像上例一樣,兩個(gè)表的小維度匹配部分,需要同時(shí)滿足內(nèi)存要求。合并會(huì)指數(shù)的降低query的執(zhí)行時(shí)間,如例中降低兩次讀取和寫入(在job之間通過HDFS通信)為一次讀取。
配置參數(shù):
-
hive.auto.convert.join: 是否允許Hive優(yōu)化普通join為map join,決定與輸入的size. (Hive 0.11 default true) -
hive.auto.convert.join.noconditionaltask: 是否允許Hive優(yōu)化普通join為map join,決定與輸入的size. 如果這個(gè)參數(shù)生效,在n-way join中n-1個(gè)表或者分區(qū)的總大小,小于hive.auto.convert.join.noconditionaltask.size, 則會(huì)直接轉(zhuǎn)化為一個(gè)map join (there is no conditional task). ( default true, Hive 0.11 中添加) -
hive.auto.convert.join.noconditionaltask.size, 如果hive.auto.convert.join.noconditionaltask沒有開啟,這個(gè)參數(shù)不會(huì)生效,如果開啟,n-way join中n-1個(gè)表或者分區(qū)的總大小小于這個(gè)值,則這個(gè)join會(huì)直接轉(zhuǎn)化為一個(gè)map join. (there is no conditional task). (默認(rèn) 10MB).
Note:
同樣的表,即使inner join能轉(zhuǎn)化為map join,outer join也不能轉(zhuǎn)化為map join,因?yàn)閙ap join只能允許一個(gè)table(或分區(qū))是streamed。對(duì)于full outer join是不能轉(zhuǎn)化為map join的。 對(duì)于left 或者right outer join,只有除了需要streamed的表以外的表,能滿足size配置。
Outer joins offer more challenges. Since a map-join operator can only stream one table, the streamed table needs to be the one from which all of the rows are required. For the left outer join, this is the table on the left side of the join; for the right outer join, the table on the right side, etc. This means that even though an inner join can be converted to a map-join, an outer join cannot be converted. An outer join can only be converted if the table(s) apart from the one that needs to be streamed can be fit in the size configuration. A full outer join cannot be converted to a map-join at all since both tables need to be streamed.
- Sort-Merge-Bucket (SMB) Map join
sort-merge-bucket(SMB) join,是定義了sort和bucket表所使用的join方式。這種join方式,通過簡(jiǎn)單的合并已經(jīng)排好序的表,來讓這個(gè)join操作比普通的map join快。這就是map join優(yōu)化。
參數(shù):
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
hive.optimize.bucketmapjoin= true 來控制hive 執(zhí)行bucket map join 了, 需要注意的是你的小表的number_buckets 必須是大表的倍數(shù). 無論多少個(gè)表進(jìn)行連接這個(gè)條件都必須滿足.(其實(shí)如果都按照2的指數(shù)倍來分bucket)。這樣數(shù)據(jù)就會(huì)按照join key做hash bucket。小表依然復(fù)制到所有節(jié)點(diǎn),map join的時(shí)候,小表的每一組bucket加載成hashtable,與對(duì)應(yīng)的一個(gè)大表bucket做局部join,這樣每次只需要加載部分hashtable就可以了。
set hive.optimize.bucketmapjoin.sortedmerge = true,來控制對(duì)join key上都有序的表的join優(yōu)化。同時(shí),需要設(shè)置input format,set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat
-
Left Semi Join
舊版Hive 中沒有in/exist 這樣的子句,所以需要將這種類型的子句轉(zhuǎn)成left semi join。 left semi join 是只傳遞表的join key給map 階段 , 如果key 足夠小還是執(zhí)行map join, 如果不是則還是common join。
數(shù)據(jù)傾斜
數(shù)據(jù)傾斜可能會(huì)發(fā)生在group過程和join過程。
- Group過程
Group過程的數(shù)據(jù)傾斜,set hive.map.aggr=true(默認(rèn)開啟),在map端完成聚合,來優(yōu)化傾斜。也就是在mapper內(nèi)部,做部分的聚合,來輸出更少的行,減少需要排序和分發(fā)到reducer的數(shù)據(jù)量。
Hive在嘗試做此優(yōu)化,不過會(huì)判斷aggregation的效果,如果不能節(jié)省足夠的內(nèi)存,就會(huì)退回標(biāo)準(zhǔn)map過程。也就是在處理了100000 行(hive.groupby.mapaggr.checkinterval控制)后,檢查內(nèi)存中的hash map的項(xiàng),如果超過50%(hive.map.aggr.hash.min.reduction控制),則認(rèn)為聚合會(huì)被終止。
Hive同樣會(huì)估計(jì)hash map中每一項(xiàng)所需要的內(nèi)存,當(dāng)使用的內(nèi)存超過了mapper可用內(nèi)存的50%(hive.map.aggr.hash.percentmemory控制),則會(huì)把flush此hash map到reducers。然而這是對(duì)行數(shù)和每行大小的估計(jì),所以如果實(shí)際值過高,可能導(dǎo)致還沒有flush就out of memory了。
當(dāng)出現(xiàn)這種OOM時(shí),可用減少hive.map.aggr.hash.percentmemory, 但是這個(gè)對(duì)內(nèi)存增長(zhǎng)與行數(shù)無關(guān)的數(shù)據(jù)來說,不一定是有效的。這個(gè)時(shí)候,可以使用關(guān)閉以下方法,- map端聚合
set hive.map.aggr=false, - 給mapper分配更多的內(nèi)存
- 重構(gòu)query查詢。利用子查詢等方法,優(yōu)化查詢語句,e.g.:
select count(distinct v) from tbl 改寫成 select count(1) from (select v from tbl group by v) t.
- map端聚合
Group過程傾斜,還可以開啟hive.groupby.skewindata=true來改善,這個(gè)是讓key隨機(jī)分發(fā)到reducer,而不是同樣的key分發(fā)到同一個(gè)reducer,然后reduce做聚合,做完之后再做一輪map-reduce。這個(gè)是把上面提到的map端聚合放到了reduce端,增加了reducer新的開銷,大多數(shù)情況效果并不好。
- Join過程
-
map join可以解決大表join小表時(shí)候的數(shù)據(jù)傾斜 -
skew join是hive中對(duì)數(shù)據(jù)傾斜的一個(gè)解決方案,set hive.optimize.skewjoin = true;根據(jù)
hive.skewjoin.key(默認(rèn)100000)設(shè)置的數(shù)量hive可以知道超過這個(gè)值的key就是特殊key值。對(duì)于特殊的key,reduce過程直接跳過,最后再啟用新的map-reduce過程來處理。
數(shù)據(jù)傾斜
業(yè)務(wù)數(shù)據(jù)本身的傾斜,可以從業(yè)務(wù)數(shù)據(jù)特點(diǎn)本身出發(fā),通過設(shè)置reduce數(shù)量等方式,來避免傾斜
-
Top N 問題
order by col limit n. hive默認(rèn)的order by實(shí)現(xiàn)只會(huì)用1個(gè)reduce做全局排序,這在數(shù)據(jù)量大的時(shí)候job運(yùn)行效率非常低。hive在0.12版本引入了parallel order by,也就是通過sampling的方式實(shí)現(xiàn)并行(即基于TotalOrderPartitioner)。具體開關(guān)參數(shù)是
hive.optimize.sampling.orderby。但是如果使用這個(gè)參數(shù)還是很可能碰到問題的:
- 首先如果order by字段本身取值范圍過少,會(huì)造成Split points are out of order錯(cuò)誤。這是因?yàn)?,假設(shè)job中reduce數(shù)量為r的話,那么TotalOrderPartitioner需要order by字段的取值至少要有r - 1個(gè)。那么這樣一來還需要關(guān)心reduce數(shù)量,增加了開發(fā)負(fù)擔(dān),而且如果把reduce數(shù)量設(shè)的很小,優(yōu)化的效果就不太明顯了。
- 其次,設(shè)置這個(gè)參數(shù)還可能造成聚會(huì)函數(shù)出錯(cuò),這個(gè)問題只在比較新的hive版本中解決了。
sort by col limit n 可以解決top N問題,sort by保證每個(gè)reduce內(nèi)數(shù)據(jù)有序,這樣就等于是做并行排序。而limit n則保證每個(gè)reduce的輸出記錄數(shù)只是n(reducer內(nèi)top N)。等局部top n完成之后,再起一輪job,用1個(gè)reduce做全局top n,由于數(shù)據(jù)量大大減少單個(gè)reducer也能快速完成。
SQL整體優(yōu)化
Job間并行
對(duì)于子查詢和union等情況,可以并行的執(zhí)行job
set hive.exec.parallel=true, 默認(rèn)的并行度為8(hive.exec.parallel. thread.number 控制),表示最多可以8個(gè)job并行,注意這個(gè)值的大小,避免占用太多資源。
減少Job數(shù)
通過優(yōu)化查詢語句(SQL)來實(shí)現(xiàn)減少job數(shù)目(子查詢數(shù)目)的目的。

