数据连接 Date Join
package com.briup.nhb.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.BasicConfigurator;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
public class MapSiteJoinMR extends Configured implements Tool {
public static class MSJMapper extends Mapper<LongWritable, Text,Text,Text>{
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//map方法正常处理order数据
//通过context加载缓存文件fruit
//URI[] cacheFiles = context.getCacheFiles();
Path[] cacheFiles = context.getLocalCacheFiles();
String fruit_path = cacheFiles[0].toString();
//本地文件IO操作
BufferedReader br=new BufferedReader(
new InputStreamReader(
new FileInputStream(fruit_path)
)
);
// 如果在集群环境中计算,那么需要通过 FSDataInputStream 读取hdfs文件数据
//遍历文本数据
String fileLine=null;
while((fileLine=br.readLine())!=null){
String[] fruit_price = fileLine.split("\t");
//将数据输出到Reduce中
if(fruit_price.length>=2){
context.write(new Text(fruit_price[0]),new Text(fruit_price[1]));
}
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// //map方法正常处理order数据
// //通过context加载缓存文件fruit
// //URI[] cacheFiles = context.getCacheFiles();
// Path[] cacheFiles = context.getLocalCacheFiles();
// String fruit_path = cacheFiles[0].toString();
// //本地文件IO操作
// BufferedReader br=new BufferedReader(
// new InputStreamReader(
// new FileInputStream(fruit_path)
// )
// );
//// 如果在集群环境中计算,那么需要通过 FSDataInputStream 读取hdfs文件数据
// //遍历文本数据
// String fileLine=null;
// while((fileLine=br.readLine())!=null){
// String[] fruit_price = fileLine.split("\t");
// //将数据输出到Reduce中
// if(fruit_price.length>=2){
// context.write(new Text(fruit_price[0]),new Text(fruit_price[1]));
// }
// }
//读取order数据,将order数据输出到reduce中
String line = value.toString();
String[] fruit_order = line.split("\t");
if (fruit_order.length>=2){
context.write(new Text(fruit_order[0]),new Text(fruit_order[1]));
}
}
}
//数据经过map处理进入reduce之后,
public static class MSJReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb =new StringBuilder();
for(Text value:values){
sb.append(value.toString());
}
context.write(key,new Text(sb.toString()));
}
}
@Override
public int run(String[] strings) throws Exception {
BasicConfigurator.configure();
Configuration conf = getConf();
conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
String inpath = "src/main/resources/order.txt";
String outpath = "src/main/resources/out1";
Job job = Job.getInstance(conf, "MSJ");
job.setJarByClass(this.getClass());
//将fruit.txt加载成缓存文件
URI uri=new
URI("file:///E:/代码/workspace/hadoop/src/main/resources/fruit.txt");
job.addCacheFile(uri);
job.setMapperClass(MSJMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MSJReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(inpath));
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path(outpath));
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception{
ToolRunner.run(new MapSiteJoinMR(),args);
}
}
java.lang.Exception: java.io.FileNotFoundException: file:\E:\hdfs_tmp_cache\mapred\local\1648889316891\fruit.txt (文件名、目录名或卷标语法不正确。)
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:492)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:552)
Caused by: java.io.FileNotFoundException: file:\E:\hdfs_tmp_cache\mapred\local\1648889316891\fruit.txt (文件名、目录名或卷标语法不正确。)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at java.io.FileInputStream.<init>(FileInputStream.java:93)
at com.briup.nhb.mr.MapSiteJoinMR$MSJMapper.setup(MapSiteJoinMR.java:33)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:799)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:347)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:271)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
希望给予解决
E:\tmp\mapred\local\1648889316891\fruit.txt
这个文件能在本地找到吗