数据由kafka出来经过flink加工,但是入不了hive

问题遇到的现象和发生背景

我的数据从卡夫卡出来,复杂的json格式,里面含list,我在flink中用stream加工后,存入临时view,然后想通过sql-api插入hive,程序不报错,但就是数据进不了hive。flink1.14.4,hive3.1.2,hadoop3.3.2,本人刚学flink。

问题相关代码,请勿粘贴截图

package table.and.sql.demo;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.util.Collector;
import com.alibaba.fastjson.JSON;
import kafka.bean.CarRecord;
import kafka.bean.CarRow;

public class KafkaInput {

public static void main(String[] args) throws Exception {
    // TODO Auto-generated method stub
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
     EnvironmentSettings settings = EnvironmentSettings.newInstance()
             .inStreamingMode()
             .build();
    
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
    String name            = "myhive";
    String defaultDatabase = "default";
    String hiveConfDir     = "/home/dell/flink/hive-conf";
    String version = "3.1.2";
    HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,version);            
    tableEnv.registerCatalog("myhive", hive);
    
    tableEnv.useCatalog("myhive");
    
    tableEnv.useDatabase("default");
    
    KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("mallplusbak_ssc_goods_car")
            .setGroupId("test")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())                
            .build();

    DataStream<String> inputSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    
    DataStream<CarRow> inputCarRowSource = inputSource.flatMap(new Splitter());
    inputCarRowSource.print();

    tableEnv.createTemporaryView("ssc_goods_car_input", inputCarRowSource);
    
                                                    
    
    String sink = "insert into sink_ssc_goods_car select car_id,member_id,goods_info_id,qty,create_by,create_time,create_date from ssc_goods_car_input";
    
    String dropTable = "DROP TABLE IF EXISTS sink_ssc_goods_car";
    
    
    String createTable = "create table sink_ssc_goods_car"
            + " (car_id bigint,member_id bigint,goods_info_id BIGINT,qty INT,create_by string,create_time string,create_date string)"
            + "STORED AS PARQUET\n" +
            "TBLPROPERTIES (\n" +
            "  'sink.partition-commit.policy.kind' = 'metastore,success-file'\n)" ;
    
    
    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    
    tableEnv.executeSql(dropTable);
    
    tableEnv.executeSql(createTable);
    
    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
    
    tableEnv.executeSql(sink);
            
    
}

public static class Splitter implements FlatMapFunction<String, CarRow> {
    private static final long serialVersionUID = 1L;

    //@Override
    public void flatMap(String sentence, Collector<CarRow> out){
        
        try {
            CarRecord r = JSON.parseObject(sentence, CarRecord.class);
            for(CarRow bean : r.getData()) {
                bean.printPara();
                
                out.collect(bean);
            }; 
        }catch(Exception e) {
            e.printStackTrace();
        }
    
               
    }
}

}

运行结果及报错内容
我的解答思路和尝试过的方法

后续我把程序作了修改,改为用程序里面生成的测试数据来插入hive,结果是可以的。
public class WriteHive {

public static void main(String[] args) throws Exception {
    // TODO Auto-generated method stub
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
     EnvironmentSettings settings = EnvironmentSettings.newInstance()
             .inStreamingMode()
             .build();
    
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
    String name            = "myhive";
    String defaultDatabase = "default";
    String hiveConfDir     = "/home/dell/flink/hive-conf";
    String version = "3.1.2";
    HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,version);
    
    
    
    tableEnv.registerCatalog("myhive", hive);
    
    tableEnv.useCatalog("myhive");
    
    tableEnv.useDatabase("default");
    
    DataStream<Row> inputCarRowSource = env.fromElements(
            Row.of(123,123,1234,12,"create_by","create_time","create_date" ),
            Row.of(4444,123,1234,12,"create_by","create_time","create_date"),
            Row.of(5555,123,1234,12,"create_by","create_time","create_date")
            );

        // interpret the insert-only DataStream as a Table
        Table inputTable = tableEnv.fromDataStream(inputCarRowSource).as("car_id","member_id","goods_info_id","qty","create_by","create_time","create_date");
                                                            
     tableEnv.createTemporaryView("ssc_goods_car_input", inputTable);
                                                            
    String sink = "insert into sink_ssc_goods_car select car_id,member_id,goods_info_id,qty,create_by,create_time,create_date from ssc_goods_car_input";
    
    //String sink = "insert into sink_ssc_goods_car select 123,123,1234,12,'create_by','create_time','create_date' ";
    
    String dropTable = "DROP TABLE IF EXISTS sink_ssc_goods_car";
    
    
    String createTable = "create table  if not exists  sink_ssc_goods_car"
            + " (car_id bigint,member_id bigint,goods_info_id BIGINT,qty INT,create_by string,create_time string,create_date string)"
            + "STORED AS PARQUET\n" +
            "TBLPROPERTIES (\n" +
            "'sink.partition-commit.trigger'='process-time',\n" +
            "'sink.partition-commit.delay'='0s',\n" +
            "'sink.partition-commit.policy.kind' = 'metastore,success-file'\n)" ;
    
            
    
    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    
    //tableEnv.executeSql(dropTable);
    
    //tableEnv.executeSql(createTable);
    
    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
    
    System.out.println("=====================");
    
    //tableEnv.sqlUpdate(sink);
    
    //env.execute("kafkasink");
    TableResult tableResult1 = tableEnv.executeSql(sink);
    System.out.println(tableResult1.getJobClient().get().getJobStatus());
    
    //env.execute("kafkaSource");
}

另外我把临时view做了print显示是有数据的
tableEnv.createTemporaryView("ssc_goods_car_input", inputCarRowSource);

    String PRINT_SINK_SQL = "create table if not exists sink_print ( \n" +
            " car_id BIGINT," +
            " member_id BIGINT ," +
            " goods_info_id BIGINT," +
            " qty INT ," +
            " create_by STRING," +
            " create_time STRING, " +
            " create_date STRING" +
            ") with ('connector' = 'print' )";
    
    String sink_print="insert into sink_print select car_id,member_id,goods_info_id,qty,create_by,create_time,create_date from ssc_goods_car_input";
    
    tableEnv.executeSql(PRINT_SINK_SQL);
        
    tableEnv.executeSql(sink_print);
我想要达到的结果

kafka过来数据 随时插入hive

你最后是打印到控制台哈,没有sink到hive

tableEnv.executeSql(sink).print();

你的使用方式有问题,即使使用catalog建表也需要指定with,你配置了日志吗,也许报错了没打出来而已,resource下加一个log4j2.properties 指定

rootLogger.Level=debug