Skip to content
Snippets Groups Projects
Select Git revision
  • 722dcc4892d064c21c64f9769f766fc919a6a5a8
  • master default
2 results

Stopword_iii.java

Blame
  • Stopword_iii.java 3.15 KiB
    package Question1;
    
    import java.io.IOException;
    import java.util.Arrays;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Stopword_iii extends Configured implements Tool {
       public static void main(String[] args) throws Exception {
          System.out.println(Arrays.toString(args));
          int res = ToolRunner.run(new Configuration(), new Stopword_iii(), args);
          
          System.exit(res);
       }
    
       @Override
       public int run(String[] args) throws Exception {
          System.out.println(Arrays.toString(args));
          Job job = new Job(getConf(), "Stopword_iii");
          job.setJarByClass(Stopword_iii.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
    
          job.setMapperClass(Map.class);
          job.setReducerClass(Reduce.class);
          job.setCombinerClass(Reduce.class);
          
          job.getConfiguration().setBoolean("mapred.compress.map.output",
        		  true);
          job.getConfiguration().setClass("mapred.map.output.compression.codec",
        		  BZip2Codec.class, CompressionCodec.class);
    
          job.setInputFormatClass(TextInputFormat.class);
          job.setOutputFormatClass(TextOutputFormat.class);
          
          job.setNumReduceTasks(10);
          
    
          FileInputFormat.addInputPath(job, new Path(args[0]));
          FileInputFormat.addInputPath(job, new Path(args[1]));
          FileInputFormat.addInputPath(job, new Path(args[2]));
          FileOutputFormat.setOutputPath(job, new Path(args[3]));
    
    
          job.waitForCompletion(true);
          
          return 0;
       }
       
       public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
          private final static IntWritable ONE = new IntWritable(1);
          private Text word = new Text();
    
          @Override
          public void map(LongWritable key, Text value, Context context)
                  throws IOException, InterruptedException {
             for (String token: value.toString().split("\\s+")) {
                word.set(token.toLowerCase());
                context.write(word, ONE);
             }
          }
       }
    
       public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
          @Override
          public void reduce(Text key, Iterable<IntWritable> values, Context context)
                  throws IOException, InterruptedException {
             int sum = 0;
             for (IntWritable val : values) {
                sum += val.get();
             }
             if (sum > 4000) {
            	 context.write(key, new IntWritable(sum));
             }
          }
       }
    }