Something went wrong on our end
Select Git revision
1 6.29 KiB
package ecp.BDPA.assignment1;
import java.io.BufferedReader;
import java.io.DataOutput;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.security.KeyStore.LoadStoreParameter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class InvertIndex extends Configured implements Tool {
private enum ONLY_WORD_COUNTER {
PG100,
PG31100,
PG3200
}
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Configuration conf = new Configuration();
conf.set("StopWordsFileName", args[2]);
int res = ToolRunner.run(conf, new InvertIndex(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Job job = new Job(getConf(), "InvertIndex");
job.setJarByClass(InvertIndex.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text[].class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// job.setCombinerClass(Reduce.class);
job.setNumReduceTasks(10);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
Integer i;
PrintWriter writer = new PrintWriter(args[3], "UTF-8");
i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG100).getValue();
writer.println("PG100: "+i.toString()+"\n");
i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG31100).getValue();
writer.println("PG31100: "+i.toString()+"\n");
i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG3200).getValue();
writer.println("PG3200: "+i.toString()+"\n");
writer.close();
return 0;
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private Text filename = new Text();
public List<Text> stopWords = new ArrayList<Text>();
public void loadStopWords(String filename) throws IOException{
Path pt=new Path(filename);//Location of file in HDFS
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
String sCurrentLine;
while ((sCurrentLine = br.readLine()) != null) {
String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", "");
Text t = new Text();
t.set(stopWord);
stopWords.add(t);
}
br.close();
return;
}
public void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
String filename = context.getConfiguration().get("StopWordsFileName");
loadStopWords(filename);
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String name = fileSplit.getPath().getName();
filename.set(name);
for (String token: value.toString().split("\\s+|-{2,}+")) {
word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase());
if (!stopWords.contains(word)){
context.write(word, filename);
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, TextArray> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
ArrayList<Text> res = new ArrayList<Text>();
for (Text val : values) {
if (!res.contains(val)){
res.add(new Text(val));
}
}
if (res.size()==1){
String filename = res.get(0).toString();
switch (filename){
case "pg100.txt":
context.getCounter(ONLY_WORD_COUNTER.PG100).increment(1);
break;
case "pg31100.txt":
context.getCounter(ONLY_WORD_COUNTER.PG31100).increment(1);
break;
case "pg3200.txt":
context.getCounter(ONLY_WORD_COUNTER.PG3200).increment(1);
break;
}
}
Text[] arr = new Text[res.size()];
arr = res.toArray(arr);
TextArray output = new TextArray(arr);
output.set(arr);
context.write(key, output);
}
}
public static class TextArray extends ArrayWritable {
public TextArray(Text[] arr) {
super(Text.class);
}
@Override
public Text[] get() {
return (Text[]) super.get();
}
@Override
public void write(DataOutput arg0) throws IOException {
for(Text data : get()){
data.write(arg0);
}
}
@Override
public String toString() {
Text[] values = get();
String output = new String();
for (Text t: values){
output += t.toString();
output += ",";
}
output = output.substring(0, output.length()-1);
return output;
}
}
}