flink表转流再转表产生InstanceAlreadyExistsException

Flink1.16
问题:当我使用flink table消费Kafka时,查询的结果表转换为数据流、再转换为表,产生InstanceAlreadyExistsException__

WARN [org.apache.kafka.common.utils.AppInfoParser] - Error registering AppInfo mbean
  javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=my-group-id-0
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
    at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:816)
    at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666)
    at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:647)
    at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:627)
    at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.(KafkaPartitionSplitReader.java:88)
    at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:160)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:161)
    at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:84)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242)
    at org.apache.flink.streaming.api.operators.SourceOperator.handleAddSplitsEvent(SourceOperator.java:548)
    at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1455)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:748)

下面是我写的测试

package com.bug.FlinkDemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;

public class tbltest {
    @Test
    public void ts1() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(
                "create table kafka_ods_tbl(\n" +
                        "   iceberg_ods_tbl_name string,\n" +
                        "   kafka_dwd_topic string,\n" +
                        "   data string\n" +
                        ") with (\n" +
                        " 'connector' = 'kafka',\n" +
                        " 'topic' = 'KAFKA-ODS-TOPIC',\n" +
                        " 'properties.bootstrap.servers'='hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                        " 'scan.startup.mode'='earliest-offset',\n" +
                        " 'properties.group.id' = 'my-group-id',\n" +
                        " 'format' = 'json'\n" +
                        ")"
        );

        Table table = tableEnv.sqlQuery(" select iceberg_ods_tbl_name,data,kafka_dwd_topic from kafka_ods_tbl");

        DataStream<Row> rowDataStream = tableEnv.toDataStream(table);

        tableEnv.createTemporaryView("tmp",rowDataStream);

        tableEnv.executeSql("select * from tmp");

//        rowDataStream.print("map===>\n");
        env.execute("test");
    }

}
不知道你这个问题是否已经解决, 如果还没有解决的话:
  • 以下回答来自chatgpt: 首先,InstanceAlreadyExistsException异常的出现通常是因为已经存在具有相同名称的表或流了,所以需要检查代码中是否有重复的表或流。 如果代码中确实没有重复的表或流,那么可以考虑进行代码更改来解决问题。 以下是一组可能的解决方案: 1. 重命名表 如果发现代码中存在具有相同名称的表,那么可以尝试重命名表来避免该异常的产生。例如,使用以下代码将表重命名为“new_table”: ``` tableEnv.createTemporaryView("new_table", old_table); ``` 2. 停止并重新启动作业 在某些情况下,InstanceAlreadyExistsException异常可能是由于Flink作业中存在一些已停止但未清理的资源导致的。如果这是问题的原因,那么可以尝试停止并重新启动Flink作业来清理这些资源。例如,在Flink Web UI中,可以找到正在运行的作业,然后右键单击该作业并选择“Cancel”,然后重新提交该作业。 3. 手动删除已存在的表或流 如果发现其他代码或作业已经创建了具有相同名称的表或流,那么可以尝试手动删除这些表或流。例如,在Flink Web UI的“Catalogs”页面中,可以找到并删除具有相同名称的表或流。 4. 关闭TableEnvironment并重新打开 为了避免资源占用和内存泄漏等问题,Flink中的TableEnvironment默认情况下会在作业结束后自动关闭。因此,如果代码中存在多个TableEnvironment实例,那么需要手动关闭它们并重新打开一个新的TableEnvironment。例如,使用以下代码关闭并重新打开TableEnvironment: ``` //关闭TableEnvironment tableEnv.close(); //打开新的TableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); ``` 总之,InstanceAlreadyExistsException异常的出现通常是由于存在重复的表或流,或者由于Flink作业中存在未清理的资源。通过检查并改正可能存在的问题,可以解决该异常。

如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^