<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 495,comments - 227,trackbacks - 0
    http://www.rigongyizu.com/use-multiinputformat-read-different-files-in-one-job/

    hadoop中提供了 MultiOutputFormat 能將結(jié)果數(shù)據(jù)輸出到不同的目錄,也提供了 FileInputFormat 來一次讀取多個目錄的數(shù)據(jù),但是默認(rèn)一個job只能使用 job.setInputFormatClass 設(shè)置使用一個inputfomat處理一種格式的數(shù)據(jù)。如果需要實現(xiàn) 在一個job中同時讀取來自不同目錄的不同格式文件 的功能,就需要自己實現(xiàn)一個 MultiInputFormat 來讀取不同格式的文件了(原來已經(jīng)提供了MultipleInputs)。

    例如:有一個mapreduce job需要同時讀取兩種格式的數(shù)據(jù),一種格式是普通的文本文件,用 LineRecordReader 一行一行讀取;另外一種文件是偽XML文件,用自定義的AJoinRecordReader讀取。

    自己實現(xiàn)了一個簡單的 MultiInputFormat 如下:

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     
    public class MultiInputFormat extends TextInputFormat {
     
        @Override
        public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
            RecordReader reader = null;
            try {
                String inputfile = ((FileSplit) split).getPath().toString();
                String xmlpath = context.getConfiguration().get("xml_prefix");
                String textpath = context.getConfiguration().get("text_prefix");
     
                if (-1 != inputfile.indexOf(xmlpath)) {
                    reader = new AJoinRecordReader();
                } else if (-1 != inputfile.indexOf(textpath)) {
                    reader = new LineRecordReader();
                } else {
                    reader = new LineRecordReader();
                }
            } catch (IOException e) {
                // do something ...
            }
     
            return reader;
        }
    }

    其實原理很簡單,就是在 createRecordReader 的時候,通過 ((FileSplit) split).getPath().toString() 獲取到當(dāng)前要處理的文件名,然后根據(jù)特征匹配,選取對應(yīng)的 RecordReader 即可。xml_prefix和text_prefix可以在程序啟動時通過 -D 傳給Configuration。

    比如某次執(zhí)行打印的值如下:

    inputfile=hdfs://test042092.sqa.cm4:9000/test/input_xml/common-part-00068
    xmlpath_prefix=hdfs://test042092.sqa.cm4:9000/test/input_xml
    textpath_prefix=hdfs://test042092.sqa.cm4:9000/test/input_txt

    這里只是通過簡單的文件路徑和標(biāo)示符匹配來做,也可以采用更復(fù)雜的方法,比如文件名、文件后綴等。

    接著在map類中,也同樣可以根據(jù)不同的文件名特征進(jìn)行不同的處理:

    @Override
    public void map(LongWritable offset, Text inValue, Context context)
            throws IOException {
     
        String inputfile = ((FileSplit) context.getInputSplit()).getPath()
                .toString();
     
        if (-1 != inputfile.indexOf(textpath)) {
            ......
        } else if (-1 != inputfile.indexOf(xmlpath)) {
            ......
        } else {
            ......
        }
    }

    這種方式太土了,原來hadoop里面已經(jīng)提供了 MultipleInputs 來實現(xiàn)對一個目錄指定一個inputformat和對應(yīng)的map處理類。

    MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
       MapClass.class);
    MultipleInputs.addInputPath(conf, new Path("/bar"),
       KeyValueTextInputFormat.class, MapClass2.class);
    posted on 2014-09-16 09:27 SIMONE 閱讀(2750) 評論(0)  編輯  收藏 所屬分類: hadoop
    主站蜘蛛池模板: 边摸边吃奶边做爽免费视频99 | 青娱乐免费在线视频| 亚洲一区二区三区在线观看精品中文| 国产国产人免费视频成69大陆| 亚洲日韩区在线电影| 免费网站看av片| 黑人大战亚洲人精品一区| 三年片免费观看大全国语| 国产成人亚洲综合| GOGOGO高清免费看韩国| 久久精品国产精品亚洲精品| 免费成人在线电影| 亚洲卡一卡2卡三卡4卡无卡三| 免费A级毛片无码专区| 亚洲欧洲自拍拍偷综合| 国产桃色在线成免费视频| 香蕉大伊亚洲人在线观看| 99re热免费精品视频观看| 2020天堂在线亚洲精品专区| 好先生在线观看免费播放| 亚洲色大成网站www久久九| 成年女人永久免费观看片| 国产99久久亚洲综合精品| 亚洲日韩在线观看| 免费无码又爽又刺激网站| 亚洲理论精品午夜电影| 24小时日本在线www免费的| 狠狠综合亚洲综合亚洲色| 亚洲综合无码AV一区二区| 久久青草国产免费观看| 亚洲色欲色欲www| 免费成人av电影| 免费精品久久天干天干| 亚洲午夜国产精品无卡| 国产在线观看免费不卡| 两个人日本WWW免费版| 亚洲日韩乱码中文无码蜜桃臀| 免费看美女让人桶尿口| 在线播放国产不卡免费视频| 亚洲乱亚洲乱淫久久| 日本免费人成视频播放 |