diff --git a/IndexApproach.java b/IndexApproach.java new file mode 100644 index 0000000000000000000000000000000000000000..5f04586a805b68b2c4d4e8bf9874335ddd60f455 --- /dev/null +++ b/IndexApproach.java @@ -0,0 +1,175 @@ +package ecp.BDPA.assignment2; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.Tool; +import org.apache.commons.lang.StringUtils; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.swing.text.Document; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class IndexApproach extends Configured implements Tool { + private enum CompCounter2{ + NUM + } + public static void main(String[] args) throws Exception { + System.out.println(Arrays.toString(args)); + Configuration conf = new Configuration(); + conf.set("mapreduce.map.output.compress", "true"); + conf.set("Document",args[0]); +// conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(500)); + int res = ToolRunner.run(conf, new IndexApproach(), args); + + System.exit(res); + } + @Override + public int run(String[] args) throws Exception { + System.out.println(Arrays.toString(args)); + Job job = new Job(getConf(), "IndexApproach"); + job.setJarByClass(IndexApproach.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setInputFormatClass(KeyValueTextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setNumReduceTasks(1); + + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + job.waitForCompletion(true); + + return 0; + } + + public static class Map extends Mapper<Text, Text, Text, Text> { + private Text word = new Text(); + + public void setup(Context context) throws IOException,InterruptedException { + super.setup(context); + } + + @Override + public void map(Text key, Text value, Context context) + throws IOException, InterruptedException { + + if (value.toString().isEmpty()){ + return; + } + + String[] document = value.toString().split(","); + int filter = document.length - (int)Math.ceil(document.length*0.8) + 1; + int counter = 0; + //System.out.println(filter); + //System.out.println(document.length); + while(counter<filter){ + word.set(document[counter]); + context.write(word,key); + counter += 1; + } + } + } + public static class Reduce extends Reducer<Text, Text, Text, Text> { + public HashMap<Text,Text> document = new HashMap<Text,Text>(); + + private Set<String> text2Set(String s){ + return new HashSet<String>(Arrays.asList(s.split(","))); + } + + @Override + public void setup(Context context) throws IOException,InterruptedException { + super.setup(context); + String filename = context.getConfiguration().get("Document"); + loadDocument(filename); + } + + public void loadDocument(String filename) throws IOException{ + Path pt=new Path(filename);//Location of file in HDFS + FileSystem fs = FileSystem.get(new Configuration()); + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); + + String sCurrentLine; + + while ((sCurrentLine = br.readLine()) != null) { + String[] d = sCurrentLine.split("[\\s]+"); + if (d.length != 2){ + System.out.println("WARNING: WRONG INPUT FORMAT"); + } + document.put(new Text(d[0]), new Text(d[1])); + } + } + + public double similarity(String t1, String t2) { + Set<String> s1 = text2Set(t1); + Set<String> s2 = text2Set(t2); + + Set<String> union = new HashSet<String>(s1); + union.addAll(s2); + + Set<String> intersection = new HashSet<String>(s1); + intersection.retainAll(s2); + + if (union.size()==0){ + return 0; + } + + return intersection.size()/union.size(); + } + + @Override + public void reduce(Text key, Iterable<Text> values, Context context) + throws IOException, InterruptedException { + List<Text> val = new ArrayList<Text>(); + for (Text v :values){ + val.add(new Text(v)); + } + for (int i=0; i<val.size(); i++){ + Text v1 = val.get(i); + for (int j=i+1; j<val.size(); j++){ + Text v2 = val.get(j); + if (v1.equals(v2)){ + continue; + } + String s1 = this.document.get(v1).toString(); + String s2 = this.document.get(v2).toString(); + context.getCounter(CompCounter2.NUM).increment(1); + Double s = similarity(s1, s2); + + if (s>=0.8){ + context.write(new Text(v1.toString()+','+v2.toString()), new Text(String.valueOf(s))); + } + } + } + } + } +}