Reduce side join

The simplest form of join available in the MapReduce framework and nearly any type of SQL join such as inner, left outer, full outer, and so on can be done using reduce side join. The only difficulty is that nearly all the data will be shuffled across the network to go to the reducer. Two or more datasets will be joined together using a common key. Multiple large datasets can be joined by a foreign key. Remember that you should go with map side join if one of the datasets can fit into the memory. Reduce side join should be used when both datasets cannot fit into memory.

MapReduce has the capability of reading data from multiple inputs and different formats in the same MapReduce program and it also allows different mappers to be used for a specific InputFormat. The following configuration needs to be added to the Driver class so that the MapReduce program reads the input from multiple paths and redirects to the specific mapper for processing, for example:

MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserMapper.class);

MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PurchaseReportMapper.class);

Let's look into some sample code of the reduce side join and see how it works. The mappers emit the records with a key as userId and a value as an identifier appended to the whole record. The X is appended to the record so that on the reducer we can easily identify that the record is coming from which Mapper. The UserMapper class will look as follows:

import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;

import java.io.IOException;

public class UserMapper extends Mapper<Object, Text, Text, Text> {
private Text outputkey = new Text();
private Text outputvalue = new Text();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {

String[] userRecord = value.toString().split(",");
String userId = userRecord[0];
outputkey.set(userId);
outputvalue.set("X" + value.toString());
context.write(outputkey, outputvalue);
}
}

Similarly, the second Mapper processes the purchase history of the users and emits the IDs of the users who purchase the goods, and appends Y to the value as the identifier, as follows:

import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PurchaseReportMapper {
private Text outputkey = new Text();
private Text outputvalue = new Text();

public void map(Object key, Text value, Mapper.Context context)
throws IOException, InterruptedException {

String[] purchaseRecord = value.toString().split(",");
String userId = purchaseRecord[1];
outputkey.set(userId);
outputvalue.set("Y" + value.toString());
context.write(outputkey, outputvalue);
}
}

On the Reducer side, the idea is to simply keep two lists and add user records to one list and purchase records to the other list, then perform a join based on the condition. The sample Reducer code will look as follows:


import org.apache.Hadoop.io.Text;
import org.apache.Hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;

public class UserPurchaseJoinReducer extends Reducer<Text, Text, Text, Text> {
private Text tmp = new Text();
private ArrayList<Text> userList = new ArrayList<Text>();
private ArrayList<Text> purchaseList = new ArrayList<Text>();

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
userList.clear();
purchaseList.clear();

while (values.iterator().hasNext()) {
tmp = values.iterator().next();
if (tmp.charAt(0) == 'X') {
userList.add(new Text(tmp.toString().substring(1)));
} else if (tmp.charAt('0') == 'Y') {
purchaseList.add(new Text(tmp.toString().substring(1)));
}
}

/* Joining both dataset */

if (!userList.isEmpty() && !purchaseList.isEmpty()) {
for (Text user : userList) {
for (Text purchase : purchaseList) {
context.write(user, purchase);
}
}
}
}
}

The joining operation is a more costly operation that requires shuffling of data over the network. If there is scope, the data should be filtered at the mapper side to avoid unnecessary data movement.