diff --git a/output/Preprocess.java b/output/Preprocess.java
new file mode 100644
index 0000000000000000000000000000000000000000..de85b0bee7c246f99b48bc59d22fc14b4c2a6578
--- /dev/null
+++ b/output/Preprocess.java
@@ -0,0 +1,172 @@
+package ecp.BDPA.assignment2;
+import org.apache.commons.lang.StringUtils;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+public class Preprocess extends Configured implements Tool {
+   private enum FinalLineNumCounter{
+	   Final_NUM
+   }
+   private enum LineNumCounter{
+	   NUM
+   }
+   
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Configuration conf = new Configuration();
+      conf.set("StopWordsFileName", args[2]);
+      conf.set("WordFreqFileName", args[3]);
+      conf.set("mapreduce.map.output.compress", "true");
+      int res = ToolRunner.run(conf, new Preprocess(), args);
+      
+      System.exit(res);
+   }
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "Preprocess");
+      job.setJarByClass(Preprocess.class);
+      job.setOutputKeyClass(LongWritable.class);
+      job.setOutputValueClass(Text.class);
+      job.setMapperClass(Map.class);
+      job.setReducerClass(Reduce.class);
+      job.setNumReduceTasks(1);
+      
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+      job.waitForCompletion(true);
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<LongWritable, Text, LongWritable, Text> {
+	  private Text word = new Text();
+      private Text words = new Text();
+      public List<Text> stopWords = new ArrayList<Text>(); 
+      public HashMap<String,Integer> wordFreq = new HashMap<String,Integer>(); 
+      public void loadWordFreq(String filename) throws IOException{
+    	  Path pt=new Path(filename);//Location of file in HDFS
+          FileSystem fs = FileSystem.get(new Configuration());
+    	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+    		  
+    	  String sCurrentLine;
+    	      
+          while ((sCurrentLine = br.readLine()) != null) {
+        	  String[] wordfreq = sCurrentLine.split("[\\s]+");
+        	  if (wordfreq.length != 2){
+        		  System.out.println("WARNING: WRONG INPUT FORMAT");
+        	  }
+        	  this.wordFreq.put(wordfreq[0], new Integer(wordfreq[1]));
+    	  }
+          
+          br.close();
+          
+          return;
+       }
+      public void loadStopWords(String filename) throws IOException{
+    	  Path pt=new Path(filename);//Location of file in HDFS
+          FileSystem fs = FileSystem.get(new Configuration());
+    	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+    		  
+    	  String sCurrentLine;
+    	      
+          while ((sCurrentLine = br.readLine()) != null) {
+        	  String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", "");
+        	  Text t = new Text();
+        	  t.set(stopWord);
+        	  this.stopWords.add(t);
+    	  }
+          
+          br.close();
+          
+          return;
+       }
+      
+      public void setup(Context context) throws IOException,InterruptedException { 
+    	 super.setup(context);
+    	 String filename = context.getConfiguration().get("StopWordsFileName");
+    	 loadStopWords(filename);
+    	 loadWordFreq(context.getConfiguration().get("WordFreqFileName"));
+    	 
+      }
+      
+      @Override
+      public void map(LongWritable key, Text value, Context context)
+              throws IOException, InterruptedException {
+    	 Counter counter = context.getCounter(LineNumCounter.NUM);
+    	 Counter counter_final = context.getCounter(FinalLineNumCounter.Final_NUM);
+    	 counter.increment(1);
+    	 Set<String> wordSet = new HashSet<String>();
+    	 if (value.toString().isEmpty()){
+    		 return;
+    	 }	
+         for (String token: value.toString().split("\\s+|-{2,}+")) {
+        	 word.set(token.replaceAll("[^A-Za-z0-9]+", "").toLowerCase());
+        	 if(word.toString().length()==0){
+        		 continue;
+        	 }
+        	 else if (stopWords.contains(word)){
+        		 continue;
+        	 }else if(!wordFreq.containsKey(word.toString())){
+        		 System.out.println("WARN: HASHTABLE DON'T HAVE WORD:");
+        		 System.out.println(word);	 
+        	 }else{
+        		 wordSet.add(word.toString());
+        		 }
+         }
+         if (wordSet.isEmpty()){
+        	 return;
+         }else{
+        	 counter_final.increment(1);
+         }
+         List<String> wordList = new ArrayList<String>(wordSet);
+         
+         Collections.sort(wordList, new Comparator<String>() {
+        	 @Override
+        	 public int compare(String s1, String s2)
+        	 {
+        		 return  wordFreq.get(s1).compareTo(wordFreq.get(s2));
+        	 }
+         });
+         words.set(StringUtils.join(wordList,","));
+         context.write(new LongWritable(counter.getValue()), words);
+      }
+   }
+   public static class Reduce extends Reducer<LongWritable, Text, LongWritable, Text> {
+      @Override
+      public void reduce(LongWritable key, Iterable<Text> values, Context context)
+              throws IOException, InterruptedException {
+    	 Text words = new Text();
+    	 for (Text v : values){
+    		 words.set(v);
+    	 }
+         context.write(key, words);
+      }
+   }
+}