想搞一个平台。后端采用Java。实现类似于FlinkSQLClient的功能,即往后台Flink发送一连串的FlinkSQL命令,后台Flink可以直接执行并提交对应的任务。该平台也可以同步管理Flink已有任务。
FlinkSQL命令包含设置参数的命令,DQL,DML,DDL等等。如下所示。
-- 设置参数的语句
set execution.checkpointing.intervat = 10000;
set execution.checkpointingtimout = 10000;
-- DDL建表语句
CREATE TABLE doris_test_sink (
id INT,
name varchar(255),
sex varchar(255)
)
WITH (
'connector' = 'doris',
'fenodes' = 'srvbd59.net.cn:8030',
'table.identifier' = 'cdc_test.user_info',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = '123456'
)
-- DML语句
insert into ....
1:上面的思路就是提前packet一个BaseJar,放到Linux上。
2:BaseJar中要执行的SQL语句是通过JDBC从数据库中拉取。通过for循环执行。
3:要执行的SQL语句是先通过编辑器编辑,然后存储在数据库中。
4:通过Java代码操作Linux,拼接flink run命令,并且在命令中拼接--SqlKey,通过ParameterTool.fromArgs获取作为键拉取要执行的SQL。
5:按照以上的思路,所有的任务都是基于一个BaseJar包运行的多个任务。
6:以上思路经初步验证是可行的。
tableEnv.executeSql("sql语句")不能执行设置参数的语句,如下:
tableEnv.executeSql("set execution.checkpointing.intervat = 10000;")
但设置属性的参数不能通过tableEnv.executeSql()执行。会报下面的错误。
Exception in thread "main" org.apache.flink.table.api.TableException:
Unsupported SQL query! executeSql() only accepts a single SQL statement of type
CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION,
DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE,
SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW,
DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.
关于上面的问题,初步考虑可以通过Java代码解析字符串,然后直接拼接成Configuration。然后再构建环境。
Configuration configuration = new Configuration();
// 设置底层 key-value 选项
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode().withConfiguration(configuration).build();
TableEnvironment tEnv = TableEnvironment.create(settings);
可以参考下一个开源框架Dinky,如果可以看源码的话。个人想寻求代码思路。
我就是思考了一下FlinkSQL平台化的初步思路,但我这个思路总觉得不太正规。想知道Flink有没有提供这样一个API。给它要执行的SQL,他可以自动启动一个任务。或者觉得有其他更好的思路可以探讨一下。
有没有平台化开发的相关经验的人员,后续我的工作要涉及到平台开发。希望可以提供指导。当然,如果提供实质帮助。我也可以有偿。
题主的思路完全正确可行,因为我就是这么做的。也是一开始觉得不是很正规,但确实能解决实际的业务问题,要相信自己。
梳理一下
BaseJar:丢到linux服务器上,用来解析Flink SQL语句
MySQL:存储要执行的Flink SQL语句
Platform:调用BaseJar,传入参数task_id,告诉BaseJar要读取MySQL里的哪一条Flink SQL进行解析,并生成一个Flink SQL任务。
这里有3个细节
1.BaseJar的入口类的Main函数,支持传入参数,这个参数就可以定为task_id
2.建议重新设计MySQL表,既然一行MySQL数据存储一个Flink SQL任务,那么至少要有三个字段
data_source:Flink SQL源表,指定从哪里接入数据,一般是Kafka
data_sink:Flink SQL落地表,指定任务结果写到哪里,一般是Kafka
task_sql:Flink SQL逻辑代码
这样一行MySQL数据就能生成一个完整的Flink SQL任务,且可以根据task_id来获取到这个任务配置解析
3.拼接flink run命令调用BaseJar,但属于本地命令执行,要依赖本地环境,更好的办法是远程提交Flink任务。但远程提交Flink任务现在没有现成的代码,需要自己去撸Flink源码。
回到问题本身:tableEnv.executeSql("sql语句")不能执行设置参数的语句。
当然不能,因为Flink流任务在一开始必须是确定的,但我们可以通过向Main方法传参task_id,来实现一个BaseJar读取不同的Flink SQL语句,生成不同的Flink SQL任务。本身已经想到了用MySQL存Flink SQL,就不要再用Configuration了。
补充:
现在已经有很多这样的开源平台了,如果觉得自己造轮子麻烦,可以直接用开源的。
StreamPark:刚刚被Apache收纳为大数据平台孵化项目,https://github.com/apache/incubator-streampark
Ververica Platform:Flink官方提供的Flink SQL执行平台,还行吧勉强用英文不友好,https://www.ververica.com/getting-started
可以 用 flink sql-client.sh -i -f 命令行提交试试
参考:
https://blog.csdn.net/chanyue123/article/details/125336596
https://blog.csdn.net/qq_45409791/article/details/126091865
http://t.csdn.cn/OHYl9
这个可以参考下