Skip to content
Snippets Groups Projects
Commit 39da8455 authored by Meiqi Guo's avatar Meiqi Guo
Browse files

Upload new file

parent 4672e2b1
No related branches found
No related tags found
No related merge requests found
package ecp.BDPA.assignment2;
import org.apache.hadoop.conf.Configured;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.commons.lang.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair(Text first, Text second) {
set(first, second);
}
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public String toString() {
return first + " " + second;
}
@Override
public int compareTo(TextPair other) {
int cmpFirstFirst = first.compareTo(other.first);
int cmpSecondSecond = second.compareTo(other.second);
int cmpFirstSecond = first.compareTo(other.second);
int cmpSecondFirst = second.compareTo(other.first);
if (cmpFirstFirst == 0 && cmpSecondSecond == 0 || cmpFirstSecond == 0
&& cmpSecondFirst == 0) {
return 0;
}
Text thisSmaller;
Text otherSmaller;
Text thisBigger;
Text otherBigger;
if (this.first.compareTo(this.second) < 0) {
thisSmaller = this.first;
thisBigger = this.second;
} else {
thisSmaller = this.second;
thisBigger = this.first;
}
if (other.first.compareTo(other.second) < 0) {
otherSmaller = other.first;
otherBigger = other.second;
} else {
otherSmaller = other.second;
otherBigger = other.first;
}
int cmpThisSmallerOtherSmaller = thisSmaller.compareTo(otherSmaller);
int cmpThisBiggerOtherBigger = thisBigger.compareTo(otherBigger);
if (cmpThisSmallerOtherSmaller == 0) {
return cmpThisBiggerOtherBigger;
} else {
return cmpThisSmallerOtherSmaller;
}
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
}
public class NaiveApproach extends Configured implements Tool {
private enum CompCounter{
NUM
}
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Configuration conf = new Configuration();
Long line = loadFileLength(args[2]);
conf.set("InputFileLength", line.toString());
conf.set("mapreduce.map.output.compress", "true");
conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(500));
int res = ToolRunner.run(conf, new NaiveApproach(), args);
System.exit(res);
}
public static Long loadFileLength(String filename) throws IOException{
Path pt=new Path(filename);//Location of file in HDFS
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
Long l = Long.parseLong(br.readLine());
br.close();
return l;
}
@Override
public int run(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Job job = new Job(getConf(), "NaiveApproach");
job.setJarByClass(NaiveApproach.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
public static class Map extends Mapper<Text, Text, TextPair, Text> {
private Text words = new Text();
private TextPair keyPair = new TextPair();
private Long fileLength;
public void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
this.fileLength = Long.parseLong(context.getConfiguration().get("InputFileLength"));
}
@Override
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
if (value.toString().isEmpty()){
return;
}
String keyOut = key.toString();
if (!StringUtils.isNumeric(keyOut)){
System.out.println("WARN: Bas input id");
System.out.println(keyOut);
return;
}
this.words = value;
long counter = 1;
while(counter<=this.fileLength){
String counterstr = Long.toString(counter);
this.keyPair.set(key, new Text(counterstr));
if (key.equals(counterstr)){
counter += 1;
continue;
}
context.write(keyPair,words);
counter += 1;
}
}
}
public static class Reduce extends Reducer<TextPair, Text, Text, Text> {
private Set<String> text2Set(String s){
return new HashSet<String>(Arrays.asList(s.split(",")));
}
public double similarity(String t1, String t2) {
Set<String> s1 = text2Set(t1);
Set<String> s2 = text2Set(t2);
Set<String> union = new HashSet<String>(s1);
union.addAll(s2);
Set<String> intersection = new HashSet<String>(s1);
intersection.retainAll(s2);
if (union.size()==0){
return 0;
}
return intersection.size()/union.size();
}
@Override
public void reduce(TextPair key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int counter = 0;
String[] strings = new String[2];
for (Text v : values){
strings[counter] = v.toString();
counter += 1;
}
if (counter!=2){ // document id not in input file
return;
}
double s = similarity(strings[0], strings[1]);
context.getCounter(CompCounter.NUM).increment(1);
if (s>=0.8){
context.write(new Text(key.toString()), new Text(String.valueOf(s)));
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment