在SIP項目設計的過程中,對于它龐大的日志在開始時就考慮使用任務分解的多線程處理模式來分析統計,在我從前寫的文章《Tiger Concurrent Practice --日志分析并行分解設計與實現》中有所提到。但是由于統計的內容暫時還是十分簡單,所以就采用Memcache作為計數器,結合MySQL就完成了訪問 控制以及統計的工作。然而未來,對于海量日志分析的工作,還是需要有所準備。現在最火的技術詞匯莫過于“云計算”,在Open API日益盛行的今天,互聯網應用的數據將會越來越有價值,如何去分析這些數據,挖掘其內在價值,就需要分布式計算來支撐海量數據的分析工作。

回過頭來看,早先那種多線程,多任務分解的日志分析設計,其實是分布式計算的一個單機版縮略,如何將這種單機的工作進行分拆,變成協同工作的集群, 其實就是分布式計算框架設計所涉及的。在去年參加BEA大會的時候,BEA和VMWare合作采用虛擬機來構建集群,無非就是希望使得計算機硬件能夠類似 于應用程序中資源池的資源,使用者無需關心資源的分配情況,從而最大化了硬件資源的使用價值。分布式計算也是如此,具體的計算任務交由哪一臺機器執行,執 行后由誰來匯總,這都由分布式框架的Master來抉擇,而使用者只需簡單地將待分析內容提供給分布式計算系統作為輸入,就可以得到分布式計算后的結果。

Hadoop是Apache開源組織的一個分布式計算開源框架,在很多大型網站上都已經得到了應用,如亞馬遜、Facebook和Yahoo等等。 對于我來說,最近的一個使用點就是服務集成平臺的日志分析。服務集成平臺的日志量將會很大,而這也正好符合了分布式計算的適用場景(日志分析和索引建立就 是兩大應用場景)。

當前沒有正式確定使用,所以也是自己業余摸索,后續所寫的相關內容,都是一個新手的學習過程,難免會有一些錯誤,只是希望記錄下來可以分享給更多志同道合的朋友。

什么是Hadoop?

搞什么東西之前,第一步是要知道What(是什么),然后是Why(為什么),最后才是How(怎么做)。但很多開發的朋友在做了多年項目以后,都習慣是先How,然后What,最后才是Why,這樣只會讓自己變得浮躁,同時往往會將技術誤用于不適合的場景。

Hadoop框架中最核心的設計就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇論文所提及而被廣為流傳的, 簡單的一句話解釋MapReduce就是“任務的分解與結果的匯總”。HDFS是Hadoop分布式文件系統(Hadoop Distributed File System)的縮寫,為分布式計算存儲提供了底層支持。

MapReduce從它名字上來看就大致可以看出個緣由,兩個動詞Map和Reduce,“Map(展開)”就是將一個任務分解成為多個任 務,“Reduce”就是將分解后多任務處理的結果匯總起來,得出最后的分析結果。這不是什么新思想,其實在前面提到的多線程,多任務的設計就可以找到這 種思想的影子。不論是現實社會,還是在程序設計中,一項工作往往可以被拆分成為多個任務,任務之間的關系可以分為兩種:一種是不相關的任務,可以并行執 行;另一種是任務之間有相互的依賴,先后順序不能夠顛倒,這類任務是無法并行處理的。回到大學時期,教授上課時讓大家去分析關鍵路徑,無非就是找最省時的 任務分解執行方式。在分布式系統中,機器集群就可以看作硬件資源池,將并行的任務拆分,然后交由每一個空閑機器資源去處理,能夠極大地提高計算效率,同時 這種資源無關性,對于計算集群的擴展無疑提供了最好的設計保證。(其實我一直認為Hadoop的卡通圖標不應該是一個小象,應該是螞蟻,分布式計算就好比 螞蟻吃大象,廉價的機器群可以匹敵任何高性能的計算機,縱向擴展的曲線始終敵不過橫向擴展的斜線)。任務分解處理以后,那就需要將處理以后的結果再匯總起 來,這就是Reduce要做的工作。


