Skip to content
Snippets Groups Projects
Commit 41207917 authored by Meiqi Guo's avatar Meiqi Guo
Browse files

Upload new file

parent 20a7d741
Branches
No related tags found
No related merge requests found
package ecp.BDPA.assignment2;
import org.apache.commons.lang.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Preprocess extends Configured implements Tool {
private enum FinalLineNumCounter{
Final_NUM
}
private enum LineNumCounter{
NUM
}
public static void main(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Configuration conf = new Configuration();
conf.set("StopWordsFileName", args[2]);
conf.set("WordFreqFileName", args[3]);
conf.set("mapreduce.map.output.compress", "true");
int res = ToolRunner.run(conf, new Preprocess(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
System.out.println(Arrays.toString(args));
Job job = new Job(getConf(), "Preprocess");
job.setJarByClass(Preprocess.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
public static class Map extends Mapper<LongWritable, Text, LongWritable, Text> {
private Text word = new Text();
private Text words = new Text();
public List<Text> stopWords = new ArrayList<Text>();
public HashMap<String,Integer> wordFreq = new HashMap<String,Integer>();
public void loadWordFreq(String filename) throws IOException{
Path pt=new Path(filename);//Location of file in HDFS
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
String sCurrentLine;
while ((sCurrentLine = br.readLine()) != null) {
String[] wordfreq = sCurrentLine.split("[\\s]+");
if (wordfreq.length != 2){
System.out.println("WARNING: WRONG INPUT FORMAT");
}
this.wordFreq.put(wordfreq[0], new Integer(wordfreq[1]));
}
br.close();
return;
}
public void loadStopWords(String filename) throws IOException{
Path pt=new Path(filename);//Location of file in HDFS
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
String sCurrentLine;
while ((sCurrentLine = br.readLine()) != null) {
String stopWord = sCurrentLine.replaceAll("[^A-Za-z]+", "");
Text t = new Text();
t.set(stopWord);
this.stopWords.add(t);
}
br.close();
return;
}
public void setup(Context context) throws IOException,InterruptedException {
super.setup(context);
String filename = context.getConfiguration().get("StopWordsFileName");
loadStopWords(filename);
loadWordFreq(context.getConfiguration().get("WordFreqFileName"));
}
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Counter counter = context.getCounter(LineNumCounter.NUM);
Counter counter_final = context.getCounter(FinalLineNumCounter.Final_NUM);
counter.increment(1);
Set<String> wordSet = new HashSet<String>();
if (value.toString().isEmpty()){
return;
}
for (String token: value.toString().split("\\s+|-{2,}+")) {
word.set(token.replaceAll("[^A-Za-z0-9]+", "").toLowerCase());
if(word.toString().length()==0){
continue;
}
else if (stopWords.contains(word)){
continue;
}else if(!wordFreq.containsKey(word.toString())){
System.out.println("WARN: HASHTABLE DON'T HAVE WORD:");
System.out.println(word);
}else{
wordSet.add(word.toString());
}
}
if (wordSet.isEmpty()){
return;
}else{
counter_final.increment(1);
}
List<String> wordList = new ArrayList<String>(wordSet);
Collections.sort(wordList, new Comparator<String>() {
@Override
public int compare(String s1, String s2)
{
return wordFreq.get(s1).compareTo(wordFreq.get(s2));
}
});
words.set(StringUtils.join(wordList,","));
context.write(new LongWritable(counter.getValue()), words);
}
}
public static class Reduce extends Reducer<LongWritable, Text, LongWritable, Text> {
@Override
public void reduce(LongWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Text words = new Text();
for (Text v : values){
words.set(v);
}
context.write(key, words);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment