mapreduce代码,需要注释解答

img

img


mapreduce的代码,能不能来个厉害的弄个注释。要每步详细的注释,急

你这个是求每一个单词出现得个数

编写Mapper类
public class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> { 
  @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();//读取一行数据
        StringTokenizer st = new StringTokenizer(line);//使用空格分隔
        while (st.hasMoreTokens()) {
            String word = st.nextToken();//单词
            context.write(new Text(word), new IntWritable(1));//单词---> 1
        }
    }
}
编写Reducer类

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;//汇总
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));//单词--->数量
    }
}

本地部署得运行代码
public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration cfg = new Configuration();//创建配置对象
        conf.set("mapreduce.framework.name","local");
        //本地模式运行mr,输入输出的数据可以在本地,也可以在hdfs
        //conf.set("fs.defaultFS","hdfs://hcmaster:9000");
        conf.set("fs.defaultFS","file:///");
        
        Job job = Job.getInstance(cfg, "word count");//创建job对象
        
        job.setJarByClass(WordCount.class);//创建运行job的类
        
        job.setMapperClass(TokenizerMapper.class);//设置mapper类
        job.setReducerClass(IntSumReducer.class); //设置Reduce类
        
        job.setOutputKeyClass(Text.class);//设置Reduce输出的key
        job.setOutputValueClass(IntWritable.class);//设置Reduce输出的value

        FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入路径

        Path op1 = new Path(args[1]);
        FileSystem fs = FileSystem.get(cfg);

        if (fs.exists(op1)) {
            fs.delete(op1, true);
            System.out.println("存在此输出路径,已删除!!!");
        }

        FileOutputFormat.setOutputPath(job, op1);//设置输出路径
        boolean b = job.waitForCompletion(true); //提交job
        System.exit(b ? 0 : 1);
    }
}

和你的差不多,很详细了你瞅瞅,除了mapper有点区别我单独注明了,其他都一样的

        // 链接是按行统计一次,你的题干是把每行数据再按分隔符" \t\n\r\f"拆分单独统计为1次
        StringTokenizer itr = new StringTokenizer("ggggggg");

参考一下呢


package com.cgh.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static void mian(String[] args) throws Exception {
        //创建Configuration对象
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);
        //设置jar包所在路径
        job.setJarByClass(WordCount.class);

        //设置mapper类和reducer类
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
        //指定maptask的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //指定Reducetask的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //指定该mapReduce程序数据的输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
        FileOutputFormat.setOutputPath(job, new Path("wordcount/output"));


        //最后提交任务
        boolean waitForCompletion=job.waitForCompletion(true);
        job.submit();
    }

    private static class WCMapper extends Mapper<LongWritable, Text,Text, LongWritable>{
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            //对我们的传入的数据进行切分
            String[] words=value.toString().split(" ");
            for(String word:words){
                //对切分好的数据发送送给Reduce
                context.write(new Text(word), new LongWritable(1));
            }
        }
    }

    private static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        @Override
        protected void reduce(Text arg0, Iterable<LongWritable> arg1,
                Reducer<Text, LongWritable, Text, LongWritable>.Context arg2)
                throws IOException, InterruptedException {

            int sum=0;
            //对传入的数据进行计数,加和
            for(LongWritable v:arg1){
                sum+=v.get();
            }
        //将最终的结果输出到我们的hdfs上
            arg2.write(arg0, new LongWritable(sum));
        }
    }

}