圖1:MapReduce結構示意圖

上圖就是MapReduce大致的結構圖,在Map前還可能會對輸入的數據有Split(分割)的過程,保證任務并行效率,在Map之后還會有Shuffle(混合)的過程,對于提高Reduce的效率以及減小數據傳輸的壓力有很大的幫助。后面會具體提及這些部分的細節。

HDFS是分布式計算的存儲基石,Hadoop的分布式文件系統和其他分布式文件系統有很多類似的特質。分布式文件系統基本的幾個特點:

  1. 對于整個集群有單一的命名空間。
  2. 數據一致性。適合一次寫入多次讀取的模型,客戶端在文件沒有被成功創建之前無法看到文件存在。
  3. 文件會被分割成多個文件塊,每個文件塊被分配存儲到數據節點上,而且根據配置會由復制文件塊來保證數據的安全性。

圖2:HDFS結構示意圖

上圖中展現了整個HDFS三個重要角色:NameNode、DataNode和Client。NameNode可以看作是分布式文件系統中的管理 者,主要負責管理文件系統的命名空間、集群配置信息和存儲塊的復制等。NameNode會將文件系統的Meta-data存儲在內存中,這些信息主要包括 了文件信息、每一個文件對應的文件塊的信息和每一個文件塊在DataNode的信息等。DataNode是文件存儲的基本單元,它將Block存儲在本地 文件系統中,保存了Block的Meta-data,同時周期性地將所有存在的Block信息發送給NameNode。Client就是需要獲取分布式文 件系統文件的應用程序。這里通過三個操作來說明他們之間的交互關系。

文件寫入:

  1. Client向NameNode發起文件寫入的請求。
  2. NameNode根據文件大小和文件塊配置情況,返回給Client它所管理部分DataNode的信息。
  3. Client將文件劃分為多個Block,根據DataNode的地址信息,按順序寫入到每一個DataNode塊中。

文件讀取:

  1. Client向NameNode發起文件讀取的請求。
  2. NameNode返回文件存儲的DataNode的信息。
  3. Client讀取文件信息。

文件Block復制:

  1. NameNode發現部分文件的Block不符合最小復制數或者部分DataNode失效。
  2. 通知DataNode相互復制Block。
  3. DataNode開始直接相互復制。

最后再說一下HDFS的幾個設計特點(對于框架設計值得借鑒):

  1. Block的放置:默認不配置。一個Block會有三份備份,一份放在NameNode指定的DataNode,另一份放在與指定 DataNode非同一Rack上的DataNode,最后一份放在與指定DataNode同一Rack上的DataNode上。備份無非就是為了數據安 全,考慮同一Rack的失敗情況以及不同Rack之間數據拷貝性能問題就采用這種配置方式。
  2. 心跳檢測DataNode的健康狀況,如果發現問題就采取數據備份的方式來保證數據的安全性。
  3. 數據復制(場景為DataNode失敗、需要平衡DataNode的存儲利用率和需要平衡DataNode數據交互壓力等情況):這里先 說一下,使用HDFS的balancer命令,可以配置一個Threshold來平衡每一個DataNode磁盤利用率。例如設置了Threshold為 10%,那么執行balancer命令的時候,首先統計所有DataNode的磁盤利用率的均值,然后判斷如果某一個DataNode的磁盤利用率超過這 個均值Threshold以上,那么將會把這個DataNode的block轉移到磁盤利用率低的DataNode,這對于新節點的加入來說十分有用。
  4. 數據交驗:采用CRC32作數據交驗。在文件Block寫入的時候除了寫入數據還會寫入交驗信息,在讀取的時候需要交驗后再讀入。
  5. NameNode是單點:如果失敗的話,任務處理信息將會紀錄在本地文件系統和遠端的文件系統中。
  6. 數據管道性的寫入:當客戶端要寫入文件到DataNode上,首先客戶端讀取一個Block然后寫到第一個DataNode上,然后由第 一個DataNode傳遞到備份的DataNode上,一直到所有需要寫入這個Block的NataNode都成功寫入,客戶端才會繼續開始寫下一個 Block。
  7. 安全模式:在分布式文件系統啟動的時候,開始的時候會有安全模式,當分布式文件系統處于安全模式的情況下,文件系統中的內容不允許修改也 不允許刪除,直到安全模式結束。安全模式主要是為了系統啟動的時候檢查各個DataNode上數據塊的有效性,同時根據策略必要的復制或者刪除部分數據 塊。運行期通過命令也可以進入安全模式。在實踐過程中,系統啟動的時候去修改和刪除文件也會有安全模式不允許修改的出錯提示,只需要等待一會兒即可。

