Wednesday, 12 August 2015

How to Improve PIG Data Integration performance by using Specialized Joins

PIG Latin includes below mentioned three specialized JOINS types.

1.   Replicated Join
2.   Skewed Join
3.   Merge Join

Replicated Join

Ø  It works successfully if one of the data set is smaller in size which is to be fit in memory.
Ø  It works very efficiently because smaller data set is copied into distributed cache which will be shared across to all the mappers in the cluster of machines. Also it implements join process at mapper itself in which reducer phase is avoided.
Ø  According to the Pig documentation, a relation of size up to 100 MB can be used when the process has 1 GB of memory.
Ø  A run-time error will be generated if not enough memory is available for loading the data.
Ø  Replicated Join can be used in both inner and outer join. And it also supports for joining more than two tables.

Example

A_Big = LOAD ‘emp.dat’ USING PigStorage() AS (f1:int, f2:int, f3:int);
B_Small = LOAD ‘salary.dat’ USING PigStorage() AS (f1:int, f2:int, f3:int);
C = JOIN A_Big BY f1, B_Small BY f1 USING ‘replicated’;

Skewed Join

Ø  Usually parallel join process will be harmed, if there are lots of data for a certain key, then data will not be evenly distributed across the reducers in which one of them will be stuck in processing the majority of data. Skewed join handles this case efficiently.
Ø  Skewed join computes a histogram of the key space and it uses this data to allocate reducers for a given key.
Ø  Skewed Join can be used in both inner and outer join. And currently it only supports for joining two tables.
Ø  The pig.skwedjoin.reduce.memusage Java parameter specifies the heap fraction available to reducers in order to perform this join. Setting a low value means more reducers will be used, yet the cost of copying the data across them will increase.
Ø  Pig’s developers claim to have good performance when setting it between 0.1 - 0.4.

Example

A_Big = LOAD ‘emp.dat’ USING PigStorage() AS (f1:int, f2:int, f3:int);
B_massive = LOAD ‘salary.dat’ USING PigStorage() AS (f1:int, f2:int, f3:int);
C = JOIN A_Big BY f1, B_massive BY f1 USING ‘skewed’;

Merge Join

Ø  It works successfully if both the data sets are sorted(ascending order) by the same join key.
Ø  It improves performance because join process takes place at mapper phase itself and it ignores two phases that are sort & Shuffle and reducer.
Ø  Pig implements the merge join algorithm by selecting the left input of the join to be the input file for the map phase, and the right input of the join to be the side file. It then samples records from the right input to build an index that contains, for each sampled record, the key(s) the filename and the offset into the file the record begins at. This sampling is done in an initial map only job. A second or actual Map Reduce job is then initiated, with the left input as its input. Each map uses the index to seek to the appropriate record in the right input and begin doing the join.
Ø  Merge join is only supported inner join.

Example

C = JOIN A BY f1, B BY f1 USING ‘merge’;