Flink1.16
问题:当我使用flink table消费Kafka时,查询的结果表转换为数据流、再转换为表,产生InstanceAlreadyExistsException__
WARN [org.apache.kafka.common.utils.AppInfoParser] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=my-group-id-0
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:816)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:647)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:627)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.(KafkaPartitionSplitReader.java:88)
at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:160)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:161)
at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:84)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:242)
at org.apache.flink.streaming.api.operators.SourceOperator.handleAddSplitsEvent(SourceOperator.java:548)
at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$19(StreamTask.java:1455)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
下面是我写的测试
package com.bug.FlinkDemo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;
public class tbltest {
@Test
public void ts1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(
"create table kafka_ods_tbl(\n" +
" iceberg_ods_tbl_name string,\n" +
" kafka_dwd_topic string,\n" +
" data string\n" +
") with (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'KAFKA-ODS-TOPIC',\n" +
" 'properties.bootstrap.servers'='hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
" 'scan.startup.mode'='earliest-offset',\n" +
" 'properties.group.id' = 'my-group-id',\n" +
" 'format' = 'json'\n" +
")"
);
Table table = tableEnv.sqlQuery(" select iceberg_ods_tbl_name,data,kafka_dwd_topic from kafka_ods_tbl");
DataStream<Row> rowDataStream = tableEnv.toDataStream(table);
tableEnv.createTemporaryView("tmp",rowDataStream);
tableEnv.executeSql("select * from tmp");
// rowDataStream.print("map===>\n");
env.execute("test");
}
}
不知道你这个问题是否已经解决, 如果还没有解决的话: