Flink1.11.3 设置水位线报异常

package com.xu.studyApp.watermarkTest

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala._

import java.time.Duration

/**
 * @author MrXu
 * @create 2021-11-21 19:05
 * @desc
 */
object WatermarkTest2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.enableCheckpointing(60000)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(5000)  // 默认200ms一次水印;
    env.setParallelism(1)

    val socketInput: DataStream[String] = env.socketTextStream("127.0.0.1", 9999)
    val mapInput: DataStream[(String, Long)] = socketInput.map((line: String) => {
      val words: Array[String] = line.split(",")
      (words(0), words(1).toLong)
    })

    // 数据有可能是乱序的,设置数据的水位线提取.
    val dataWithWaterMarkInput: DataStream[(String, Long)] = mapInput.assignTimestampsAndWatermarks(
      WatermarkStrategy
        .forBoundedOutOfOrderness[(String, Long)](Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
          override def extractTimestamp(element: (String, Long), recordTimestamp: Long) = {
            element._2
          }
        })
    )

    dataWithWaterMarkInput
      .keyBy((_: (String, Long))._1)
      .minBy(1)
      .print()

    env.execute("WatermarkTest2")
  }
}


异常:

Static methods in interface require -target:jvm-1.8
        .forBoundedOutOfOrderness[(String, String)](Duration.ofSeconds(5))

灰常不解, 不知道是Idea设置不对还是代码本身有问题, 但是Flink官网文档也是这样写的. JDK都升成了8版本最高版本也是不行, 求帮助!

你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答


本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。


因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。