下面綜合MapReduce和HDFS來看Hadoop的結構:


圖3:Hadoop結構示意圖

在Hadoop的系統中,會有一臺Master,主要負責NameNode的工作以及JobTracker的工作。JobTracker的主要職責 就是啟動、跟蹤和調度各個Slave的任務執行。還會有多臺Slave,每一臺Slave通常具有DataNode的功能并負責TaskTracker的 工作。TaskTracker根據應用要求來結合本地數據執行Map任務以及Reduce任務。

說到這里,就要提到分布式計算最重要的一個設計點:Moving Computation is Cheaper than Moving Data。就是在分布式處理中,移動數據的代價總是高于轉移計算的代價。簡單來說就是分而治之的工作,需要將數據也分而存儲,本地任務處理本地數據然后歸 總,這樣才會保證分布式計算的高效性。

為什么要選擇Hadoop?

說完了What,簡單地說一下Why。官方網站已經給了很多的說明,這里就大致說一下其優點及使用的場景(沒有不好的工具,只用不適用的工具,因此選擇好場景才能夠真正發揮分布式計算的作用):

  1. 可擴展:不論是存儲的可擴展還是計算的可擴展都是Hadoop的設計根本。
  2. 經濟:框架可以運行在任何普通的PC上。
  3. 可靠:分布式文件系統的備份恢復機制以及MapReduce的任務監控保證了分布式處理的可靠性。
  4. 高效:分布式文件系統的高效數據交互實現以及MapReduce結合Local Data處理的模式,為高效處理海量的信息作了基礎準備。

使用場景:個人覺得最適合的就是海量數據的分析,其實Google最早提出MapReduce也就是為了海量數 據分析。同時HDFS最早是為了搜索引擎實現而開發的,后來才被用于分布式計算框架中。海量數據被分割于多個節點,然后由每一個節點并行計算,將得出的結 果歸并到輸出。同時第一階段的輸出又可以作為下一階段計算的輸入,因此可以想象到一個樹狀結構的分布式計算圖,在不同階段都有不同產出,同時并行和串行結 合的計算也可以很好地在分布式集群的資源下得以高效的處理。


其實參看Hadoop官方文檔已經能夠很容易配置分布式框架運行環境了,不過這里既然寫了就再多寫一點,同時有一些細節需要注意的也說明一下,其實 也就是這些細節會讓人摸索半天。Hadoop可以單機跑,也可以配置集群跑,單機跑就不需要多說了,只需要按照Demo的運行說明直接執行命令即可。這里 主要重點說一下集群配置運行的過程。

環境

7臺普通的機器,操作系統都是Linux。內存和CPU就不說了,反正Hadoop一大特點就是機器在多不在精。JDK必須是1.5以上的,這個切記。7臺機器的機器名務必不同,后續會談到機器名對于MapReduce有很大的影響。

部署考慮

正如上面我描述的,對于Hadoop的集群來說,可以分成兩大類角色:Master和Slave,前者主要配置NameNode和 JobTracker的角色,負責總管分布式數據和分解任務的執行,后者配置DataNode和TaskTracker的角色,負責分布式數據存儲以及任 務的執行。本來我打算看看一臺機器是否可以配置成Master,同時也作為Slave使用,不過發現在NameNode初始化的過程中以及 TaskTracker執行過程中機器名配置好像有沖突(NameNode和TaskTracker對于Hosts的配置有些沖突,究竟是把機器名對應 IP放在配置前面還是把Localhost對應IP放在前面有點問題,不過可能也是我自己的問題吧,這個大家可以根據實施情況給我反饋)。最后反正決定一 臺Master,六臺Slave,后續復雜的應用開發和測試結果的比對會增加機器配置。

