flinksql table

Flinksql 批式任务如何 获取tableEnv.sqlQuery(sqlscript)返回的table中的记录数,执行查询之后是一张table表,我想知道这张table表中的记录个数然后写入日志。

  • 以下回答由chatgpt基于相关博客总结生成:

    可以使用Flink API中的count()方法和log4j框架来实现此需求。具体步骤如下:

    1. 定义Logger

    在代码的开头,需要定义Logger,并在需要打印记录数的位置调用log.info()方法来写入日志。

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MyJob {
        private static final Logger log = LoggerFactory.getLogger(MyJob.class);
    
        // ...
    }
    
    1. 获取记录数并写入日志

    在执行SQL查询之后,可以使用Flink的count方法获取表中的记录数,并将其写入日志。

    Table resultTable = tableEnv.sqlQuery("select * from sensor");
    long count = resultTable.count();
    log.info("Table has {} records", count);
    
    1. 完整代码示例
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MyJob {
        private static final Logger log = LoggerFactory.getLogger(MyJob.class);
    
        public static void main(String[] args) {
            // 创建 BatchTableEnvironment
            BatchTableEnvironment tableEnv = ...
    
            // 执行 SQL 查询
            Table resultTable = tableEnv.sqlQuery("select * from sensor");
    
            // 获取表中记录数并写入日志
            long count = resultTable.count();
            log.info("Table has {} records", count);
        }
    }