Hadoop Mapreduce LogoSo far in the series of articles we have seen how to create a mapreduce program without writing explicit mapper or reducer also in the second part we wrote the wordcount with our own custom mapper and reducer

In this article we will have a look at the modification to our previous program wordcount with our own custom mapper and reducer by implementing a concept called as custom record reader. Before we attack the problem let us look at some theory required to understand the topic.

(Input format theory information reference from yahoo tutorial )

The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats.

More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.

Another important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called “splits” and are encapsulated in instances of the InputSplit interface. Most files, for example, are split up on the boundaries of the underlying blocks in HDFS, and are represented by instances of the FileInputSplitclass. Other files may be unsplittable, depending on application-specific data. Dividing up other data sources (e.g., tables from a database) into splits would be performed in a different, application-specific fashion. When dividing the data into input splits, it is important that this process be quick and cheap. The data itself should not need to be accessed to perform this process (as it is all done by a single machine at the start of the MapReduce job).

So in nutshell InputFormat does 2 tasks :

  1. Divide the data source ( the data files ) into fragments or blocks which are sent to a mapper. These are called splits.
  2. These splits are further divided into records and these records are provided one at a time to the mapper for processing. This is achieved through a class called as Record Reader

We will concentrate on customizing #2 above customizing #1 will be left for one of the next articles. By customizing record reader as in #2 above we get immense power of sending any kind of records / xml sections / JSON objects to the mapper after reading it from the source text files

Okey. Now that we understand how mapper is fed data from source files lets look at what we will try to achieve in the example program in this article.

Problem : We want our mapper to receive 3 records ( 3 lines ) from the source file at a time instead on 1 line as provided by default by the TextInputFormat.

Approach :

  1. We will extend from  TextInputFormat class to create our own NLinesInputFormat .
  2. We will also create our own RecordReader class called NLinesRecordReader where we will implement the logic of feeding 3 lines/records at a time.
  3. We will make a change in our driver program to use our new NLinesInputFormat class.
  4. To prove that we are really getting 3 lines at a time, instead of actually counting words ( which we already know now how to do ) , we will emit out number of lines we get in the input at a time as a key and 1 as a value , which after going through reducer will give us frequency of  each unique number of lines to the mappers.

Example :

Step 1 : Creating NLinesInputFormat class as  a custom inputformat class.

This is really straightforward, we will inherit our class from TextInputFormat and override createInputFormat( ) function to use our custom record reader class NLinesRecordReader which we will soon write.   The sourcelisting for this follows :

public class NLinesInputFormat extends TextInputFormat{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
        return new NLinesRecordReader();
    }
}

Now that we have our new inputformat ready lets look at creating custom record reader. This is little complicated and the source code is a modified version of hadoop’s own LineInputFormat.

Step 2:  Creating NLinesRecordReader class as a custom RecordReader class.

We will inherit from RecordReader class. RecordReader has 6 abstract methods which we will have to implement.

  • close ( )
  • getCurrentKey ( )
  • getCurrentValue ( )
  • getProgress ( )
  • initialize ( )
  • nextKeyValue ( )

The most important ones for our discussion are the initialize and nextKeyvalue functions which we will override. The initialize function will be called only once for each split so we will do setup in this function and the nextKeyValue function is called for providing records, here we will write logic so that we send 3 records in the value instead of default 1. Here is the source listing for the class :

public class NLinesRecordReader extends RecordReader<LongWritable, Text>{
    private final int NLINESTOPROCESS = 3;
    private LineReader in;
    private LongWritable key;
    private Text value = new Text();
    private long start =0;
    private long end =0;
    private long pos =0;
    private int maxLineLength;

@Override
    public void close() throws IOException {
        if (in != null) {
            in.close();
        }
    }

@Override
    public LongWritable getCurrentKey() throws IOException,InterruptedException {
        return key;
    }

@Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

@Override
    public float getProgress() throws IOException, InterruptedException {
        if (start == end) {
            return 0.0f;
        }
        else {
            return Math.min(1.0f, (pos - start) / (float)(end - start));
        }
    }

@Override
    public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
        FileSplit split = (FileSplit) genericSplit;
        final Path file = split.getPath();
        Configuration conf = context.getConfiguration();
        this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
        FileSystem fs = file.getFileSystem(conf);
        start = split.getStart();
        end= start + split.getLength();
        boolean skipFirstLine = false;
        FSDataInputStream filein = fs.open(split.getPath());

        if (start != 0){
            skipFirstLine = true;
            --start;
            filein.seek(start);
        }
        in = new LineReader(filein,conf);
        if(skipFirstLine){
            start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

@Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (key == null) {
            key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
            value = new Text();
        }
        value.clear();
        final Text endline = new Text("\n");
        int newSize = 0;
        for(int i=0;i<NLINESTOPROCESS;i++){
            Text v = new Text();
            while (pos < end) {
                newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                value.append(v.getBytes(),0, v.getLength());
                value.append(endline.getBytes(),0, endline.getLength());
                if (newSize == 0) {
                    break;
                }
                pos += newSize;
                if (newSize < maxLineLength) {
                    break;
                }
            }
        }
        if (newSize == 0) {
            key = null;
            value = null;
            return false;
        } else {
            return true;
        }
    }
}

Step 3 : Change in driver to use new Inputformat

Now that we have the custom record reader ready lets modify our driver to use the new input format by adding following line of code

    job.setInputFormatClass(NLinesInputFormat.class);

Step 4 : Change the mapper to emit number of lines it gets each time
Here is the listing; its pretty self explanatory. I am only putting listing of map function here for the listing here.

public void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException
{
    String lines = value.toString();
    String []lineArr = lines.split("\n");
    int lcount = lineArr.length;
    context.write(new Text(new Integer(lcount).toString()),new IntWritable(1));
 }

Sample Data Input :

I have used sample data input files of 10000 lines of following format

Shantanu , Deo

Suruchi, Bhide

Shamika, Deo

Mujtaba, Ahmed

Sample Output from Reducer:

1        1

3        3333

This is because our mapper got 3333 records of 3 lines each and 1 last record of 1 line.

I hope you understood the article. If you liked it please feel free to share this . Also comment.

Thanks and have great day!

About these ads