Skip to content
Snippets Groups Projects
Select Git revision
  • 0e92823dd4638b43b616cebe1ffbf3a39f9c8ca9
  • master default
2 results

InvertIndex

Blame
  • InvertIndex 5.85 KiB
    package ecp.BDPA.assignment1;
    import java.io.BufferedReader;
    import java.io.DataOutput;
    import java.io.FileReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.security.KeyStore.LoadStoreParameter;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Set;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.ArrayWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    public class InvertIndex extends Configured implements Tool {	
       
       private enum ONLY_WORD_COUNTER {
    		  PG100,
    		  PG31100,
    		  PG3200
    		 }
    	
       public static void main(String[] args) throws Exception {
          System.out.println(Arrays.toString(args));
          
          Configuration conf = new Configuration();
          conf.set("StopWordsFileName", args[2]);
          int res = ToolRunner.run(conf, new InvertIndex(), args);
    	  
          
          System.exit(res);
       }
       @Override
       public int run(String[] args) throws Exception {
          System.out.println(Arrays.toString(args));
          Job job = new Job(getConf(), "InvertIndex");
          job.setJarByClass(InvertIndex.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);
          job.setMapperClass(Map.class);
          job.setReducerClass(Reduce.class);
          //job.setCombinerClass(Reduce.class);
          job.setNumReduceTasks(10);
          
          job.setInputFormatClass(TextInputFormat.class);
          job.setOutputFormatClass(TextOutputFormat.class);
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(Text.class);
          
          FileInputFormat.addInputPath(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
          job.waitForCompletion(true);
          
          Integer i;
          PrintWriter writer = new PrintWriter(args[3], "UTF-8");
          i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG100).getValue();
          writer.println("PG100: "+i.toString()+"\n");
          i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG31100).getValue();
          writer.println("PG31100: "+i.toString()+"\n");
          i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG3200).getValue();
          writer.println("PG3200: "+i.toString()+"\n");
          writer.close();
          return 0;
       }
       
       public static class Map extends Mapper<LongWritable, Text, Text, Text> {
          private Text word = new Text();
          private Text filename = new Text();
          public List<Text> stopWords = new ArrayList<Text>(); 
          
          public void loadStopWords(String filename) throws IOException{
        	  Path pt=new Path(filename);//Location of file in HDFS
              FileSystem fs = FileSystem.get(new Configuration());
        	  BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
        		  
        	  String sCurrentLine;
        	      
              while ((sCurrentLine = br.readLine()) != null) {
            	  String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", "");
            	  Text t = new Text();
            	  t.set(stopWord);
            	  stopWords.add(t);
        	  }
              
              br.close();
              
              return;
           }
          
          public void setup(Context context) throws IOException,InterruptedException { 
        	 super.setup(context);
        	 String filename = context.getConfiguration().get("StopWordsFileName");
        	 loadStopWords(filename);
          }
          @Override
          public void map(LongWritable key, Text value, Context context)
                  throws IOException, InterruptedException {
        	 FileSplit fileSplit = (FileSplit)context.getInputSplit();
        	 String name = fileSplit.getPath().getName();
        	 filename.set(name);
             for (String token: value.toString().split("\\s+|-{2,}+")) {
                word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase());
                if (!stopWords.contains(word)){
                   context.write(word, filename);            	
                }
             }
          }
       }
       public static class Reduce extends Reducer<Text, Text, Text, Text> {
          @Override
          public void reduce(Text key, Iterable<Text> values, Context context)
                  throws IOException, InterruptedException {
        	  	ArrayList<Text> list = new ArrayList<Text>();
    
    			for (Text value : values) {
    	        	list.add(new Text(value));
    			}
    			
    			if (list.size()==1){
    	        	 String filename = list.get(0).toString();
    	        	 switch (filename){
    		        	 case "pg100.txt":
    		        		 context.getCounter(ONLY_WORD_COUNTER.PG100).increment(1);
    		        		 break;
    		        	 case "pg31100.txt":
    		        		 context.getCounter(ONLY_WORD_COUNTER.PG31100).increment(1);
    		        		 break;
    		        	 case "pg3200.txt":
    		    		     context.getCounter(ONLY_WORD_COUNTER.PG3200).increment(1);
    		    		     break;
    		        	 }
    	        	 }
    			
    			 Set<Text> set = new HashSet<Text>(list);
    			 String output = new String();
    	         for (Text u : set){
    	        	 output += u.toString()+'#'+Collections.frequency(list, u);
    	        	 output += ',';
    	         }
    	         output = output.substring(0, output.length()-1);
    	         Text val = new Text();
    	         val.set(output);
    	         context.write(key, val);
    			
          }
       }   
    }