Map Reduce is one of the YARN supported application to process the BIG Data on Hadoop 2.x cluster. Mapper and Reducer are two separate Java classes. Mapper is mainly used to collect the data from input source and Reducer is used for applying aggregation logic.
Hadoop supports streaming language in which we can develop Map reduce program in JAVA, Phyton, Perl, ruby bash etc..Any scripting language able to read from stdin, write to stdout and parse tab and new line characters will be supported for Hadoop programming. Hadoop streaming just pipes the string representations of key value pairs as concatenated with a tab to an arbitrary program that must be executable on each node manager node.
Map reduce program understands data as record by record reference of key value pairs.
So what is that key value pairs?
- Key is the unique representation of data.
- Value is the list associated with specific key.
Any data can be represented in the form of key value pairs as mentioned below:
Let us see an one problem statement. We should find the Max temperature recorded for each year on the given input file[Temp.dat]. Lets assume that input file size is 256 MB.
- In order to process the data by Map Reduce program, our input data should be placed in HDFS. This is important pre-requisite condition. Map reduce takes input from HDFS and also put the output\processed data into HDFS.
- Copy the input file[Temp.dat] into HDFS by using below command:
- HDFS DFS -put Temp.dat /Temp.dat
When we copy a file to HDFS, client library(part of Hadoop framework) receives the file and divides the file into multiple 128 MB blocks and distributes across the machines in the cluster as per name node instruction.
Block-1 - data - 128 MB:
1990 43
1990 23
1990 54
1991 31
Block-2 - data - 128 MB:
1991 25
1991 37
1991 51
Now we will be executing our MR program as below:
>> hadoop jar Maximum_temperature.jar /Temp.dat /Temp-out
Maximum_temperature.jar - Map reduce program JAR file name.
/Temp.dat - Input File Path in HDFS which is Args[0]
/Temp-out - Output File path in HDFS which is Args[1]. This should be the non-existence directory.
- In order to prepare the physical block(128 MB) into logical record by record reference of key value pairs, InputFormat(Java class) will be used.
- Different InputFormat classes are available as
- TextInputFormat
- KeyValueInputFormat
- NLineInputFormat
- Developer needs to define the specific input format class in our MR main method.
- In our program, we are going to use the TextInputFormat in which
- Byte Offset [Starting position] - key
- Entire Line - value
- Client library prepares the input split (logical record by record reference of key value pairs) by calling the InputFormat that we defined in our MR program.
Input Split for Block -1:[Key, Value]
0, 1990 43
8, 1990 23
16, 1990 54
24, 1991 31
8, 1990 23
16, 1990 54
24, 1991 31
Input Split for Block -2:[Key, Value]
0, 1991 25
8, 1991 37
16, 1991 51
Data flow or process flow of any map reduce program is given below.
Mapper -1 gets list of input splits as below:
0, 1990 43
8, 1990 23
16, 1990 54
24, 1991 31
Mapper -2 gets list of input splits as below:
0, 1991 25
8, 1991 37
16, 1991 51
Mapper logic:
Mapper -1 Output:
1990, 43
1990, 23
1990, 54
1991, 31
Mapper -2 Output:
8, 1991 37
16, 1991 51
Data flow or process flow of any map reduce program is given below.
Mapper -1 gets list of input splits as below:
0, 1990 43
8, 1990 23
16, 1990 54
24, 1991 31
Mapper -2 gets list of input splits as below:
0, 1991 25
8, 1991 37
16, 1991 51
Mapper logic:
- Read the value.
- Based on delimiter, split into key and value fields.
- Write Map output for each record as (key, value)
Mapper -1 Output:
1990, 43
1990, 23
1990, 54
1991, 31
Mapper -2 Output:
1991, 25
1991, 37
1991, 51
Hadoop sort & shuffle process:
1991, 37
1991, 51
Hadoop sort & shuffle process:
- Hadoop framework collects and consolidates all maps output.
- Then it sorts based on key and prepare list of values for unique keys.
Sort & shuffle Output:
1990, [43, 23, 54]
1991, [31, 25, 37, 51]
Reducer Logic:
- Read values for one by one record from sort & shuffle output.
- Find MAX among all the values for unique key
- Write output as (key, MAX)
Reducer output:
1990, 54
1991, 51
Map Reduce program for the above problem statement:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Maximum_temperature {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>
{
//Defining a local variable k of type Text
Text key1= new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
//Converting the record (single line) to String and storing it in a String variable line
String line = value.toString();
//StringTokenizer is breaking the record (line) according to the delimiter white spaces
StringTokenizer tokenizer = new StringTokenizer(line," ");
//Iterating through all the tokens and forming the key value pair
while (tokenizer.hasMoreTokens())
{
//The first token is going in year variable of type string
String year= tokenizer.nextToken();
key1.set(year);
//Takes next token and removes all the white spaces around it
//and then stores it in the string variable called temp
String temp= tokenizer.nextToken().trim();
//Converts string temp into integer v
int value1 = Integer.parseInt(temp);
//Sending to output collector which in turn passes the same to reducer
context.write(key1,new IntWritable(value1));
}
}
}
//Reducer
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
//Defining a local variable temp_max of type INT
int temp_max=0;
for(IntWritable it : values) {
//Defining a local variable temperature of type INT which is taking all the temperature
int temperature= it.get();
if(temp_max<temperature)
{
temp_max =temperature;
}
}
//Finally the output is collected as the year and maximum temperature corresponding to that year
context.write(key, new IntWritable(temp_max));
}
}
//Driver
public static void main(String[] args) throws Exception
{
//reads the default configuration of cluster from the configuration XML files
Configuration conf = new Configuration();
//Initializing the job with the default configuration of the cluster
Job job = new Job(conf, "Maximum_temperature");
//Assigning the driver class name
job.setJarByClass(Max_temp.class);
//Defining the mapper class name
job.setMapperClass(Map.class);
//Defining the reducer class name
job.setReducerClass(Reduce.class);
//Defining the output key class for the final output i.e. from reducer
job.setOutputKeyClass(Text.class);
//Defining the output value class for the final output i.e. from reducer
job.setOutputValueClass(IntWritable.class);
//Defining input Format class which is responsible to parse the dataset into a key value pair
job.setInputFormatClass(TextInputFormat.class);
//Defining output Format class which is responsible to parse the final key-value output from MR framework
//to a text file into HDFS Disk.
job.setOutputFormatClass(TextOutputFormat.class);
//setting the second argument as a path in a path variable
Path outputPath = new Path(args[1]);
//Configuring the input/output path from the file system into the job
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//exiting the job only if the flag value becomes false
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}