Wednesday, 24 June 2015

How Map Reduce processes the BIG Data


Map Reduce is one of the YARN supported application to process the BIG Data on Hadoop 2.x cluster. Mapper and Reducer are two separate Java classes. Mapper is mainly used to collect the data from input source and Reducer is used for applying aggregation logic.

Hadoop supports streaming language in which we can develop Map reduce program in JAVA, Phyton, Perl, ruby bash etc..Any scripting language able to read from stdin, write to stdout and parse tab and new line characters will be supported for Hadoop programming. Hadoop streaming just pipes the string representations of key value pairs as concatenated with a tab to an arbitrary program that must be executable on each node manager node.

Map reduce program understands data as record by record reference of key value pairs
So what is that key value pairs? 
  • Key is the unique representation of data.
  • Value is the list associated with specific key.

Any data can be represented in the form of key value pairs as mentioned below:


Let us see an one problem statement. We should find the Max temperature recorded for each year on the given input file[Temp.dat]. Lets assume that input file size is 256 MB.

  • In order to process the data by Map Reduce program, our input data should be placed in HDFS. This is important pre-requisite condition. Map reduce takes input from HDFS and also put the output\processed data into HDFS.
  • Copy the input file[Temp.dat] into HDFS by using below command:
    • HDFS DFS -put Temp.dat /Temp.dat

  • When we copy a file to HDFS, client library(part of Hadoop framework) receives the file and divides the file into multiple 128 MB blocks and distributes across the machines in the cluster as per name node instruction.

  • Block-1 - data - 128 MB:
    1990 43
    1990 23
    1990 54
    1991 31

    Block-2 - data - 128 MB:
    1991 25
    1991 37
    1991 51

    Now we will be executing our MR program as below:
    >> hadoop jar Maximum_temperature.jar /Temp.dat /Temp-out

    Maximum_temperature.jar - Map reduce program JAR file name.
    /Temp.dat    -  Input File Path in HDFS which is Args[0]
    /Temp-out    -  Output File path in HDFS which is Args[1]. This should be the non-existence directory.

    • In order to prepare the physical block(128 MB) into logical record by record reference of key value pairs, InputFormat(Java class) will be used.
    • Different InputFormat classes are available as
      • TextInputFormat
      • KeyValueInputFormat
      • NLineInputFormat
    • Developer needs to define the specific input format class in our MR main method.
    • In our program, we are going to use the TextInputFormat in which
      • Byte Offset [Starting position] - key 
      • Entire Line                             - value
    • Client library prepares the input split (logical record by record reference of key value pairs) by calling the InputFormat that we defined in our MR program.
    Input Split for Block -1:[Key, Value]
    0, 1990 43
    8, 1990 23
    16, 1990 54
    24, 1991 31

    Input Split for Block -2:[Key, Value]
    0, 1991 25
    8, 1991 37
    16, 1991 51

    Data flow or process flow of any map reduce program is given below.



    Mapper -1 gets list of input splits as below:
    0, 1990 43
    8, 1990 23
    16, 1990 54
    24, 1991 31

    Mapper -2 gets list of input splits as below:
    0, 1991 25
    8, 1991 37
    16, 1991 51

    Mapper logic:

    1. Read the value.
    2. Based on delimiter, split into key and value fields.
    3. Write Map output for each record as (key, value)

    Mapper -1 Output:
    1990, 43
    1990, 23
    1990, 54
    1991, 31

    Mapper -2 Output:
    1991, 25
    1991, 37
    1991, 51

    Hadoop sort & shuffle process:

    1. Hadoop framework collects and consolidates all maps output. 
    2. Then it sorts based on key and prepare list of values for unique keys.
    Sort & shuffle Output:
    1990, [43, 23, 54]
    1991, [31, 25, 37, 51]

    Reducer Logic:
    1. Read values for one by one record from sort & shuffle output.
    2. Find MAX among all the values for unique key
    3. Write output as (key, MAX)

    Reducer output:
    1990, 54
    1991, 51

    Map Reduce program for the above problem statement:

    import java.io.IOException; 

    import java.util.StringTokenizer; 
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.IntWritable; 
    import org.apache.hadoop.io.LongWritable; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class Maximum_temperature { 
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> 
    {
          //Defining a local variable k of type Text  
        Text key1= new Text(); 
            
    @Override
            public void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException 
    {
            //Converting the record (single line) to String and storing it in a String variable line
                    String line = value.toString(); 
                    //StringTokenizer is breaking the record (line) according to the delimiter white spaces
                    StringTokenizer tokenizer = new StringTokenizer(line," "); 
                    //Iterating through all the tokens and forming the key value pair
                    while (tokenizer.hasMoreTokens()) 
                    //The first token is going in year variable of type string
                String year= tokenizer.nextToken();
                key1.set(year);
                //Takes next token and removes all the white spaces around it 
    //and then stores it in the string variable called temp
                String temp= tokenizer.nextToken().trim();
                //Converts string temp into integer v      
                int value1 = Integer.parseInt(temp); 
                //Sending to output collector which in turn passes the same to reducer
                    context.write(key1,new IntWritable(value1)); 
                } 
            } 
        } 
        //Reducer
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> 
    {
            @Override 
            public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException 
    {
              //Defining a local variable temp_max of type INT
            int temp_max=0;
                for(IntWritable it : values) { 
                //Defining a local variable temperature of type INT which is taking all the temperature
                int temperature= it.get();
                if(temp_max<temperature)
                {
                temp_max =temperature;
                }
                } 
                //Finally the output is collected as the year and maximum temperature corresponding to that year
                context.write(key, new IntWritable(temp_max)); 
            } 
        } 
    //Driver
        public static void main(String[] args) throws Exception 
        //reads the default configuration of cluster from the configuration XML files
    Configuration conf = new Configuration();
    //Initializing the job with the default configuration of the cluster
    Job job = new Job(conf, "Maximum_temperature");
    //Assigning the driver class name 
    job.setJarByClass(Max_temp.class);
    //Defining the mapper class name
    job.setMapperClass(Map.class);
    //Defining the reducer class name
    job.setReducerClass(Reduce.class);
    //Defining the output key class for the final output i.e. from reducer
    job.setOutputKeyClass(Text.class);
    //Defining the output value class for the final output i.e. from reducer
    job.setOutputValueClass(IntWritable.class);
    //Defining input Format class which is responsible to parse the dataset into a key value pair 
    job.setInputFormatClass(TextInputFormat.class);
    //Defining output Format class which is responsible to parse the final key-value output from MR framework 
    //to a text file into HDFS Disk.
    job.setOutputFormatClass(TextOutputFormat.class);
            //setting the second argument as a path in a path variable
            Path outputPath = new Path(args[1]);
            //Configuring the input/output path from the file system into the job
            FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //exiting the job only if the flag value becomes false
    System.exit(job.waitForCompletion(true) ? 0 : 1);
        } 
    }

    Monday, 22 June 2015

    Steps to Install and Configure Cloudera Hadoop Platform - CDH 5.4.x



    Cloudera provides QuickStart VMs which contains single-node Apache Hadoop cluster with example data, queries, scripts and cloudera Manager to manage our cluster. The VMs run CentOS 6.4 and are available for VMware, VirtualBox, and KVM. All require a 64-bit host OS.

    Lets see the steps to install the configure the CDH 5.4.x at VMware player.

    Step - 1 [Download and install the VMware player]
    Use the below link to download and install the VMware player.




    Step - 2 [Download and extract the Cloudera Setup file and extract it]
    Use the below link to download the Cloudera 5.4.x setup file



    Step - 3 [Start VMware Player and Click Open a Virtual Machine]




    It will take few minutes to start.
    Use the below mentioned Login credentials:

    Machine Login credentials are:
    • User name  -- cloudera
    • Password    -- cloudera
    Cloudera Manager credentials are:
    • User name   -- admin
    • Password     -- admin
    Click the terminal window as below:



    Type the command "sudo jps" to check running processes.

    >> sudo jps


    2472 NodeManager
    2235 SecondaryNameNode
    6329 Jps
    1854 QuorumPeerMain
    3254 RunJar
    2036 JournalNode
    4433 Bootstrap
    3326 RunJar
    1950 DataNode
    4654 
    3772 Master
    4687 
    3671 HistoryServer
    3180 ThriftServer
    2354 Bootstrap
    3067 RESTServer
    2115 NameNode
    2396 JobHistoryServer
    2670 ResourceManager
    3655 Bootstrap

    4635 Bootstrap


    Steps to Configure HCatalog


    HCatalog is the sub module of HIVE. By default, HCatalog will be part of HIVE binaries. In order to work with HCatalog, remote meta store is required to be configured for HIVE meta data definition.

    Step:1
    First update the environment variables at .bashrc file related to PIG, HIVE and HCatalog as below.

    export HIVE_HOME=/usr/lib/hive-0.13.1-bin
    export HCAT_HOME=/usr/lib/hive-0.13.1-bin/hcatalog
    export PIG_HOME=/usr/lib/pig-0.12.0
    export PATH=$PATH:$HCAT_HOME/bin
    export PATH=$PATH:$PIG_HOME/bin
    export PIG_CLASSPATH=$HCAT_HOME/share/hcatalog/hive-hcatalog-core-0.13.1.jar:\
    $HCAT_HOME/share/hcatalog/hive-hcatalog-pig-adapter-0.13.1.jar:\
    $HIVE_HOME/lib/hive-metastore-*.jar:$HIVE_HOME/lib/libthrift-*.jar:\
    $HIVE_HOME/lib/hive-exec-*.jar:$HIVE_HOME/lib/libfb303-*.jar:\
    $HIVE_HOME/lib/jdo-api-*.jar:$HIVE_HOME/conf:$HADOOP_HOME/conf

    Step 2

    It is required that Hive metastore should be running in remote mode so that Meta Store client knows where is the meta store in $HIVE_HOME/conf/hive-site.xml 

    Add or edit the hive.metastore.uris property as follows at hive-site.xml file.

    <property>
      <name>hive.metastore.uris</name>
      <value>thrift://<hostname>:9083</value>
    </property>

    And, run

    >> hive --service metastore &

    and test if it is running through

    >> netstat -an | grep 9083

    Step 3

    # Create a table using hcatalog

    >> hcat -e "create table hcatemptest(Name string, Salary int, Location string) row format delimited fields terminated by ',' stored as textfile"


    # Get the schema for a table

    >> hcat -e "desc hcatemptest"
    OK
    Name      string
    Salary     int
    Location  string


    Insight about HCATALOG


    Data analyst uses multiple tools for processing BIG data such as Map Reduce(JAVA), PIG, HIVE etc. In real world environment, Map reduce output can be processed by PIG and HIVE or vice-versa. Before processing, analyst should know the data location(file path), format and schema. They also uses different set of data format such as CSV, JSON, AVRO, ORC, HDFS files, HIVE tables as part of data processing.

    Hive uses meta store in order to read data location, format and schema. PIG defines them as part of the script and Map Reduce encodes them as part of the application code. For Map reduce and PIG application, it is very difficult to maintain the meta data information. HCatalog is aiming to provide solution for this scenario.

    HCatalog is a metadata and table management system for the HADOOP environment. It is a sub-module of HIVE.

    Let us see how it works.

    Upload a employee file to HDFS

    >> hdfs dfs -put employee /data

    swetha,250000,Chennai
    anamika,200000,Kanyakumari
    tarun,300000,Pondi
    anita,250000,Salem

    Lets create schema for this input file in HIVE.

    >> create external table employee(Name string, Salary int, Location string) 
    ROW FORMAT DELIMITED FIELDS TERMINATED By ','
    STORED AS TEXTFILE
    LOCATION '/data/employee';

    Add the data set to HCatalog by running below command.

    >>  hcat -f employee.hcatalog

    Once it is added, we can verify as below.

    >> hcat -e "describe employee"
    OK
    Name     string     None
    Salary    int          None
    Location string      None

    Now HCatalog is maintaining our "employee" data set's location ,schema and format. So it can be accessed through HCatalog interface from PIG, Map Reduce application.



    HCatalog Interface for PIG:

    >> result = LOAD 'employee' USING org.apache.hcatalog.pig.HCatLoader();
    >> DUMP result;

    It can also be used as part of PIG Script execution as below:

    >> pig -useHCatalog Test.pig


    There are two PIG interfaces are available.

    1. HCatLoader - To read data from a data set.
    2. HCatStorer  - To write data to a data set.

    >> result = LOAD 'employee' using HCatLoader();
    >> STORE result into 'emp_result' using HCatStorer('date=20150622');

    HCatStorer also helps to provide the partition keys as mentioned above. It is possible to write to a single partition or multiple partition.

    HCatalog Interface for Map Reduce:

    It consists of below mentioned two interfaces.

    1. HCatInputFormat - It accepts a data set to read.
    2. HCatOutputFormat - It accepts a data set to write.

    HCatalog Interface for HIVE:

    There are no Hive specific interfaces for HCatalog as it a sub module of HIVE. 
    Hive can read information from HCatalog directly.

    HCatalog Notifications:

    HCatalog provides notification activity by using Oozie or custom Java code through which we can process a data set as soon as it is available.

    WebHCat Server:

    It provides a REST-like web API for HCatalog. It sends request to get information about data sets and also sends request to run PIG or HIVE scripts.

    >> curl -s 'http://hostname:port/templeton/v1/ddl/database/db_name/table/table-name?user.name=hcatuser'


    Friday, 19 June 2015

    Steps to Install and Configure SQOOP


    SQOOP is used to import and export data between structured environment (RDBMS) and Hadoop cluster. It internally uses JDBC driver in order to establish connection between source and destination.

    Let us see steps to install and configure SQOOP on Hadoop environment. 

    Step-1 [Download and extract the SQOOP binaries from the below link for the stable version]


    >> tar -xvf sqoop-1.4.6.bin_hadoop-2.0.4-alpha.tar.gz
    >> su
    >> mv sqoop-1.4.6.bin_hadoop-2.0.4-alpha /usr/lib/sqoop

    Step-2 [Configure bashrc]

    Append the below lines to ~/.bashrc file.

    export SQOOP_HOME=/usr/lib/sqoop
    export PATH=$PATH:$SQOOP_HOME/bin

    Execute the ~/.bashrc file

    >> source ~/.bashrc

    Step:3 [Configure sqoop-env.sh]

    >> cd $SQOOP_HOME/conf
    >> mv sqoop-env-template.sh sqoop-env.sh

    >> gedit sqoop-env.sh

    export HADOOP_COMMON_HOME=/usr/lib/hadoop
    export HADOOP_MAPRED_HOME=/usr/lib/hadoop

    Step-4 [Download and configure mysql-connector-java]

    >> tar -zxf mysql-connector-java-5.1.30.tar.gz
    >> su
    >> cd mysql-connector-java-5.1.30
    >> mv mysql-connector-java-5.1.30-bin.jar /usr/lib/sqoop/lib

    Step-5 [Download and configure MSSQL-connector-java]


    >> wget http://www.microsoft.com/en-us/download/confirmation.aspx?id=21599

    >> tar -zxf sqljdbc_3.0.1301.101_enu.tar.gz
    >> su
    >> cd sqljdbc_3.0.1301.101_enu\sqljdbc_3.0\enu
    >> mv sqljdbc4.jar /usr/lib/sqoop/lib

    Step-6 [Verify the SQOOP]

    >> cd $SQOOP_HOME/bin
    >> sqoop-version


    Steps to Install and Configure HBase on Apache Hadoop Cluster 2.0


    HBase is one of the NOSQL Database which stores data in the form of column oriented key value pairs. Through HBase, we can perform real time data analytic. HFile or Map file is the data storage architecture for HBase.

    Let us see the steps to install and configure HBase. Hadoop components should have installed and configured prior to HBase setup.

    Step:1 [Download and extract the Hbase stable version]

    >> tar -zxvf hbase-1.0.1.1-bin.tar.gz

    Shift to super user mode and move the HBase folder to /usr/lib as shown below.

    >> su
    >> mv hbase-1.0.1.1-bin/* Hbase/

    Step:2 [Configure hbase-env.sh]

    Set the JAVA_HOME environment variable here.

    >> cd /usr/lib/Hbase/conf
    >> gedit hbase-env.sh

    export JAVA_HOME=/usr/lib/jvm/java-1.7.0

    Step:3 [Configure hbase-site.xml]

    >> cd /usr/lib/Hbase/conf
    >> gedit hbase-site.xml

    <configuration>
       //Here we have to set the path where we want HBase to store its files.
       <property>
          <name>hbase.rootdir</name>
          <value>file:/home/hadoop/Hbase/HFiles</value>
       </property>
       //Here we have to set the path where we want HBase to store its built in zookeeper  files.
       <property>
          <name>hbase.zookeeper.property.dataDir</name>
          <value>/home/hadoop/zookeeper</value>
       </property>

       //It will mention in which mode HBase should be run
       <property>
         <name>hbase.cluster.distributed</name>
         <value>true</value>
       </property>

       //HDFS instance address, using the hdfs:// URI syntax. We are running HDFS on the localhost at port 8030
       <property>
         <name>hbase.rootdir</name>
         <value>hdfs://localhost:8030/hbase</value>
       </property>
    </configuration>

    Step:4 [Setting up Java Environment]

    HBase also provides JAVA API libraries in order to manage the HBase table from application. So we need to set the classpath for HBase libraries (lib folder in HBase) as shown below.

    >> gedit ~/.bashrc

    export CLASSPATH = $CLASSPATH://home/hadoop/hbase/lib/*

    Step:5 [Start hbase services and verify the status]

    Start HBase as below

    >> cd /usr/lib/Hbase
    >> .bin/start-hbase.sh

    Start Hbase Master Server as below.

    >> .bin/local-master-backup.sh start 2
    • Number signifies specific server
    • Using the "local-master-backup.sh" we can start up to 10 servers 
    • ./bin/local-master-backup.sh 2 4
    • To kill a backup master, we need its process id, which will be stored in a file named "/tmp/hbase-USER-X-master.pid"
    • Below command to kill the backup master
    >> cat /tmp/hbase-user-1-master.pid |xargs kill -9


    Start Region Server as below

    >> .bin/local-regionservers.sh start 3

    Start HBase shell as below.

    >> cd bin
    >>./hbase shell

    To access Hbase web interface

    http://localhost:60010


    Steps to install and configure Apache HIVE


    Apache Hadoop is data warehouse environment which is built on top of Hadoop. It is developed by Facebook and released as open source to the community. Hive uses SQL like HiveQL(Hive Query Language) for the Big data processing.

    During execution, HiveQL is converted into series of Map Reduce code which will be executed on top of Hadoop cluster.

    Let us go through steps to install and configure Apache HIVE. Hadoop should be installed & configured before HIVE setup.

    Step:1 [Download and extract the HIVE tar file]

    >> wget -c http://archive.apache.org/dist/hive/stable/apache-hive-1.2.0-bin.tar.gz

    >>  tar -xzvf apache-hive-1.2.0-bin.tar.gz


    Step:2 [Edit the .bashrc file for environment variables]

    Add the following at the end of the file:
    export HIVE_HOME=/usr/lib/apache-hive-1.2.0-bin
    export PATH=$PATH:$HIVE_HOME/bin

    Step:3 [Create and configure HIVE directory within HDFS]
    >> hadoop fs -mkdir /user/hive/warehouse
    The directory "warehouse" is the location to store the table or data related to hive.
    >> hadoop fs -mkdir /temp
    The temporary directory "temp" is the temporary location to store the intermediate result of processing.
    Set read/write permissions for the HIVE directories.
    In this command we are giving written permission to the group:
    >> hadoop fs -chmodg+w /user/hive/warehouse 
    >> hadoop fs -chmodg+w /temp

    Step:4 [Update Hadoop path in hive config files]
    >> sudo gedit hive-config.sh
    export HIVE_CONF_DIR=$HIVE_CONF_DIR
    export HIVE_AUX_JARS_PATH=$HIVE_AUX_JARS_PATH
    export HADOOP_HOME=<Your Hadoop Home dir>
    >> cd $HIVE_HOME/conf
    >> cp hive-env.sh.template hive-env.sh
    >> sudo gedit hive-env.sh
    #Append the below line.
    export HADOOP_HOME=<Your Hadoop Home dir>
    Hive configuration is completed now. If we require external database server to configure meta store, then we use Apache Derby database.

    Step:5 [Install and Configure Apache Derby]
    >> tar zxvf db-derby-10.4.2.0-bin.tar.gz
    >> mv db-derby-10.4.2.0-bin /usr/lib/derby
    Use "su -" command in case if requires super user for copying files.

    Step:6 [Setup environment variable for Derby]
    Append the below lines at .bashrc file.
    >> export DERBY_HOME=/usr/lib/derby
    >> export PATH=$PATH:$DERBY_HOME/bin
    >> export CLASSPATH=$CLASSPATH:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbytools.jar
    Execute the ~/.bashrc file
    >> source ~/.bashrc

    Step:7 [Create directory to store Meta store]
    >> mkdir $DERBY_HOME/data

    Step:8 [Configuring Meta store of HIVE]
    Specify to hive where the database is stored. In order to do this, edit the hive-site.xml, which is in the $HIVE_HOME/conf directory.
    First, copy the template file using following command.
    >> cd $HIVE_HOME/conf
    >> cp hive-default.xml.template hive-site.xml
    Append the below lines between <configuration> and </configuration> at hive-site.xml
    <property>
       <name>javax.jdo.option.ConnectionURL</name>
       <value>jdbc:derby://localhost:1433/metastore_db;create=true </value>
       <description>JDBC connect string for a JDBC metastore </description>
    </property>

    Create a file named jpox.properties and include the below lines in it.
    javax.jdo.PersistenceManagerFactoryClass =
    org.jpox.PersistenceManagerFactoryImpl
    org.jpox.autoCreateSchema = false
    org.jpox.validateTables = false
    org.jpox.validateColumns = false
    org.jpox.validateConstraints = false
    org.jpox.storeManagerType = rdbms
    org.jpox.autoCreateSchema = true
    org.jpox.autoStartMechanismMode = checked
    org.jpox.transactionIsolation = read_committed
    javax.jdo.option.DetachAllOnCommit = true
    javax.jdo.option.NontransactionalRead = true
    javax.jdo.option.ConnectionDriverName = org.apache.derby.jdbc.ClientDriver
    javax.jdo.option.ConnectionURL = jdbc:derby://hadoop1:1433/metastore_db;create = true
    javax.jdo.option.ConnectionUserName = APP
    javax.jdo.option.ConnectionPassword = mine

    Step:9 [Verifying Hive installation]
    Use the below command to get into the hive CLI prompt and to check the available database\table list.
    >> hive
    >> show tables;
    >> show databases;
    >> quit;