實施步驟

  1. 在所有的機器上都建立相同的目錄,也可以就建立相同的用戶,以該用戶的home路徑來做hadoop的安裝路徑。例如我在所有的機器上都建立了/home/wenchu
  2. 下載Hadoop,先解壓到Master上。這里我是下載的0.17.1的版本。此時Hadoop的安裝路徑就是/home/wenchu/hadoop-0.17.1
  3. 解壓后進入conf目錄,主要需要修改以下文件:hadoop-env.shhadoop-site.xmlmastersslaves

    Hadoop的基礎配置文件是hadoop-default.xml,看Hadoop的代碼可以知道,默認建立一個Job的時候會建立Job的Config,Config首先讀入hadoop-default.xml的配置,然后再讀入hadoop-site.xml的配置(這個文件初始的時候配置為空),hadoop-site.xml中主要配置你需要覆蓋的hadoop-default.xml的系統級配置,以及你需要在你的MapReduce過程中使用的自定義配置(具體的一些使用例如final等參考文檔)。

    以下是一個簡單的hadoop-site.xml的配置:

    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
    <property>
       <name>fs.default.name</name>//你的namenode的配置,機器名加端口
       <value>hdfs://10.2.224.46:54310/</value>
    </property>
    <property>
       <name>mapred.job.tracker</name>//你的JobTracker的配置,機器名加端口
       <value>hdfs://10.2.224.46:54311/</value>
    </property>
    <property>
       <name>dfs.replication</name>//數據需要備份的數量,默認是三
       <value>1</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>//Hadoop的默認臨時路徑,這個最好配置,如果在新增節點或者其他情況下莫名其妙的DataNode啟動不了,就刪除此文件中的tmp目錄即可。不過如果刪除了NameNode機器的此目錄,那么就需要重新執行NameNode格式化的命令。
        <value>/home/wenchu/hadoop/tmp/</value>
    </property>
    <property>
       <name>mapred.child.java.opts</name>//java虛擬機的一些參數可以參照配置
       <value>-Xmx512m</value>
    </property>
    <property>
      <name>dfs.block.size</name>//block的大小,單位字節,后面會提到用處,必須是512的倍數,因為采用crc作文件完整性校驗,默認配置512是checksum的最小單元。
      <value>5120000</value>
      <description>The default block size for new files.</description>
    </property>
    </configuration>

    hadoop-env.sh文件只需要修改一個參數:

    # The java implementation to use.  Required.
    export JAVA_HOME=/usr/ali/jdk1.5.0_10

    配置你的Java路徑,記住一定要1.5版本以上,免得莫名其妙出現問題。

    Masters中配置Masters的IP或者機器名,如果是機器名那么需要在/etc/hosts中有所設置。Slaves中配置的是Slaves的IP或者機器名,同樣如果是機器名需要在/etc/hosts中有所設置。范例如下,我這里配置的都是IP:

    Masters:
    10.2.224.46

    Slaves:
    10.2.226.40
    10.2.226.39
    10.2.226.38
    10.2.226.37
    10.2.226.41
    10.2.224.36
  4. 建立Master到每一臺Slave的SSH受信證書。由于Master將會通過SSH啟動所有Slave的Hadoop,所以需要建立單向或者雙向證書保證命令執行時不需要再輸入密碼。在Master和所有的Slave機器上執行:ssh-keygen -t rsa。執行此命令的時候,看到提示只需要回車。然后就會在/root/.ssh/下面產生id_rsa.pub的證書文件,通過scp將Master機器上的這個文件拷貝到Slave上(記得修改名稱),例如:scp root@masterIP:/root/.ssh/id_rsa.pub /root/.ssh/46_rsa.pub,然后執行cat /root/.ssh/46_rsa.pub >>/root/.ssh/authorized_keys,建立authorized_keys文 件即可,可以打開這個文件看看,也就是rsa的公鑰作為key,user@IP作為value。此時可以試驗一下,從master ssh到slave已經不需要密碼了。由slave反向建立也是同樣。為什么要反向呢?其實如果一直都是Master啟動和關閉的話那么沒有必要建立反 向,只是如果想在Slave也可以關閉Hadoop就需要建立反向。
  5. 將Master上的Hadoop通過scp拷貝到每一個Slave相同的目錄下,根據每一個Slave的Java_HOME的不同修改其hadoop-env.sh
  6. 修改Master上/etc/profile:
    新增以下內容:(具體的內容根據你的安裝路徑修改,這步只是為了方便使用)
    export HADOOP_HOME=/home/wenchu/hadoop-0.17.1
    export PATH=$PATH:$HADOOP_HOME/bin
    修改完畢后,執行source /etc/profile來使其生效。
  7. 在Master上執行Hadoop namenode –format,這是第一需要做的初始化,可以看作格式化吧,以后除了在上面我提到過刪除了Master上的hadoop.tmp.dir目錄,否則是不需要再次執行的。
  8. 然后執行Master上的start-all.sh,這個命令可以直接執行,因為在6中已經添加到了path路徑,這個命令是啟動hdfs和mapreduce兩部分,當然你也可以分開單獨啟動hdfs和mapreduce,分別是bin目錄下的start-dfs.shstart-mapred.sh
  9. 檢查Master的logs目錄,看看Namenode日志以及JobTracker日志是否正常啟動。
  10. 檢查Slave的logs目錄看看Datanode日志以及TaskTracker日志是否正常。
  11. 如果需要關閉,那么就直接執行stop-all.sh即可。

