1.14版本的flink sql 写入2.1版本的hbase 报错

1.14版本的flink sql 写入2.1版本的hbase 报错 ,用的连接器是flink-connector-hbase-2.2_2.11-1.14.4.jar

img

img

#报错日志如下:

etting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.14.4/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p4071.7339474/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
---开始KERBEROS认证---
root (auth:SIMPLE)
---KERBEROS认证通过---
---处理hive维表对象完成---
---开始处理hive维表对象---
---处理hive维表对象完成---
---开始FlinkSQL流数据处理---
---数据写入中---
java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0) NOT NULL info) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0) NOT NULL info) NOT NULL
rel:
LogicalProject(rowkey=[$0], info=[ROW($1)])
  LogicalTableScan(table=[[myHiveCatalog, hive_catalog, kafka_dwd_acc_caccn]])

        at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
        at com.gyzq.HiveJoinTest.main(HiveJoinTest.java:75)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.14.4/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p4071.7339474/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
---开始KERBEROS认证---
root (auth:SIMPLE)
---KERBEROS认证通过---
---处理hive维表对象完成---
---开始处理hive维表对象---
---处理hive维表对象完成---
---开始FlinkSQL流数据处理---
---数据写入中---
java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0) NOT NULL info) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0) NOT NULL info) NOT NULL
rel:
LogicalProject(rowkey=[$0], info=[ROW($1)])
  LogicalTableScan(table=[[myHiveCatalog, hive_catalog, kafka_dwd_acc_caccn]])

    at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:639)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:290)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
    at com.gyzq.HiveJoinTest.main(HiveJoinTest.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
    at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

只要能解决问题 酬金可以另外追加

引用chatGPT作答,如果您使用的是Flink 1.14版本和HBase 2.1版本,则建议使用Flink HBase连接器的最新版本,即flink-connector-hbase_2.12-1.14.4.jar。如果您正在使用flink-connector-hbase-2.2_2.11-1.14.4.jar版本,则可能会出现不兼容的问题。

如果您使用的是最新版本的Flink HBase连接器,但仍然遇到问题,则可能是由于HBase版本与Flink HBase连接器不兼容导致的。在这种情况下,您可以尝试升级HBase版本或使用与您的HBase版本兼容的Flink HBase连接器版本。

另外,请确保您的代码正确设置了HBase表的列族和列信息,以及正确指定了HBase表的名称和Zookeeper Quorum地址。如果您仍然无法解决问题,请提供更多详细的错误信息和代码示例,以便我们更好地帮助您解决问题。

看截图,数据类型的问题

  • 给你找了一篇非常好的博客,你可以看看是否有帮助,链接:flink实战(一) flink-sql关联hbase维度数据处理
  • 除此之外, 这篇博客: flink sql connector中的 hbase 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • pom

    <!-- 1.4.x -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hbase-1.4_2.11</artifactId>
      <version>1.14.2</version>
    </dependency>
    
    <!-- 2.2.x -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
      <version>1.14.2</version>
    </dependency>
    

    所有 HBase 表的列簇必须定义为 ROW 类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中声明查询中使用的的列簇和列限定符。除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 rowkey。rowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。

    -- 在 Flink SQL 中注册 HBase 表 "mytable"
    CREATE TABLE hTable (
     rowkey INT,
     family1 ROW<q1 INT>,
     family2 ROW<q2 STRING, q3 BIGINT>,
     family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
     PRIMARY KEY (rowkey) NOT ENFORCED
    ) WITH (
     'connector' = 'hbase-1.4',
     'table-name' = 'mytable',
     'zookeeper.quorum' = 'localhost:2181'
    );
    
    -- 用 ROW(...) 构造函数构造列簇,并往 HBase 表写数据。
    -- 假设 "T" 的表结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
    INSERT INTO hTable
    SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
    
    -- 从 HBase 表扫描数据
    SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
    
    -- temporal join HBase 表,将 HBase 表作为维表
    SELECT * FROM myTopic
    LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
    ON myTopic.key = hTable.rowkey;
    
  • 您还可以看一下 李飞老师的Flink1.8实时数仓项目实战课程中的 Flink HBaseSink代码实现小节, 巩固相关知识点
  • 以下回答由chatgpt基于相关博客总结生成:

    由于问题描述不够清晰具体,无法确定具体的报错信息,因此无法给出明确的解决方案。建议根据报错信息进行针对性的排查,或者在问题描述中提供更多细节,以便更好地解决问题。

代码贴图