如何用java实现SparkSQL dataframe添加自增序号列?

用spark分页查询数据,普通的sql()的不支持分页的sql语句
在网上查资料说可以增加一个序列实现
但是基本都是scala语言,代码如下

// 在原Schema信息的基础上添加一列 “id”信息

val schema: StructType = dataframe.schema.add(StructField("id", LongType))

// DataFrame转RDD 然后调用 zipWithIndex

val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex()

val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2)))

// 将添加了索引的RDD 转化为DataFrame

val df2 = spark.createDataFrame(rowRDD, schema)

df2.show()

这段代码如何用java实现呢
或者说我想把dataframe的数据分行获取,有什么其他办法吗

不知道你这个问题是否已经解决, 如果还没有解决的话:
  • 看下这篇博客,也许你就懂了,链接:解决sparksql两个DataFrame合并后出现两列相同的情况
  • 除此之外, 这篇博客: SparkSQL之DataFrame 编程(创建DataFrame ,DataFrame数据运算操作 ,输出存储DataFrame)(11)中的 1  从RDD创建DataFrame(从一个已经存在的RDD进行转换) 部分也许能够解决你的问题, 你可以仔细阅读以下内容或者直接跳转源博客中阅读:

    1)  创建 sparkSession ,后续简称 spark ;

    2)  使用 spark 创建原始的 RDD ,对RDD里面的数据进行切割处理 ,将切割处理的数据封装到定义的一个样例类(bean对象)里面 ,返回一个新的 RDD ;

    3)  创建 DataFrame 的两种方法 : 

    第一种 :  spark 调用 createDataFrame ,将新的RDD 放进去

    第二种 : 导入隐式转换(import spark.implicits._) , 然后新的RDD调用 toDF 方法将 RDD 转换成 DataFrame .

    注意 : 如果切割处理的数据不封装到 bean对象里面 ,而是直接以 tuple(元组) 的方式返回生成新的RDD ,后续这个RDD转为 DataFrame 之后 ,其 ROW(行)字段的名字就不是元组里面的字段名字 ,框架从tuple元组结构中,对schema的推断,也是成功的,只是字段名是tuple中的数据访问索引。即 row 的描述信息没有被约束

    object SparkSqlTest3 {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName(this.getClass.getSimpleName)
          .master("local[*]")
          .getOrCreate()
    
        --创建RDD
        val lines: RDD[String] = spark.sparkContext.parallelize(List("lii,13,90.00", "yuu,14,91.09", "koo,12,90.00"))
        val userRDD: RDD[User2] = lines.map(line => {
          val fields = line.split(",")
          val name = fields(0)
          val age = fields(1).toInt
          val fv = fields(2).toDouble
          User2(name, age, fv)
        })
        --创建DataFrame-----第一种方法
        val userDF: DataFrame = spark.createDataFrame(userRDD)
        userDF.printSchema()
        /**
         * root
         * |-- name: string (nullable = true)
         * |-- age: integer (nullable = false)
         * |-- fv: double (nullable = false)
         */
        userDF.show()
        /**
         * +----+---+-----+
         * |name|age|   fv|
         * +----+---+-----+
         * | lii| 13| 90.0|
         * | yuu| 14|91.09|
         * | koo| 12| 90.0|
         * +----+---+-----+
         */
    
        --创建DataFrame的----第二种方法--导入隐式转换
        import spark.implicits._
        val userDF2: DataFrame = userRDD.toDF
        userDF2.show()
        /**
         * +----+---+-----+
         * |name|age|   fv|
         * +----+---+-----+
         * | lii| 13| 90.0|
         * | yuu| 14|91.09|
         * | koo| 12| 90.0|
         * +----+---+-----+
         */
      }
    }
    case class User2(name:String ,age:Int, fv:Double)

    利用框架提供的隐式转换可以直接调用toDF创建,并指定字段名(其实就是约束 row 的信息)

    object DataFrame03 {
      def main(args: Array[String]): Unit = {
        --创建sparksession
        val session = SparkSession.builder()
          .appName(this.getClass.getSimpleName)
          .master("local[*]")
          .getOrCreate()
        --使用 sparksession 创建RDD
        val lines: RDD[String] = session.sparkContext.parallelize(List("huu,12,98.00", "lii,13,99.09", "poo,14,98.09"))
        val rowRDD = lines.map(line => {
          val fields = line.split(",")
          val f0 = fields(0)
          val f1 = fields(1).toInt
          val f2 = fields(2).toDouble
          (f0, f1, f2)
        })
        --创建 DataFrame
        val dataFrame: DataFrame = session.createDataFrame(rowRDD)
        dataFrame.show()    --打印创建的dataF
        --  row 字段信息是元组的索引(字段名是tuple中的数据访问索引)
          +---+---+-----+
          | _1| _2|   _3|
          +---+---+-----+
          |huu| 12| 98.0|
          |lii| 13|99.09|
          |poo| 14|98.09|
          +---+---+-----+     
        --导入隐式转换
        import session.implicits._
        val dataFrame1: DataFrame = rowRDD.toDF("name", "age", "fv")  --对 row 的信息进行约束
        dataFrame1.show()
        --结果如下:
          +----+---+-----+
          |name|age|   fv|
          +----+---+-----+
          | huu| 12| 98.0|
          | lii| 13|99.09|
          | poo| 14|98.09|
          +----+---+-----+    
      }
    }

    将切割处理的数据封装到Spark系统自定义的Row实例类里面 ,这样就可以给row指定字段属性了 ,创建的RDD跟跟row约束的字段名进行关联

    --创建DataFrame = RDD+CaseClass ,然后调用RDD的toDF
    --创建DataFrame = RDD+StructType
    object DateFrame01 {
      def main(args: Array[String]): Unit = {
        --创建sparkSession ,简称 spark
        val spark: SparkSession = SparkSession.builder()
          .appName(this.getClass.getSimpleName)
          .master("local[*]")
          .getOrCreate()
    
        --创建RDD
        val lines: RDD[String] = spark.sparkContext.parallelize(List("HUU,13,98.0","YII,12,98.99","GRR,17,97.08"))
        --处理数据 ,这个 Row 是spark系统自定义的实例类
        val rowRDD: RDD[Row] = lines.map(line => {
          val fields: Array[String] = line.split(",")
          val f0 = fields(0)
          val f1 = fields(1).toInt
          val f2 = fields(2).toDouble
          Row(f0, f1, f2)
        })
        --对Row的描述信息 ,就是所谓的Schema
        val structType: StructType = StructType(List(
          StructField("name", StringType),          --该字段默认可以为空
          StructField("age", IntegerType, false),     --该字段不可以为空
          StructField("fv", DoubleType, false)
        ))
        --对RDD 和Schema 进行关联
        val df: DataFrame = spark.createDataFrame(rowRDD, structType)
        --创建视图
        df.createTempView("v_user")
        --查询数据
        spark.sql(
          """
            |select name,fv from v_user where age >= 13
            |""".stripMargin).show()
       ----结果如下
          +----+-----+
          |name|   fv|
          +----+-----+
          | HUU| 98.0|
          | GRR|97.08|
          +----+-----+
      }
    }

如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^