www.HadoopExam.com

HadoopExam Learning Resources

CCD-410 Certifcation CCA-500 Hadoop Administrator Exam HBase Certifcation CCB-400 Data Science Certifcation Hadoop Training with Hands On Lab Hadoop Package Deal

Not executing my hadoop mapper class while parsing xml in hadoop using XMLInputFormat

I am new to hadoop, using Hadoop 2.6.0 version and trying to parse an complex XML. After searching for a while I get to know that for XML parsing we need to write custom InputFormat which is mahout's XMLInputFormat. I also took a help from this example

But when I am running my code after passig XMLInputformat class, It will not call my own Mapper class and the output file is having 0 data in it if I use the XMLInputFormat given in the example.

Surprisingly if I do not pass my XMLInputFormat class to my JOB, then my mapper works fine and giving output properly. Will any one help here to point out what I am missing over here.

My job configuration class is :

publicstaticvoid runParserJob(String inputPath,String outputPath)throwsIOException{
    LOGGER.info("-----runParserJob()-----Start");Configuration configuration =newConfiguration();         configuration.set("xmlinput.start",Constants.XML_INPUT_START_TAG_PRODUCT);
    configuration.set("xmlinput.end",Constants.XML_INPUT_END_TAG_PRODUCT);
    configuration.set("io.serializations",Constants.XML_IO_SERIALIZATIONS);Job job =newJob(configuration,Constants.JOB_TITLE);FileInputFormat.setInputPaths(job, inputPath);
    job.setJarByClass(ParserDriver.class);
    job.setMapperClass(XMLMapper.class);
    job.setNumReduceTasks(0);
    job.setInputFormatClass(XmlInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);Path hdfsOutputPath =newPath(outputPath);FileOutputFormat.setOutputPath(job, hdfsOutputPath);FileSystem dfs =FileSystem.get(hdfsOutputPath.toUri(),configuration);/**Using this condition it will create output at same location 
     * by deleting older data in that location**/if(dfs.exists(hdfsOutputPath)){
        dfs.delete(hdfsOutputPath,true);}try{
        job.waitForCompletion(true);}catch(InterruptedException ie){
        LOGGER.error("-----Process interrupted in between Exception-----", ie);}catch(ClassNotFoundException ce){
        LOGGER.error("-----Class not found while running the job-----",ce);}}

My XMLInputFormat Class is:

publicclassXmlInputFormatextendsTextInputFormat{publicstaticfinalString START_TAG_KEY ="xmlinput.start";publicstaticfinalString END_TAG_KEY ="xmlinput.end";@OverridepublicRecordReader<LongWritable,Text> createRecordReader(InputSplit is,TaskAttemptContext tac){returnnewXmlRecordReader();}publicstaticclassXmlRecordReaderextendsRecordReader<LongWritable,Text>{privatebyte[] startTag;privatebyte[] endTag;privatelong start;privatelong end;privateFSDataInputStream fsin;privateDataOutputBuffer buffer =newDataOutputBuffer();privateLongWritable key =newLongWritable();privateText value =newText();@Overridepublicvoid initialize(InputSplit inputSplit,TaskAttemptContext taskAttemptContext)throwsIOException,InterruptedException{FileSplit fileSplit =(FileSplit)inputSplit;
        startTag = taskAttemptContext.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
        endTag = taskAttemptContext.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");

        start = fileSplit.getStart();
        end = start + fileSplit.getLength();Path file = fileSplit.getPath();FileSystem hdfs = file.getFileSystem(taskAttemptContext.getConfiguration());
         fsin = hdfs.open(fileSplit.getPath());
         fsin.seek(start);}@Overridepublicboolean nextKeyValue()throwsIOException,InterruptedException{if(fsin.getPos()< end){if(readUntilMatch(startTag,false)){try{
                    buffer.write(startTag);if(readUntilMatch(endTag,true)){
                        value.set(buffer.getData(),0, buffer.getLength());
                        key.set(fsin.getPos());returntrue;}}finally{
                    buffer.reset();}}}returnfalse;}@Overridepublicvoid close()throwsIOException{}@OverridepublicLongWritable getCurrentKey()throwsIOException,InterruptedException{returnnull;}@OverridepublicText getCurrentValue()throwsIOException,InterruptedException{returnnull;}@Overridepublicfloat getProgress()throwsIOException,InterruptedException{return0;}privateboolean readUntilMatch(byte[] match,boolean withinBlock)throwsIOException{int i =0;while(true){int b = fsin.read();//If reaches to EOFif(b ==-1){returnfalse;}//If not then save into the buffer.if(withinBlock){
                buffer.write(b);}// check if we're matching:if(b == match[i]){
              i++;if(i >= match.length)returntrue;}else i =0;// see if we've passed the stop point:if(!withinBlock && i ==0&& fsin.getPos()>= end)returnfalse;}}}

}


Can any one help me out here? Thanks in advance. Correct me if I am going wrong anywhere.


I am not sure how your XML structure looks like, but for example if you have a XML structure:

<data><product id="101" itemCategory="BER" transaction="PUR"><transaction-id>102A5RET</transaction-id><item-name>Blue-Moon-12-PK-BTTLE</item-name><item-purchased>2</item-purchased><item-price>12.99</item-price><time-stamp>2015-04-2011:12:13102301</time-stamp></product>...</data>

Your XMLInputFormat class would need to know which XML node you want to work with:

configuration.set("xmlinput.start","<product")//note only <product
configuration.set("xmlinput.end","</product>")//note only </product>

Hope this will help!!

Add comment


Security code
Refresh

You are here: Home Question & Answer Hadoop Questions Not executing my hadoop mapper class while parsing xml in hadoop using XMLInputFormat