From 447d3950bec9972c349534a81c37e21ff405e379 Mon Sep 17 00:00:00 2001
From: Meiqi Guo <mei-qi.guo@student.ecp.fr>
Date: Wed, 15 Feb 2017 12:09:32 +0100
Subject: [PATCH] Add new file

---
 1 | 185 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 185 insertions(+)
 create mode 100644 1

diff --git a/1 b/1
new file mode 100644
index 0000000..88afccc
--- /dev/null
+++ b/1
@@ -0,0 +1,185 @@
+package ecp.BDPA.assignment1;
+import java.io.BufferedReader;
+import java.io.DataOutput;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.security.KeyStore.LoadStoreParameter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+public class InvertIndex extends Configured implements Tool {	
+   
+   private enum ONLY_WORD_COUNTER {
+		  PG100,
+		  PG31100,
+		  PG3200
+		 }
+	
+   public static void main(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      
+      Configuration conf = new Configuration();
+      conf.set("StopWordsFileName", args[2]);
+      int res = ToolRunner.run(conf, new InvertIndex(), args);
+	  
+      
+      System.exit(res);
+   }
+   @Override
+   public int run(String[] args) throws Exception {
+      System.out.println(Arrays.toString(args));
+      Job job = new Job(getConf(), "InvertIndex");
+      job.setJarByClass(InvertIndex.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Text[].class);
+      job.setMapperClass(Map.class);
+      job.setReducerClass(Reduce.class);
+//      job.setCombinerClass(Reduce.class);
+      job.setNumReduceTasks(10);
+      
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+      job.setMapOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Text.class);
+      
+      FileInputFormat.addInputPath(job, new Path(args[0]));
+      FileOutputFormat.setOutputPath(job, new Path(args[1]));
+      job.waitForCompletion(true);
+      
+      Integer i;
+      PrintWriter writer = new PrintWriter(args[3], "UTF-8");
+      i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG100).getValue();
+      writer.println("PG100: "+i.toString()+"\n");
+      i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG31100).getValue();
+      writer.println("PG31100: "+i.toString()+"\n");
+      i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG3200).getValue();
+      writer.println("PG3200: "+i.toString()+"\n");
+      writer.close();
+      
+      return 0;
+   }
+   
+   public static class Map extends Mapper<LongWritable, Text, Text, Text> {
+      private Text word = new Text();
+      private Text filename = new Text();
+      public List<Text> stopWords = new ArrayList<Text>(); 
+      
+      public void loadStopWords(String filename) throws IOException{
+    	  Path pt=new Path(filename);//Location of file in HDFS
+          FileSystem fs = FileSystem.get(new Configuration());
+    	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
+    		  
+    	  String sCurrentLine;
+    	      
+          while ((sCurrentLine = br.readLine()) != null) {
+        	  String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", "");
+        	  Text t = new Text();
+        	  t.set(stopWord);
+        	  stopWords.add(t);
+    	  }
+          
+          br.close();
+          
+          return;
+       }
+      
+      public void setup(Context context) throws IOException,InterruptedException { 
+    	 super.setup(context);
+    	 String filename = context.getConfiguration().get("StopWordsFileName");
+    	 loadStopWords(filename);
+      }
+      @Override
+      public void map(LongWritable key, Text value, Context context)
+              throws IOException, InterruptedException {
+    	 FileSplit fileSplit = (FileSplit)context.getInputSplit();
+    	 String name = fileSplit.getPath().getName();
+    	 filename.set(name);
+         for (String token: value.toString().split("\\s+|-{2,}+")) {
+            word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase());
+            if (!stopWords.contains(word)){
+               context.write(word, filename);            	
+            }
+         }
+      }
+   }
+   public static class Reduce extends Reducer<Text, Text, Text, TextArray> {
+      @Override
+      public void reduce(Text key, Iterable<Text> values, Context context)
+              throws IOException, InterruptedException {
+         ArrayList<Text> res = new ArrayList<Text>();
+         for (Text val : values) {
+        	if (!res.contains(val)){
+        		res.add(new Text(val));
+        	}
+         }
+         if (res.size()==1){
+        	 String filename = res.get(0).toString();
+        	 switch (filename){
+	        	 case "pg100.txt":
+	        		 context.getCounter(ONLY_WORD_COUNTER.PG100).increment(1);
+	        		 break;
+	        	 case "pg31100.txt":
+	        		 context.getCounter(ONLY_WORD_COUNTER.PG31100).increment(1);
+	        		 break;
+	        	 case "pg3200.txt":
+	    		     context.getCounter(ONLY_WORD_COUNTER.PG3200).increment(1);
+	    		     break;
+	        	 }
+        	 }
+         Text[] arr = new Text[res.size()];
+         arr = res.toArray(arr);
+         TextArray output = new TextArray(arr);
+         output.set(arr);
+         context.write(key, output);
+      }
+   }
+   
+   public static class TextArray extends ArrayWritable {
+	    public TextArray(Text[] arr) {
+	        super(Text.class);
+	    }
+	    @Override
+	    public Text[] get() {
+	        return (Text[]) super.get();
+	    }
+	    @Override
+	    public void write(DataOutput arg0) throws IOException {
+	        for(Text data : get()){
+	            data.write(arg0);
+	        }
+	    }
+	    
+	    @Override
+	    public String toString() {
+	        Text[] values = get();
+	        String output = new String();
+	        for (Text t: values){
+		        output += t.toString();
+		        output += ",";
+	        }
+	        output = output.substring(0, output.length()-1);
+	        return output;
+	    }
+	}   
+}
+
-- 
GitLab