首先 本文中的 hadoop join 在實際開發沒有用處!
如果在開發中 請使用
cascading groupby, 進行 hadoop join,
本文只是為探討弄懂 cascading 實現做準備。
當然 如果有有人 hadoop join 過 請聯系我,大家交流下 !
文件可能需要的一些參考:
hadoop jython ( windows )
jython ,jython 編譯以及jar 包
少量 linux shell
本文介紹 hadoop 可能使用到的 join 接口測試 ,已經參考:
使用Hadoop實現Inner Join操作的方法【from淘寶】:http://labs.chinamobile.com/groups/58_547
下面 測試后 ,我這大體上 對 hadoop join 的方式是這樣理解的 (猜想):
數據1 ; 數據2
job1.map( 數據1 ) =(臨時文件1)> 文件標示1+需要join列 數據
job2.map( 數據2 ) =(臨時文件2)> 文件標示2+需要join列 數據
臨時文件
mapred.join.expr 生成
job3.map ->
文件標示1+需要join列 : 數據
文件標示2+需要join列 : 數據
......
job3.Combiner - >
需要join列 : 文件標示1+數據
需要join列 : 文件標示2+數據
job3.Reducer->
需要join列 : 使用 java-list > 生成
文件2-列x [ 數據,數據... ]
文件1-列x [ 數據,數據... ]
然后 你這 left join ,或 inner join 或 xxx join 邏輯 就自己來吧
結果集合
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/1
1
2
3
4
5
[root@localhost python]# cat /home/megajobs/del/jobs/tools/hadoop-0.18.3/data/090907/2
2
4
3
1
修改 ..../hadoop-0.18.3/src/examples/python/compile
#!/usr/bin/env bash
export HADOOP_HOME=/home/xx/del/jobs/tools/hadoop-0.18.3
export CASCADING_HOME=/home/xx/del/jobs/tools/cascading-1.0.16-hadoop-0.18.3
export JYTHON_HOME=/home/xx/del/jobs/tools/jython2.2.1
export CLASSPATH="$HADOOP_HOME/hadoop-0.18.3-core.jar"
# so that filenames w/ spaces are handled correctly in loops below
IFS=
# add libs to CLASSPATH
for f in $HADOOP_HOME/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $HADOOP_HOME/lib/jetty-ext/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $CASCADING_HOME/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $CASCADING_HOME/lib/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
for f in $JYTHON_HOME/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
# restore ordinary behaviour
unset IFS
/home/xx/del/jobs/tools/jython2.2.1/jythonc -p org.apache.hadoop.examples -d -j $1.jar -c $1.py
/home/xx/del/jobs/tools/hadoop-0.18.3/bin/hadoop jar $1.jar $2 $3 $4 $5 $6 $7 $8 $9
簡單
數據 鏈接 :
from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class tMap(Mapper, MapReduceBase):
def map(self, key, value, output, reporter):
output.collect( Text( str(key) ) , Text( value.toString() ))
def main(args):
conf = JobConf(tMap)
conf.setJobName("wordcount")
conf.setMapperClass( tMap )
FileInputFormat.setInputPaths(conf,[ Path(sp) for sp in args[1:-1]])
conf.setOutputKeyClass( Text )
conf.setOutputValueClass( Text )
conf.setOutputPath(Path(args[-1]))
JobClient.runJob(conf)
if __name__ == "__main__":main(sys.argv)
運行
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2 file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc78
結果:
[xx@localhost wc78]$ cat ../wc78/part-00000
0 1
0 2
2 4
2 2
4 3
4 3
6 1
6 4
8 5
簡單的數據 join :
from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred.lib import *
from org.apache.hadoop.mapred.join import *
from org.apache.hadoop.mapred import *
import sys
import getopt
class tMap(Mapper, MapReduceBase):
def map(self, key, value, output, reporter):
output.collect( Text( str(key) ) , Text( value.toString() ))
def main(args):
conf = JobConf(tMap)
conf.setJobName("wordcount")
conf.setMapperClass( tMap )
conf.set("mapred.join.expr", CompositeInputFormat.compose("override",TextInputFormat, args[1:-1] ) )
conf.setOutputKeyClass( Text )
conf.setOutputValueClass( Text )
conf.setInputFormat(CompositeInputFormat)
conf.setOutputPath(Path(args[-1]))
JobClient.runJob(conf)
if __name__ == "__main__":main(sys.argv)
運行結果 ( ) :
./compile test file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/1 file:///home/xx/del/jobs/tools/hadoop-0.18.3/data/090907/2 file:///home/xx/del/jobs/tools/hadoop-0.18.3/tmp/wc79
[xx@localhost wc78]$ cat ../wc79/part-00000
0 2
2 4
4 3
6 1
8 5
整理 m.tkk7.com/Good-Game