diff --git a/src/StubDriver.java b/src/StubDriver.java
new file mode 100644
index 0000000000000000000000000000000000000000..bb3c50326b8dc0662079a8d822a2e62f4e3fcc6a
--- /dev/null
+++ b/src/StubDriver.java
@@ -0,0 +1,45 @@
+import org.apache.hadoop.mapreduce.Job;
+
+public class StubDriver {
+
+  public static void main(String[] args) throws Exception {
+
+    /*
+     * Validate that two arguments were passed from the command line.
+     */
+    if (args.length != 2) {
+      System.out.printf("Usage: StubDriver <input dir> <output dir>\n");
+      System.exit(-1);
+    }
+
+    /*
+     * Instantiate a Job object for your job's configuration. 
+     */
+    Job job = new Job();
+    
+    /*
+     * Specify the jar file that contains your driver, mapper, and reducer.
+     * Hadoop will transfer this jar file to nodes in your cluster running 
+     * mapper and reducer tasks.
+     */
+    job.setJarByClass(StubDriver.class);
+    
+    /*
+     * Specify an easily-decipherable name for the job.
+     * This job name will appear in reports and logs.
+     */
+    job.setJobName("Stub Driver");
+
+    /*
+     * TODO implement
+     */
+    
+    /*
+     * Start the MapReduce job and wait for it to finish.
+     * If it finishes successfully, return 0. If not, return 1.
+     */
+    boolean success = job.waitForCompletion(true);
+    System.exit(success ? 0 : 1);
+  }
+}
+
diff --git a/src/StubMapper.java b/src/StubMapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..e77872e2bb58eba1365abe3aa49cae69625e6044
--- /dev/null
+++ b/src/StubMapper.java
@@ -0,0 +1,19 @@
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class StubMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
+
+  @Override
+  public void map(LongWritable key, Text value, Context context)
+      throws IOException, InterruptedException {
+
+    /*
+     * TODO implement
+     */
+
+  }
+}
diff --git a/src/StubReducer.java b/src/StubReducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..13515db746a58773ed1fc8d4a1137d41f6c7638b
--- /dev/null
+++ b/src/StubReducer.java
@@ -0,0 +1,19 @@
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class StubReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
+
+  @Override
+  public void reduce(Text key, Iterable<IntWritable> values, Context context)
+      throws IOException, InterruptedException {
+
+    /*
+     * TODO implement
+     */
+
+  }
+}
\ No newline at end of file
diff --git a/src/StubTest.java b/src/StubTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..8b6dab6d1692cd1157e5baf5c0fdd0701c476aea
--- /dev/null
+++ b/src/StubTest.java
@@ -0,0 +1,94 @@
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StubTest {
+
+  /*
+   * Declare harnesses that let you test a mapper, a reducer, and
+   * a mapper and a reducer working together.
+   */
+  MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
+  ReduceDriver<Text, IntWritable, Text, DoubleWritable> reduceDriver;
+  MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable> mapReduceDriver;
+
+  /*
+   * Set up the test. This method will be called before every test.
+   */
+  @Before
+  public void setUp() {
+
+    /*
+     * Set up the mapper test harness.
+     */
+    StubMapper mapper = new StubMapper();
+    mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
+    mapDriver.setMapper(mapper);
+
+    /*
+     * Set up the reducer test harness.
+     */
+    StubReducer reducer = new StubReducer();
+    reduceDriver = new ReduceDriver<Text, IntWritable, Text, DoubleWritable>();
+    reduceDriver.setReducer(reducer);
+
+    /*
+     * Set up the mapper/reducer test harness.
+     */
+    mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable>();
+    mapReduceDriver.setMapper(mapper);
+    mapReduceDriver.setReducer(reducer);
+  }
+
+  /*
+   * Test the mapper.
+   */
+  @Test
+  public void testMapper() {
+
+    /*
+     * For this test, the mapper's input will be "1 cat cat dog" 
+     * TODO: implement
+     */
+    fail("Please implement test.");
+
+  }
+
+  /*
+   * Test the reducer.
+   */
+  @Test
+  public void testReducer() {
+
+    /*
+     * For this test, the reducer's input will be "cat 1 1".
+     * The expected output is "cat 2".
+     * TODO: implement
+     */
+    fail("Please implement test.");
+
+  }
+
+
+  /*
+   * Test the mapper and reducer working together.
+   */
+  @Test
+  public void testMapReduce() {
+
+    /*
+     * For this test, the mapper's input will be "1 cat cat dog" 
+     * The expected output (from the reducer) is "cat 2", "dog 1". 
+     * TODO: implement
+     */
+    fail("Please implement test.");
+
+  }
+}
diff --git a/src/similarity/LongPair.java b/src/similarity/LongPair.java
new file mode 100644
index 0000000000000000000000000000000000000000..74e59af283b38c444d45d6944402614e46dacc2b
--- /dev/null
+++ b/src/similarity/LongPair.java
@@ -0,0 +1,119 @@
+package similarity;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+class LongPair implements WritableComparable<LongPair> {
+
+    private LongWritable first;
+    private LongWritable second;
+    
+    public LongPair() {
+        this.set(new LongWritable(0), new LongWritable(0));
+    }
+    
+    public LongPair(LongWritable first, LongWritable second) {
+        this.set(first, second);
+    }
+    
+    public LongPair(Long first, Long second) {
+        this.set(new LongWritable(first), new LongWritable(second));
+    }
+
+    public LongPair(String first, String second) {
+        this.set(new LongWritable( new Long(first)), new LongWritable( new Long(second)));
+    }
+
+    public LongWritable getFirst() {
+        return first;
+    }
+
+    public LongWritable getSecond() {
+        return second;
+    }
+
+    public void set(LongWritable first, LongWritable second) {
+        this.first = first;
+        this.second = second;
+    }    
+    
+    public void setFirst(LongWritable first){
+        this.first = first;
+    }
+    
+    public void setFirst(Long first){
+        this.first = new LongWritable(first);
+    }
+    
+    public void setSecond(LongWritable second){
+        this.second = second;
+    }
+    
+    public void setSecond(Long second){
+        this.second = new LongWritable(second);
+    }
+    
+    public long getSum(){
+    	return this.first.get()+this.second.get();
+    }
+    
+    public long getDiff(){
+    	return Math.abs(this.first.get()-this.second.get());
+    }
+    
+    public LongPair inverse(){
+    	return new LongPair(second, first);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof LongPair) {
+            LongPair p1 = (LongPair) o;
+            boolean b1 = first.equals(p1.first) && second.equals(p1.second);
+            LongPair p2 = p1.inverse();
+            boolean b2 = first.equals(p2.first) && second.equals(p2.second);
+            return b1 || b2;
+        }
+        return false;
+    }
+    
+    @Override
+    public int compareTo(LongPair other) {
+    	long cmp = this.getSum()-other.getSum();
+    	long cmp_alter = this.getDiff() - other.getDiff();
+    	if(cmp<0){
+    		return 1;
+    	}else if(cmp>0){
+    		return -1;
+    	}else if(cmp_alter<0){
+    		return 1;
+    	}else if(cmp_alter>0){
+    		return -1;
+    	}
+    	return 0;
+    }
+    
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        first.readFields(in);
+        second.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        first.write(out);
+        second.write(out);
+    }
+
+    @Override
+    public String toString() {
+        return first.toString() + "," + second.toString();
+    }
+
+}
\ No newline at end of file
diff --git a/src/similarity/NaiveApproach.java b/src/similarity/NaiveApproach.java
new file mode 100644
index 0000000000000000000000000000000000000000..32dfaf4b77923466df88459dceb2b409772193ed
--- /dev/null
+++ b/src/similarity/NaiveApproach.java
@@ -0,0 +1,181 @@
+package similarity;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class NaiveApproach extends Configured implements Tool {
+   private enum CompCounter{
+      NUM
+   }
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Configuration conf = new Configuration();
+      Long line = loadFileLength(args[2]);
+      conf.set("InputFileLength", line.toString());
+      conf.set("mapreduce.map.output.compress", "true");
+      conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(500));
+      int res = ToolRunner.run(conf, new NaiveApproach(), args);
+      
+      System.exit(res);
+   }
+   
+   public static Long loadFileLength(String filename) throws IOException{
+ 	  Path pt=new Path(filename);//Location of file in HDFS
+      FileSystem fs = FileSystem.get(new Configuration());
+ 	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+ 	  
+ 	  Long l = Long.parseLong(br.readLine());
+       
+      br.close();
+       
+      return l;
+   }
+
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "NaiveApproach");
+      job.setJarByClass(NaiveApproach.class);
+
+      job.setMapOutputKeyClass(LongPair.class);
+	  job.setMapOutputValueClass(Text.class);
+	  job.setInputFormatClass(KeyValueTextInputFormat.class);
+	  job.setOutputFormatClass(TextOutputFormat.class);
+	  
+      job.setMapperClass(Map.class);
+      job.setReducerClass(Reduce.class);
+
+      job.setNumReduceTasks(1);
+      
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.waitForCompletion(true);
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<Text, Text, LongPair, Text> {
+      private Text words = new Text();
+      private LongPair keyPair = new LongPair();
+      private Long fileLength;
+      
+      public void setup(Context context) throws IOException,InterruptedException { 
+    	  super.setup(context);
+    	  this.fileLength = Long.parseLong(context.getConfiguration().get("InputFileLength"));
+      }
+      
+      @Override
+      public void map(Text key, Text value, Context context)
+              throws IOException, InterruptedException {
+    	  
+    	 if (value.toString().isEmpty()){
+    		 return;
+    	 }
+    	 
+    	 String keyOut = key.toString();
+
+    	 if (!StringUtils.isNumeric(keyOut)){
+    		 System.out.println("WARN: Bas input id");
+    		 System.out.println(keyOut);
+    		 return;
+    	 }
+    		 
+    	 this.words = value;
+    	 this.keyPair.setFirst(Long.parseLong(keyOut));
+    	 
+    	 long counter = 1;
+    	 
+    	 while(counter<=this.fileLength){
+    		 this.keyPair.setSecond(counter);
+    		 
+    		 if (this.keyPair.getDiff()==0){
+    			 counter += 1;
+    			 continue;
+    		 }
+    		 
+    		 context.write(keyPair,words);
+    		 counter += 1;
+    	 }
+      }
+   }
+    public static class Reduce extends Reducer<LongPair, Text, Text, Text> {
+	    
+	   private Set<String> text2Set(String s){
+		   return new HashSet<String>(Arrays.asList(s.split(",")));
+	   }
+	   
+	   public double similarity(String t1, String t2) {
+
+		   Set<String> s1 = text2Set(t1);
+		   Set<String> s2 = text2Set(t2);
+		   
+		   Set<String> union = new HashSet<String>(s1);
+		   union.addAll(s2);
+		   
+		   Set<String> intersection = new HashSet<String>(s1);
+		   intersection.retainAll(s2);
+		   
+		   if (union.size()==0){
+			   return 0;
+		   }
+		   
+		   return intersection.size()/union.size();
+	    }
+	   
+	  @Override
+      public void reduce(LongPair key, Iterable<Text> values, Context context)
+              throws IOException, InterruptedException {
+    	 int counter = 0;
+		 String[] strings =  new String[2];
+    	 for (Text v : values){
+    		 strings[counter] = v.toString();
+    		 counter += 1;
+    	 }
+    	     	 
+    	 if (counter!=2){ // document id not in input file 
+    		return;
+    	 }
+    	 
+    	 double s = similarity(strings[0], strings[1]);
+    	 context.getCounter(CompCounter.NUM).increment(1);
+    	 
+    	 if (s>=0.8){
+             context.write(new Text(key.toString()), new Text(String.valueOf(s))); 
+    	 }
+      }
+   }
+}
+
diff --git a/src/similarity/PrefilteringApproach.java b/src/similarity/PrefilteringApproach.java
new file mode 100644
index 0000000000000000000000000000000000000000..baf42c52cb958f232dd460b18a792c65a74604ac
--- /dev/null
+++ b/src/similarity/PrefilteringApproach.java
@@ -0,0 +1,183 @@
+package similarity;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.swing.text.Document;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class PrefilteringApproach extends Configured implements Tool {
+   private enum CompCounter{
+      NUM
+   }
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.map.output.compress", "true");
+      conf.set("Document",args[0]);
+//      conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(500));
+      int res = ToolRunner.run(conf, new PrefilteringApproach(), args);
+      
+      System.exit(res);
+   }
+
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "PrefilteringApproach");
+      job.setJarByClass(PrefilteringApproach.class);
+
+      job.setMapOutputKeyClass(Text.class);
+	  job.setMapOutputValueClass(Text.class);
+      job.setInputFormatClass(KeyValueTextInputFormat.class);
+	  job.setOutputFormatClass(TextOutputFormat.class);
+	  
+      job.setMapperClass(Map.class);
+      job.setReducerClass(Reduce.class);
+
+      job.setNumReduceTasks(1);
+      
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.waitForCompletion(true);
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<Text, Text, Text, Text> {
+      private Text word = new Text();
+
+      
+      public void setup(Context context) throws IOException,InterruptedException { 
+    	  super.setup(context);
+      }
+      
+      @Override
+      public void map(Text key, Text value, Context context)
+              throws IOException, InterruptedException {
+    	  
+    	 if (value.toString().isEmpty()){
+    		 return;
+    	 }
+    	 
+    	 String[] document = value.toString().split(",");
+    	 int window = document.length - (int)Math.floor(document.length*0.8);
+//    	 System.out.println(document[0]);
+    	 int counter = 0;
+    	 
+    	 while(counter<window){
+    		 word.set(document[counter]);
+    		 context.write(word,key);
+    		 counter += 1;
+    	 }
+      }
+   }
+    public static class Reduce extends Reducer<Text, Text, Text, Text> {
+    	public HashMap<Text,Text> document = new HashMap<Text,Text>(); 
+	    
+	   private Set<String> text2Set(String s){
+		   return new HashSet<String>(Arrays.asList(s.split(",")));
+	   }
+	   
+	   @Override
+       public void setup(Context context) throws IOException,InterruptedException { 
+     	 super.setup(context);
+     	 String filename = context.getConfiguration().get("Document");
+     	 loadDocument(filename);
+       }
+	   
+	   public void loadDocument(String filename) throws IOException{
+		 	  Path pt=new Path(filename);//Location of file in HDFS
+		      FileSystem fs = FileSystem.get(new Configuration());
+		 	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+		 	  
+	    	  String sCurrentLine;
+    	      
+	          while ((sCurrentLine = br.readLine()) != null) {
+	        	  String[] d = sCurrentLine.split("[\\s]+");
+	        	  if (d.length != 2){
+	        		  System.out.println("WARNING: WRONG INPUT FORMAT");
+	        	  }
+	        	  document.put(new Text(d[0]), new Text(d[1]));
+	    	  }
+	   }
+	   
+	   public double similarity(String t1, String t2) {
+
+		   Set<String> s1 = text2Set(t1);
+		   Set<String> s2 = text2Set(t2);
+		   
+		   Set<String> union = new HashSet<String>(s1);
+		   union.addAll(s2);
+		   
+		   Set<String> intersection = new HashSet<String>(s1);
+		   intersection.retainAll(s2);
+		   
+		   if (union.size()==0){
+			   return 0;
+		   }
+		   
+		   return intersection.size()/union.size();
+	    }
+	   
+	  @Override
+      public void reduce(Text key, Iterable<Text> values, Context context)
+              throws IOException, InterruptedException {
+		 List<Text> val = new ArrayList<Text>();
+		 for (Text v :values){
+			 val.add(new Text(v));
+		 }
+    	 for (Text v1 : val){
+    		 for (Text v2: val){
+    			 if (v1.equals(v2)){
+    				 continue;
+    			 }
+    			 
+    			 String s1 = this.document.get(v1).toString();
+    			 String s2 = this.document.get(v2).toString();
+
+    	    	 context.getCounter(CompCounter.NUM).increment(1);
+    			 Double s = similarity(s1, s2);
+    			 
+    			 if (s>=0.8){
+    				 context.write(new Text(v1.toString()+','+v2.toString()), new Text(String.valueOf(s)));
+    			 }
+    		 }
+    	 }
+      }
+   }
+}
+
diff --git a/src/similarity/WordCount.java b/src/similarity/WordCount.java
new file mode 100644
index 0000000000000000000000000000000000000000..f0ad1f1a1bf6e6850ba345c493566ea7977d6ff1
--- /dev/null
+++ b/src/similarity/WordCount.java
@@ -0,0 +1,85 @@
+package similarity;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class WordCount extends Configured implements Tool {
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.map.output.compress", "true");
+      int res = ToolRunner.run(conf, new WordCount(), args);
+      
+      System.exit(res);
+   }
+
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "WordCount");
+      job.setJarByClass(WordCount.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(IntWritable.class);
+
+      job.setMapperClass(Map.class);
+      job.setCombinerClass(Reduce.class);
+      job.setReducerClass(Reduce.class);
+
+      job.setNumReduceTasks(1);
+      
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.waitForCompletion(true);
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
+      private final static IntWritable ONE = new IntWritable(1);
+      private Text word = new Text();
+
+      @Override
+      public void map(LongWritable key, Text value, Context context)
+              throws IOException, InterruptedException {
+         for (String token: value.toString().split("\\s+|-{2,}+")) {
+        	 word.set(token.replaceAll("[^A-Za-z0-9]+", ""));
+            context.write(word, ONE);
+         }
+      }
+   }
+
+   public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
+      @Override
+      public void reduce(Text key, Iterable<IntWritable> values, Context context)
+              throws IOException, InterruptedException {
+         int sum = 0;
+         if (key.toString().isEmpty()) {
+        	 return ;
+         }
+         for (IntWritable val : values) {
+            sum += val.get();
+         }
+        	context.write(key, new IntWritable(sum));
+      }
+   }
+}
+
diff --git a/src/similarity/WordSort.java b/src/similarity/WordSort.java
new file mode 100644
index 0000000000000000000000000000000000000000..291bb3e26347d699dee321d0cede3cd8b5e3650b
--- /dev/null
+++ b/src/similarity/WordSort.java
@@ -0,0 +1,170 @@
+package similarity;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class WordSort extends Configured implements Tool {
+   private enum DocLineCounter{
+	   NUM
+   }
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Configuration conf = new Configuration();
+      conf.set("StopWordsFileName", args[2]);
+      conf.set("WordFreqFileName", args[3]);
+      conf.set("mapreduce.map.output.compress", "true");
+      int res = ToolRunner.run(conf, new WordSort(), args);
+      
+      System.exit(res);
+   }
+
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "WordSort");
+      job.setJarByClass(WordSort.class);
+      job.setOutputKeyClass(LongWritable.class);
+      job.setOutputValueClass(Text.class);
+
+      job.setMapperClass(Map.class);
+      job.setReducerClass(Reduce.class);
+
+      job.setNumReduceTasks(1);
+      
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.waitForCompletion(true);
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<LongWritable, Text, LongWritable, Text> {
+      private Text words = new Text();
+      public List<Text> stopWords = new ArrayList<Text>(); 
+      public HashMap<String,Integer> wordFreq = new HashMap<String,Integer>(); 
+
+      public void loadWordFreq(String filename) throws IOException{
+    	  Path pt=new Path(filename);//Location of file in HDFS
+          FileSystem fs = FileSystem.get(new Configuration());
+    	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+    		  
+    	  String sCurrentLine;
+    	      
+          while ((sCurrentLine = br.readLine()) != null) {
+        	  String[] wordfreq = sCurrentLine.split("[\\s]+");
+        	  if (wordfreq.length != 2){
+        		  System.out.println("WARNING: WRONG INPUT FORMAT");
+        	  }
+        	  this.wordFreq.put(wordfreq[0], new Integer(wordfreq[1]));
+    	  }
+          
+          br.close();
+          
+          return;
+       }
+
+      public void loadStopWords(String filename) throws IOException{
+    	  Path pt=new Path(filename);//Location of file in HDFS
+          FileSystem fs = FileSystem.get(new Configuration());
+    	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+    		  
+    	  String sCurrentLine;
+    	      
+          while ((sCurrentLine = br.readLine()) != null) {
+        	  String stopWord = sCurrentLine.replaceAll("[^A-Za-z0-9]+", "");
+        	  Text t = new Text();
+        	  t.set(stopWord);
+        	  this.stopWords.add(t);
+    	  }
+          
+          br.close();
+          
+          return;
+       }
+      
+      public void setup(Context context) throws IOException,InterruptedException { 
+    	 super.setup(context);
+    	 String filename = context.getConfiguration().get("StopWordsFileName");
+    	 loadStopWords(filename);
+    	 loadWordFreq(context.getConfiguration().get("WordFreqFileName"));
+    	 
+      }
+      
+      @Override
+      public void map(LongWritable key, Text value, Context context)
+              throws IOException, InterruptedException {
+    	 Counter counter = context.getCounter(DocLineCounter.NUM);
+    	 counter.increment(1);
+    	 Set<String> wordSet = new HashSet<String>();
+    	 if (value.toString().isEmpty()){
+    		 return;
+    	 }
+         for (String token: value.toString().split("\\s+|-{2,}+")) {
+        	 String s = token.replaceAll("[^A-Za-z0-9]+", "");
+        	 if (stopWords.contains(s)||(s.isEmpty())){
+        		 continue;
+        	 }else if(!wordFreq.containsKey(s)){
+        		 System.out.println("WARN: HASHTABLE DON'T HAVE WORD:");
+        		 System.out.println(s);	 
+        	 }
+        	 wordSet.add(s);
+         }
+         List<String> wordList = new ArrayList<String>(wordSet);
+         
+         Collections.sort(wordList, new Comparator<String>() {
+        	 @Override
+        	 public int compare(String s1, String s2)
+        	 {
+        		 return  wordFreq.get(s1).compareTo(wordFreq.get(s2));
+        	 }
+         });
+         words.set(StringUtils.join(wordList,","));
+         context.write(new LongWritable(counter.getValue()), words);
+      }
+   }
+
+   public static class Reduce extends Reducer<LongWritable, Text, LongWritable, Text> {
+      @Override
+      public void reduce(LongWritable key, Iterable<Text> values, Context context)
+              throws IOException, InterruptedException {
+    	 Text words = new Text();
+    	 for (Text v : values){
+    		 words.set(v);
+    	 }
+         context.write(key, words);
+      }
+   }
+}
+
diff --git a/src/wordcount/InvertTable.java b/src/wordcount/InvertTable.java
new file mode 100644
index 0000000000000000000000000000000000000000..27c01eb812276ad7f5e5206745e1d100fccc06c0
--- /dev/null
+++ b/src/wordcount/InvertTable.java
@@ -0,0 +1,199 @@
+package wordcount;
+
+import java.io.BufferedReader;
+import java.io.DataOutput;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.security.KeyStore.LoadStoreParameter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class InvertTable extends Configured implements Tool {	
+   
+   private enum ONLY_WORD_COUNTER {
+		  PG100,
+		  PG31100,
+		  PG3200
+		 }
+	
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      
+      Configuration conf = new Configuration();
+      conf.set("StopWordsFileName", args[2]);
+//      conf.set("mapreduce.map.output.compress", "true");
+      int res = ToolRunner.run(conf, new InvertTable(), args);
+	  
+      
+      System.exit(res);
+   }
+
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "InvertTable");
+      job.setJarByClass(InvertTable.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Text[].class);
+
+      job.setMapperClass(Map.class);
+      job.setReducerClass(Reduce.class);
+      job.setCombinerClass(Reduce.class);
+
+      job.setNumReduceTasks(10);
+      
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+
+      job.setMapOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Text.class);
+      
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.waitForCompletion(true);
+      
+      Integer i;
+      PrintWriter writer = new PrintWriter(args[3], "UTF-8");
+      i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG100).getValue();
+      writer.println("PG100: "+i.toString()+"\n");
+      i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG31100).getValue();
+      writer.println("PG31100: "+i.toString()+"\n");
+      i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG3200).getValue();
+      writer.println("PG3200: "+i.toString()+"\n");
+      writer.close();
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<LongWritable, Text, Text, Text> {
+      private Text word = new Text();
+      private Text filename = new Text();
+      public List<Text> stopWords = new ArrayList<Text>(); 
+      
+      public void loadStopWords(String filename) throws IOException{
+    	  Path pt=new Path(filename);//Location of file in HDFS
+          FileSystem fs = FileSystem.get(new Configuration());
+    	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+    		  
+    	  String sCurrentLine;
+    	      
+          while ((sCurrentLine = br.readLine()) != null) {
+        	  String stopWord = sCurrentLine.replaceAll("[^A-Za-z0-9]+", "");
+        	  Text t = new Text();
+        	  t.set(stopWord);
+        	  stopWords.add(t);
+    	  }
+          
+          br.close();
+          
+          return;
+       }
+      
+      public void setup(Context context) throws IOException,InterruptedException { 
+    	 super.setup(context);
+    	 String filename = context.getConfiguration().get("StopWordsFileName");
+    	 loadStopWords(filename);
+      }
+
+      @Override
+      public void map(LongWritable key, Text value, Context context)
+              throws IOException, InterruptedException {
+    	 FileSplit fileSplit = (FileSplit)context.getInputSplit();
+    	 String name = fileSplit.getPath().getName();
+    	 filename.set(name);
+         for (String token: value.toString().split("\\s+|-{2,}+")) {
+            word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase());
+            if (!stopWords.contains(word)){
+               context.write(word, filename);            	
+            }
+         }
+      }
+   }
+
+   public static class Reduce extends Reducer<Text, Text, Text, TextArray> {
+      @Override
+      public void reduce(Text key, Iterable<Text> values, Context context)
+              throws IOException, InterruptedException {
+         ArrayList<Text> res = new ArrayList<Text>();
+         for (Text val : values) {
+        	if (!res.contains(val)){
+        		res.add(new Text(val));
+        	}
+         }
+         if (res.size()==1){
+        	 String filename = res.get(0).toString();
+        	 switch (filename){
+	        	 case "pg100.txt":
+	        		 context.getCounter(ONLY_WORD_COUNTER.PG100).increment(1);
+	        		 break;
+	        	 case "pg31100.txt":
+	        		 context.getCounter(ONLY_WORD_COUNTER.PG31100).increment(1);
+	        		 break;
+	        	 case "pg3200.txt":
+	    		     context.getCounter(ONLY_WORD_COUNTER.PG3200).increment(1);
+	    		     break;
+	        	 }
+        	 }
+         Text[] arr = new Text[res.size()];
+         arr = res.toArray(arr);
+         TextArray output = new TextArray(arr);
+         output.set(arr);
+         context.write(key, output);
+      }
+   }
+   
+   public static class TextArray extends ArrayWritable {
+	    public TextArray(Text[] arr) {
+	        super(Text.class);
+	    }
+
+	    @Override
+	    public Text[] get() {
+	        return (Text[]) super.get();
+	    }
+
+	    @Override
+	    public void write(DataOutput arg0) throws IOException {
+	        for(Text data : get()){
+	            data.write(arg0);
+	        }
+	    }
+	    
+	    @Override
+	    public String toString() {
+	        Text[] values = get();
+	        String output = new String();
+	        for (Text t: values){
+		        output += t.toString();
+		        output += ",";
+	        }
+	        output = output.substring(0, output.length()-1);
+	        return output;
+	    }
+	}   
+}
+
+
diff --git a/src/wordcount/InvertTableExtension.java b/src/wordcount/InvertTableExtension.java
new file mode 100644
index 0000000000000000000000000000000000000000..ac305ee9cc555468c88acf2f9088179d17923cc0
--- /dev/null
+++ b/src/wordcount/InvertTableExtension.java
@@ -0,0 +1,177 @@
+package wordcount;
+
+import java.io.BufferedReader;
+import java.io.DataOutput;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.security.KeyStore.LoadStoreParameter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class InvertTableExtension extends Configured implements Tool {	
+   
+	
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      
+      Configuration conf = new Configuration();
+      conf.set("StopWordsFileName", args[2]);
+//      conf.set("mapreduce.map.output.compress", "true");
+      int res = ToolRunner.run(conf, new InvertTableExtension(), args);
+	  
+      
+      System.exit(res);
+   }
+
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "InvertTableExtension");
+      job.setJarByClass(InvertTableExtension.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Text.class);
+
+      job.setMapperClass(Map.class);
+      job.setReducerClass(Reduce.class);
+      job.setCombinerClass(Reduce.class);
+
+      job.setNumReduceTasks(10);
+      
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+
+      job.setMapOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Text.class);
+      
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.waitForCompletion(true);
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<LongWritable, Text, Text, Text> {
+      private Text word = new Text();
+      private Text filename = new Text();
+      public List<Text> stopWords = new ArrayList<Text>(); 
+      
+      public void loadStopWords(String filename) throws IOException{
+    	  Path pt=new Path(filename);//Location of file in HDFS
+          FileSystem fs = FileSystem.get(new Configuration());
+    	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+    		  
+    	  String sCurrentLine;
+    	      
+          while ((sCurrentLine = br.readLine()) != null) {
+        	  String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", "");
+        	  Text t = new Text();
+        	  t.set(stopWord);
+        	  stopWords.add(t);
+    	  }
+          
+          br.close();
+          
+          return;
+       }
+      
+      public void setup(Context context) throws IOException,InterruptedException { 
+    	 super.setup(context);
+    	 String filename = context.getConfiguration().get("StopWordsFileName");
+    	 loadStopWords(filename);
+      }
+
+      @Override
+      public void map(LongWritable key, Text value, Context context)
+              throws IOException, InterruptedException {
+    	 FileSplit fileSplit = (FileSplit)context.getInputSplit();
+    	 String name = fileSplit.getPath().getName();
+    	 filename.set(name);
+         for (String token: value.toString().split("\\s+|-{2,}+")) {
+            word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase());
+            if (!stopWords.contains(word)){
+               context.write(word, filename);            	
+            }
+         }
+      }
+   }
+
+   public static class Reduce extends Reducer<Text, Text, Text, Text> {
+      @Override
+      public void reduce(Text key, Iterable<Text> values, Context context)
+              throws IOException, InterruptedException {
+         List<Text> res = new ArrayList<Text>();
+         for (Text val : values) {
+        	res.add(new Text(val));
+         }
+         Set<Text> unique = new HashSet<Text>(res);
+         String output = new String();
+         for (Text u : unique){
+        	 output += u.toString()+'#'+Collections.frequency(res, u);
+        	 output += ',';
+         }
+         output = output.substring(0, output.length()-1);
+         Text value = new Text();
+         value.set(output);
+         context.write(key, value);
+      }
+   }   
+   public static class TextArray extends ArrayWritable {
+	    public TextArray(Text[] arr) {
+	        super(Text.class);
+	    }
+
+	    @Override
+	    public Text[] get() {
+	        return (Text[]) super.get();
+	    }
+
+	    @Override
+	    public void write(DataOutput arg0) throws IOException {
+	        for(Text data : get()){
+	            data.write(arg0);
+	        }
+	    }
+	    
+	    @Override
+	    public String toString() {
+	        Text[] values = get();
+	        String output = new String();
+	        for (Text t: values){
+		        output += t.toString();
+		        output += ",";
+	        }
+	        output = output.substring(0, output.length()-1);
+	        return output;
+	    }
+   }
+}
+
+
+
diff --git a/src/wordcount/WordCount.java b/src/wordcount/WordCount.java
new file mode 100644
index 0000000000000000000000000000000000000000..92f4223041353640794bd4334a91047ef6f8a4da
--- /dev/null
+++ b/src/wordcount/WordCount.java
@@ -0,0 +1,84 @@
+package wordcount;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class WordCount extends Configured implements Tool {
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.map.output.compress", "true");
+      int res = ToolRunner.run(conf, new WordCount(), args);
+      
+      System.exit(res);
+   }
+
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "WordCount");
+      job.setJarByClass(WordCount.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(IntWritable.class);
+
+      job.setMapperClass(Map.class);
+      job.setCombinerClass(Reduce.class);
+      job.setReducerClass(Reduce.class);
+
+//      job.setNumReduceTasks(50);
+      
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+
+      job.waitForCompletion(true);
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
+      private final static IntWritable ONE = new IntWritable(1);
+      private Text word = new Text();
+
+      @Override
+      public void map(LongWritable key, Text value, Context context)
+              throws IOException, InterruptedException {
+         for (String token: value.toString().split("\\s+|-{2,}+")) {
+        	 word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase());
+            context.write(word, ONE);
+         }
+      }
+   }
+
+   public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
+      @Override
+      public void reduce(Text key, Iterable<IntWritable> values, Context context)
+              throws IOException, InterruptedException {
+         int sum = 0;
+         for (IntWritable val : values) {
+            sum += val.get();
+         }
+         if (sum>4000){
+        	context.write(key, new IntWritable(sum));
+         }
+      }
+   }
+}
+