import java.io.IOException;
import java.net.URI;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple3;
public class Test {
public static void main(String[] args) throws IOException {
// 创建一个配置类SparkConf,然后创建一个SparkContext
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
JavaSparkContext jsc = new JavaSparkContext(conf);
String url = "E://";
JavaRDD<String> data = null;
Configuration config = new Configuration();
FileSystem hoursHDFS = FileSystem.get(URI.create(url),config);
FileStatus[] hoursFile = hoursHDFS.listStatus(new Path(url));
Path[] paths = FileUtil.stat2Paths(hoursFile);
for (int i = 0; i < paths.length; i++) {
Path path = paths[i];
// 读取原文件数据,每一行记录转成RDD里面的一个元素
data = jsc.textFile(path.toString());
String newFile = "E://"+Long.valueOf(new Date().getTime())+".txt";
FileSystem newHDFS = FileSystem.get(URI.create(newFile),config);
// 将每条记录的每列切割出来,生成一个Tuple
JavaRDD<Tuple3<String, String, Integer>> person = data
.map(new Function<String, Tuple3<String, String, Integer>>() {
private static final long serialVersionUID = -2381522520231963249L;
public Tuple3<String, String, Integer> call(String s)
throws Exception {
// 按逗号分割一行数据
String[] tokens = s.split(",");
// 将分割后的三个元素组成一个三元Tuple
Tuple3<String, String, Integer> person = new Tuple3<String, String, Integer>(
tokens[0], tokens[1], Integer
.parseInt(tokens[2]));
return person;
}
});
FSDataOutputStream os = newHDFS.create(new Path(newFile),true);
// 遍历数据写到新文件中
for (Tuple3<String,String, Integer> d : person.collect()) {
StringBuffer sb = new StringBuffer();
sb.append(d._1() + "," + d._2()+","+d._3());
os.write(sb.toString().getBytes("UTF-8"));
}
}
// 将每条记录的每列切割出来,生成一个Tuple
jsc.stop();
jsc.close();
}
}
首先是代码。
目前功能已实现到读取文件夹的文件写到一个新文件上。
但是想以10个文件读取一次写到一个新文件上。没啥思路。
因为我写代码对于算法逻辑最是想不通。
求大神指教,另外代码是用spark处理的。在读文件时也是在hdfs里去读取。目前代码是写的本地路径
int i = 1;
int total = 102;
List list = new ArrayList<>();
while (i <= 102) {
if (i % 10 == 0 || i == total) {
// TODO:操作文件
list.add(i);
System.err.println(list.toString());
list = new ArrayList<>();
}
else {
// TODO:读写
list.add(i);
}
i++;
}
不知道你十个一读接下来要做什么。。
你这样也是一个文件一个文件读,(只是读了十个休息一下,或者是做别的事情)。
File f=new File("E:\")
File[] fd=f.listFile();
再用循环来读取文件
也可以利用线程
先循环读取文件夹中所有的文件列表,然后以10个为单位放到一个个链表中,最后尾数不足的部分再放一个链表,这样你就可以按需要进行处理。