package ecp.BDPA.assignment1; import java.io.BufferedReader; import java.io.DataOutput; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.security.KeyStore.LoadStoreParameter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class InvertIndex extends Configured implements Tool { private enum ONLY_WORD_COUNTER { Single_Document; } public static void main(String[] args) throws Exception { System.out.println(Arrays.toString(args)); Configuration conf = new Configuration(); conf.set("StopWordsFileName", args[2]); int res = ToolRunner.run(conf, new InvertIndex(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { System.out.println(Arrays.toString(args)); Job job = new Job(getConf(), "InvertIndex"); job.setJarByClass(InvertIndex.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //job.setCombinerClass(Reduce.class); job.setNumReduceTasks(10); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); Integer i; PrintWriter writer = new PrintWriter(args[3], "UTF-8"); i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.Single_Document).getValue(); writer.println("Num Of Words Only In Single Document "+i.toString()+"\n"); writer.close(); return 0; } public static class Map extends Mapper<LongWritable, Text, Text, Text> { private Text word = new Text(); private Text filename = new Text(); public List<Text> stopWords = new ArrayList<Text>(); public void loadStopWords(String filename) throws IOException{ Path pt=new Path(filename);//Location of file in HDFS FileSystem fs = FileSystem.get(new Configuration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); String sCurrentLine; while ((sCurrentLine = br.readLine()) != null) { String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", ""); Text t = new Text(); t.set(stopWord); stopWords.add(t); } br.close(); return; } public void setup(Context context) throws IOException,InterruptedException { super.setup(context); String filename = context.getConfiguration().get("StopWordsFileName"); loadStopWords(filename); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)context.getInputSplit(); String name = fileSplit.getPath().getName(); filename.set(name); for (String token: value.toString().split("\\s+|-{2,}+")) { word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase()); if (!stopWords.contains(word)){ context.write(word, filename); } } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { ArrayList<Text> list = new ArrayList<Text>(); for (Text value : values) { list.add(new Text(value)); } if (list.size()==1){ context.getCounter(ONLY_WORD_COUNTER.Single_Document).increment(1); } Set<Text> set = new HashSet<Text>(list); String output = new String(); for (Text u : set){ output += u.toString(); //output += u.toString()+'#'+Collections.frequency(list, u); output += ','; } output = output.substring(0, output.length()-1); Text val = new Text(); val.set(output); context.write(key, val); } } }