Skip to content
Snippets Groups Projects
Commit 713f2087 authored by Meiqi Guo's avatar Meiqi Guo
Browse files

Delete IndexApproach.java

parent 8d5c7384
No related branches found
No related tags found
No related merge requests found
package ecp.BDPA.assignment2;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.commons.lang.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.swing.text.Document;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class IndexApproach extends Configured implements Tool {
private enum CompCounter{
NUM
}
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Configuration conf = new Configuration();
conf.set("mapreduce.map.output.compress", "true");
conf.set("Document",args[0]);
// conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(500));
int res = ToolRunner.run(conf, new IndexApproach(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Job job = new Job(getConf(), "IndexApproach");
job.setJarByClass(IndexApproach.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
public static class Map extends Mapper<Text, Text, Text, Text> {
private Text word = new Text();
public void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
}
@Override
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().isEmpty()){
return;
}
String[] document = value.toString().split(",");
int filter = document.length - (int)Math.floor(document.length*0.8 + 1);
int counter = 0;
while(counter<filter){
word.set(document[counter]);
context.write(word,key);
counter += 1;
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public HashMap<Text,Text> document = new HashMap<Text,Text>();
private Set<String> text2Set(String s){
return new HashSet<String>(Arrays.asList(s.split(",")));
}
@Override
public void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
String filename = context.getConfiguration().get("Document");
loadDocument(filename);
}
public void loadDocument(String filename) throws IOException{
Path pt=new Path(filename);//Location of file in HDFS
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
String sCurrentLine;
while ((sCurrentLine = br.readLine()) != null) {
String[] d = sCurrentLine.split("[\\s]+");
if (d.length != 2){
System.out.println("WARNING: WRONG INPUT FORMAT");
}
document.put(new Text(d[0]), new Text(d[1]));
}
}
public double similarity(String t1, String t2) {
Set<String> s1 = text2Set(t1);
Set<String> s2 = text2Set(t2);
Set<String> union = new HashSet<String>(s1);
union.addAll(s2);
Set<String> intersection = new HashSet<String>(s1);
intersection.retainAll(s2);
if (union.size()==0){
return 0;
}
return intersection.size()/union.size();
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<Text> val = new ArrayList<Text>();
for (Text v :values){
val.add(new Text(v));
}
for (int i=0; i<val.size(); i++){
Text v1 = val.get(i);
for (int j=i+1; j<val.size(); j++){
Text v2 = val.get(j);
if (v1.equals(v2)){
continue;
}
String s1 = this.document.get(v1).toString();
String s2 = this.document.get(v2).toString();
context.getCounter(CompCounter.NUM).increment(1);
Double s = similarity(s1, s2);
if (s>=0.8){
context.write(new Text(v1.toString()+','+v2.toString()), new Text(String.valueOf(s)));
}
}
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment