Wednesday, 24 June 2015

How Map Reduce processes the BIG Data


Map Reduce is one of the YARN supported application to process the BIG Data on Hadoop 2.x cluster. Mapper and Reducer are two separate Java classes. Mapper is mainly used to collect the data from input source and Reducer is used for applying aggregation logic.

Hadoop supports streaming language in which we can develop Map reduce program in JAVA, Phyton, Perl, ruby bash etc..Any scripting language able to read from stdin, write to stdout and parse tab and new line characters will be supported for Hadoop programming. Hadoop streaming just pipes the string representations of key value pairs as concatenated with a tab to an arbitrary program that must be executable on each node manager node.

Map reduce program understands data as record by record reference of key value pairs
So what is that key value pairs? 
  • Key is the unique representation of data.
  • Value is the list associated with specific key.

Any data can be represented in the form of key value pairs as mentioned below:


Let us see an one problem statement. We should find the Max temperature recorded for each year on the given input file[Temp.dat]. Lets assume that input file size is 256 MB.

  • In order to process the data by Map Reduce program, our input data should be placed in HDFS. This is important pre-requisite condition. Map reduce takes input from HDFS and also put the output\processed data into HDFS.
  • Copy the input file[Temp.dat] into HDFS by using below command:
    • HDFS DFS -put Temp.dat /Temp.dat

  • When we copy a file to HDFS, client library(part of Hadoop framework) receives the file and divides the file into multiple 128 MB blocks and distributes across the machines in the cluster as per name node instruction.

  • Block-1 - data - 128 MB:
    1990 43
    1990 23
    1990 54
    1991 31

    Block-2 - data - 128 MB:
    1991 25
    1991 37
    1991 51

    Now we will be executing our MR program as below:
    >> hadoop jar Maximum_temperature.jar /Temp.dat /Temp-out

    Maximum_temperature.jar - Map reduce program JAR file name.
    /Temp.dat    -  Input File Path in HDFS which is Args[0]
    /Temp-out    -  Output File path in HDFS which is Args[1]. This should be the non-existence directory.

    • In order to prepare the physical block(128 MB) into logical record by record reference of key value pairs, InputFormat(Java class) will be used.
    • Different InputFormat classes are available as
      • TextInputFormat
      • KeyValueInputFormat
      • NLineInputFormat
    • Developer needs to define the specific input format class in our MR main method.
    • In our program, we are going to use the TextInputFormat in which
      • Byte Offset [Starting position] - key 
      • Entire Line                             - value
    • Client library prepares the input split (logical record by record reference of key value pairs) by calling the InputFormat that we defined in our MR program.
    Input Split for Block -1:[Key, Value]
    0, 1990 43
    8, 1990 23
    16, 1990 54
    24, 1991 31

    Input Split for Block -2:[Key, Value]
    0, 1991 25
    8, 1991 37
    16, 1991 51

    Data flow or process flow of any map reduce program is given below.



    Mapper -1 gets list of input splits as below:
    0, 1990 43
    8, 1990 23
    16, 1990 54
    24, 1991 31

    Mapper -2 gets list of input splits as below:
    0, 1991 25
    8, 1991 37
    16, 1991 51

    Mapper logic:

    1. Read the value.
    2. Based on delimiter, split into key and value fields.
    3. Write Map output for each record as (key, value)

    Mapper -1 Output:
    1990, 43
    1990, 23
    1990, 54
    1991, 31

    Mapper -2 Output:
    1991, 25
    1991, 37
    1991, 51

    Hadoop sort & shuffle process:

    1. Hadoop framework collects and consolidates all maps output. 
    2. Then it sorts based on key and prepare list of values for unique keys.
    Sort & shuffle Output:
    1990, [43, 23, 54]
    1991, [31, 25, 37, 51]

    Reducer Logic:
    1. Read values for one by one record from sort & shuffle output.
    2. Find MAX among all the values for unique key
    3. Write output as (key, MAX)

    Reducer output:
    1990, 54
    1991, 51

    Map Reduce program for the above problem statement:

    import java.io.IOException; 

    import java.util.StringTokenizer; 
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.IntWritable; 
    import org.apache.hadoop.io.LongWritable; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class Maximum_temperature { 
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> 
    {
          //Defining a local variable k of type Text  
        Text key1= new Text(); 
            
    @Override
            public void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException 
    {
            //Converting the record (single line) to String and storing it in a String variable line
                    String line = value.toString(); 
                    //StringTokenizer is breaking the record (line) according to the delimiter white spaces
                    StringTokenizer tokenizer = new StringTokenizer(line," "); 
                    //Iterating through all the tokens and forming the key value pair
                    while (tokenizer.hasMoreTokens()) 
                    //The first token is going in year variable of type string
                String year= tokenizer.nextToken();
                key1.set(year);
                //Takes next token and removes all the white spaces around it 
    //and then stores it in the string variable called temp
                String temp= tokenizer.nextToken().trim();
                //Converts string temp into integer v      
                int value1 = Integer.parseInt(temp); 
                //Sending to output collector which in turn passes the same to reducer
                    context.write(key1,new IntWritable(value1)); 
                } 
            } 
        } 
        //Reducer
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> 
    {
            @Override 
            public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException 
    {
              //Defining a local variable temp_max of type INT
            int temp_max=0;
                for(IntWritable it : values) { 
                //Defining a local variable temperature of type INT which is taking all the temperature
                int temperature= it.get();
                if(temp_max<temperature)
                {
                temp_max =temperature;
                }
                } 
                //Finally the output is collected as the year and maximum temperature corresponding to that year
                context.write(key, new IntWritable(temp_max)); 
            } 
        } 
    //Driver
        public static void main(String[] args) throws Exception 
        //reads the default configuration of cluster from the configuration XML files
    Configuration conf = new Configuration();
    //Initializing the job with the default configuration of the cluster
    Job job = new Job(conf, "Maximum_temperature");
    //Assigning the driver class name 
    job.setJarByClass(Max_temp.class);
    //Defining the mapper class name
    job.setMapperClass(Map.class);
    //Defining the reducer class name
    job.setReducerClass(Reduce.class);
    //Defining the output key class for the final output i.e. from reducer
    job.setOutputKeyClass(Text.class);
    //Defining the output value class for the final output i.e. from reducer
    job.setOutputValueClass(IntWritable.class);
    //Defining input Format class which is responsible to parse the dataset into a key value pair 
    job.setInputFormatClass(TextInputFormat.class);
    //Defining output Format class which is responsible to parse the final key-value output from MR framework 
    //to a text file into HDFS Disk.
    job.setOutputFormatClass(TextOutputFormat.class);
            //setting the second argument as a path in a path variable
            Path outputPath = new Path(args[1]);
            //Configuring the input/output path from the file system into the job
            FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //exiting the job only if the flag value becomes false
    System.exit(job.waitForCompletion(true) ? 0 : 1);
        } 
    }