Skip to content
Snippets Groups Projects
Commit 0aaec851 authored by cloudera_vm's avatar cloudera_vm
Browse files

All pairs preprocessing AND change formatting of preprocessing (we add key)

parent 121fae99
Branches
No related tags found
No related merge requests found
Showing
with 12094 additions and 2599 deletions
File added
File added
File added
File added
File added
File added
File added
File added
File added
File added
File added
File added
This diff is collapsed.
This diff is collapsed.
......@@ -56,6 +56,7 @@ public class Preprocessing_1 extends Configured implements Tool {
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.getConfiguration().set("mapreduce.output.textoutputformat.separator", ",");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);;
......@@ -108,7 +109,7 @@ public class Preprocessing_1 extends Configured implements Tool {
}
}
public static class Reduce extends Reducer<LongWritable, Text, NullWritable, Text> {
public static class Reduce extends Reducer<LongWritable, Text, LongWritable, Text> {
private static HashMap<String,Integer> word_freq = new HashMap<String,Integer>();
......@@ -172,7 +173,7 @@ public class Preprocessing_1 extends Configured implements Tool {
if(!concat_words.toString().isEmpty()){
context.getCounter(COUNTS.COUNT_LINES).increment(1);
context.write(NullWritable.get(), new Text(concat_words.toString()));
context.write(key, new Text(concat_words.toString()));
}
}
......
package SetSimilarityJoins;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Qa extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
int res = ToolRunner.run(new Configuration(), new Qa(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Job job = new Job(getConf(), "Qa_ALL_PAIRS");
job.setJarByClass(Qa.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path outputFilePath = new Path(args[1]);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputFilePath);
FileSystem fs = FileSystem.newInstance(getConf());
if (fs.exists(outputFilePath)) {
fs.delete(outputFilePath, true);
}
job.waitForCompletion(true);
return 0;
}
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text();
String stopwords_file = "/home/cloudera/workspace/bpa/Assign2/stopwords/stopwords";
String stopwords = new String(Files.readAllBytes(Paths.get(stopwords_file)));
public Map() throws IOException{
System.out.println(stopwords);
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
for (String token: value.toString().replaceAll("[^A-Za-z0-9 ]", " ").split("\\s+")) {
if (!stopwords.contains(token)) {
word.set(token);
context.write(word, ONE);
}
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
}
\ No newline at end of file
package SetSimilarityJoins;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.collect.Sets;
public class Qa_ALL_PAIRS extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
int res = ToolRunner.run(new Configuration(), new Qa_ALL_PAIRS(), args);
System.exit(res);
}
public static enum COUNTS {COUNT_COMP};
@Override
public int run(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Job job = new Job(getConf(), "Qa_ALL_PAIRS");
job.setJarByClass(Qa_ALL_PAIRS.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.getConfiguration().set("mapreduce.output.textoutputformat.separator", ",");
job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
Path outputFilePath = new Path(args[1]);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputFilePath);
FileSystem fs = FileSystem.newInstance(getConf());
if (fs.exists(outputFilePath)) {
fs.delete(outputFilePath, true);
}
job.waitForCompletion(true);
long counter = job.getCounters().findCounter(COUNTS.COUNT_COMP).getValue();
Path countFile = new Path(new Path(args[1]),"nb_comp");
File file = new File(countFile.toString());
FileWriter fileWriter = new FileWriter(file);
fileWriter.write(String.valueOf(counter));
fileWriter.flush();
fileWriter.close();
return 0;
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
String doc_path = "/home/cloudera/workspace/bpa/Assign2/Preprocessing_1/processed_doc";
String doc = new String(Files.readAllBytes(Paths.get(doc_path)));
HashMap<String, String> id_doc = new HashMap<String, String>();
public Map() throws IOException{
for (String line : doc.split("\n")){
id_doc.put(line.split(",")[0],
line.split(",")[1]);
/*
*/
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
int id_current_doc = Integer.valueOf(
value.toString().split(",")[0]);
for (String other_doc : id_doc.keySet()) {
int id_other_doc = Integer.valueOf(other_doc);
if (id_current_doc < id_other_doc){
StringBuilder pair = new StringBuilder();
pair.append(id_current_doc);
pair.append("--");
pair.append(id_other_doc);
context.write(new Text(pair.toString()),
new Text(
value.toString().split(",")[1].toLowerCase()));
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, DoubleWritable> {
String doc_path = "/home/cloudera/workspace/bpa/Assign2/Preprocessing_1/processed_doc";
String doc = new String(Files.readAllBytes(Paths.get(doc_path)));
HashMap<String, String> id_doc = new HashMap<String, String>();
public Reduce() throws IOException{
for (String line : doc.split("\n")){
id_doc.put(line.split(",")[0],
line.split(",")[1]);
}
}
public static double Jaccard(String[] A, String[] B){
Set<String> A_set = new HashSet<String>(Arrays.asList(A));
Set<String> B_set = new HashSet<String>(Arrays.asList(B));
Set<String> union = Sets.union(A_set, B_set);
Set<String> intersection = Sets.intersection(A_set, B_set);
return (double)intersection.size()/(double)union.size();
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String[] ids = key.toString().split("--");
String content_1 = id_doc.get(ids[0]).toLowerCase();
String content_2 = id_doc.get(ids[1]).toLowerCase();
double jaccsim = Jaccard(content_1.split(" "),
content_2.split(" "));
if (jaccsim >=0.8){
context.write(key,new DoubleWritable(jaccsim));
}
context.getCounter(COUNTS.COUNT_COMP).increment(1);
}
}
}
package SetSimilarityJoins;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.collect.Sets;
public class Qa_ALL_PAIRS_1000 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
int res = ToolRunner.run(new Configuration(), new Qa_ALL_PAIRS_1000(), args);
System.exit(res);
}
public static enum COUNTS {COUNT_COMP};
@Override
public int run(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Job job = new Job(getConf(), "Qa_ALL_PAIRS_1000");
job.setJarByClass(Qa_ALL_PAIRS_1000.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
Path outputFilePath = new Path(args[1]);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputFilePath);
FileSystem fs = FileSystem.newInstance(getConf());
if (fs.exists(outputFilePath)) {
fs.delete(outputFilePath, true);
}
job.waitForCompletion(true);
long counter = job.getCounters().findCounter(COUNTS.COUNT_COMP).getValue();
Path countFile = new Path(new Path(args[1]),"nb_comp");
File file = new File(countFile.toString());
FileWriter fileWriter = new FileWriter(file);
fileWriter.write(String.valueOf(counter));
fileWriter.flush();
fileWriter.close();
return 0;
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
String doc_path = "/home/cloudera/workspace/bpa/Assign2/Preprocessing_1/processed_doc_1000";
String doc = new String(Files.readAllBytes(Paths.get(doc_path)));
HashMap<String, String> id_doc = new HashMap<String, String>();
public Map() throws IOException{
for (String line : doc.split("\n")){
id_doc.put(line.split(",")[0],
line.split(",")[1]);
/*
*/
}
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
int id_current_doc = Integer.valueOf(
value.toString().split(",")[0]);
for (String other_doc : id_doc.keySet()) {
int id_other_doc = Integer.valueOf(other_doc);
if (id_current_doc < id_other_doc){
StringBuilder pair = new StringBuilder();
pair.append(id_current_doc);
pair.append("--");
pair.append(id_other_doc);
context.write(new Text(pair.toString()),
new Text(
value.toString().split(",")[1].toLowerCase()));
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, DoubleWritable> {
String doc_path = "/home/cloudera/workspace/bpa/Assign2/Preprocessing_1/processed_doc_1000";
String doc = new String(Files.readAllBytes(Paths.get(doc_path)));
HashMap<String, String> id_doc = new HashMap<String, String>();
public Reduce() throws IOException{
for (String line : doc.split("\n")){
id_doc.put(line.split(",")[0],
line.split(",")[1]);
}
}
public static double Jaccard(String[] A, String[] B){
Set<String> A_set = new HashSet<String>(Arrays.asList(A));
Set<String> B_set = new HashSet<String>(Arrays.asList(B));
Set<String> union = Sets.union(A_set, B_set);
Set<String> intersection = Sets.intersection(A_set, B_set);
return (double)intersection.size()/(double)union.size();
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String[] ids = key.toString().split("--");
String content_1 = id_doc.get(ids[0]).toLowerCase();
String content_2 = id_doc.get(ids[1]).toLowerCase();
double jaccsim = Jaccard(content_1.split(" "),
content_2.split(" "));
if (jaccsim >=0.8){
context.write(key,new DoubleWritable(jaccsim));
}
context.getCounter(COUNTS.COUNT_COMP).increment(1);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment