你这个是求每一个单词出现得个数
编写Mapper类
public class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();//读取一行数据
StringTokenizer st = new StringTokenizer(line);//使用空格分隔
while (st.hasMoreTokens()) {
String word = st.nextToken();//单词
context.write(new Text(word), new IntWritable(1));//单词---> 1
}
}
}
编写Reducer类
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;//汇总
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));//单词--->数量
}
}
本地部署得运行代码
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration cfg = new Configuration();//创建配置对象
conf.set("mapreduce.framework.name","local");
//本地模式运行mr,输入输出的数据可以在本地,也可以在hdfs
//conf.set("fs.defaultFS","hdfs://hcmaster:9000");
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(cfg, "word count");//创建job对象
job.setJarByClass(WordCount.class);//创建运行job的类
job.setMapperClass(TokenizerMapper.class);//设置mapper类
job.setReducerClass(IntSumReducer.class); //设置Reduce类
job.setOutputKeyClass(Text.class);//设置Reduce输出的key
job.setOutputValueClass(IntWritable.class);//设置Reduce输出的value
FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入路径
Path op1 = new Path(args[1]);
FileSystem fs = FileSystem.get(cfg);
if (fs.exists(op1)) {
fs.delete(op1, true);
System.out.println("存在此输出路径,已删除!!!");
}
FileOutputFormat.setOutputPath(job, op1);//设置输出路径
boolean b = job.waitForCompletion(true); //提交job
System.exit(b ? 0 : 1);
}
}
和你的差不多,很详细了你瞅瞅,除了mapper有点区别我单独注明了,其他都一样的
// 链接是按行统计一次,你的题干是把每行数据再按分隔符" \t\n\r\f"拆分单独统计为1次
StringTokenizer itr = new StringTokenizer("ggggggg");
package com.cgh.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void mian(String[] args) throws Exception {
//创建Configuration对象
Configuration configuration=new Configuration();
Job job=Job.getInstance(configuration);
//设置jar包所在路径
job.setJarByClass(WordCount.class);
//设置mapper类和reducer类
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//指定maptask的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定Reducetask的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定该mapReduce程序数据的输入和输出路径
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("wordcount/output"));
//最后提交任务
boolean waitForCompletion=job.waitForCompletion(true);
job.submit();
}
private static class WCMapper extends Mapper<LongWritable, Text,Text, LongWritable>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
//对我们的传入的数据进行切分
String[] words=value.toString().split(" ");
for(String word:words){
//对切分好的数据发送送给Reduce
context.write(new Text(word), new LongWritable(1));
}
}
}
private static class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text arg0, Iterable<LongWritable> arg1,
Reducer<Text, LongWritable, Text, LongWritable>.Context arg2)
throws IOException, InterruptedException {
int sum=0;
//对传入的数据进行计数,加和
for(LongWritable v:arg1){
sum+=v.get();
}
//将最终的结果输出到我们的hdfs上
arg2.write(arg0, new LongWritable(sum));
}
}
}