Skip to content
Snippets Groups Projects
Commit 447d3950 authored by Meiqi Guo's avatar Meiqi Guo
Browse files

Add new file

parent 405ac761
No related branches found
No related tags found
No related merge requests found
1 0 → 100644
package ecp.BDPA.assignment1;
import java.io.BufferedReader;
import java.io.DataOutput;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.security.KeyStore.LoadStoreParameter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class InvertIndex extends Configured implements Tool {
private enum ONLY_WORD_COUNTER {
PG100,
PG31100,
PG3200
}
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Configuration conf = new Configuration();
conf.set("StopWordsFileName", args[2]);
int res = ToolRunner.run(conf, new InvertIndex(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Job job = new Job(getConf(), "InvertIndex");
job.setJarByClass(InvertIndex.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text[].class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// job.setCombinerClass(Reduce.class);
job.setNumReduceTasks(10);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
Integer i;
PrintWriter writer = new PrintWriter(args[3], "UTF-8");
i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG100).getValue();
writer.println("PG100: "+i.toString()+"\n");
i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG31100).getValue();
writer.println("PG31100: "+i.toString()+"\n");
i = (int) job.getCounters().findCounter(ONLY_WORD_COUNTER.PG3200).getValue();
writer.println("PG3200: "+i.toString()+"\n");
writer.close();
return 0;
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private Text filename = new Text();
public List<Text> stopWords = new ArrayList<Text>();
public void loadStopWords(String filename) throws IOException{
Path pt=new Path(filename);//Location of file in HDFS
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
String sCurrentLine;
while ((sCurrentLine = br.readLine()) != null) {
String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", "");
Text t = new Text();
t.set(stopWord);
stopWords.add(t);
}
br.close();
return;
}
public void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
String filename = context.getConfiguration().get("StopWordsFileName");
loadStopWords(filename);
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String name = fileSplit.getPath().getName();
filename.set(name);
for (String token: value.toString().split("\\s+|-{2,}+")) {
word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase());
if (!stopWords.contains(word)){
context.write(word, filename);
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, TextArray> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
ArrayList<Text> res = new ArrayList<Text>();
for (Text val : values) {
if (!res.contains(val)){
res.add(new Text(val));
}
}
if (res.size()==1){
String filename = res.get(0).toString();
switch (filename){
case "pg100.txt":
context.getCounter(ONLY_WORD_COUNTER.PG100).increment(1);
break;
case "pg31100.txt":
context.getCounter(ONLY_WORD_COUNTER.PG31100).increment(1);
break;
case "pg3200.txt":
context.getCounter(ONLY_WORD_COUNTER.PG3200).increment(1);
break;
}
}
Text[] arr = new Text[res.size()];
arr = res.toArray(arr);
TextArray output = new TextArray(arr);
output.set(arr);
context.write(key, output);
}
}
public static class TextArray extends ArrayWritable {
public TextArray(Text[] arr) {
super(Text.class);
}
@Override
public Text[] get() {
return (Text[]) super.get();
}
@Override
public void write(DataOutput arg0) throws IOException {
for(Text data : get()){
data.write(arg0);
}
}
@Override
public String toString() {
Text[] values = get();
String output = new String();
for (Text t: values){
output += t.toString();
output += ",";
}
output = output.substring(0, output.length()-1);
return output;
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment