From cb4f46b3bfed03acb1b930334cfb91bd4971096a Mon Sep 17 00:00:00 2001 From: Meiqi Guo <mei-qi.guo@student.ecp.fr> Date: Wed, 15 Feb 2017 16:19:18 +0100 Subject: [PATCH] Add new file --- InvertIndex2 | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 InvertIndex2 diff --git a/InvertIndex2 b/InvertIndex2 new file mode 100644 index 0000000..6b489e9 --- /dev/null +++ b/InvertIndex2 @@ -0,0 +1,152 @@ +package ecp.BDPA.assignment1; +import java.io.BufferedReader; +import java.io.DataOutput; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.security.KeyStore.LoadStoreParameter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +public class InvertIndex extends Configured implements Tool { + + private enum ONLY_WORD_COUNTER { + Single_Document; + } + + public static void main(String[] args) throws Exception { + System.out.println(Arrays.toString(args)); + + Configuration conf = new Configuration(); + conf.set("StopWordsFileName", args[2]); +// conf.set("mapreduce.map.output.compress", "true"); + int res = ToolRunner.run(conf, new InvertIndex(), args); + + + System.exit(res); + } + @Override + public int run(String[] args) throws Exception { + System.out.println(Arrays.toString(args)); + Job job = new Job(getConf(), "InvertIndex"); + job.setJarByClass(InvertIndex.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + //job.setCombinerClass(Reduce.class); + job.setNumReduceTasks(10); + + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + job.waitForCompletion(true); + + Integer i; + PrintWriter writer = new PrintWriter(args[3], "UTF-8"); + i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.Single_Document).getValue(); + writer.println("Num Of Words Only In Single Document "+i.toString()+"\n"); + writer.close(); + return 0; + } + + public static class Map extends Mapper<LongWritable, Text, Text, Text> { + private Text word = new Text(); + private Text filename = new Text(); + public List<Text> stopWords = new ArrayList<Text>(); + + public void loadStopWords(String filename) throws IOException{ + Path pt=new Path(filename);//Location of file in HDFS + FileSystem fs = FileSystem.get(new Configuration()); + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); + + String sCurrentLine; + + while ((sCurrentLine = br.readLine()) != null) { + String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", ""); + Text t = new Text(); + t.set(stopWord); + stopWords.add(t); + } + + br.close(); + + return; + } + + public void setup(Context context) throws IOException,InterruptedException { + super.setup(context); + String filename = context.getConfiguration().get("StopWordsFileName"); + loadStopWords(filename); + } + @Override + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit)context.getInputSplit(); + String name = fileSplit.getPath().getName(); + filename.set(name); + for (String token: value.toString().split("\\s+|-{2,}+")) { + word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase()); + if (!stopWords.contains(word)){ + context.write(word, filename); + } + } + } + } + public static class Reduce extends Reducer<Text, Text, Text, Text> { + @Override + public void reduce(Text key, Iterable<Text> values, Context context) + throws IOException, InterruptedException { + ArrayList<Text> list = new ArrayList<Text>(); + + for (Text value : values) { + list.add(new Text(value)); + } + + if (list.size()==1){ + context.getCounter(ONLY_WORD_COUNTER.Single_Document).increment(1); + + } + + Set<Text> set = new HashSet<Text>(list); + String output = new String(); + for (Text u : set){ + output += u.toString(); + //output += u.toString()+'#'+Collections.frequency(list, u); + output += ','; + } + output = output.substring(0, output.length()-1); + Text val = new Text(); + val.set(output); + context.write(key, val); + + } + } +} -- GitLab