Skip to content
Snippets Groups Projects
Commit 405ac761 authored by Meiqi Guo's avatar Meiqi Guo
Browse files

Add new file

parents
No related branches found
No related tags found
No related merge requests found
package ecp.BDPA.assignment1;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
public class MyWordCount extends Configured implements Tool {
public static void main(String[] arg0) throws Exception {
System.out.println(Arrays.toString(arg0));
Configuration conf = new Configuration();
conf.set("mapreduce.map.output.compress", "true");
int res = ToolRunner.run(conf, new MyWordCount(), arg0);
System.exit(res);
}
@Override
public int run(String[] arg0) throws Exception {
System.out.println(Arrays.toString(arg0));
Job job = new Job(getConf(), "MyWordCount");
job.setJarByClass(MyWordCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setNumReduceTasks(50);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
job.waitForCompletion(true);
return 0;
}
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
for (String token: value.toString().split("\\s+|-{2,}+")) {
word.set(token.replaceAll("[^A-Za-z]+", "").toLowerCase());
context.write(word, ONE);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
if (sum>4000){
context.write(key, new IntWritable(sum));
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment