Hive的一般學習者和培訓者在談性能優化的時候一般都會從語法和參數這些雕蟲小技的角度談優化,而不會革命性的優化Hive的性能,產生這種現象的原因有:1,歷史原因和思維定勢:大家學習SQL的時候一般都是就單機DB,這個時候你的性能優化技巧確實主要是SQL語法和參數調優;2,Hive的核心的性能問題往往是產生在超過規模數據集,例如說100億條級別的數據集,以及每天處理上千上萬個Hive作業的情況下產生的;上面的第二點是我們現在Hive性能調優部分要徹底解決的內容;要從根本上解決和顯著的解決實際企業中Hive真正的性能優化問題,必須考慮到底什么是Hive性能的限制,我們按照優先級來說:第一重要的是:戰略性架構 解決海量數據下大量Job過于頻繁的IO問題,而這個問題實質上涉及了架構方面的分表 數據復用 以及分區表等調優的方式; 補充:1,海量的數據中有些數據是高頻使用的數據,而有些是很少使用的,如果能夠分離成為不同的表,會極大的提升效率;很多的作業可能會有共同點,抽離出來先進行計算并保留計算結果,后面的作業都可以復用;同時,底層的基礎功能也可以先計算,在上層應用的時候直接拿數據結果,而不是每次都重復計算; 2,合理從用靜態分區表和動態分區表,可以避免數據全局掃描及計算資源更合理的利用; 3,數據傾斜的一站式解決方案;第二重要的是:引擎和物理層面,很多內容都是普通Hive使用這不知道的! 從Hive語法和Job內部的角度去進行優化,這要求MapReduce以及Hive如何被翻譯成為MapReduce要非常精通;第三重要的是:一些關鍵的參數;歸根到底,Hive的性能優化主要考慮的是如何最大化和最有效的使用CPU Memory IO;
Hive背后的Mapper調優:
1,Mapper數過大,會產生大量小文件,由于Mapper是基于虛擬機的,過多的Mapper創建和初始化及關閉虛擬機都會消耗大量的硬件資源;
Mapper數太小,并發度過小,Job執行時間過長,無法充分利用分布式硬件資源;
2,Mapper數據由什么決定呢?
輸入文件數目;
輸入文件的大小;
配置參數;
默認情況下:例如一個文件800M,BLock大小是128M,那么Mapper數目就是7個,6個Mapper處理的數據是 128M, 1個Mapper處理的數據是32M;再例如,一個目錄下有三個文件分別大小問5M 10M 150M
此時會產生4個Mapper,處理的數據分別是5M 10M 128M 22M;
減少Mapper的個數,就要合并小文件,這種小文件有可能是直接來自于數據源的小文件,也可能是Reducer產生的小文件;
set hive.input.format=org.apache.Hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapFiles=true;
set hive.merge.mapredFiles=true;
set hive.merge.size.per.task=256000000
set mapred.max.split.size=256000000
set mapred.min.split.size.per.node=128000000
增加Mapper的個數,一般是通過控制Hive SQL中上一個Job的Reducer個數來控制的,例如在Join操作的時候會把多個表分解為多個Job;
set mapred.map.tasks=2;
set hive.merge.mapFiles=true;
set hive.merge.mapredFiles=true;
set hive.merge.size.per.task=256000000
例如我們有5個300M的文件;按照上面的配置會產生10個Mapper,5個Mapper處理的都是256M的數據,另外5個Mapper處理的都是44M的數據,問題是:大的Mapper會數據傾斜
如何解決,設置set mapred.map.tasks=6,此時根據MapRed的運行機制,會劃分6個Mapper,每個Mapper的處理數據的大小是250M, min(1500M/6, 256M) =250M
Hive背后的Reducer調優:
1,Reducer數目過大的話,會產生很多小文件,每個Reducer都會產生一個文件,如果這些小文件是下一個JOB的輸入,則會需要對小文件進行合并;同樣啟動 初始化和銷毀Reducer的虛擬機也需要消耗大量的硬件;
Reducer數據過小的話,Reduce的時間會比較長,也可能會出現數據傾斜;
2,如何控制Reducer的個數呢?
set hive.exec.reducers.byte.per.reducer=1G
set hive.exec.reducers.max=999
Reducer個數=min(999, Reducer的數據輸入總量/1G);
set mapred.reduce.tasks = 10, 默認是1; 如果說當前的Reducer的結果很大,且被接下來多個Job使用其結果,我們該如何設置參數呢?一般都需要調大該參數;
什么情況下只有一個Reducer?如果不進行Group by但卻需要匯總,或者說Order by,當然如果最后Reducer的數據小于默認的1G的話,也會只有一個Reducer;
1,Hive在分布式運行的時候最害怕的是數據傾斜,這是由于分布式系統的特性決定的,因為分布式系統之所以很快是由于作業平均分配給了不同的節點,不同節點同心協力,從而達到更快處理完作業的目的;
順便說明一下,處理數據傾斜的能力是hadoop和Spark工程師最核心的競爭力之一;
2,Hive中數據傾斜的原因:
數據在分布式節點上分布不平衡;
join時某些key可能特別大;
groupBy的時候某個Key可能特別多;
count(distinct)有可能出現數據傾斜,因為其內部首先會進行groupBy操作;
3,join,我們希望join時候key是分散,如果一個key的數據量特別大,有可能會出現數據傾斜和OOM,一個核心點是:小表join大表,在reduce階段左側的小表會加載進內存,減少OOM的風險;
4,大表join大表的情況:數據傾斜,例如null值,解決辦法一般是要打散null值,例如說使用隨機數等,如果數據傾斜比較嚴重,采用這種方式可以提升至少一倍的速度;
5,mapJoin:小表join(超)大表的時候,可以采用mapJoin的方式把小表全部加載到Mapper端的內存中/*+MAPJOIN(table_name)*/;
6,小表join(超)大表的時候,是否會自動進行mapJoin,想進行mapJoin,需要設置:set hive.auto.convert.join=true,Hive在進行join的時候會判斷左表的大小來決定是否進行mapJoin:
set hive.mapjoin.smalltable.filesize=128000000;
set hive.mapjoin.cache.numrows=100000;
上述參數可以根據實際的硬件機器的內存進行調整,對性能有至關重要的影響,因為沒有了Shuffle;
對于mapJoin我們能夠使用Mapper端JVM中多大的內存呢?
set hive.mapjoin.followby.gby.localtask.max.momery.usage = 0.8
set hive.mapjoin.localtask.max.memory.uage=0.9
7,groupBy,我們可以設置在Mapper端進行部分聚合,最后在Reducer端進行全局聚合
set hive.map.aggr=true;
set hive.groupby.mapaggr.checkinterval=100000
set hive.groupby.skewindata = true 內部會產生兩個Job,第一個Job會通過自己的算法打散傾斜的Key并進行聚合操作且保留結果,第二個Job會完成全部的groupBy操作,會產生Mapper-Reducer-Reducer的結構
8, count(distinct),如果某個字段特別多,容易產生數據傾斜,解決思路:
在查詢語句中例如對null進行過濾,在結果中加1
9, 笛卡爾積:join時候沒有on條件,或者on條件無效,這個時候會使用Reducer進行笛卡爾積的操作;