想把FlinkTableApi代码转为DataStreamApi代码
package com.cupdata.bigdata.flink
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import java.time.Duration
object tableApiTest {
def main(args: Array[String]): Unit = {
//创建日志对象
val log = LoggerFactory.getLogger("tableApiTest")
//创建运行环境
val settings: EnvironmentSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
//create table env
val tEnv: TableEnvironment = TableEnvironment.create(settings)
//enable checkpointing
tEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
//加载配置信息
val conf = ConfigFactory.load()
//kafka
val sourceConnector = conf.getString("kafka.sourceConnector")
val sourceTopic = conf.getString("kafka.sourceTopic")
val sourceBootstrapServers = conf.getString("kafka.sourceBootstrapServers")
val sourceGroupID = conf.getString("kafka.sourceGroupID")
val sourceStartup = conf.getString("kafka.sourceStartup")
val sourceAvroSchemaURL = conf.getString("kafka.sourceAvroSchemaURL")
val sinkConnector = conf.getString("kafka.sinkConnector")
val sinkTopic = conf.getString("kafka.sinkTopic")
val sinkBootstrapServers = conf.getString("kafka.sinkBootstrapServers")
//数据库
val databaseConnector = conf.getString("database.databaseConnector")
val url = conf.getString("database.url")
val username = conf.getString("database.username")
val password = conf.getString("database.password")
val cacheMaxRows = conf.getString("database.cacheMaxRows")
val cacheTTL = conf.getString("database.cacheTTL")
//创建tbl_achg
val achgCreateSQL = s"""
|CREATE TEMPORARY TABLE tbl_achg (
| BANK decimal(4, 0),
| PROC_TIME as proctime(),
| ENTRY_KEY STRING,
| RID STRING,
| ENTRY_TYPE STRING
|) WITH (
| 'connector' = '$sourceConnector',
| 'topic' = '$sourceTopic',
| 'scan.startup.mode' = '$sourceStartup',
| 'properties.bootstrap.servers' = '$sourceBootstrapServers',
| 'properties.group.id' = '$sourceGroupID',
| 'format' = 'debezium-avro-confluent',
| 'debezium-avro-confluent.url' = '$sourceAvroSchemaURL'
|)
|""".stripMargin
log.info("tbl_achg create SQL: ")
log.info(achgCreateSQL)
tEnv.executeSql(achgCreateSQL)
//创建tbl_custr
tEnv.executeSql(
s"""
|CREATE TEMPORARY TABLE tbl_custr (
| CUSTR_NBR STRING,
| RACE_CODE STRING,
| SURNAME STRING,
| ID_ISSDT decimal(8, 0),
| ID_DTE decimal(8, 0),
| NATION_CD STRING,
| GENDER STRING,
| OCCU_TYPE STRING,
| ISSDEPT_CD STRING,
| ISSDEPT_IF STRING,
| ETHNIC STRING,
| DAY_BIRTH decimal(8, 0),
| MAR_STATUS STRING,
| EDUCA STRING,
| COMP_NAME STRING,
| COMP_NAME2 STRING,
| OCC_CODE STRING,
| TECH_LEVEL STRING,
| COMP_DUTY STRING,
| INCOME_ANN decimal(5, 1),
| YR_IN_COM3 decimal(2, 0),
| MO_PHONE STRING,
| HMTEL_AREA decimal(4, 0),
| HOME_PHONE STRING,
| EMAIL_ADDR STRING,
| SECUR_NBR STRING
|) WITH (
| 'connector' = '$databaseConnector',
| 'url' = '$url',
| 'username' = '$username',
| 'password' = '$password',
| 'table-name' = 'tbl_custr',
| 'lookup.cache.max-rows' = '$cacheMaxRows',
| 'lookup.cache.ttl' = '$cacheTTL'
|)
|""".stripMargin)
//创建tbl_addr
tEnv.executeSql(
s"""
|CREATE TEMPORARY TABLE tbl_addr (
| CUSTR_NBR STRING,
| ADDR_TYPE STRING,
| ADDR_LINE1 STRING,
| ADDR_LINE2 STRING,
| ADDR_LINE3 STRING,
| ADDR_LINE4 STRING,
| ADDR_LINE5 STRING,
| POST_CODE decimal(8, 0)
|) WITH (
| 'connector' = '$databaseConnector',
| 'url' = '$url',
| 'username' = '$username',
| 'password' = '$password',
| 'table-name' = 'tbl_addr',
| 'lookup.cache.max-rows' = '$cacheMaxRows',
| 'lookup.cache.ttl' = '$cacheTTL'
|)
|""".stripMargin)
//创建tbl_cusp
tEnv.executeSql(
s"""
|CREATE TABLE tbl_cusp (
| CUSTR_NBR STRING,
| SCHLNM_CH STRING
|) WITH (
| 'connector' = '$databaseConnector',
| 'url' = '$url',
| 'username' = '$username',
| 'password' = '$password',
| 'table-name' = 'tbl_cusp',
| 'lookup.cache.max-rows' = '$cacheMaxRows',
| 'lookup.cache.ttl' = '$cacheTTL'
|)
|""".stripMargin)
//创建结果表
tEnv.executeSql(
s"""
|CREATE TEMPORARY TABLE tbl_result (
| RCN STRING,
| APP_ID STRING,
| BUSI_TYPE STRING,
| VERSION STRING,
| FORMAT STRING,
| TIME_STAMP STRING,
| ORDER_NO STRING,
| BANK STRING,
| CHARSET STRING,
| BUSINESS_DATA STRING,
| PRIMARY KEY (RCN) NOT ENFORCED
|) WITH (
| 'connector' = '$sinkConnector',
| 'topic' = '$sinkTopic',
| 'properties.bootstrap.servers' = '$sinkBootstrapServers',
| 'key.format' = 'json',
| 'value.format' = 'json',
| 'value.fields-include' = 'EXCEPT_KEY'
|)
|""".stripMargin)
//执行查询
tEnv.executeSql(
"""
|insert into tbl_result
|select t1.RID as RCN,
|'bigdata_platform' as APP_ID,
|'achg_cfi' as BUSI_TYPE,
|'1.0' as VERSION,
|'json' as FORMAT,
|cast(t1.PROC_TIME as string) as TIME_STAMP,
|t1.RID as ORDER_NO,
|'1444' as BANK,
|'utf-8' as CHARSET,
|'bank:'||cast(t1.BANK as string)||',CUSTR_NBR:'||t1.ENTRY_KEY||',RACE_CODE:'||
|t2.RACE_CODE||',SURNAME:'||t2.SURNAME||',ID_ISSDT:'||cast(t2.ID_ISSDT as string)||',ID_DTE:'||
|cast(t2.ID_DTE as string)||',NATION_CD:'||t2.NATION_CD||',GENDER:'||t2.GENDER||',OCCU_TYPE:'||t2.OCCU_TYPE||
|(case t1.ENTRY_TYPE
|when 'CUD7' then ',ISSDEPT_CD:'||t2.ISSDEPT_CD
|when 'CUJH' then ',ISSDEPT_IF:'||t2.ISSDEPT_IF
|when 'CUPO' then ',ETHNIC:'||t2.ETHNIC
|when 'CU06' then ',DAY_BIRTH:'||cast(t2.DAY_BIRTH as string)
|when 'CU05' then ',MAR_STATUS:'||t2.MAR_STATUS
|when 'CU33' then ',EDUCA:'||t2.EDUCA
|when 'CUCH' then ',COMP_NAME:'||t2.COMP_NAME||t2.COMP_NAME2
|when 'CU09' then ',OCC_CODE:'||t2.OCC_CODE
|when 'CU74' then ',TECH_LEVEL:'||t2.TECH_LEVEL
|when 'CU31' then ',COMP_DUTY:'||t2.COMP_DUTY
|when 'CU16' then ',INCOME_ANN:'||cast(t2.INCOME_ANN as string)
|when 'CUE5' then ',YR_IN_COM3:'||cast(t2.YR_IN_COM3 as string)
|when 'AD03' then ',ADDR_TYPE:'||t3.ADDR_TYPE||',ADDR_LINE:'||t3.ADDR_LINE1||t3.ADDR_LINE2||t3.ADDR_LINE3||t3.ADDR_LINE4||t3.ADDR_LINE5||',POST_CODE:'||cast(t3.POST_CODE as string)
|when 'CU34' then ',MO_PHONE:'||t2.MO_PHONE
|when 'CU12' then ',HMTEL_AREA:'||cast(t2.HMTEL_AREA as string)
|when 'CU13' then ',HOME_PHONE:'||t2.HOME_PHONE
|when 'CU32' then ',EMAIL_ADDR:'||t2.EMAIL_ADDR
|when 'CUB8' then ',SCHLNM_CH:'||t4.SCHLNM_CH
|when 'CU35' then ',SECUR_NBR:'||t2.SECUR_NBR
|else ''
|end
|) as BUSINESS_DATA
|from (
|select * from tbl_achg
|where BANK = 1444
|and ENTRY_TYPE in ('CU26','CU23','CUD6','CUD1','CUD8','CU04','CUP1','CUD7','CUJH','CUP0','CU06','CU05','CU33','CUCH','CU09','CU74','CU31','CU16','CUE5','AD03','CU34','CU12','CU13','CU32','CUB8','CU35')
|) t1
|left join tbl_custr FOR SYSTEM_TIME AS OF t1.PROC_TIME t2 on t1.ENTRY_KEY = t2.CUSTR_NBR
|left join tbl_addr FOR SYSTEM_TIME AS OF t1.PROC_TIME t3 on t1.ENTRY_KEY = t3.CUSTR_NBR
|left join tbl_cusp FOR SYSTEM_TIME AS OF t1.PROC_TIME t4 on t1.ENTRY_KEY = t4.CUSTR_NBR
|""".stripMargin)
}
}
你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答
本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。
因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。