Map reduce in hadoop, Word Count Example


Hello friends today we will learn Hadoop Map reduce theory as will as its practical application by running simple wordcount program on hadoop platform.

Pre-requisites :
  • Hadoop should be installed on your ubuntu OS. If not, install it from here.
  • Basic Knowledge of Programming Language : JAVA.
  • Eclipse IDE.
Map-reduce Theory:
         
        The term MapReduce represents two separate and distinct tasks:
         
            1. Map Job - It scales the data sets as input and processes them to produce key value pairs.
         
            2. Reduce Job - It takes the output of the Map job i.e the key value pairs and aggregates them
                                      to produce desired results.
     
       The input and output of the map and reduce jobs are stored in HDFS (Hadoop Distributed File  System).   

Lets take a example "Deer Bear and River" to understand how map reduce work.
         
   For simplicity, let's consider a few words of a text document. We want to find the number of occurrence of each word. First the input is split to distribute the work among all the map nodes as shown in the figure. Then each word is identified and mapped to the number one. Thus the pairs also called as tuples are created. In the first mapper node three words Deer, Bear and River are passed. Thus the output of the node will be three key, value pairs with three distinct keys and value set to one. The mapping process remains the same in all the nodes. These tuples are then passed to the reduce nodes.  A partitioner comes into action which carries out shuffling so that all the tuples with same key are sent to same node. 



The Reducer node processes all the tuples such that all the pairs with same key are counted and the count is updated as the value of that specific key. In the example there are two pairs with the key ‘Bear’ which are then reduced to single tuple with the value equal to the count. All the output tuples are then collected and written in the output file.   


The data goes through following phases :
Input Splits:
Input to a MapReduce job is divided into fixed-size pieces called input splits Input split is a chunk of the input that is consumed by a single map. Here 3 sentences i.e Dear Bear River , Car Car River and Deer Car Bear is split as shown in figure.
Mapping:
This is very first phase in the execution of map-reduce program. In this phase data in each split is passed to a mapping function to produce output values. In our example, job of mapping phase is to count number of occurrences of each word from input splits i.e every word is assigned value for example deer,1 Bear,1 etc.
Shuffling:
This phase consumes output of Mapping phase. Its task is to collect the same records from Mapping phase output. In our example, same words are clubed together along with their respective frequency i.e  Bear,(1,1) and like wise for other ones.
Reducing:
In this phase, output values from Shuffling phase are aggregated. This phase combines values from Shuffling phase and returns a single output value. In short,we set a counter and finally increase it based on the number of times that word has repeated and gives to output.
Output:
Finally the splited data is again combined and displayed. 
WORD COUNT PROGRAM AND IMPLEMENTATION :

 1. Open Eclipse and create new java project name it wordcount.


 2. Now we have to add 2 jar file to our wordcount project. Go in Computer -> usr -> local ->                   hadoop  -> share -> hadoop -> mapreduce 

     

3. Copy hadoop-mapreduce-client-core-2.9.0.jar to Desktop.

4. Go in Computer -> usr -> local -> hadoop  -> share -> hadoop -> common.



5. copy hadoop-common-2.9.0.jar to Desktop. 

6. Right click on src -> wordcount go in Build Path -> Configure Build Path -> Libraries -> Add            External Jars -> Desktop.



7. Add Both the jars located on Desktop.


CODE:

1. We have to import some packages initially.

import java.io.IOException;
import java.util.*;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path; 

2. Now we create mapper class under our main class i.e wordcount:
         
         public class wordcount {     
         public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key, Text value, Context context)
           throws IOException,InterruptedException {

String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
value.set(tokenizer.nextToken());
context.write(value, new IntWritable(1));}  
                            }
                     }
Mapper class takes 4 arguments i.e <Input Key, Input Value , Output Key, Output Value>.

Context is used like System.out.println to print or write the value hence we pass Context in the            map function.
We take a variable named line of String type to convert the value into string.
StringTokenizer is used to extract the words on the basis of spaces. For doing so we create a                object named Tokenizer and pass variable  "line".We iterate this using while loop till their are              no more tokens. It works as a Splitter.
Finally we assign value '1' to each word using context.write here 'value ' contains actual words.

3.  Reducer class

public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterable<IntWritable>values,Context context)
throws IOException,InterruptedException {
int sum=0;
for(IntWritable x:values)
{
sum+=x.get();
}
context.write(key, new IntWritable(sum));
}
} 

Reduce class takes 4 arguments i.e <Input Key, Input Value , Output Key, Output Value>. The Input Key here is the output given by map function.
We initialize sum as 0 and run for loop where we take all the values in x . The value of x gets added to sum. This for loop will run until the end of values. 
Finally we write the key and corresponding new sum . example : Bear,2.

4. Main Class :
          
public static void main(String[]args) throws Exception {
     
    Configuration conf= new Configuration();
     
    org.apache.hadoop.mapreduce.Job job = Job.getInstance(conf,"wordcount");
     
    job.setJarByClass(wordcount.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
     
    job.setOutputKeyClass(Text.class);
     job.setMapOutputValueClass(IntWritable.class);
    
     job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
     
    Path outputPath = new Path(args[1]);
     
    FileInputFormat.addInputPath(job,new Path(args[0]));
    FileOutputFormat.setOutputPath(job,new Path(args[1]));
     
    outputPath.getFileSystem(conf).delete(outputPath,true);
     
    System.exit(job.waitForCompletion(true)? 0:1);
     }

Create a object conf of type Configuration by doing this we can define the wordcount configuration or any hadoop example.
To run the wordcount we use job and pass the main class name with conf.
Now we set Jar by class and pass our all classes.
Further we set Output key class and Output Value class which was Text and IntWritable type.
Finally we set input path which we are going to pass from command line and will start from args[0].
Similarly we do for output path to be passed from command line.
example : to run the code we will give below command
                   hadoop jar  wordcount.jar /input /output
here /input is Path(args[0]) and /output is Path(args[1]).

Save the program and now we are going to export this as ".jar" file. Right click on wordcount and click on export.


Then go in java and select jar finally click next.


Select the two classes and give destination of jar file (will recommend to giv desktop path ) click next 2 times.



On final page dont forget to select main class i.e click on browse beside main class blank and select class and then press finish.


Now your jar file be their on Desktop.

One last thing to do before running our program create a blank text document and type the inputs :
You can type anything you want, following image is a example of it.

     
Save the document by name "tinput".
  • Finally its time to run our program for generated input.

1. Open Terminal.

2. Log in as hduser
       su - hduser

3.   Start dfs and yarn by:
       start-dfs.sh
       start-yarn.sh


4. Make directory: 
       sudo adduser hduser sudo
       sudo mkdir ~/tinput


We will now copy our input file i.e "tinput directory which we created  on hdfs:
         sudo cp /home/tanmay/Desktop/tinput ~/tinput 
    
 Now we will put this directory on hdfs :
   /usr/local/hadoop/bin/hdfs dfs -put ~/tinput /


5. To run our program for input file "wordcount.doc" generalize command is:
        hadoop jar Path/.jar /inputfile /outputfile

    Hence for our program we will use,
       hadoop jar /home/tanmay/Desktop/wordcount.jar /tinput /deerbear


First Mapper will run and then the reducer and we will get required output.

6. To view our output written on HDFS :
      Open Browser and type in url http://localhost:50070/dfshealth.html#tab-overview

       Go in utilities and click Browse the file system. We have given deerbear as output file name ,select that and download part-r-0000.
       



We get our required output as shown in image.

  

Full code is uploaded on the following github link.
  





  
  












Comments