FlinkTableApi转DataStreamApi

问题遇到的现象和发生背景

想把FlinkTableApi代码转为DataStreamApi代码

问题相关代码,请勿粘贴截图
package com.cupdata.bigdata.flink

import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory

import java.time.Duration

object tableApiTest {

  def main(args: Array[String]): Unit = {
    //创建日志对象
    val log = LoggerFactory.getLogger("tableApiTest")

    //创建运行环境
    val settings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    //create table env
    val tEnv: TableEnvironment = TableEnvironment.create(settings)
    //enable checkpointing
    tEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
    tEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))

    //加载配置信息
    val conf = ConfigFactory.load()
    //kafka
    val sourceConnector = conf.getString("kafka.sourceConnector")
    val sourceTopic = conf.getString("kafka.sourceTopic")
    val sourceBootstrapServers = conf.getString("kafka.sourceBootstrapServers")
    val sourceGroupID = conf.getString("kafka.sourceGroupID")
    val sourceStartup = conf.getString("kafka.sourceStartup")
    val sourceAvroSchemaURL = conf.getString("kafka.sourceAvroSchemaURL")
    val sinkConnector = conf.getString("kafka.sinkConnector")
    val sinkTopic = conf.getString("kafka.sinkTopic")
    val sinkBootstrapServers = conf.getString("kafka.sinkBootstrapServers")
    //数据库
    val databaseConnector = conf.getString("database.databaseConnector")
    val url = conf.getString("database.url")
    val username = conf.getString("database.username")
    val password = conf.getString("database.password")
    val cacheMaxRows = conf.getString("database.cacheMaxRows")
    val cacheTTL = conf.getString("database.cacheTTL")


    //创建tbl_achg
    val achgCreateSQL = s"""
                           |CREATE TEMPORARY TABLE tbl_achg (
                           |  BANK decimal(4, 0),
                           |  PROC_TIME as proctime(),
                           |  ENTRY_KEY STRING,
                           |  RID STRING,
                           |  ENTRY_TYPE STRING
                           |) WITH (
                           | 'connector' = '$sourceConnector',
                           | 'topic' = '$sourceTopic',
                           | 'scan.startup.mode' = '$sourceStartup',
                           | 'properties.bootstrap.servers' = '$sourceBootstrapServers',
                           | 'properties.group.id' = '$sourceGroupID',
                           | 'format' = 'debezium-avro-confluent',
                           | 'debezium-avro-confluent.url' = '$sourceAvroSchemaURL'
                           |)
                           |""".stripMargin
    log.info("tbl_achg create SQL: ")
    log.info(achgCreateSQL)
    tEnv.executeSql(achgCreateSQL)

    //创建tbl_custr
    tEnv.executeSql(
      s"""
         |CREATE TEMPORARY TABLE tbl_custr (
         |  CUSTR_NBR STRING,
         |  RACE_CODE STRING,
         |  SURNAME STRING,
         |  ID_ISSDT decimal(8, 0),
         |  ID_DTE decimal(8, 0),
         |  NATION_CD STRING,
         |  GENDER STRING,
         |  OCCU_TYPE STRING,
         |  ISSDEPT_CD STRING,
         |  ISSDEPT_IF STRING,
         |  ETHNIC STRING,
         |  DAY_BIRTH decimal(8, 0),
         |  MAR_STATUS STRING,
         |  EDUCA STRING,
         |  COMP_NAME STRING,
         |  COMP_NAME2 STRING,
         |  OCC_CODE STRING,
         |  TECH_LEVEL STRING,
         |  COMP_DUTY STRING,
         |  INCOME_ANN decimal(5, 1),
         |  YR_IN_COM3 decimal(2, 0),
         |  MO_PHONE STRING,
         |  HMTEL_AREA decimal(4, 0),
         |  HOME_PHONE STRING,
         |  EMAIL_ADDR STRING,
         |  SECUR_NBR STRING
         |) WITH (
         | 'connector' = '$databaseConnector',
         | 'url' = '$url',
         | 'username' = '$username',
         | 'password' = '$password',
         | 'table-name' = 'tbl_custr',
         | 'lookup.cache.max-rows' = '$cacheMaxRows',
         | 'lookup.cache.ttl' = '$cacheTTL'
         |)
         |""".stripMargin)

    //创建tbl_addr
    tEnv.executeSql(
      s"""
         |CREATE TEMPORARY TABLE tbl_addr (
         |  CUSTR_NBR STRING,
         |  ADDR_TYPE STRING,
         |  ADDR_LINE1 STRING,
         |  ADDR_LINE2 STRING,
         |  ADDR_LINE3 STRING,
         |  ADDR_LINE4 STRING,
         |  ADDR_LINE5 STRING,
         |  POST_CODE decimal(8, 0)
         |) WITH (
         | 'connector' = '$databaseConnector',
         | 'url' = '$url',
         | 'username' = '$username',
         | 'password' = '$password',
         | 'table-name' = 'tbl_addr',
         | 'lookup.cache.max-rows' = '$cacheMaxRows',
         | 'lookup.cache.ttl' = '$cacheTTL'
         |)
         |""".stripMargin)

    //创建tbl_cusp
    tEnv.executeSql(
      s"""
         |CREATE TABLE tbl_cusp (
         |  CUSTR_NBR STRING,
         |  SCHLNM_CH STRING
         |) WITH (
         | 'connector' = '$databaseConnector',
         | 'url' = '$url',
         | 'username' = '$username',
         | 'password' = '$password',
         | 'table-name' = 'tbl_cusp',
         | 'lookup.cache.max-rows' = '$cacheMaxRows',
         | 'lookup.cache.ttl' = '$cacheTTL'
         |)
         |""".stripMargin)

    //创建结果表
    tEnv.executeSql(
      s"""
         |CREATE TEMPORARY TABLE tbl_result (
         |  RCN STRING,
         |  APP_ID STRING,
         |  BUSI_TYPE STRING,
         |  VERSION STRING,
         |  FORMAT STRING,
         |  TIME_STAMP STRING,
         |  ORDER_NO STRING,
         |  BANK STRING,
         |  CHARSET STRING,
         |  BUSINESS_DATA STRING,
         |  PRIMARY KEY (RCN) NOT ENFORCED
         |) WITH (
         | 'connector' = '$sinkConnector',
         | 'topic' = '$sinkTopic',
         | 'properties.bootstrap.servers' = '$sinkBootstrapServers',
         | 'key.format' = 'json',
         | 'value.format' = 'json',
         | 'value.fields-include' = 'EXCEPT_KEY'
         |)
         |""".stripMargin)

    //执行查询
    tEnv.executeSql(
      """
        |insert into tbl_result
        |select t1.RID as RCN,
        |'bigdata_platform' as APP_ID,
        |'achg_cfi' as BUSI_TYPE,
        |'1.0' as VERSION,
        |'json' as FORMAT,
        |cast(t1.PROC_TIME as string) as TIME_STAMP,
        |t1.RID as ORDER_NO,
        |'1444' as BANK,
        |'utf-8' as CHARSET,
        |'bank:'||cast(t1.BANK as string)||',CUSTR_NBR:'||t1.ENTRY_KEY||',RACE_CODE:'||
        |t2.RACE_CODE||',SURNAME:'||t2.SURNAME||',ID_ISSDT:'||cast(t2.ID_ISSDT as string)||',ID_DTE:'||
        |cast(t2.ID_DTE as string)||',NATION_CD:'||t2.NATION_CD||',GENDER:'||t2.GENDER||',OCCU_TYPE:'||t2.OCCU_TYPE||
        |(case t1.ENTRY_TYPE
        |when 'CUD7' then ',ISSDEPT_CD:'||t2.ISSDEPT_CD
        |when 'CUJH' then ',ISSDEPT_IF:'||t2.ISSDEPT_IF
        |when 'CUPO' then ',ETHNIC:'||t2.ETHNIC
        |when 'CU06' then ',DAY_BIRTH:'||cast(t2.DAY_BIRTH as string)
        |when 'CU05' then ',MAR_STATUS:'||t2.MAR_STATUS
        |when 'CU33' then ',EDUCA:'||t2.EDUCA
        |when 'CUCH' then ',COMP_NAME:'||t2.COMP_NAME||t2.COMP_NAME2
        |when 'CU09' then ',OCC_CODE:'||t2.OCC_CODE
        |when 'CU74' then ',TECH_LEVEL:'||t2.TECH_LEVEL
        |when 'CU31' then ',COMP_DUTY:'||t2.COMP_DUTY
        |when 'CU16' then ',INCOME_ANN:'||cast(t2.INCOME_ANN as string)
        |when 'CUE5' then ',YR_IN_COM3:'||cast(t2.YR_IN_COM3 as string)
        |when 'AD03' then ',ADDR_TYPE:'||t3.ADDR_TYPE||',ADDR_LINE:'||t3.ADDR_LINE1||t3.ADDR_LINE2||t3.ADDR_LINE3||t3.ADDR_LINE4||t3.ADDR_LINE5||',POST_CODE:'||cast(t3.POST_CODE as string)
        |when 'CU34' then ',MO_PHONE:'||t2.MO_PHONE
        |when 'CU12' then ',HMTEL_AREA:'||cast(t2.HMTEL_AREA as string)
        |when 'CU13' then ',HOME_PHONE:'||t2.HOME_PHONE
        |when 'CU32' then ',EMAIL_ADDR:'||t2.EMAIL_ADDR
        |when 'CUB8' then ',SCHLNM_CH:'||t4.SCHLNM_CH
        |when 'CU35' then ',SECUR_NBR:'||t2.SECUR_NBR
        |else ''
        |end
        |) as BUSINESS_DATA
        |from (
        |select * from tbl_achg
        |where BANK = 1444
        |and ENTRY_TYPE in ('CU26','CU23','CUD6','CUD1','CUD8','CU04','CUP1','CUD7','CUJH','CUP0','CU06','CU05','CU33','CUCH','CU09','CU74','CU31','CU16','CUE5','AD03','CU34','CU12','CU13','CU32','CUB8','CU35')
        |) t1
        |left join tbl_custr FOR SYSTEM_TIME AS OF t1.PROC_TIME t2 on t1.ENTRY_KEY = t2.CUSTR_NBR
        |left join tbl_addr FOR SYSTEM_TIME AS OF t1.PROC_TIME t3 on t1.ENTRY_KEY = t3.CUSTR_NBR
        |left join tbl_cusp FOR SYSTEM_TIME AS OF t1.PROC_TIME t4 on t1.ENTRY_KEY = t4.CUSTR_NBR
        |""".stripMargin)
  }
}

运行结果及报错内容
我的解答思路和尝试过的方法
我想要达到的结果

你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答


本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。


因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。