spark写入mysql关于commit的问题

用scala写,数据插入mysql,想几个sql(不同的数据dataframe更新不同的表)后再commit,数据为dataframe插入,现在用的foreachpartition,但会出现序列化问题(task not ...)
需要代码demo

把你的代码demo发出来看下怎么修改吧



class MysqlDemo {

  var driver="com.mysql.jdbc.Driver"
  var url="jdbc:mysql://192.168.153.135:3306/mybatisdb"
  var user="root"
  var pwd="123456"
   //构造函数
  def this(driver:String,url:String,user:String,pwd:String){
    this()
    this.driver=driver
    this.url=url
    this.user=user
    this.pwd=pwd
  }
}

object MysqlDemo{
  def main(args: Array[String]): Unit = {
    val demo = new MysqlDemo()
    val connection = demo.conn()
//    println(connection)
//
//    demo.select()

//    val num=demo.insert(Student("kb16",15,"男"))
//    println(num)

//    val bool=demo.delete(Student("li",18,null))
//    println(bool)

    val bool=demo.update(Student("河东",2,"女"))
    println(bool)
  }
}

object MysqlUtil {
  case class Student(name:String,age:Int,gender:String)
  implicit class mysqlOperation(mysqlDemo: MysqlDemo){
    private var connection:Connection=_

    def conn():Connection={
      Class.forName(mysqlDemo.driver)
       connection= DriverManager.getConnection(mysqlDemo.url,mysqlDemo.user,mysqlDemo.pwd)
      connection
    }

    def select():Unit={
      var sqlString="select id,name,age,gender from student"
      val rs = conn().createStatement().executeQuery(sqlString)

      while (rs.next()){
        val id=rs.getInt(1)
        val name=rs.getString(2)
        val age=rs.getString(3)
        val gender=rs.getString(4)
        println(id,name,age,gender)
      }
    }

    def insert(student: Student):Int={
      println("执行插入的方法",student.name,student.age,student.gender)
      var sqlString="insert into student(name,age,gender) values(?,?,?)"
      var prep:PreparedStatement=conn().prepareStatement(sqlString)
      prep.setString(1,student.name)
      prep.setInt(2,student.age)
      prep.setString(3,student.gender)

      var i = prep.executeUpdate()
      i
    }

    def delete(student: Student):Boolean={
      println("执行删除的方法",student.name,student.age,student.gender)
      var sqlString="delete from student where name =? "
      val prep = conn().prepareStatement(sqlString)
      prep.setString(1,student.name)
      val bool = prep.execute()
      bool
    }

    def update(student: Student):Boolean={
      println("执行更新的方法",student.name,student.age,student.gender)
      var sqlString="update student set age=?,gender=?  where name=?"
//      var ps=conn().prepareStatement(sqlString)

      var ps=conn().prepareStatement(sqlString)
      ps.setInt(1,student.age)
      ps.setString(2,student.gender)
      ps.setString(3,student.name)

      val bool = ps.execute()
      bool
    }
  }
}

我之前写的 你可以借鉴下