这两天尝试用java实现sparksql连接mysql数据库,经过调试可以成功连接到数据库,但奇怪的是只能够查询出表头和表结构却看不到表里面数据
代码如下
import java.util.Hashtable;
import java.util.Properties;
import javax.swing.JFrame;
import org.apache.avro.hadoop.io.AvroKeyValue.Iterator;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hive.ql.exec.vector.expressions.IsNull;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession.Builder;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.datanucleus.store.rdbms.identifier.IdentifierFactory;
import antlr.collections.List;
import scala.Enumeration.Val;
public class Demo_Mysql3 {
private static Logger logger = Logger.getLogger(Demo_Mysql3.class);
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Demo_Mysql3");
sparkConf.setMaster("local[5]");
sparkConf.setSparkHome("F:\DownLoad\spark\spark-2.0.0-bin-hadoop2.7");
sparkConf.set("spark.sql.warehouse.dir","F:\DownLoad\spark\spark-2.0.0-bin-hadoop2.7");
SparkContext sc0=null;
try {
sc0=new SparkContext(sparkConf);
SparkSession sparkSession=new SparkSession(sc0);
SQLContext sqlContext = new SQLContext(sparkSession);
// 一个条件表示一个分区
String[] predicates = new String[] {
"1=1 order by id limit 400000,50000",
"1=1 order by id limit 450000,50000",
"1=1 order by id limit 500000,50000",
"1=1 order by id limit 550000,50000",
"1=1 order by id limit 600000,50000" };
String url = "jdbc:mysql://localhost:3306/clone";
String table = "image";
Properties connectionProperties = new Properties();
connectionProperties.setProperty("dbtable", table);// 设置表
connectionProperties.setProperty("user", "root");// 设置用户名
connectionProperties.setProperty("password", "root");// 设置密码
// 读取数据
DataFrameReader jread = sqlContext.read();
//Dataset jdbcDs=jread.jdbc(url, table, predicates, connectionProperties);
sqlContext.read().jdbc(url, table, predicates, connectionProperties).select("*").show();
} catch (Exception e) {
logger.error("|main|exception error", e);
} finally {
if (sc0 != null) {
sc0.stop();
}
}
}
}
控制台输出如下:
全部输出如下;
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/07/22 10:39:04 INFO SparkContext: Running Spark version 2.0.0
17/07/22 10:39:04 INFO SecurityManager: Changing view acls to: Mr.Zhang
17/07/22 10:39:04 INFO SecurityManager: Changing modify acls to: Mr.Zhang
17/07/22 10:39:04 INFO SecurityManager: Changing view acls groups to:
17/07/22 10:39:04 INFO SecurityManager: Changing modify acls groups to:
17/07/22 10:39:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Mr.Zhang); groups with view permissions: Set(); users with modify permissions: Set(Mr.Zhang); groups with modify permissions: Set()
17/07/22 10:39:05 INFO Utils: Successfully started service 'sparkDriver' on port 4469.
17/07/22 10:39:05 INFO SparkEnv: Registering MapOutputTracker
17/07/22 10:39:05 INFO SparkEnv: Registering BlockManagerMaster
17/07/22 10:39:05 INFO DiskBlockManager: Created local directory at C:\Users\Mr.Zhang\AppData\Local\Temp\blockmgr-c055d142-9742-4d45-bc98-f7c337c2beed
17/07/22 10:39:05 INFO MemoryStore: MemoryStore started with capacity 1990.8 MB
17/07/22 10:39:05 INFO SparkEnv: Registering OutputCommitCoordinator
17/07/22 10:39:05 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/07/22 10:39:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://169.254.236.65:4040
17/07/22 10:39:05 INFO Executor: Starting executor ID driver on host localhost
17/07/22 10:39:06 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 4498.
17/07/22 10:39:06 INFO NettyBlockTransferService: Server created on 169.254.236.65:4498
17/07/22 10:39:06 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 169.254.236.65, 4498)
17/07/22 10:39:06 INFO BlockManagerMasterEndpoint: Registering block manager 169.254.236.65:4498 with 1990.8 MB RAM, BlockManagerId(driver, 169.254.236.65, 4498)
17/07/22 10:39:06 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 169.254.236.65, 4498)
17/07/22 10:39:06 INFO SharedState: Warehouse path is 'F:\DownLoad\spark\spark-2.0.0-bin-hadoop2.7'.
17/07/22 10:39:08 INFO CodeGenerator: Code generated in 173.873451 ms
17/07/22 10:39:08 INFO SparkContext: Starting job: show at Demo_Mysql3.java:71
17/07/22 10:39:08 INFO DAGScheduler: Got job 0 (show at Demo_Mysql3.java:71) with 1 output partitions
17/07/22 10:39:08 INFO DAGScheduler: Final stage: ResultStage 0 (show at Demo_Mysql3.java:71)
17/07/22 10:39:08 INFO DAGScheduler: Parents of final stage: List()
17/07/22 10:39:08 INFO DAGScheduler: Missing parents: List()
17/07/22 10:39:08 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at show at Demo_Mysql3.java:71), which has no missing parents
17/07/22 10:39:08 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.9 KB, free 1990.8 MB)
17/07/22 10:39:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.6 KB, free 1990.8 MB)
17/07/22 10:39:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 169.254.236.65:4498 (size: 4.6 KB, free: 1990.8 MB)
17/07/22 10:39:08 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1012
17/07/22 10:39:08 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at show at Demo_Mysql3.java:71)
17/07/22 10:39:08 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/07/22 10:39:08 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5149 bytes)
17/07/22 10:39:08 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/07/22 10:39:08 INFO JDBCRDD: closed connection
17/07/22 10:39:08 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1091 bytes result sent to driver
17/07/22 10:39:08 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 109 ms on localhost (1/1)
17/07/22 10:39:08 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/07/22 10:39:08 INFO DAGScheduler: ResultStage 0 (show at Demo_Mysql3.java:71) finished in 0.109 s
17/07/22 10:39:08 INFO DAGScheduler: Job 0 finished: show at Demo_Mysql3.java:71, took 0.320450 s
17/07/22 10:39:09 INFO SparkContext: Starting job: show at Demo_Mysql3.java:71
17/07/22 10:39:09 INFO DAGScheduler: Got job 1 (show at Demo_Mysql3.java:71) with 4 output partitions
17/07/22 10:39:09 INFO DAGScheduler: Final stage: ResultStage 1 (show at Demo_Mysql3.java:71)
17/07/22 10:39:09 INFO DAGScheduler: Parents of final stage: List()
17/07/22 10:39:09 INFO DAGScheduler: Missing parents: List()
17/07/22 10:39:09 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[2] at show at Demo_Mysql3.java:71), which has no missing parents
17/07/22 10:39:09 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 9.9 KB, free 1990.8 MB)
17/07/22 10:39:09 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 1990.8 MB)
17/07/22 10:39:09 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 169.254.236.65:4498 (size: 4.6 KB, free: 1990.8 MB)
17/07/22 10:39:09 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
17/07/22 10:39:09 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 1 (MapPartitionsRDD[2] at show at Demo_Mysql3.java:71)
17/07/22 10:39:09 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
17/07/22 10:39:09 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 1, PROCESS_LOCAL, 5149 bytes)
17/07/22 10:39:09 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, localhost, partition 2, PROCESS_LOCAL, 5149 bytes)
17/07/22 10:39:09 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, localhost, partition 3, PROCESS_LOCAL, 5149 bytes)
17/07/22 10:39:09 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4, localhost, partition 4, PROCESS_LOCAL, 5149 bytes)
17/07/22 10:39:09 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
17/07/22 10:39:09 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
17/07/22 10:39:09 INFO Executor: Running task 2.0 in stage 1.0 (TID 3)
17/07/22 10:39:09 INFO Executor: Running task 3.0 in stage 1.0 (TID 4)
17/07/22 10:39:09 INFO JDBCRDD: closed connection
17/07/22 10:39:09 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1091 bytes result sent to driver
17/07/22 10:39:09 INFO JDBCRDD: closed connection
17/07/22 10:39:09 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 23 ms on localhost (1/4)
17/07/22 10:39:09 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 1091 bytes result sent to driver
17/07/22 10:39:09 INFO JDBCRDD: closed connection
17/07/22 10:39:09 INFO Executor: Finished task 2.0 in stage 1.0 (TID 3). 1091 bytes result sent to driver
17/07/22 10:39:09 INFO JDBCRDD: closed connection
17/07/22 10:39:09 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 23 ms on localhost (2/4)
17/07/22 10:39:09 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) in 19 ms on localhost (3/4)
17/07/22 10:39:09 INFO Executor: Finished task 3.0 in stage 1.0 (TID 4). 1091 bytes result sent to driver
17/07/22 10:39:09 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) in 35 ms on localhost (4/4)
17/07/22 10:39:09 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/07/22 10:39:09 INFO DAGScheduler: ResultStage 1 (show at Demo_Mysql3.java:71) finished in 0.039 s
17/07/22 10:39:09 INFO DAGScheduler: Job 1 finished: show at Demo_Mysql3.java:71, took 0.045598 s
+------+-----+----+---------+----+---+---------+---+
|status|count|type|threshold|wkey|url|imagename| id|
+------+-----+----+---------+----+---+---------+---+
+------+-----+----+---------+----+---+---------+---+
17/07/22 10:39:09 INFO SparkUI: Stopped Spark web UI at http://169.254.236.65:4040
17/07/22 10:39:09 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/07/22 10:39:09 INFO MemoryStore: MemoryStore cleared
17/07/22 10:39:09 INFO BlockManager: BlockManager stopped
17/07/22 10:39:09 INFO BlockManagerMaster: BlockManagerMaster stopped
17/07/22 10:39:09 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/07/22 10:39:09 INFO SparkContext: Successfully stopped SparkContext
17/07/22 10:39:09 INFO ShutdownHookManager: Shutdown hook called
17/07/22 10:39:09 INFO ShutdownHookManager: Deleting directory C:\Users\Mr.Zhang\AppData\Local\Temp\spark-37d86494-fe2c-44a6-8bb2-989db283dc8b
/数据库表本来有数据?
表中本来有数据,正常方式也可以读到
把这段代码
// 读取数据
DataFrameReader jread = sqlContext.read();
//Dataset jdbcDs=jread.jdbc(url, table, predicates, connectionProperties);
sqlContext.read().jdbc(url, table, predicates, connectionProperties).select("*").show();
改为:
DataFrame jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*").show();