Combiner function is used as an optimization technique for MapReduce jobs. Combiner class combines/reduce the data generated by Mappers before it gets transferred to the Reducers.

In previous post, you learned about how combiner works in MapReduce programming.

In most of cases you can use Reducer class as Combiner class. But if you would like to implement your own custom combiner functionality then you can do that by extending Reducer class.

In this tutorial, we will use stock market data and will identify number of records for each exchange.

Sample data can be downloaded from https://github.com/protechskills/stock-data.

This sample data of stock market domain has 5 Lakh records. Schema of data is as below: id,exchange,stockname,sector,country,date,open,high,low,close,volume,adj_close

Below is the Java Code to write your custom Combiner.

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.fs.Path;

public class CombinerClass {

/**
Mapper class to filter only relevant data.
*/
public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>
{
public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
{
int i=1;
String S[]=value.toString().split("\t");
// Write output with exchange field as key and count 1 as value.
context.write(new Text(S[1]),new IntWritable(i));
}
}

/**
Combiner class to reduce the data at node level
*/
public static class Combiner extends Reducer<Text,IntWritable,Text,IntWritable>
{
public void reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException
{
int sum=0;
for(IntWritable v:value)
sum+=v.get();

context.write(key,new IntWritable(sum));
}

}

/**
Reducer class to finally reduce the data and generate final output
*/
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>
{
public void reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException
{
System.out.println("Combiner called...");
int sum=0;
for(IntWritable v:value)
sum+=v.get();

context.write(key,new IntWritable(sum));
}
}

@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception
{

Configuration conf= new Configuration();

Job job = new Job(conf,"My Combiner Program");

job.setJarByClass(CombinerClass.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Combiner.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//exiting the job only if the flag value becomes false

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

After running the code, below is the expected output

MRJobOutput

Here you can see that as per the output there are only 7 keys and corresponding to these 7 keys there are 80K values. This means here in this case the number the values that are passed to the reducer are very large and so the time taken by the reducer to process the result will be more.

In this case where we can see that the number of values are huge and number of keys are small it is good to go with the combiner because the combiner will ultimately reduce the number of values that will be sent to the reducer and hence improve the overall performance.

See the below screenshots to determine the difference when your code with and without combiner and the number of records processed by reducer in each case

With combiner in action

With Combiner

The above figure shows the input to the combiner and the number of key value pairs it emitted thereby reducing the time taken by the framework in shuffling and sorting the data also time taken by reducers is also less as the key value pairs to process are less

Without Combiner in Action

Without Combiner
You can see that without Combiner all the data is fed to the reducer there by taking more time in sort shuffle phase and reducer phase.

When to use Combiner

Combiner can only be used on functions those are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c}.

The ideal situation to use a combiner is to look at the data, here as per the output you can see that there are only 7 keys and more than 80K values so using combiner is an ideal choice.
Also in case of implementing a combiner the thing we need to take care about is the action our reducer  is going to perform like in case of word-count, the reducer is doing the sum aggregation so it is suitable to aggregate the values previously before entering the reducer to reduce the load on reducer.

One other example in which a combiner is useful, when you have to find the maximum or the minimum values corresponding to the keys.

About the Author

Anugrih is having over 2 yrs of experience in hadoop ecosystems like Hive, Oozie, Sqoop, Pig and Flume. His hobbies include listening to music and playing puzzle games.


Related Posts

How Combiner works in Hadoop MapReduce

Share this:

Leave a Reply

Your email address will not be published. Required fields are marked *