<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Skynet

    ---------- ---------- 我的新 blog : liukaiyi.cublog.cn ---------- ----------

      BlogJava :: 首頁 :: 聯系 :: 聚合  :: 管理
      112 Posts :: 1 Stories :: 49 Comments :: 0 Trackbacks

    首先 本文中的 hadoop join  在實際開發沒有用處!
    如果在開發中 請使用 cascading  groupby, 進行 hadoop join,
    本文只是為探討弄懂 cascading 實現做準備。

    當然 如果有有人 hadoop join 過 請聯系我,大家交流下 !

    文件可能需要的一些參考:
    hadoop jython ( windows )
    jython ,jython 編譯以及jar 包
    少量 linux shell


    本文介紹 hadoop 可能使用到的 join 接口測試 ,已經參考:
    使用Hadoop實現Inner Join操作的方法【from淘寶】:http://labs.chinamobile.com/groups/58_547

    下面 測試后 ,我這大體上 對 hadoop  join 的方式是這樣理解的 (猜想):
    數據1 ; 數據2
    job1.map( 數據1 ) =(臨時文件1)>  文件標示1+需要join列  數據
    job2.map( 數據2 ) =(臨時文件2)>  文件標示2+需要join列  數據

    臨時文件 mapred.join.expr 生成
    job3.map ->
    文件標示1+需要join列 : 數據
    文件標示2+需要join列 : 數據
    ......
    job3.Combiner - >
    需要join列 : 文件標示1+數據
    需要join列 : 文件標示2+數據
    job3.Reducer->
    需要join列 : 使用 java-list > 生成
      文件2-列x [  數據,數據... ]
      文件1-列x [  數據,數據... ]
    然后 你這 left join ,或 inner join 或 xxx join 邏輯 就自己來吧


    結果集合
    [root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/1
    1
    2
    3
    4
    5
    [root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/2
    2
    4
    3
    1

    修改 ..../hadoop-0.18.3/src/examples/python/compile
    #!/usr/bin/env bash

    export HADOOP_HOME
    =/home/xx/del/jobs/tools/hadoop-0.18.3
    export CASCADING_HOME
    =/home/xx/del/jobs/tools/cascading-1.0.16-hadoop-0.18.3
    export JYTHON_HOME
    =/home/xx/del/jobs/tools/jython2.2.1

    export CLASSPATH
    ="$HADOOP_HOME/hadoop-0.18.3-core.jar"                                            

    # so that filenames w/ spaces are handled correctly in loops below
    IFS=

    # add libs to CLASSPATH

    for f in $HADOOP_HOME/lib/*.jar; do                                                               
      CLASSPATH
    =${CLASSPATH}:$f;
    done

    for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
      CLASSPATH
    =${CLASSPATH}:$f;
    done

    for f in $CASCADING_HOME/*.jar; do
      CLASSPATH
    =${CLASSPATH}:$f;
    done

    for f in $CASCADING_HOME/lib/*.jar; do
      CLASSPATH
    =${CLASSPATH}:$f;
    done


    for f in $JYTHON_HOME/*.jar; do
      CLASSPATH
    =${CLASSPATH}:$f;
    done

    # restore ordinary behaviour
    unset IFS

    /home/xx/del/jobs/tools/jython2.2.1/jythonc -p org.apache.hadoop.examples --j $1.jar  -c $1.py 
    /home/xx/del/jobs/tools/hadoop-0.18.3/bin/hadoop jar $1.jar $2 $3 $4 $5 $6 $7 $8 $9 


    簡單 數據 鏈接 :
    from org.apache.hadoop.fs import Path                                                             
    from org.apache.hadoop.io import *                                                                
    from org.apache.hadoop.mapred.lib import *                                                        
    from org.apache.hadoop.mapred.join  import *                                                      
    from org.apache.hadoop.mapred import *                                                            
    import sys                                                                                        
    import getopt                                                                                     
                                                                                                      
    class tMap(Mapper, MapReduceBase):                                                                
            
    def map(self, key, value, output, reporter):                                              
                    output.collect( Text( str(key) ) , Text( value.toString() ))                      
                                                                                           
                                   
    def main(args):                                                                                   
            conf 
    = JobConf(tMap)                                                                      
            conf.setJobName(
    "wordcount")                                                              
                                                                                                      
            conf.setMapperClass( tMap )                                                               

            FileInputFormat.setInputPaths(conf,[ Path(sp) for sp in args[1:-1]])                      
            conf.setOutputKeyClass( Text )
            conf.setOutputValueClass( Text )                                                         

            conf.setOutputPath(Path(args[
    -1]))                                                        
            
            JobClient.runJob(conf)                                                                    
            
    if __name__ == "__main__":main(sys.argv)     

    運行
    ./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2   file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc78
    結果:
    [xx@localhost wc78]$ cat ../wc78/part-00000
    0    1
    0    2
    2    4
    2    2
    4    3
    4    3
    6    1
    6    4
    8    5


    簡單的數據 join :
    from org.apache.hadoop.fs import Path
    from org.apache.hadoop.io import *
    from org.apache.hadoop.mapred.lib import *
    from org.apache.hadoop.mapred.join  import *
    from org.apache.hadoop.mapred import *
    import sys
    import getopt

    class tMap(Mapper, MapReduceBase):
            
    def map(self, key, value, output, reporter):
                    output.collect( Text( str(key) ) , Text( value.toString() ))

    def main(args):
            conf 
    = JobConf(tMap)
            conf.setJobName(
    "wordcount")
            conf.setMapperClass( tMap )

            conf.set("mapred.join.expr", CompositeInputFormat.compose("override",TextInputFormat, args[1:-1] ) )
            conf.setOutputKeyClass( Text )
            conf.setOutputValueClass( Text )

            conf.setInputFormat(CompositeInputFormat)
         
            conf.setOutputPath(Path(args[
    -1]))

            JobClient.runJob(conf)

    if __name__ == "__main__":main(sys.argv)
            

    運行結果 (  ) :
    ./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2   file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc79
    [xx@localhost wc78]$ cat ../wc79/part-00000
    0    2
    2    4
    4    3
    6    1
    8    5













    整理 m.tkk7.com/Good-Game
    posted on 2009-09-08 10:39 劉凱毅 閱讀(1660) 評論(2)  編輯  收藏 所屬分類: python數據挖掘

    Feedback

    # re: hadoop jython join ( 1 ) 2009-09-08 11:11 zhong
    大規模數據量時,使用hadoop做join操作還是很有意義的  回復  更多評論
      

    # re: hadoop jython join ( 1 ) 2009-09-08 21:18 kenshin
    感覺做join,用cascading比較合適!!邏輯上比較清晰,而且這兩個不是同一概念的東西!!

    搓見  回復  更多評論
      

    主站蜘蛛池模板: 日韩中文无码有码免费视频| 亚洲乱码精品久久久久..| 污视频网站在线观看免费| 中文字幕精品亚洲无线码一区| 久久成人无码国产免费播放| 精品久久亚洲中文无码| 亚洲国产一级在线观看| 99久久久国产精品免费牛牛| 亚洲AV无码资源在线观看| 亚洲av综合色区| 妞干网手机免费视频| 97在线视频免费公开视频| 亚洲av永久无码嘿嘿嘿| 在线观看亚洲成人| 国产福利在线免费| 中文字幕在线视频免费| 亚洲国产精品嫩草影院| 亚洲AV日韩AV永久无码绿巨人| 国产免费久久精品久久久| 久久精品一本到99热免费| 菠萝菠萝蜜在线免费视频| 亚洲女人初试黑人巨高清| 日日噜噜噜噜夜夜爽亚洲精品 | 黄页网站在线观看免费| 亚洲视频一区在线| 亚洲精品偷拍视频免费观看| 一个人看www在线高清免费看| 久久国产乱子伦精品免费强| 特级aa**毛片免费观看| 国产.亚洲.欧洲在线| 亚洲AV人无码激艳猛片| 国产精品亚洲精品日韩已方| 成人人免费夜夜视频观看| 免费A级毛片无码视频| 久久www免费人成精品香蕉| 国产亚洲一卡2卡3卡4卡新区| 亚洲一区二区三区在线| 亚洲永久永久永久永久永久精品| 久久精品国产精品亚洲艾草网美妙 | 亚洲日本在线电影| 亚洲人成777在线播放|