以上步驟就可以啟動Hadoop的分布式環境,然后在Master的機器進入Master的安裝目錄,執行hadoop jar hadoop-0.17.1-examples.jar wordcount輸入路徑和輸出路徑,就可以看到字數統計的效果了。此處的輸入路徑和輸出路徑都指的是HDFS中的路徑,因此你可以首先通過拷貝本地文件系統中的目錄到HDFS中的方式來建立HDFS中的輸入路徑:

hadoop dfs -copyFromLocal /home/wenchu/test-in test-in。其中/home/wenchu/test-in是本地路徑,test-in是將會建立在HDFS中的路徑,執行完畢以后可以通過hadoop dfs –ls看到test-in目錄已經存在,同時可以通過hadoop dfs –ls test-in查看里面的內容。輸出路徑要求是在HDFS中不存在的,當執行完那個demo以后,就可以通過hadoop dfs –ls 輸出路徑看到其中的內容,具體文件的內容可以通過hadoop dfs –cat文件名稱來查看。

經驗總結和注意事項(這部分是我在使用過程中花了一些時間走的彎路):

  1. Master和Slave上的幾個conf配置文件不需要全部同步,如果確定都是通過Master去啟動和關閉,那么Slave機器上的配置不需要去維護。但如果希望在任意一臺機器都可以啟動和關閉Hadoop,那么就需要全部保持一致了。
  2. Master和Slave機器上的/etc/hosts中必須把集群中機器都配置上去,就算在各個配置文件中 使用的是IP。這個吃過不少苦頭,原來以為如果配成IP就不需要去配置Host,結果發現在執行Reduce的時候總是卡住,在拷貝的時候就無法繼續下 去,不斷重試。另外如果集群中如果有兩臺機器的機器名如果重復也會出現問題。
  3. 如果在新增了節點或者刪除節點的時候出現了問題,首先就去刪除Slave的hadoop.tmp.dir,然后重新啟動試試看,如果還是不行那就干脆把Master的hadoop.tmp.dir刪除(意味著dfs上的數據也會丟失),如果刪除了Master的hadoop.tmp.dir,那么就需要重新namenode –format
  4. Map任務個數以及Reduce任務個數配置。前面分布式文件系統設計提到一個文件被放入到分布式文件系統中,會被分割成多個block放置到每一個的DataNode上,默認dfs.block.size應該是64M,也就是說如果你放置到HDFS上的數據小于64,那么將只有一個Block,此時會被放置到某一個DataNode中,這個可以通過使用命令:hadoop dfsadmin –report就可以看到各個節點存儲的情況。也可以直接去某一個DataNode查看目錄:hadoop.tmp.dir/dfs/data/current就 可以看到那些block了。Block的數量將會直接影響到Map的個數。當然可以通過配置來設定Map和Reduce的任務個數。Map的個數通常默認 和HDFS需要處理的blocks相同。也可以通過配置Map的數量或者配置minimum split size來設定,實際的個數為:max(min(block_size,data/#maps),min_split_size)。Reduce可以通過這個公式計算:0.95*num_nodes*mapred.tasktracker.tasks.maximum

總的來說出了問題或者啟動的時候最好去看看日志,這樣心里有底。

Hadoop中的命令(Command)總結

這部分內容其實可以通過命令的Help以及介紹了解,我主要側重于介紹一下我用的比較多的幾個命令。Hadoop dfs 這個命令后面加參數就是對于HDFS的操作,和Linux操作系統的命令很類似,例如:

  • Hadoop dfs –ls就是查看/usr/root目錄下的內容,默認如果不填路徑這就是當前用戶路徑;
  • Hadoop dfs –rmr xxx就是刪除目錄,還有很多命令看看就很容易上手;
  • Hadoop dfsadmin –report這個命令可以全局的查看DataNode的情況;
  • Hadoop job后面增加參數是對于當前運行的Job的操作,例如list,kill等;
  • Hadoop balancer就是前面提到的均衡磁盤負載的命令。

其他就不詳細介紹了。


Hadoop基本流程

一個圖片太大了,只好分割成為兩部分。根據流程圖來說一下具體一個任務執行的情況。

  1. 在分布式環境中客戶端創建任務并提交。
  2. InputFormat做Map前的預處理,主要負責以下工作:
    1. 驗證輸入的格式是否符合JobConfig的輸入定義,這個在實現Map和構建Conf的時候就會知道,不定義可以是Writable的任意子類。
    2. 將input的文件切分為邏輯上的輸入InputSplit,其實這就是在上面提到的在分布式文件系統中blocksize是有大小限制的,因此大文件會被劃分為多個block。
    3. 通過RecordReader來再次處理inputsplit為一組records,輸出給Map。(inputsplit只是邏輯切分的第一步,但是如何根據文件中的信息來切分還需要RecordReader來實現,例如最簡單的默認方式就是回車換行的切分)
  3. RecordReader處理后的結果作為Map的輸入,Map執行定義的Map邏輯,輸出處理后的key和value對應到臨時中間文件。
  4. Combiner可選擇配置,主要作用是在每一個Map執行完分析以后,在本地優先作Reduce的工作,減少在Reduce過程中的數據傳輸量。
  5. Partitioner可選擇配置,主要作用是在多個Reduce的情況下,指定Map的結果由某一個Reduce處理,每一個Reduce都會有單獨的輸出文件。(后面的代碼實例中有介紹使用場景)
  6. Reduce執行具體的業務邏輯,并且將處理結果輸出給OutputFormat。
  7. OutputFormat的職責是,驗證輸出目錄是否已經存在,同時驗證輸出結果類型是否如Config中配置,最后輸出Reduce匯總后的結果。

業務場景和代碼范例

業務場景描述:可設定輸入和輸出路徑(操作系統的路徑非HDFS路徑),根據訪問日志分析某一個應用訪問某一個API的總次數和總流量,統計后分別輸出到兩個文件中。這里僅僅為了測試,沒有去細分很多類,將所有的類都歸并于一個類便于說明問題。


測試代碼類圖

LogAnalysiser就是主類,主要負責創建、提交任務,并且輸出部分信息。內部的幾個子類用途可以參看流程中提到的角色職責。具體地看看幾個類和方法的代碼片斷:

LogAnalysiser::MapClass

    public static class MapClass extends MapReduceBase
        implements Mapper<LongWritable, Text, Text, LongWritable>
    {
        public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
                throws IOException
        {   
            String line = value.toString();//沒有配置RecordReader,所以默認采用line的實現,key就是行號,value就是行內容
            if (line == null || line.equals(""))
                return;
            String[] words = line.split(",");
            if (words == null || words.length < 8)
                return;
            String appid = words[1];
            String apiName = words[2];
            LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));
            Text record = new Text();
            record.set(new StringBuffer("flow::").append(appid)
                            .append("::").append(apiName).toString());
            reporter.progress();
            output.collect(record, recbytes);//輸出流量的統計結果,通過flow::作為前綴來標示。
            record.clear();
            record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());
            output.collect(record, new LongWritable(1));//輸出次數的統計結果,通過count::作為前綴來標示
        }   
    }

