From 39da8455d740a9eb3ecf74741fbcf35a937044f7 Mon Sep 17 00:00:00 2001 From: Meiqi Guo <mei-qi.guo@student.ecp.fr> Date: Sat, 18 Mar 2017 03:39:44 +0100 Subject: [PATCH] Upload new file --- NaiveApproach.java | 289 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 NaiveApproach.java diff --git a/NaiveApproach.java b/NaiveApproach.java new file mode 100644 index 0000000..ea838b1 --- /dev/null +++ b/NaiveApproach.java @@ -0,0 +1,289 @@ +package ecp.BDPA.assignment2; + +import org.apache.hadoop.conf.Configured; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import org.apache.commons.lang.StringUtils; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.util.Tool; + +class TextPair implements WritableComparable<TextPair> { + + private Text first; + private Text second; + + public TextPair(Text first, Text second) { + set(first, second); + } + + public TextPair() { + set(new Text(), new Text()); + } + + public TextPair(String first, String second) { + set(new Text(first), new Text(second)); + } + + public Text getFirst() { + return first; + } + + public Text getSecond() { + return second; + } + + public void set(Text first, Text second) { + this.first = first; + this.second = second; + } + + @Override + public void readFields(DataInput in) throws IOException { + first.readFields(in); + second.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + first.write(out); + second.write(out); + } + + @Override + public String toString() { + return first + " " + second; + } + + @Override + public int compareTo(TextPair other) { + int cmpFirstFirst = first.compareTo(other.first); + int cmpSecondSecond = second.compareTo(other.second); + int cmpFirstSecond = first.compareTo(other.second); + int cmpSecondFirst = second.compareTo(other.first); + + if (cmpFirstFirst == 0 && cmpSecondSecond == 0 || cmpFirstSecond == 0 + && cmpSecondFirst == 0) { + return 0; + } + + Text thisSmaller; + Text otherSmaller; + + Text thisBigger; + Text otherBigger; + + if (this.first.compareTo(this.second) < 0) { + thisSmaller = this.first; + thisBigger = this.second; + } else { + thisSmaller = this.second; + thisBigger = this.first; + } + + if (other.first.compareTo(other.second) < 0) { + otherSmaller = other.first; + otherBigger = other.second; + } else { + otherSmaller = other.second; + otherBigger = other.first; + } + + int cmpThisSmallerOtherSmaller = thisSmaller.compareTo(otherSmaller); + int cmpThisBiggerOtherBigger = thisBigger.compareTo(otherBigger); + + if (cmpThisSmallerOtherSmaller == 0) { + return cmpThisBiggerOtherBigger; + } else { + return cmpThisSmallerOtherSmaller; + } + } + + @Override + public int hashCode() { + return first.hashCode() * 163 + second.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof TextPair) { + TextPair tp = (TextPair) o; + return first.equals(tp.first) && second.equals(tp.second); + } + return false; + } + +} + +public class NaiveApproach extends Configured implements Tool { + private enum CompCounter{ + NUM + } + public static void main(String[] args) throws Exception { + System.out.println(Arrays.toString(args)); + Configuration conf = new Configuration(); + Long line = loadFileLength(args[2]); + conf.set("InputFileLength", line.toString()); + conf.set("mapreduce.map.output.compress", "true"); + conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(500)); + int res = ToolRunner.run(conf, new NaiveApproach(), args); + + System.exit(res); + } + + public static Long loadFileLength(String filename) throws IOException{ + Path pt=new Path(filename);//Location of file in HDFS + FileSystem fs = FileSystem.get(new Configuration()); + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); + + Long l = Long.parseLong(br.readLine()); + + br.close(); + + return l; + } + @Override + public int run(String[] args) throws Exception { + System.out.println(Arrays.toString(args)); + Job job = new Job(getConf(), "NaiveApproach"); + job.setJarByClass(NaiveApproach.class); + job.setMapOutputKeyClass(TextPair.class); + job.setMapOutputValueClass(Text.class); + job.setInputFormatClass(KeyValueTextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setNumReduceTasks(1); + + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + job.waitForCompletion(true); + + return 0; + } + + public static class Map extends Mapper<Text, Text, TextPair, Text> { + private Text words = new Text(); + private TextPair keyPair = new TextPair(); + private Long fileLength; + + public void setup(Context context) throws IOException,InterruptedException { + super.setup(context); + this.fileLength = Long.parseLong(context.getConfiguration().get("InputFileLength")); + } + + @Override + public void map(Text key, Text value, Context context) + throws IOException, InterruptedException { + + if (value.toString().isEmpty()){ + return; + } + + String keyOut = key.toString(); + if (!StringUtils.isNumeric(keyOut)){ + System.out.println("WARN: Bas input id"); + System.out.println(keyOut); + return; + } + + this.words = value; + + long counter = 1; + + + while(counter<=this.fileLength){ + String counterstr = Long.toString(counter); + this.keyPair.set(key, new Text(counterstr)); + + if (key.equals(counterstr)){ + counter += 1; + continue; + } + + context.write(keyPair,words); + counter += 1; + } + } + } + public static class Reduce extends Reducer<TextPair, Text, Text, Text> { + + private Set<String> text2Set(String s){ + return new HashSet<String>(Arrays.asList(s.split(","))); + } + + public double similarity(String t1, String t2) { + Set<String> s1 = text2Set(t1); + Set<String> s2 = text2Set(t2); + + Set<String> union = new HashSet<String>(s1); + union.addAll(s2); + + Set<String> intersection = new HashSet<String>(s1); + intersection.retainAll(s2); + + if (union.size()==0){ + return 0; + } + + return intersection.size()/union.size(); + } + + @Override + public void reduce(TextPair key, Iterable<Text> values, Context context) + throws IOException, InterruptedException { + int counter = 0; + String[] strings = new String[2]; + for (Text v : values){ + strings[counter] = v.toString(); + counter += 1; + } + + if (counter!=2){ // document id not in input file + return; + } + + double s = similarity(strings[0], strings[1]); + context.getCounter(CompCounter.NUM).increment(1); + + if (s>=0.8){ + context.write(new Text(key.toString()), new Text(String.valueOf(s))); + } + } + } +} + -- GitLab