idea运行kafka集成flink的项目运行报错。
public class KafkaFlinkDemo1 {
public static void main(String[] args) throws Exception {
//获取执行环境
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//创建一个Table Environment
StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);
sTableEnv.connect(new Kafka()
.version("0.10")
.topic("topic1")
.startFromLatest()
.property("group.id", "group1")
.property("bootstrap.servers", "172.168.30.105:21005")
).withFormat(
new Json().failOnMissingField(false).deriveSchema()
).withSchema(
new Schema().field("userId", Types.LONG())
.field("day", Types.STRING())
.field("begintime", Types.LONG())
.field("endtime", Types.LONG())
.field("data", ObjectArrayTypeInfo.getInfoFor(
Row[].class,
Types.ROW(new String[]{"package", "activetime"},
new TypeInformation[]{Types.STRING(), Types.LONG()}
)
))
).inAppendMode().registerTableSource("userlog");
Table result = sTableEnv.sqlQuery("select userId from userlog");
DataStream<Row> rowDataStream = sTableEnv.toAppendStream(result, Row.class);
rowDataStream.print();
sEnv.execute("KafkaFlinkDemo1");
}
}
报错信息如下:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/E:/develop/apache-maven-3.6.0-bin/repository/ch/qos/logback/logback-classic/1.1.3/logback-classic-1.1.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/E:/develop/apache-maven-3.6.0-bin/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.AbstractMethodError: org.apache.flink.table.descriptors.ConnectorDescriptor.toConnectorProperties()Ljava/util/Map;
at org.apache.flink.table.descriptors.ConnectorDescriptor.toProperties(ConnectorDescriptor.java:58)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.toProperties(ConnectTableDescriptor.scala:107)
at org.apache.flink.table.descriptors.StreamTableDescriptor.toProperties(StreamTableDescriptor.scala:95)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:39)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:46)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
at com.huawei.bigdata.KafkaFlinkDemo1.main(KafkaFlinkDemo1.java:41)
Process finished with exit code 1
https://blog.csdn.net/xianpanjia4616/article/details/86644800