Not executing my hadoop mapper class while parsing xml in hadoop using XMLInputFormat
I am new to hadoop, using Hadoop 2.6.0 version and trying to parse an complex XML. After searching for a while I get to know that for XML parsing we need to write custom InputFormat which is mahout's XMLInputFormat. I also took a help from this example
But when I am running my code after passig XMLInputformat class, It will not call my own Mapper class and the output file is having 0 data in it if I use the XMLInputFormat given in the example.
Surprisingly if I do not pass my XMLInputFormat class to my JOB, then my mapper works fine and giving output properly. Will any one help here to point out what I am missing over here.
My job configuration class is :
publicstaticvoid runParserJob(String inputPath,String outputPath)throwsIOException{
LOGGER.info("-----runParserJob()-----Start");Configuration configuration =newConfiguration(); configuration.set("xmlinput.start",Constants.XML_INPUT_START_TAG_PRODUCT);
configuration.set("xmlinput.end",Constants.XML_INPUT_END_TAG_PRODUCT);
configuration.set("io.serializations",Constants.XML_IO_SERIALIZATIONS);Job job =newJob(configuration,Constants.JOB_TITLE);FileInputFormat.setInputPaths(job, inputPath);
job.setJarByClass(ParserDriver.class);
job.setMapperClass(XMLMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XmlInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);Path hdfsOutputPath =newPath(outputPath);FileOutputFormat.setOutputPath(job, hdfsOutputPath);FileSystem dfs =FileSystem.get(hdfsOutputPath.toUri(),configuration);/**Using this condition it will create output at same location
* by deleting older data in that location**/if(dfs.exists(hdfsOutputPath)){
dfs.delete(hdfsOutputPath,true);}try{
job.waitForCompletion(true);}catch(InterruptedException ie){
LOGGER.error("-----Process interrupted in between Exception-----", ie);}catch(ClassNotFoundException ce){
LOGGER.error("-----Class not found while running the job-----",ce);}}
My XMLInputFormat Class is:
publicclassXmlInputFormatextendsTextInputFormat{publicstaticfinalString START_TAG_KEY ="xmlinput.start";publicstaticfinalString END_TAG_KEY ="xmlinput.end";@OverridepublicRecordReader<LongWritable,Text> createRecordReader(InputSplit is,TaskAttemptContext tac){returnnewXmlRecordReader();}publicstaticclassXmlRecordReaderextendsRecordReader<LongWritable,Text>{privatebyte[] startTag;privatebyte[] endTag;privatelong start;privatelong end;privateFSDataInputStream fsin;privateDataOutputBuffer buffer =newDataOutputBuffer();privateLongWritable key =newLongWritable();privateText value =newText();@Overridepublicvoid initialize(InputSplit inputSplit,TaskAttemptContext taskAttemptContext)throwsIOException,InterruptedException{FileSplit fileSplit =(FileSplit)inputSplit;
startTag = taskAttemptContext.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
endTag = taskAttemptContext.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");
start = fileSplit.getStart();
end = start + fileSplit.getLength();Path file = fileSplit.getPath();FileSystem hdfs = file.getFileSystem(taskAttemptContext.getConfiguration());
fsin = hdfs.open(fileSplit.getPath());
fsin.seek(start);}@Overridepublicboolean nextKeyValue()throwsIOException,InterruptedException{if(fsin.getPos()< end){if(readUntilMatch(startTag,false)){try{
buffer.write(startTag);if(readUntilMatch(endTag,true)){
value.set(buffer.getData(),0, buffer.getLength());
key.set(fsin.getPos());returntrue;}}finally{
buffer.reset();}}}returnfalse;}@Overridepublicvoid close()throwsIOException{}@OverridepublicLongWritable getCurrentKey()throwsIOException,InterruptedException{returnnull;}@OverridepublicText getCurrentValue()throwsIOException,InterruptedException{returnnull;}@Overridepublicfloat getProgress()throwsIOException,InterruptedException{return0;}privateboolean readUntilMatch(byte[] match,boolean withinBlock)throwsIOException{int i =0;while(true){int b = fsin.read();//If reaches to EOFif(b ==-1){returnfalse;}//If not then save into the buffer.if(withinBlock){
buffer.write(b);}// check if we're matching:if(b == match[i]){
i++;if(i >= match.length)returntrue;}else i =0;// see if we've passed the stop point:if(!withinBlock && i ==0&& fsin.getPos()>= end)returnfalse;}}}
}
Can any one help me out here? Thanks in advance. Correct me if I am going wrong anywhere.
I am not sure how your XML structure looks like, but for example if you have a XML structure:
<data><product id="101" itemCategory="BER" transaction="PUR"><transaction-id>102A5RET</transaction-id><item-name>Blue-Moon-12-PK-BTTLE</item-name><item-purchased>2</item-purchased><item-price>12.99</item-price><time-stamp>2015-04-2011:12:13102301</time-stamp></product>...</data>
Your XMLInputFormat class would need to know which XML node you want to work with:
configuration.set("xmlinput.start","<product")//note only <product
configuration.set("xmlinput.end","</product>")//note only </product>
Hope this will help!!