关于Flink中窗口计算的Reduce

我写了一个wordcount,想用窗口一直进行累加。但是写出来reduce那里一直报错,请问是哪里错了?

object FlinkDemoSource1 {
  def main(args: Array[String]): Unit = {
    import org.apache.flink.api.scala._
    val conf = new Configuration()
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val input: DataStream[String] = env.socketTextStream("localhost", 6666)

    input.flatMap(_.split(" ")).map((_, 1)).keyBy(_._1)
      .timeWindow(Time.seconds(10))
      .reduce((x, y) => {
      (x._1,x._2 + y._2)}
    ,new AssignWindowEndProcessFunction())


    count.print()
    env.execute("wordcount")
  }
}

class AssignWindowEndProcessFunction extends ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[Iterable[(String, Int)]]): Unit = {

    var tmp:Int  = 0;
    for (i <- elements){
      tmp += i._2
    }
    out.collect((key,tmp))
  }
}

https://blog.csdn.net/IT_BULL/article/details/104201157/