spark计算mongodb中的数据,总是计算不出结果,这些错误信息也找不到是为什么, 有一两次能计算出结果 。第一次接触这个东西 大神们帮帮忙啊
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster(SPARK_PATH);
sparkConf.setAppName("Logs_Collect");
String[] jars = { "F:\\bigdata.jar" };// 将文件导出为jar包,不然会报classNotFound的异常
sparkConf.setJars(jars);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Configuration config = new Configuration();
config.set("mongo.input.uri", MONGODB_URL + ".log");
config.set("mongo.output.uri", MONGODB_URL + ".testcollect");
Date start = DateUtil.getLastNDay(dateRange);
Date end = DateUtil.getLastNDay(0);
// 从mongodb取数据
JavaPairRDD<Object, BSONObject> mongoRDD = ctx.newAPIHadoopRDD(config, MongoInputFormat.class, Object.class,
BSONObject.class);
JavaPairRDD<Object, BSONObject> mongoRDD2 = mongoRDD
.filter(new Function<Tuple2<Object, BSONObject>, Boolean>() {
@Override
public Boolean call(Tuple2<Object, BSONObject> arg0) throws Exception {
if (((Date) arg0._2.get("time")).after(start) && ((Date) arg0._2.get("time")).before(end)) {
return true;
} else
return false;
}
});
JavaPairRDD<Map<String, Object>, BSONObject> mongoRDD3 = mongoRDD2
.mapToPair(new PairFunction<Tuple2<Object, BSONObject>, Map<String, Object>, BSONObject>() {
@Override
public Tuple2<Map<String, Object>, BSONObject> call(Tuple2<Object, BSONObject> arg0)
throws Exception {
Object host = arg0._2.get("host");
Object content = arg0._2.get("content");
Map<String, Object> k = new HashMap<String, Object>();
k.put("host", host);
k.put("content", content);
return new Tuple2<Map<String, Object>, BSONObject>(k, arg0._2);
}
});
JavaPairRDD<Map<String, Object>, Integer> mongoRDD4 = mongoRDD3
.mapToPair(new PairFunction<Tuple2<Map<String, Object>, BSONObject>, Map<String, Object>, Integer>() {
@Override
public Tuple2<Map<String, Object>, Integer> call(Tuple2<Map<String, Object>, BSONObject> arg0)
throws Exception {
return new Tuple2<Map<String, Object>, Integer>(arg0._1, 1);
}
});
JavaPairRDD<Map<String, Object>, Integer> mongoRDD5 = mongoRDD4
.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
Map<Map<String, Object>, Integer> map2 = mongoRDD5.collectAsMap();
[INFO] com.mongodb.hadoop.splitter.MongoCollectionSplitter - Created split: min={ "_id" : { "$oid" : "563dc85a002e25dc6bfd59cd"}}, max= { "_id" : { "$oid" : "563dc85b002e25dc6bfd7b1b"}}
[INFO] com.mongodb.hadoop.splitter.MongoCollectionSplitter - Created split: min={ "_id" : { "$oid" : "563dc85b002e25dc6bfd7b1b"}}, max= null
[Stage 0:> (0 + 4) / 79][DEBUG] org.spark-project.jetty.http.HttpParser - filled 173/173
[DEBUG] org.spark-project.jetty.server.Server - REQUEST /jars/bigdata.jar on BlockingHttpConnection@3190b6f6,g=HttpGenerator{s=0,h=-1,b=-1,c=-1},p=HttpParser{s=-5,l=10,c=0},r=1
[DEBUG] org.spark-project.jetty.server.Server - RESPONSE /jars/bigdata.jar 200 handled=true
[WARN] org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, slave02): java.io.IOException: java.lang.ArrayIndexOutOfBoundsException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1141)
at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException
at java.lang.System.arraycopy(Native Method)
at org.bson.BasicBSONDecoder$BSONInput._need(BasicBSONDecoder.java:404)
at org.bson.BasicBSONDecoder$BSONInput.read(BasicBSONDecoder.java:452)
at org.bson.BasicBSONDecoder$BSONInput.readCStr(BasicBSONDecoder.java:492)
at org.bson.BasicBSONDecoder.decodeElement(BasicBSONDecoder.java:197)
at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:153)
at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:121)
at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1138)
... 24 more