LogAnalysiser:: PartitionerClass

    public static class PartitionerClass implements Partitioner<Text, LongWritable>
    {
        public int getPartition(Text key, LongWritable value, int numPartitions)
        {
            if (numPartitions >= 2)//Reduce 個數,判斷流量還是次數的統計分配到不同的Reduce
                if (key.toString().startsWith("flow::"))
                    return 0;
                else
                    return 1;
            else
                return 0;
        }
        public void configure(JobConf job){}   
}

LogAnalysiser:: CombinerClass

參看ReduceClass,通常兩者可以使用一個,不過這里有些不同的處理就分成了兩個。在ReduceClass中藍色的行表示在CombinerClass中不存在。

LogAnalysiser:: ReduceClass

    public static class ReduceClass extends MapReduceBase
        implements Reducer<Text, LongWritable,Text, LongWritable>
    {
        public void reduce(Text key, Iterator<LongWritable> values,
                OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException
        {
            Text newkey = new Text();
            newkey.set(key.toString().substring(key.toString().indexOf("::")+2));
            LongWritable result = new LongWritable();
            long tmp = 0;
            int counter = 0;
            while(values.hasNext())//累加同一個key的統計結果
            {
                tmp = tmp + values.next().get();
               
                counter = counter +1;//擔心處理太久,JobTracker長時間沒有收到報告會認為TaskTracker已經失效,因此定時報告一下
                if (counter == 1000)
                {
                    counter = 0;
                    reporter.progress();
                }
            }
            result.set(tmp);
            output.collect(newkey, result);//輸出最后的匯總結果
        }   
    }

LogAnalysiser

	public static void main(String[] args)
{
try
{
run(args);
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void run(String[] args) throws Exception
{
if (args == null || args.length <2)
{
System.out.println("need inputpath and outputpath");
return;
}
String inputpath = args[0];
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");
shortout = new StringBuffer(shortout).append("-")
.append(formater.format(new Date())).toString();


if (!shortin.startsWith("/"))
shortin = "/" + shortin;
if (!shortout.startsWith("/"))
shortout = "/" + shortout;
shortin = "/user/root" + shortin;
shortout = "/user/root" + shortout;
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory())
{
System.out.println("inputpath not exist or isn't dir!");
return;
}
if (!outputdir.exists())
{
new File(outputpath).mkdirs();
}

JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構建Config
FileSystem fileSys = FileSystem.get(conf);
fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統的文件拷貝到HDFS中

conf.setJobName("analysisjob");
conf.setOutputKeyClass(Text.class);//輸出的key類型,在OutputFormat會檢查
conf.setOutputValueClass(LongWritable.class); //輸出的value類型,在OutputFormat會檢查
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(CombinerClass.class);
conf.setReducerClass(ReduceClass.class);
conf.setPartitionerClass(PartitionerClass.class);
conf.set("mapred.reduce.tasks", "2");//強制需要有兩個Reduce來分別處理流量和次數的統計
FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑
FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑

Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(conf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
//刪除輸入和輸出的臨時文件
fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));
fileSys.delete(new Path(shortin),true);
fileSys.delete(new Path(shortout),true);
}

以上的代碼就完成了所有的邏輯性代碼,然后還需要一個注冊驅動類來注冊業務Class為一個可標示的命令,讓hadoop jar可以執行。

public class ExampleDriver {
  public static void main(String argv[]){
    ProgramDriver pgd = new ProgramDriver();
    try {
      pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");
      pgd.driver(argv);
    }
    catch(Throwable e){
      e.printStackTrace();
    }
  }
}

將代碼打成jar,并且設置jar的mainClass為ExampleDriver這個類。在分布式環境啟動以后執行如下語句:

hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out

在/home/wenchu/test-in中是需要分析的日志文件,執行后就會看見整個執行過程,包括了Map和Reduce的進度。執行完畢會 在/home/wenchu/test-out下看到輸出的內容。有兩個文件:part-00000和part-00001分別記錄了統計后的結果。 如果需要看執行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的Map,Reduce 的創建情況以及執行情況。在運行期也可以通過瀏覽器來查看Map,Reduce的情況:http://MasterIP:50030 /jobtracker.jsp

Hadoop集群測試

首先這里使用上面的范例作為測試,也沒有做太多的優化配置,這個測試結果只是為了看看集群的效果,以及一些參數配置的影響。

文件復制數為1,blocksize 5M

Slave數 處理記錄數(萬條) 執行時間(秒)
2 95 38
2 950 337
4 95 24
4 950 178
6 95 21
6 950 114

Blocksize 5M

Slave數 處理記錄數(萬條) 執行時間(秒)
2(文件復制數為1) 950 337
2(文件復制數為3) 950 339
6(文件復制數為1) 950 114
6(文件復制數為3) 950 117

文件復制數為1

Slave數 處理記錄數(萬條) 執行時間(秒)
6(blocksize 5M) 95 21
6(blocksize 77M) 95 26
4(blocksize 5M) 950 178
4(blocksize 50M) 950 54
6(blocksize 5M) 950 114
6(blocksize 50M) 950 44
6(blocksize 77M) 950 74

測試的數據結果很穩定,基本測幾次同樣條件下都是一樣。通過測試結果可以看出以下幾點:

  1. 機器數對于性能還是有幫助的(等于沒說^_^)。
  2. 文件復制數的增加只對安全性有幫助,但是對于性能沒有太多幫助。而且現在采取的是將操作系統文件拷貝到HDFS中,所以備份多了,準備的時間很長。
  3. blocksize對于性能影響很大,首先如果將block劃分的太小,那么將會增加job的數量,同時也增加了協作的代價,降低了性能,但是配置的太大也會讓job不能最大化并行處理。所以這個值的配置需要根據數據處理的量來考慮。
  4. 最后就是除了這個表里面列出來的結果,應該去仔細看輸出目錄中的_logs/history中的xxx_analysisjob這個文件,里面記錄了全部的執行過程以及讀寫情況。這個可以更加清楚地了解哪里可能會更加耗時。

隨想

“云計算”熱的燙手,就和SAAS、Web2及SNS等一樣,往往都是在搞概念,只有真正踏踏實實的大型互聯網公司,才會投入人力物力去研究符合自 己的分布式計算。其實當你的數據量沒有那么大的時候,這種分布式計算也就僅僅只是一個玩具而已,只有在真正解決問題的過程中,它深層次的問題才會被挖掘出 來。

這三篇文章(分布式計算開源框架Hadoop介紹,Hadoop中的集群配置和使用技巧)僅僅是為了給對分布式計算有興趣的朋友拋個磚,要想真的掘到金 子,那么就踏踏實實的去用、去想、去分析。或者自己也會更進一步地去研究框架中的實現機制,在解決自己問題的同時,也能夠貢獻一些什么。

前幾日看到有人跪求成為架構師的方式,看了有些可悲,有些可笑,其實有多少架構師知道什么叫做架構?架構師的職責是什么?與其追求這么一個名號,還不如踏踏實實地做塊石頭沉到水底。要知道,積累和沉淀的過程就是一種成長。