static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
static List<String> li = new ArrayList<String>();
@Override
protected void setup(Context context)throws IOException, InterruptedException {
//获取缓存文件路径的数组
Path [] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
//循环读取每一个缓存文件
for (Path p : paths) {
//获取文件名字
String fileName = p.getName();
if(fileName.equals("dir")){
BufferedReader sb = null;
sb = new BufferedReader(new FileReader(new File(p.toString())));
//读取BufferedReader里面的数据
String tmp = null;
while ( (tmp = sb.readLine()) != null) {
String ss []= tmp.split(" ");
for (String s : ss) {
li.add(s);
}
}
//关闭sb对象
sb.close();
}
}
}
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer lines = new StringTokenizer(line);
while (lines.hasMoreTokens()) {
//判断每一个单词是否是敏感词汇
String word = lines.nextToken();
if(!li.contains(word)){
context.write(new Text(word), new Text("1"));
}
}
}
@Override
protected void cleanup(Context context)throws IOException, InterruptedException {
}
}
static class MyReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void setup(Context context)throws IOException, InterruptedException {
}
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
int counter = 0;
for (Text t : value) {
counter += Integer.parseInt(t.toString());
}
context.write(key, new Text(counter+""));
}
@Override
protected void cleanup(Context context)throws IOException, InterruptedException {
}
}
@Override
public void setConf(Configuration conf) {
conf.set("fs.defaultFS", "hdfs://192.168.68.131:9000");
}
@Override
public Configuration getConf() {
return new Configuration();
}
/**
* 驱动方法
*/
@Override
public int run(String[] args) throws Exception {
//1、获取conf对象
Configuration conf = getConf();
//2、创建job
Job job = Job.getInstance(conf, "Xiaochu02");
//3、设置运行job的class
job.setJarByClass(Xiaochu02.class);
//4、设置map相关属性
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
//设置分布式缓存文件
job.addCacheFile(new URI("hdfs://192.168.68.131:9000/1603data/dir"));
//5、设置reduce相关属性
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//判断输出目录是否存在,若存在则删除
FileSystem fs = FileSystem.get(conf);
if(fs.exists(new Path(args[1]))){
fs.delete(new Path(args[1]), true);
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//6、提交运行job
int isok = job.waitForCompletion(true) ? 0 : 1;
return isok;
}
public static void main(String[] args) {
try {
//对输入参数作解析
String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs();
System.exit(ToolRunner.run(new Xiaochu02(), argss));
} catch (Exception e) {
e.printStackTrace();
}
}
}
![img](https://img-mid.csdnimg.cn/release/static/image/mid/ask/253829770046169.png "#lef
可以运行就可以了。