2017年7月30日
#
一、POI概述
Apache POI是Apache軟件基金會的開放源碼函式庫,POI提供API給Java程序對Microsoft Office格式檔案讀和寫的功能。
結構:
HSSF - 提供讀寫Microsoft Excel格式檔案的功能。
XSSF - 提供讀寫Microsoft Excel OOXML格式檔案的功能。
HWPF - 提供讀寫Microsoft Word格式檔案的功能。
HSLF - 提供讀寫Microsoft PowerPoint格式檔案的功能。
HDGF - 提供讀寫Microsoft Visio格式檔案的功能。
使用必須引入依賴
org.apache.poi
poi
3.17
注:3.17版本是支持jdk6的最后版本
二、HSSF概況
HSSF 是Horrible SpreadSheet Format的縮寫,通過HSSF,你可以用純Java代碼來讀取、寫入、修改Excel文件。HSSF 為讀取操作提供了兩類API:usermodel和eventusermodel,即“用戶模型”和“事件-用戶模型”。
三、 POI EXCEL文檔結構類
HSSFWorkbook excel文檔對象
HSSFSheet excel的sheet
HSSFRow excel的行
HSSFCell excel的單元格
HSSFFont excel字體
HSSFName 名稱
HSSFDataFormat 日期格式
HSSFHeader sheet頭
HSSFFooter sheet尾
HSSFCellStyle cell樣式
HSSFDateUtil 日期
HSSFPrintSetup 打印
HSSFErrorConstants 錯誤信息表
四、EXCEL的讀寫操作
1、讀取“區域數據.xls”并儲存于list集合中,“區域數據.xls”如下圖
public List
importXLS(){
ArrayList
list = new ArrayList<>();
try {
//1、獲取文件輸入流
InputStream inputStream = new FileInputStream("/Users/Shared/區域數據.xls");
//2、獲取Excel工作簿對象
HSSFWorkbook workbook = new HSSFWorkbook(inputStream);
//3、得到Excel工作表對象
HSSFSheet sheetAt = workbook.getSheetAt(0);
//4、循環讀取表格數據
for (Row row : sheetAt) {
//首行(即表頭)不讀取
if (row.getRowNum() == 0) {
continue;
}
//讀取當前行中單元格數據,索引從0開始
String areaNum = row.getCell(0).getStringCellValue();
String province = row.getCell(1).getStringCellValue();
String city = row.getCell(2).getStringCellValue();
String district = row.getCell(3).getStringCellValue();
String postcode = row.getCell(4).getStringCellValue();
Area area = new Area();
area.setCity(city);
area.setDistrict(district);
area.setProvince(province);
area.setPostCode(postcode);
list.add(area);
}
//5、關閉流
workbook.close();
} catch (IOException e) {
e.printStackTrace();
}
return list;
}
2、導出數據到“區域數據.xls”文件中,頁面數據如下圖:
public void exportExcel() throws IOException {
Page
page = areaService.pageQuery(null);
List
list = page.getContent();
//1.在內存中創建一個excel文件
HSSFWorkbook hssfWorkbook = new HSSFWorkbook();
//2.創建工作簿
HSSFSheet sheet = hssfWorkbook.createSheet();
//3.創建標題行
HSSFRow titlerRow = sheet.createRow(0);
titlerRow.createCell(0).setCellValue("省");
titlerRow.createCell(1).setCellValue("市");
titlerRow.createCell(2).setCellValue("區");
titlerRow.createCell(3).setCellValue("郵編");
titlerRow.createCell(4).setCellValue("簡碼");
titlerRow.createCell(5).setCellValue("城市編碼");
//4.遍歷數據,創建數據行
for (Area area : list) {
//獲取最后一行的行號
int lastRowNum = sheet.getLastRowNum();
HSSFRow dataRow = sheet.createRow(lastRowNum + 1);
dataRow.createCell(0).setCellValue(area.getProvince());
dataRow.createCell(1).setCellValue(area.getCity());
dataRow.createCell(2).setCellValue(area.getDistrict());
dataRow.createCell(3).setCellValue(area.getPostcode());
dataRow.createCell(4).setCellValue(area.getShortcode());
dataRow.createCell(5).setCellValue(area.getCitycode());
}
//5.創建文件名
String fileName = "區域數據統計.xls";
//6.獲取輸出流對象
HttpServletResponse response = ServletActionContext.getResponse();
ServletOutputStream outputStream = response.getOutputStream();
//7.獲取mimeType
ServletContext servletContext = ServletActionContext.getServletContext();
String mimeType = servletContext.getMimeType(fileName);
//8.獲取瀏覽器信息,對文件名進行重新編碼
HttpServletRequest request = ServletActionContext.getRequest();
fileName = FileUtils.filenameEncoding(fileName, request);
//9.設置信息頭
response.setContentType(mimeType);
response.setHeader("Content-Disposition","attachment;filename="+fileName);
//10.寫出文件,關閉流
hssfWorkbook.write(outputStream);
hssfWorkbook.close();
}
工具類
public class FileUtils {
public static String filenameEncoding(String filename, HttpServletRequest request) throws IOException {
String agent = request.getHeader("User-Agent"); //獲取瀏覽器
if (agent.contains("Firefox")) {
BASE64Encoder base64Encoder = new BASE64Encoder();
filename = "=?utf-8?B?"
+ base64Encoder.encode(filename.getBytes("utf-8"))
+ "?=";
} else if(agent.contains("MSIE")) {
filename = URLEncoder.encode(filename, "utf-8");
} else if(agent.contains ("Safari")) {
filename = new String (filename.getBytes ("utf-8"),"ISO8859-1");
} else {
filename = URLEncoder.encode(filename, "utf-8");
}
return filename;
}
}
寫出xls文件:
五、 EXCEL常用操作方法
1、 得到Excel常用對象
POIFSFileSystem fs=newPOIFSFileSystem(new FileInputStream("d:/test.xls"));
//得到Excel工作簿對象
HSSFWorkbook wb = new HSSFWorkbook(fs);
//得到Excel工作表對象
HSSFSheet sheet = wb.getSheetAt(0);
//得到Excel工作表的行
HSSFRow row = sheet.getRow(i);
//得到Excel工作表指定行的單元格
HSSFCell cell = row.getCell((short) j);
cellStyle = cell.getCellStyle();//得到單元格樣式
2、建立Excel常用對象
HSSFWorkbook wb = new HSSFWorkbook();//創建Excel工作簿對象
HSSFSheet sheet = wb.createSheet("new sheet");//創建Excel工作表對象
HSSFRow row = sheet.createRow((short)0); //創建Excel工作表的行
cellStyle = wb.createCellStyle();//創建單元格樣式
row.createCell((short)0).setCellStyle(cellStyle); //創建Excel工作表指定行的單元格
row.createCell((short)0).setCellValue(1); //設置Excel工作表的值
3、設置sheet名稱和單元格內容
wb.setSheetName(1, "第一張工作表",HSSFCell.ENCODING_UTF_16);
cell.setEncoding((short) 1);
cell.setCellValue("單元格內容");
4、取得sheet的數目
wb.getNumberOfSheets()
5、 根據index取得sheet對象
HSSFSheet sheet = wb.getSheetAt(0);
6、取得有效的行數
int rowcount = sheet.getLastRowNum();
7、取得一行的有效單元格個數
row.getLastCellNum();
8、單元格值類型讀寫
cell.setCellType(HSSFCell.CELL_TYPE_STRING); //設置單元格為STRING類型
cell.getNumericCellValue();//讀取為數值類型的單元格內容
9、設置列寬、行高
sheet.setColumnWidth((short)column,(short)width);
row.setHeight((short)height);
10、添加區域,合并單元格
Region region = new Region((short)rowFrom,(short)columnFrom,(short)rowTo
,(short)columnTo);//合并從第rowFrom行columnFrom列
sheet.addMergedRegion(region);// 到rowTo行columnTo的區域
//得到所有區域
sheet.getNumMergedRegions()
11、保存Excel文件
FileOutputStream fileOut = new FileOutputStream(path);
wb.write(fileOut);
12、根據單元格不同屬性返回字符串數值
public String getCellStringValue(HSSFCell cell) {
String cellValue = "";
switch (cell.getCellType()) {
case HSSFCell.CELL_TYPE_STRING://字符串類型
cellValue = cell.getStringCellValue();
if(cellValue.trim().equals("")||cellValue.trim().length()<=0)
cellValue=" ";
break;
case HSSFCell.CELL_TYPE_NUMERIC: //數值類型
cellValue = String.valueOf(cell.getNumericCellValue());
break;
case HSSFCell.CELL_TYPE_FORMULA: //公式
cell.setCellType(HSSFCell.CELL_TYPE_NUMERIC);
cellValue = String.valueOf(cell.getNumericCellValue());
break;
case HSSFCell.CELL_TYPE_BLANK:
cellValue=" ";
break;
case HSSFCell.CELL_TYPE_BOOLEAN:
break;
case HSSFCell.CELL_TYPE_ERROR:
break;
default:
break;
}
return cellValue;
}
13、常用單元格邊框格式
HSSFCellStyle style = wb.createCellStyle();
style.setBorderBottom(HSSFCellStyle.BORDER_DOTTED);//下邊框
style.setBorderLeft(HSSFCellStyle.BORDER_DOTTED);//左邊框
style.setBorderRight(HSSFCellStyle.BORDER_THIN);//右邊框
style.setBorderTop(HSSFCellStyle.BORDER_THIN);//上邊框
14、設置字體和內容位置
HSSFFont f = wb.createFont();
f.setFontHeightInPoints((short) 11);//字號
f.setBoldweight(HSSFFont.BOLDWEIGHT_NORMAL);//加粗
style.setFont(f);
style.setAlignment(HSSFCellStyle.ALIGN_CENTER);//左右居中
style.setVerticalAlignment(HSSFCellStyle.VERTICAL_CENTER);//上下居中
style.setRotation(short rotation);//單元格內容的旋轉的角度
HSSFDataFormat df = wb.createDataFormat();
style1.setDataFormat(df.getFormat("0.00%"));//設置單元格數據格式
cell.setCellFormula(string);//給單元格設公式
style.setRotation(short rotation);//單元格內容的旋轉的角度
15、插入圖片
//先把讀進來的圖片放到一個ByteArrayOutputStream中,以便產生ByteArray
ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();
BufferedImage bufferImg = ImageIO.read(new File("ok.jpg"));
ImageIO.write(bufferImg,"jpg",byteArrayOut);
//讀進一個excel模版
FileInputStream fos = new FileInputStream(filePathName+"/stencil.xlt");
fs = new POIFSFileSystem(fos);
//創建一個工作薄
HSSFWorkbook wb = new HSSFWorkbook(fs);
HSSFSheet sheet = wb.getSheetAt(0);
HSSFPatriarch patriarch = sheet.createDrawingPatriarch();
HSSFClientAnchor anchor = new HSSFClientAnchor(0,0,1023,255,(short) 0,0,(short)10,10);
patriarch.createPicture(anchor , wb.addPicture(byteArrayOut.toByteArray(),HSSFWorkbook.PICTURE_TYPE_JPEG));
16、調整工作表位置
HSSFWorkbook wb = new HSSFWorkbook();
HSSFSheet sheet = wb.createSheet("format sheet");
HSSFPrintSetup ps = sheet.getPrintSetup();
sheet.setAutobreaks(true);
ps.setFitHeight((short)1);
ps.setFitWidth((short)1);
1、在學習從文件讀取數據中,寫了個示例代碼,讀取不在同一個目錄的file.txt,運行后報這個Python OSError: [Errno 22] Invalid argument:錯誤:
(1)、首先,在F盤的python_stu中新增了一個file.txt,同時在F盤的python_stu文件目錄底下新增一個file文件夾,里面有個file_reader.py來讀取python_stu文件目錄底下的file.txt,代碼分別如下:
file.txt:
測試
測試2
測試3
file_reader.py:
with open('F:\python_stu\file.txt') as file_obj:
contents = file_obj.read();
print(contents.rstrip());
(2)、運行后報錯:
(3)、出現這種錯誤的原因是由于讀取不到這個文件,看Traceback報的錯誤,最后一行,很明顯讀取不到file.txt,前面的F:\\python_stu沒錯,后面的名稱怎么變了,還是x0cile.txt。
(4)、解決辦法,可修改上述第一行代碼為:
with open('F:\python_stu/file.txt') as file_obj:
或者:
with open('F:/python_stu/file.txt') as file_obj:
或者:
with open('F://python_stu//file.txt') as file_obj:
又或者:
with open('F:\\python_stu\\file.txt') as file_obj:
還有一些我就不附上了,上面第一種方式不統一,最好不要用,用統一的方式,而且有時候還有注意一些轉義字符,比如 \t,\n也會導致報錯。
前面學習了使用命令hdfs haadmin -failover手動進行故障轉移,在該模式下,即使現役NameNode已經失效,系統也不會自動從現役NameNode轉移到待機NameNode,下面學習如何配置部署HA自動進行故障轉移。自動故障轉移為HDFS部署增加了兩個新組件:ZooKeeper和ZKFailoverController(ZKFC)進程。ZooKeeper是維護少量協調數據,通知客戶端這些數據的改變和監視客戶端故障的高可用服務。HA的自動故障轉移依賴于ZooKeeper的以下功能:
- 故障檢測:集群中的每個NameNode在ZooKeeper中維護了一個持久會話,如果機器崩潰,ZooKeeper中的會話將終止,ZooKeeper通知另一個NameNode需要觸發故障轉移。
- 現役NameNode選擇:ZooKeeper提供了一個簡單的機制用于唯一的選擇一個節點為active狀態。如果目前現役NameNode崩潰,另一個節點可能從ZooKeeper獲得特殊的排外鎖以表明它應該成為現役NameNode。
ZKFC是自動故障轉移中的另一個新組件,是ZooKeeper的客戶端,也監視和管理NameNode的狀態。每個運行NameNode的主機也運行了一個ZKFC進程,ZKFC負責:
- 健康監測:ZKFC使用一個健康檢查命令定期地ping與之在相同主機的NameNode,只要該NameNode及時地回復健康狀態,ZKFC認為該節點是健康的。如果該節點崩潰,凍結或進入不健康狀態,健康監測器標識該節點為非健康的。
- ZooKeeper會話管理:當本地NameNode是健康的,ZKFC保持一個在ZooKeeper中打開的會話。如果本地NameNode處于active狀態,ZKFC也保持一個特殊的znode鎖,該鎖使用了ZooKeeper對短暫節點的支持,如果會話終止,鎖節點將自動刪除。
- 基于ZooKeeper的選擇:如果本地NameNode是健康的,且ZKFC發現沒有其它的節點當前持有znode鎖,它將為自己獲取該鎖。如果成功,則它已經贏得了選擇,并負責運行故障轉移進程以使它的本地NameNode為active。故障轉移進城與前面描述的手動故障轉移相似,首先如果必要保護之前的現役NameNode,然后本地NameNode轉換為active狀態。
在典型部署中,ZooKeeper守護進程運行在三個或者五個節點上,但由于ZooKeeper本身需要較少的資源,所以將ZooKeeper部署在與現役NameNode和待機NameNode相同的主機上,還可以將ZooKeeper部署到與YARN的ResourceManager相同的節點上。建議配置ZooKeeper將數據存儲在與HDFS元數據不同的硬盤上以得到最好的性能和隔離性。在配置自動故障轉移之前需要先停掉集群,目前在集群運行時還不可能將手動故障轉移的安裝轉換為自動故障轉移的安裝。接下來看看如何配置HA的自動故障轉移。首先在hdfs-site.xml中添加下面的參數,該參數的值默認為false:
- <property>
- <name>dfs.ha.automatic-failover.enabled</name>
- <value>true</value>
- </property>
在core-site.xml文件中添加下面的參數,該參數的值為ZooKeeper服務器的地址,ZKFC將使用該地址。
- <property>
- <name>ha.zookeeper.quorum</name> <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
- </property>
在HA或者HDFS聯盟中,上面的兩個參數還需要以NameServiceID為后綴,比如dfs.ha.automatic-failover.enabled.mycluster。除了上面的兩個參數外,還有其它幾個參數用于自動故障轉移,比如ha.zookeeper.session-timeout.ms,但對于大多數安裝來說都不是必須的。
在添加了上述的配置參數后,下一步就是在ZooKeeper中初始化要求的狀態,可以在任一NameNode中運行下面的命令實現該目的,該命令將在ZooKeeper中創建znode:
在啟用自動故障轉移的集群中,start-dfs.sh腳本將在任何運行NameNode的主機上自動啟動ZKFC守護進程,一旦ZKFC啟動完畢,它們將自動選擇一個NameNode為現役NameNode。如果手動管理集群中的服務,需要在每臺運行NameNode的主機上手動啟動ZKFC,命令為:
- hadoop-daemon.sh start zkfc
- hdfs zkfc
如果正在運行一個安全的集群,可能想確保存儲在ZooKeeper中的信息也是安全的,這將阻止惡意的客戶端修改ZooKeeper中的元數據或者潛在地觸發一個錯誤的故障轉移。為了保護ZooKeeper中的信息,首先在core-site.xml中添加下面的參數:
- <property>
- <name>ha.zookeeper.auth</name>
- <value>@/path/to/zk-auth.txt</value>
- </property>
- <property>
- <name>ha.zookeeper.acl</name>
- <value>@/path/to/zk-acl.txt</value>
- </property>
參數值中的@字符表示參數值保存在@后的硬盤文件中。第一個配置文件指定了ZooKeeper的認證列表,其格式與ZK CLI使用的相同,例如:digest:hdfs-zkfcs:mypassword,其中hdfs-zkfcs為ZooKeeper的用戶名,mypassword為密碼。其次使用下面的命令為該認證生成一個ZooKeeper訪問控制列表:
- $ java -cp $ZK_HOME/lib/*:$ZK_HOME/zookeeper-3.4.2.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider hdfs-zkfcs:mypassword
- output: hdfs-zkfcs:mypassword->hdfs-zkfcs:P/OQvnYyU/nF/mGYvB/xurX8dYs=
拷貝->之后的字符串并添加digest:前綴,然后粘貼到zk-acls.txt中,例如:digest:hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=:rwcda。要想使ACLs生效,需要再次運行zkfc –formatZK。最后可能像下面這樣在ZK CLI中驗證ACLs:
- [zk: localhost:2181(CONNECTED) 1] getAcl /hadoop-ha
- 'digest,'hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=
- : cdrwa
在安裝完成自動故障轉移后,或許需要測試一下。首先定位現役NameNode,可以通過訪問NameNode的web頁面來確定哪個NameNode是active狀態的。一旦確定了處于active狀態的NameNode,就需要在該節點上制造點故障,比如使用命令kill -9 <pid of NN>模擬JVM崩潰,或重啟主機或拔掉網線來模擬不同的中斷。一旦觸發了自動故障轉移,另一個NameNode應該自動在幾秒鐘內變為active狀態。檢測到故障并觸發故障轉移由參數ha.zookeeper.session-timeout.ms控制,該參數為與core-site.xml中,默認為5秒。如果測試不成功,可能是配置問題,檢查ZKFC和NameNode進程的日志以進一步診斷問題,通常錯誤都是很明顯的。
理想情況下,我們應用對Yarn資源的請求應該立刻得到滿足,但現實情況資源往往是有限的,特別是在一個很繁忙的集群,一個應用資源的請求經常需要等待一段時間才能的到相應的資源。在Yarn中,負責給應用分配資源的就是Scheduler。其實調度本身就是一個難題,很難找到一個完美的策略可以解決所有的應用場景。為此,Yarn提供了多種調度器和可配置的策略供我們選擇。
一、調度器的選擇
在Yarn中有三種調度器可以選擇:FIFO Scheduler
,Capacity Scheduler
,FairS cheduler
。
FIFO Scheduler
把應用按提交的順序排成一個隊列,這是一個先進先出隊列,在進行資源分配的時候,先給隊列中最頭上的應用進行分配資源,待最頭上的應用需求滿足后再給下一個分配,以此類推。
FIFO Scheduler
是最簡單也是最容易理解的調度器,也不需要任何配置,但它并不適用于共享集群。大的應用可能會占用所有集群資源,這就導致其它應用被阻塞。在共享集群中,更適合采用Capacity Scheduler
或Fair Scheduler
,這兩個調度器都允許大任務和小任務在提交的同時獲得一定的系統資源。
下面“Yarn調度器對比圖”展示了這幾個調度器的區別,從圖中可以看出,在FIFO 調度器中,小任務會被大任務阻塞。
而對于Capacity調度器,有一個專門的隊列用來運行小任務,但是為小任務專門設置一個隊列會預先占用一定的集群資源,這就導致大任務的執行時間會落后于使用FIFO調度器時的時間。
在Fair調度器中,我們不需要預先占用一定的系統資源,Fair調度器會為所有運行的job動態的調整系統資源。如下圖所示,當第一個大job提交時,只有這一個job在運行,此時它獲得了所有集群資源;當第二個小任務提交后,Fair調度器會分配一半資源給這個小任務,讓這兩個任務公平的共享集群資源。
需要注意的是,在下圖Fair調度器中,從第二個任務提交到獲得資源會有一定的延遲,因為它需要等待第一個任務釋放占用的Container。小任務執行完成之后也會釋放自己占用的資源,大任務又獲得了全部的系統資源。最終的效果就是Fair調度器即得到了高的資源利用率又能保證小任務及時完成。
Yarn調度器對比圖:

二、Capacity Scheduler(容器調度器)的配置
2.1 容器調度介紹
Capacity 調度器允許多個組織共享整個集群,每個組織可以獲得集群的一部分計算能力。通過為每個組織分配專門的隊列,然后再為每個隊列分配一定的集群資源,這樣整個集群就可以通過設置多個隊列的方式給多個組織提供服務了。除此之外,隊列內部又可以垂直劃分,這樣一個組織內部的多個成員就可以共享這個隊列資源了,在一個隊列內部,資源的調度是采用的是先進先出(FIFO)策略。
通過上面那幅圖,我們已經知道一個job可能使用不了整個隊列的資源。然而如果這個隊列中運行多個job,如果這個隊列的資源夠用,那么就分配給這些job,如果這個隊列的資源不夠用了呢?其實Capacity調度器仍可能分配額外的資源給這個隊列,這就是“彈性隊列”(queue elasticity)的概念。
在正常的操作中,Capacity調度器不會強制釋放Container,當一個隊列資源不夠用時,這個隊列只能獲得其它隊列釋放后的Container資源。當然,我們可以為隊列設置一個最大資源使用量,以免這個隊列過多的占用空閑資源,導致其它隊列無法使用這些空閑資源,這就是”彈性隊列”需要權衡的地方。
2.2 容器調度的配置
假設我們有如下層次的隊列:
root ├── prod └── dev ├── eng └── science
下面是一個簡單的Capacity調度器的配置文件,文件名為capacity-scheduler.xml
。在這個配置中,在root隊列下面定義了兩個子隊列prod
和dev
,分別占40%和60%的容量。需要注意,一個隊列的配置是通過屬性yarn.sheduler.capacity.<queue-path>.<sub-property>
指定的,<queue-path>
代表的是隊列的繼承樹,如root.prod
隊列,<sub-property>
一般指capacity
和maximum-capacity
。

我們可以看到,dev
隊列又被分成了eng
和science
兩個相同容量的子隊列。dev
的maximum-capacity
屬性被設置成了75%,所以即使prod
隊列完全空閑dev
也不會占用全部集群資源,也就是說,prod
隊列仍有25%的可用資源用來應急。我們注意到,eng
和science
兩個隊列沒有設置maximum-capacity
屬性,也就是說eng
或science
隊列中的job可能會用到整個dev
隊列的所有資源(最多為集群的75%)。而類似的,prod
由于沒有設置maximum-capacity屬性,它有可能會占用集群全部資源。
Capacity容器除了可以配置隊列及其容量外,我們還可以配置一個用戶或應用可以分配的最大資源數量、可以同時運行多少應用、隊列的ACL認證等。
2.3 隊列的設置
關于隊列的設置,這取決于我們具體的應用。比如,在MapReduce中,我們可以通過mapreduce.job.queuename
屬性指定要用的隊列。如果隊列不存在,我們在提交任務時就會收到錯誤。如果我們沒有定義任何隊列,所有的應用將會放在一個default
隊列中。
注意:對于Capacity調度器,我們的隊列名必須是隊列樹中的最后一部分,如果我們使用隊列樹則不會被識別。比如,在上面配置中,我們使用prod
和eng
作為隊列名是可以的,但是如果我們用root.dev.eng
或者dev.eng
是無效的。
三、Fair Scheduler(公平調度器)的配置
3.1 公平調度
Fair調度器的設計目標是為所有的應用分配公平的資源(對公平的定義可以通過參數來設置)。在上面的“Yarn調度器對比圖”展示了一個隊列中兩個應用的公平調度;當然,公平調度在也可以在多個隊列間工作。舉個例子,假設有兩個用戶A和B,他們分別擁有一個隊列。當A啟動一個job而B沒有任務時,A會獲得全部集群資源;當B啟動一個job后,A的job會繼續運行,不過一會兒之后兩個任務會各自獲得一半的集群資源。如果此時B再啟動第二個job并且其它job還在運行,則它將會和B的第一個job共享B這個隊列的資源,也就是B的兩個job會用于四分之一的集群資源,而A的job仍然用于集群一半的資源,結果就是資源最終在兩個用戶之間平等的共享。過程如下圖所示:

3.2 啟用Fair Scheduler
調度器的使用是通過yarn-site.xml
配置文件中的yarn.resourcemanager.scheduler.class
參數進行配置的,默認采用Capacity Scheduler調度器。如果我們要使用Fair調度器,需要在這個參數上配置FairScheduler類的全限定名: org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
。
3.3 隊列的配置
Fair調度器的配置文件位于類路徑下的fair-scheduler.xml
文件中,這個路徑可以通過yarn.scheduler.fair.allocation.file
屬性進行修改。若沒有這個配置文件,Fair調度器采用的分配策略,這個策略和3.1節介紹的類似:調度器會在用戶提交第一個應用時為其自動創建一個隊列,隊列的名字就是用戶名,所有的應用都會被分配到相應的用戶隊列中。
我們可以在配置文件中配置每一個隊列,并且可以像Capacity 調度器一樣分層次配置隊列。比如,參考capacity-scheduler.xml
來配置fair-scheduler:

隊列的層次是通過嵌套<queue>
元素實現的。所有的隊列都是root
隊列的孩子,即使我們沒有配到<root>
元素里。在這個配置中,我們把dev
隊列有分成了eng
和science
兩個隊列。
Fair調度器中的隊列有一個權重屬性(這個權重就是對公平的定義),并把這個屬性作為公平調度的依據。在這個例子中,當調度器分配集群40:60
資源給prod
和dev
時便視作公平,eng
和science
隊列沒有定義權重,則會被平均分配。這里的權重并不是百分比,我們把上面的40和60分別替換成2和3,效果也是一樣的。注意,對于在沒有配置文件時按用戶自動創建的隊列,它們仍有權重并且權重值為1。
每個隊列內部仍可以有不同的調度策略。隊列的默認調度策略可以通過頂級元素<defaultQueueSchedulingPolicy>
進行配置,如果沒有配置,默認采用公平調度。
盡管是Fair調度器,其仍支持在隊列級別進行FIFO調度。每個隊列的調度策略可以被其內部的<schedulingPolicy>
元素覆蓋,在上面這個例子中,prod
隊列就被指定采用FIFO進行調度,所以,對于提交到prod
隊列的任務就可以按照FIFO規則順序的執行了。需要注意,prod
和dev
之間的調度仍然是公平調度,同樣eng
和science
也是公平調度。
盡管上面的配置中沒有展示,每個隊列仍可配置最大、最小資源占用數和最大可運行的應用的數量。
3.4 隊列的設置
Fair調度器采用了一套基于規則的系統來確定應用應該放到哪個隊列。在上面的例子中,<queuePlacementPolicy>
元素定義了一個規則列表,其中的每個規則會被逐個嘗試直到匹配成功。例如,上例第一個規則specified
,則會把應用放到它指定的隊列中,若這個應用沒有指定隊列名或隊列名不存在,則說明不匹配這個規則,然后嘗試下一個規則。primaryGroup
規則會嘗試把應用放在以用戶所在的Unix組名命名的隊列中,如果沒有這個隊列,不創建隊列轉而嘗試下一個規則。當前面所有規則不滿足時,則觸發default
規則,把應用放在dev.eng
隊列中。
當然,我們可以不配置queuePlacementPolicy
規則,調度器則默認采用如下規則:
<queuePlacementPolicy> <rule name="specified" /> <rule name="user" /> </queuePlacementPolicy>
上面規則可以歸結成一句話,除非隊列被準確的定義,否則會以用戶名為隊列名創建隊列。
還有一個簡單的配置策略可以使得所有的應用放入同一個隊列(default),這樣就可以讓所有應用之間平等共享集群而不是在用戶之間。這個配置的定義如下:
<queuePlacementPolicy> <rule name="default" /> </queuePlacementPolicy>
實現上面功能我們還可以不使用配置文件,直接設置yarn.scheduler.fair.user-as-default-queue=false
,這樣應用便會被放入default 隊列,而不是各個用戶名隊列。另外,我們還可以設置yarn.scheduler.fair.allow-undeclared-pools=false
,這樣用戶就無法創建隊列了。
3.5 搶占(Preemption)
當一個job提交到一個繁忙集群中的空隊列時,job并不會馬上執行,而是阻塞直到正在運行的job釋放系統資源。為了使提交job的執行時間更具預測性(可以設置等待的超時時間),Fair調度器支持搶占。
搶占就是允許調度器殺掉占用超過其應占份額資源隊列的containers,這些containers資源便可被分配到應該享有這些份額資源的隊列中。需要注意搶占會降低集群的執行效率,因為被終止的containers需要被重新執行。
可以通過設置一個全局的參數yarn.scheduler.fair.preemption=true
來啟用搶占功能。此外,還有兩個參數用來控制搶占的過期時間(這兩個參數默認沒有配置,需要至少配置一個來允許搶占Container):
- minimum share preemption timeout - fair share preemption timeout
如果隊列在minimum share preemption timeout
指定的時間內未獲得最小的資源保障,調度器就會搶占containers。我們可以通過配置文件中的頂級元素<defaultMinSharePreemptionTimeout>
為所有隊列配置這個超時時間;我們還可以在<queue>
元素內配置<minSharePreemptionTimeout>
元素來為某個隊列指定超時時間。
與之類似,如果隊列在fair share preemption timeout
指定時間內未獲得平等的資源的一半(這個比例可以配置),調度器則會進行搶占containers。這個超時時間可以通過頂級元素<defaultFairSharePreemptionTimeout>
和元素級元素<fairSharePreemptionTimeout>
分別配置所有隊列和某個隊列的超時時間。上面提到的比例可以通過<defaultFairSharePreemptionThreshold>
(配置所有隊列)和<fairSharePreemptionThreshold>
(配置某個隊列)進行配置,默認是0.5。
在做Shuffle階段的優化過程中,遇到了數據傾斜的問題,造成了對一些情況下優化效果不明顯。主要是因為在Job完成后的所得到的Counters是整個Job的總和,優化是基于這些Counters得出的平均值,而由于數據傾斜的原因造成map處理數據量的差異過大,使得這些平均值能代表的價值降低。Hive的執行是分階段的,map處理數據量的差異取決于上一個stage的reduce輸出,所以如何將數據均勻的分配到各個reduce中,就是解決數據傾斜的根本所在。規避錯誤來更好的運行比解決錯誤更高效。在查看了一些資料后,總結如下。
1數據傾斜的原因
1.1操作:
關鍵詞 | 情形 | 后果 |
Join | 其中一個表較小, 但是key集中 | 分發到某一個或幾個Reduce上的數據遠高于平均值 |
大表與大表,但是分桶的判斷字段0值或空值過多 | 這些空值都由一個reduce處理,灰常慢 |
group by | group by 維度過小, 某值的數量過多 | 處理某值的reduce灰常耗時 |
Count Distinct | 某特殊值過多 | 處理此特殊值的reduce耗時 |
1.2原因:
1)、key分布不均勻
2)、業務數據本身的特性
3)、建表時考慮不周
4)、某些SQL語句本身就有數據傾斜
1.3表現:
任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大。
單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大于平均時長。
2數據傾斜的解決方案
2.1參數調節:
hive.map.aggr=true
Map 端部分聚合,相當于Combiner
hive.groupby.skewindata=true
有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,并輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
2.2 SQL語句調節:
如何Join:
關于驅動表的選取,選用join key分布最均勻的表作為驅動表
做好列裁剪和filter操作,以達到兩表做join的時候,數據量相對變小的效果。
大小表Join:
使用map join讓小的維度表(1000條以下的記錄條數) 先進內存。在map端完成reduce.
大表Join大表:
把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由于null值關聯不上,處理后并不影響最終結果。
count distinct大量相同特殊值
count distinct時,將值為空的情況單獨處理,如果是計算count distinct,可以不用處理,直接過濾,在最后結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
group by維度過小:
采用sum() group by的方式來替換count(distinct)完成計算。
特殊情況特殊處理:
在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最后union回去。
3典型的業務場景
3.1空值產生的數據傾斜
場景:如日志中,常會有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關聯,會碰到數據傾斜的問題。
解決方法1: user_id為空的不參與關聯(紅色字體為修改后)
select * from log a join users b on a.user_id is not null and a.user_id = b.user_id union all select * from log a where a.user_id is null;
解決方法2 :賦與空值分新的key值
select * from log a left outer join users b on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1中 log讀取兩次,jobs是2。解決方法2 job數是1 。這個優化適合無效 id (比如 -99 , ’’, null 等) 產生的傾斜問題。把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。
3.2不同數據類型關聯產生數據傾斜
場景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時,默認的Hash操作會按int型的id來進行分配,這樣會導致所有string類型id的記錄都分配到一個Reducer中。
解決方法:把數字類型轉換成字符串類型
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)
3.3小表不小不大,怎么用 map join 解決傾斜問題
使用 map join 解決小表(記錄數少)關聯大表的數據傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到map join會出現bug或異常,這時就需要特別的處理。 以下例子:
select * from log a left outer join users b on a.user_id = b.user_id;
users 表有 600w+ 的記錄,把 users 分發到所有的 map 上也是個不小的開銷,而且 map join 不支持這么大的小表。如果用普通的 join,又會碰到數據傾斜的問題。
解決方法:
select /*+mapjoin(x)*/* from log a
left outer join (
select /*+mapjoin(c)*/d.* from (
select distinct user_id from log ) c join users d
on c.user_id = d.user_id ) x
on a.user_id = b.user_id;
假如,log里user_id有上百萬個,這就又回到原來map join問題。所幸,每日的會員uv不會太多,有交易的會員不會太多,有點擊的會員不會太多,有傭金的會員不會太多等等。所以這個方法能解決很多場景下的數據傾斜問題。
4總結
使map的輸出數據更均勻的分布到reduce中去,是我們的最終目標。由于Hash算法的局限性,按key Hash會或多或少的造成數據傾斜。大量經驗表明數據傾斜的原因是人為的建表疏忽或業務邏輯可以規避的。在此給出較為通用的步驟:
1、采樣log表,哪些user_id比較傾斜,得到一個結果表tmp1。由于對計算框架來說,所有的數據過來,他都是不知道數據分布情況的,所以采樣是并不可少的。
2、數據的分布符合社會學統計規則,貧富不均。傾斜的key不會太多,就像一個社會的富人不多,奇特的人不多一樣。所以tmp1記錄數會很少。把tmp1和users做map join生成tmp2,把tmp2讀到distribute file cache。這是一個map過程。
3、map讀入users和log,假如記錄來自log,則檢查user_id是否在tmp2里,如果是,輸出到本地文件a,否則生成<user_id,value>的key,value對,假如記錄來自member,生成<user_id,value>的key,value對,進入reduce階段。
4、最終把a文件,把Stage3 reduce階段輸出的文件合并起寫到hdfs。
如果確認業務需要這樣傾斜的邏輯,考慮以下的優化方案:
1、對于join,在判斷小表不大于1G的情況下,使用map join
2、對于group by或distinct,設定 hive.groupby.skewindata=true
3、盡量使用上述的SQL語句調節進行優化
Hive的一般學習者和培訓者在談性能優化的時候一般都會從語法和參數這些雕蟲小技的角度談優化,而不會革命性的優化Hive的性能,產生這種現象的原因有:1,歷史原因和思維定勢:大家學習SQL的時候一般都是就單機DB,這個時候你的性能優化技巧確實主要是SQL語法和參數調優;2,Hive的核心的性能問題往往是產生在超過規模數據集,例如說100億條級別的數據集,以及每天處理上千上萬個Hive作業的情況下產生的;上面的第二點是我們現在Hive性能調優部分要徹底解決的內容;要從根本上解決和顯著的解決實際企業中Hive真正的性能優化問題,必須考慮到底什么是Hive性能的限制,我們按照優先級來說:第一重要的是:戰略性架構 解決海量數據下大量Job過于頻繁的IO問題,而這個問題實質上涉及了架構方面的分表 數據復用 以及分區表等調優的方式; 補充:1,海量的數據中有些數據是高頻使用的數據,而有些是很少使用的,如果能夠分離成為不同的表,會極大的提升效率;很多的作業可能會有共同點,抽離出來先進行計算并保留計算結果,后面的作業都可以復用;同時,底層的基礎功能也可以先計算,在上層應用的時候直接拿數據結果,而不是每次都重復計算; 2,合理從用靜態分區表和動態分區表,可以避免數據全局掃描及計算資源更合理的利用; 3,數據傾斜的一站式解決方案;第二重要的是:引擎和物理層面,很多內容都是普通Hive使用這不知道的! 從Hive語法和Job內部的角度去進行優化,這要求MapReduce以及Hive如何被翻譯成為MapReduce要非常精通;第三重要的是:一些關鍵的參數;歸根到底,Hive的性能優化主要考慮的是如何最大化和最有效的使用CPU Memory IO;
Hive背后的Mapper調優:
1,Mapper數過大,會產生大量小文件,由于Mapper是基于虛擬機的,過多的Mapper創建和初始化及關閉虛擬機都會消耗大量的硬件資源;
Mapper數太小,并發度過小,Job執行時間過長,無法充分利用分布式硬件資源;
2,Mapper數據由什么決定呢?
輸入文件數目;
輸入文件的大小;
配置參數;
默認情況下:例如一個文件800M,BLock大小是128M,那么Mapper數目就是7個,6個Mapper處理的數據是 128M, 1個Mapper處理的數據是32M;再例如,一個目錄下有三個文件分別大小問5M 10M 150M
此時會產生4個Mapper,處理的數據分別是5M 10M 128M 22M;
減少Mapper的個數,就要合并小文件,這種小文件有可能是直接來自于數據源的小文件,也可能是Reducer產生的小文件;
set hive.input.format=org.apache.Hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapFiles=true;
set hive.merge.mapredFiles=true;
set hive.merge.size.per.task=256000000
set mapred.max.split.size=256000000
set mapred.min.split.size.per.node=128000000
增加Mapper的個數,一般是通過控制Hive SQL中上一個Job的Reducer個數來控制的,例如在Join操作的時候會把多個表分解為多個Job;
set mapred.map.tasks=2;
set hive.merge.mapFiles=true;
set hive.merge.mapredFiles=true;
set hive.merge.size.per.task=256000000
例如我們有5個300M的文件;按照上面的配置會產生10個Mapper,5個Mapper處理的都是256M的數據,另外5個Mapper處理的都是44M的數據,問題是:大的Mapper會數據傾斜
如何解決,設置set mapred.map.tasks=6,此時根據MapRed的運行機制,會劃分6個Mapper,每個Mapper的處理數據的大小是250M, min(1500M/6, 256M) =250M
Hive背后的Reducer調優:
1,Reducer數目過大的話,會產生很多小文件,每個Reducer都會產生一個文件,如果這些小文件是下一個JOB的輸入,則會需要對小文件進行合并;同樣啟動 初始化和銷毀Reducer的虛擬機也需要消耗大量的硬件;
Reducer數據過小的話,Reduce的時間會比較長,也可能會出現數據傾斜;
2,如何控制Reducer的個數呢?
set hive.exec.reducers.byte.per.reducer=1G
set hive.exec.reducers.max=999
Reducer個數=min(999, Reducer的數據輸入總量/1G);
set mapred.reduce.tasks = 10, 默認是1; 如果說當前的Reducer的結果很大,且被接下來多個Job使用其結果,我們該如何設置參數呢?一般都需要調大該參數;
什么情況下只有一個Reducer?如果不進行Group by但卻需要匯總,或者說Order by,當然如果最后Reducer的數據小于默認的1G的話,也會只有一個Reducer;
1,Hive在分布式運行的時候最害怕的是數據傾斜,這是由于分布式系統的特性決定的,因為分布式系統之所以很快是由于作業平均分配給了不同的節點,不同節點同心協力,從而達到更快處理完作業的目的;
順便說明一下,處理數據傾斜的能力是hadoop和Spark工程師最核心的競爭力之一;
2,Hive中數據傾斜的原因:
數據在分布式節點上分布不平衡;
join時某些key可能特別大;
groupBy的時候某個Key可能特別多;
count(distinct)有可能出現數據傾斜,因為其內部首先會進行groupBy操作;
3,join,我們希望join時候key是分散,如果一個key的數據量特別大,有可能會出現數據傾斜和OOM,一個核心點是:小表join大表,在reduce階段左側的小表會加載進內存,減少OOM的風險;
4,大表join大表的情況:數據傾斜,例如null值,解決辦法一般是要打散null值,例如說使用隨機數等,如果數據傾斜比較嚴重,采用這種方式可以提升至少一倍的速度;
5,mapJoin:小表join(超)大表的時候,可以采用mapJoin的方式把小表全部加載到Mapper端的內存中/*+MAPJOIN(table_name)*/;
6,小表join(超)大表的時候,是否會自動進行mapJoin,想進行mapJoin,需要設置:set hive.auto.convert.join=true,Hive在進行join的時候會判斷左表的大小來決定是否進行mapJoin:
set hive.mapjoin.smalltable.filesize=128000000;
set hive.mapjoin.cache.numrows=100000;
上述參數可以根據實際的硬件機器的內存進行調整,對性能有至關重要的影響,因為沒有了Shuffle;
對于mapJoin我們能夠使用Mapper端JVM中多大的內存呢?
set hive.mapjoin.followby.gby.localtask.max.momery.usage = 0.8
set hive.mapjoin.localtask.max.memory.uage=0.9
7,groupBy,我們可以設置在Mapper端進行部分聚合,最后在Reducer端進行全局聚合
set hive.map.aggr=true;
set hive.groupby.mapaggr.checkinterval=100000
set hive.groupby.skewindata = true 內部會產生兩個Job,第一個Job會通過自己的算法打散傾斜的Key并進行聚合操作且保留結果,第二個Job會完成全部的groupBy操作,會產生Mapper-Reducer-Reducer的結構
8, count(distinct),如果某個字段特別多,容易產生數據傾斜,解決思路:
在查詢語句中例如對null進行過濾,在結果中加1
9, 笛卡爾積:join時候沒有on條件,或者on條件無效,這個時候會使用Reducer進行笛卡爾積的操作;