我的数据从卡夫卡出来,复杂的json格式,里面含list,我在flink中用stream加工后,存入临时view,然后想通过sql-api插入hive,程序不报错,但就是数据进不了hive。flink1.14.4,hive3.1.2,hadoop3.3.2,本人刚学flink。
package table.and.sql.demo;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.util.Collector;
import com.alibaba.fastjson.JSON;
import kafka.bean.CarRecord;
import kafka.bean.CarRow;
public class KafkaInput {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/home/dell/flink/hive-conf";
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("default");
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("mallplusbak_ssc_goods_car")
.setGroupId("test")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> inputSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<CarRow> inputCarRowSource = inputSource.flatMap(new Splitter());
inputCarRowSource.print();
tableEnv.createTemporaryView("ssc_goods_car_input", inputCarRowSource);
String sink = "insert into sink_ssc_goods_car select car_id,member_id,goods_info_id,qty,create_by,create_time,create_date from ssc_goods_car_input";
String dropTable = "DROP TABLE IF EXISTS sink_ssc_goods_car";
String createTable = "create table sink_ssc_goods_car"
+ " (car_id bigint,member_id bigint,goods_info_id BIGINT,qty INT,create_by string,create_time string,create_date string)"
+ "STORED AS PARQUET\n" +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.policy.kind' = 'metastore,success-file'\n)" ;
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql(dropTable);
tableEnv.executeSql(createTable);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql(sink);
}
public static class Splitter implements FlatMapFunction<String, CarRow> {
private static final long serialVersionUID = 1L;
//@Override
public void flatMap(String sentence, Collector<CarRow> out){
try {
CarRecord r = JSON.parseObject(sentence, CarRecord.class);
for(CarRow bean : r.getData()) {
bean.printPara();
out.collect(bean);
};
}catch(Exception e) {
e.printStackTrace();
}
}
}
}
后续我把程序作了修改,改为用程序里面生成的测试数据来插入hive,结果是可以的。
public class WriteHive {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/home/dell/flink/hive-conf";
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,version);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("default");
DataStream<Row> inputCarRowSource = env.fromElements(
Row.of(123,123,1234,12,"create_by","create_time","create_date" ),
Row.of(4444,123,1234,12,"create_by","create_time","create_date"),
Row.of(5555,123,1234,12,"create_by","create_time","create_date")
);
// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(inputCarRowSource).as("car_id","member_id","goods_info_id","qty","create_by","create_time","create_date");
tableEnv.createTemporaryView("ssc_goods_car_input", inputTable);
String sink = "insert into sink_ssc_goods_car select car_id,member_id,goods_info_id,qty,create_by,create_time,create_date from ssc_goods_car_input";
//String sink = "insert into sink_ssc_goods_car select 123,123,1234,12,'create_by','create_time','create_date' ";
String dropTable = "DROP TABLE IF EXISTS sink_ssc_goods_car";
String createTable = "create table if not exists sink_ssc_goods_car"
+ " (car_id bigint,member_id bigint,goods_info_id BIGINT,qty INT,create_by string,create_time string,create_date string)"
+ "STORED AS PARQUET\n" +
"TBLPROPERTIES (\n" +
"'sink.partition-commit.trigger'='process-time',\n" +
"'sink.partition-commit.delay'='0s',\n" +
"'sink.partition-commit.policy.kind' = 'metastore,success-file'\n)" ;
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
//tableEnv.executeSql(dropTable);
//tableEnv.executeSql(createTable);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
System.out.println("=====================");
//tableEnv.sqlUpdate(sink);
//env.execute("kafkasink");
TableResult tableResult1 = tableEnv.executeSql(sink);
System.out.println(tableResult1.getJobClient().get().getJobStatus());
//env.execute("kafkaSource");
}
另外我把临时view做了print显示是有数据的
tableEnv.createTemporaryView("ssc_goods_car_input", inputCarRowSource);
String PRINT_SINK_SQL = "create table if not exists sink_print ( \n" +
" car_id BIGINT," +
" member_id BIGINT ," +
" goods_info_id BIGINT," +
" qty INT ," +
" create_by STRING," +
" create_time STRING, " +
" create_date STRING" +
") with ('connector' = 'print' )";
String sink_print="insert into sink_print select car_id,member_id,goods_info_id,qty,create_by,create_time,create_date from ssc_goods_car_input";
tableEnv.executeSql(PRINT_SINK_SQL);
tableEnv.executeSql(sink_print);
kafka过来数据 随时插入hive
你最后是打印到控制台哈,没有sink到hive
tableEnv.executeSql(sink).print();
你的使用方式有问题,即使使用catalog建表也需要指定with,你配置了日志吗,也许报错了没打出来而已,resource下加一个log4j2.properties 指定
rootLogger.Level=debug