我写了一个wordcount,想用窗口一直进行累加。但是写出来reduce那里一直报错,请问是哪里错了?
object FlinkDemoSource1 {
def main(args: Array[String]): Unit = {
import org.apache.flink.api.scala._
val conf = new Configuration()
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[String] = env.socketTextStream("localhost", 6666)
input.flatMap(_.split(" ")).map((_, 1)).keyBy(_._1)
.timeWindow(Time.seconds(10))
.reduce((x, y) => {
(x._1,x._2 + y._2)}
,new AssignWindowEndProcessFunction())
count.print()
env.execute("wordcount")
}
}
class AssignWindowEndProcessFunction extends ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[Iterable[(String, Int)]]): Unit = {
var tmp:Int = 0;
for (i <- elements){
tmp += i._2
}
out.collect((key,tmp))
}
}