用spark分页查询数据,普通的sql()的不支持分页的sql语句
在网上查资料说可以增加一个序列实现
但是基本都是scala语言,代码如下
// 在原Schema信息的基础上添加一列 “id”信息
val schema: StructType = dataframe.schema.add(StructField("id", LongType))
// DataFrame转RDD 然后调用 zipWithIndex
val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex()
val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2)))
// 将添加了索引的RDD 转化为DataFrame
val df2 = spark.createDataFrame(rowRDD, schema)
df2.show()
这段代码如何用java实现呢
或者说我想把dataframe的数据分行获取,有什么其他办法吗
1) 创建 sparkSession ,后续简称 spark ;
2) 使用 spark 创建原始的 RDD ,对RDD里面的数据进行切割处理 ,将切割处理的数据封装到定义的一个样例类(bean对象)里面 ,返回一个新的 RDD ;
3) 创建 DataFrame 的两种方法 :
第一种 : spark 调用 createDataFrame ,将新的RDD 放进去
第二种 : 导入隐式转换(import spark.implicits._) , 然后新的RDD调用 toDF 方法将 RDD 转换成 DataFrame .
注意 : 如果切割处理的数据不封装到 bean对象里面 ,而是直接以 tuple(元组) 的方式返回生成新的RDD ,后续这个RDD转为 DataFrame 之后 ,其 ROW(行)字段的名字就不是元组里面的字段名字 ,框架从tuple元组结构中,对schema的推断,也是成功的,只是字段名是tuple中的数据访问索引。即 row 的描述信息没有被约束
object SparkSqlTest3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
--创建RDD
val lines: RDD[String] = spark.sparkContext.parallelize(List("lii,13,90.00", "yuu,14,91.09", "koo,12,90.00"))
val userRDD: RDD[User2] = lines.map(line => {
val fields = line.split(",")
val name = fields(0)
val age = fields(1).toInt
val fv = fields(2).toDouble
User2(name, age, fv)
})
--创建DataFrame-----第一种方法
val userDF: DataFrame = spark.createDataFrame(userRDD)
userDF.printSchema()
/**
* root
* |-- name: string (nullable = true)
* |-- age: integer (nullable = false)
* |-- fv: double (nullable = false)
*/
userDF.show()
/**
* +----+---+-----+
* |name|age| fv|
* +----+---+-----+
* | lii| 13| 90.0|
* | yuu| 14|91.09|
* | koo| 12| 90.0|
* +----+---+-----+
*/
--创建DataFrame的----第二种方法--导入隐式转换
import spark.implicits._
val userDF2: DataFrame = userRDD.toDF
userDF2.show()
/**
* +----+---+-----+
* |name|age| fv|
* +----+---+-----+
* | lii| 13| 90.0|
* | yuu| 14|91.09|
* | koo| 12| 90.0|
* +----+---+-----+
*/
}
}
case class User2(name:String ,age:Int, fv:Double)
利用框架提供的隐式转换可以直接调用toDF创建,并指定字段名(其实就是约束 row 的信息)
object DataFrame03 {
def main(args: Array[String]): Unit = {
--创建sparksession
val session = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
--使用 sparksession 创建RDD
val lines: RDD[String] = session.sparkContext.parallelize(List("huu,12,98.00", "lii,13,99.09", "poo,14,98.09"))
val rowRDD = lines.map(line => {
val fields = line.split(",")
val f0 = fields(0)
val f1 = fields(1).toInt
val f2 = fields(2).toDouble
(f0, f1, f2)
})
--创建 DataFrame
val dataFrame: DataFrame = session.createDataFrame(rowRDD)
dataFrame.show() --打印创建的dataF
-- row 字段信息是元组的索引(字段名是tuple中的数据访问索引)
+---+---+-----+
| _1| _2| _3|
+---+---+-----+
|huu| 12| 98.0|
|lii| 13|99.09|
|poo| 14|98.09|
+---+---+-----+
--导入隐式转换
import session.implicits._
val dataFrame1: DataFrame = rowRDD.toDF("name", "age", "fv") --对 row 的信息进行约束
dataFrame1.show()
--结果如下:
+----+---+-----+
|name|age| fv|
+----+---+-----+
| huu| 12| 98.0|
| lii| 13|99.09|
| poo| 14|98.09|
+----+---+-----+
}
}
将切割处理的数据封装到Spark系统自定义的Row实例类里面 ,这样就可以给row指定字段属性了 ,创建的RDD跟跟row约束的字段名进行关联
--创建DataFrame = RDD+CaseClass ,然后调用RDD的toDF
--创建DataFrame = RDD+StructType
object DateFrame01 {
def main(args: Array[String]): Unit = {
--创建sparkSession ,简称 spark
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
--创建RDD
val lines: RDD[String] = spark.sparkContext.parallelize(List("HUU,13,98.0","YII,12,98.99","GRR,17,97.08"))
--处理数据 ,这个 Row 是spark系统自定义的实例类
val rowRDD: RDD[Row] = lines.map(line => {
val fields: Array[String] = line.split(",")
val f0 = fields(0)
val f1 = fields(1).toInt
val f2 = fields(2).toDouble
Row(f0, f1, f2)
})
--对Row的描述信息 ,就是所谓的Schema
val structType: StructType = StructType(List(
StructField("name", StringType), --该字段默认可以为空
StructField("age", IntegerType, false), --该字段不可以为空
StructField("fv", DoubleType, false)
))
--对RDD 和Schema 进行关联
val df: DataFrame = spark.createDataFrame(rowRDD, structType)
--创建视图
df.createTempView("v_user")
--查询数据
spark.sql(
"""
|select name,fv from v_user where age >= 13
|""".stripMargin).show()
----结果如下
+----+-----+
|name| fv|
+----+-----+
| HUU| 98.0|
| GRR|97.08|
+----+-----+
}
}