flink消费kafka数据写入到Iceberg:InstanceAlreadyExistsException

使用flink消费kafka数据写入到Iceberg时产生InstanceAlreadyExistsException,应该怎么解决??

[org.apache.kafka.common.utils.AppInfoParser] - Error registering AppInfo mbean
  javax.management.InstanceAlreadyExistsException: kafka.admin.client:type=app-info,id=test-enumerator-admin-client

WARN [org.apache.kafka.common.utils.AppInfoParser] - Error registering AppInfo mbean
  javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=test-1

功能如下

package bug.DWD;

import bug.MyUtil.DateUtil;
import bug.PoJo.DwdInfoPoJo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Objects;

import static bug.MyUtil.ConfigUtil.KAFKA_BROKERS;

public class ODSDataToDWD {

    private static String kafkaBrokers = KAFKA_BROKERS;
    private static String sql;

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(5000);

        tableEnv.executeSql(
                "CREATE CATALOG hadoop_iceberg WITH (\n" +
                        "  'type'='iceberg',\n" +
                        "  'catalog-type'='hadoop',\n" +
                        "  'warehouse'='hdfs://hadoop102/user/',\n" +
                        " 'default-database'='lakehouse'\n" +
                        ")"
        );

        tableEnv.executeSql(
                "create table kafka_ods_tbl(\n" +
                        "   iceberg_ods_tbl_name string,\n" +
                        "   kafka_dwd_topic string,\n" +
                        "   data string\n" +
                        ") with (\n" +
                        " 'connector' = 'kafka',\n" +
                        " 'topic' = 'KAFKA-ODS-TOPIC',\n" +
                        " 'properties.bootstrap.servers'='hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
                        " 'scan.startup.mode'='earliest-offset',\n" +
                        " 'properties.group.id' = 'test',\n" +
                        " 'properties.enable.auto.commit' = 'false',\n" +
                        " 'format' = 'json'\n" +
                        ")"
        );

        Table odsTbl = tableEnv.sqlQuery("select iceberg_ods_tbl_name,data,kafka_dwd_topic from kafka_ods_tbl");

//        odsTbl.execute().print();

        DataStream<Row> odsDs = tableEnv.toDataStream(odsTbl);

        OutputTag<JSONObject> kafkaDataTag = new OutputTag<JSONObject>("kafka_data") {
        };


        // 清洗表换成对应的DataStream数据处理ODS 中的数据,存入Iceberg


        SingleOutputStreamOperator<DwdInfoPoJo> dwdDs = odsDs.filter(new FilterFunction<Row>() {
                    @Override
                    public boolean filter(Row row) throws Exception {
                        return row.getField(0) != null && row.getField(1) != null && row.getField(2) != null;
                    }
                })
                .process(new ProcessFunction<Row, DwdInfoPoJo>() {

                    @Override
                    public void processElement(Row value, ProcessFunction<Row, DwdInfoPoJo>.Context ctx, Collector<DwdInfoPoJo> out) throws Exception {

                        String iceberg_ods_tbl_name = value.getField(0).toString();
                        String data = value.getField(1).toString();
                        String kafka_dwd_topic = value.getField(2).toString();

                        JSONObject jsonObj = JSON.parseObject(data);

                        // 时间处理
                        jsonObj.put("logTime", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logTime")));
                        jsonObj.put("login_tm", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("login_tm")));
                        jsonObj.put("logout_tm", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logout_tm")));

                        String browse_product_code = jsonObj.getString("browseProductCode");
                        String browse_product_tpCode = jsonObj.getString("browseProductTpCode");
                        String user_ip = jsonObj.getString("userIp");
                        String obtain_points = jsonObj.getString("obtainPoints");
                        String user_id1 = jsonObj.getString("user_id");
                        String user_id2 = jsonObj.getString("userId");
                        String front_product_url = jsonObj.getString("frontProductUrl");
                        String log_time = jsonObj.getString("logTime");
                        String browse_product_url = jsonObj.getString("browseProductUrl");
                        String id = jsonObj.getString("id");
                        String ip = jsonObj.getString("ip");
                        String login_tm = jsonObj.getString("login_tm");
                        String logout_tm = jsonObj.getString("logout_tm");

                        jsonObj.put("kafka_dwd_topic", kafka_dwd_topic);
                        ctx.output(kafkaDataTag, jsonObj);

                        DwdInfoPoJo dwdInfoPoJo = new DwdInfoPoJo(iceberg_ods_tbl_name, kafka_dwd_topic, browse_product_code, browse_product_tpCode, user_ip, obtain_points, user_id1, user_id2, front_product_url, log_time, browse_product_url, id, ip, login_tm, logout_tm);

                        if (dwdInfoPoJo.kafka_dwd_topic.equals("KAFKA-DWD-BROWSE-LOG-TOPIC"))
                            System.out.println("dwdInfoPoJo=======" + dwdInfoPoJo);

                        out.collect(dwdInfoPoJo);
                    }
                });

        Schema schema = Schema.newBuilder()
                .column("iceberg_ods_tbl_name", DataTypes.STRING())
                .column("kafka_dwd_topic", DataTypes.STRING())
                .column("browse_product_code", DataTypes.STRING())
                .column("browse_product_tpcode", DataTypes.STRING())
                .column("user_ip", DataTypes.STRING())
                .column("obtain_points", DataTypes.STRING())
                .column("user_id1", DataTypes.STRING())
                .column("user_id2", DataTypes.STRING())
                .column("front_product_url", DataTypes.STRING())
                .column("log_time", DataTypes.STRING())
                .column("browse_product_url", DataTypes.STRING())
                .column("id", DataTypes.STRING())
                .column("ip", DataTypes.STRING())
                .column("login_tm", DataTypes.STRING())
                .column("logout_tm", DataTypes.STRING())
                .build();

        Table table = tableEnv.fromDataStream(dwdDs, schema);
        //插入 iceberg - dwd 层 用户登录信息数据 :DWD_USER_LOGIN
        sql = "insert into hadoop_iceberg.lakehouse.DWD_USER_LOGIN\n" +
                "select\n" +
                " id,user_id1,ip,login_tm,logout_tm\n" +
                " from " + table + " where iceberg_ods_tbl_name = 'ODS_USER_LOGIN'";
        tableEnv.executeSql(sql);

//        dwdDs.getSideOutput(kafkaDataTag).print("kafkaDataTag=====>");
//        dwdDs.getSideOutput(kafkaDataTag).sinkTo(
//                KafkaSink.<JSONObject>builder()
//                        .setBootstrapServers(kafkaBrokers)
//                        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
//                        .setProperty("transaction.timeout.ms", 1000 * 60 * 15 + "")
//                        .setRecordSerializer(
//                                KafkaRecordSerializationSchema.<JSONObject>builder()
//                                        .setTopicSelector(jsonObject -> jsonObject.getString("kafka_dwd_topic"))
//                                        .setValueSerializationSchema(new JsonSerializationSchema<JSONObject>())
//                                        .build()
//                        ).build()
//        );
        env.execute("ODSToDWD Job");
    }
}

这个异常通常是因为在Iceberg中尝试创建一个已经存在的表。您可以通过检查表名是否正确并且不存在来解决此问题。如果表名正确且不存在,则可能是由于在多个任务之间共享Iceberg Catalog实例而导致的。在这种情况下,您需要确保每个任务都有自己的Catalog实例,或者使用线程安全的Catalog实现。

出现InstanceAlreadyExistsException错误,通常是由于一个已经存在的实例在写入数据时发生了冲突。这可能是由于以下原因之一: 1. Kafka 实例已经存在:在将 Kafka 数据写入到 Iceberg 时,如果 Kafka 实例已经存在,那么将无法写入新的数据。因此,在将数据写入到 Kafka 之前,需要创建一个新的 Kafka 实例。 2. Flink 实例已经存在:在将数据写入到 Flink 时,如果 Flink 实例已经存在,那么将无法写入新的数据。因此,在将数据写入到 Flink 之前,需要创建一个新的 Flink 实例。 如果已经创建了 Kafka 实例或 Flink 实例,请检查它们是否都已经准备好写入数据。可以使用以下命令检查 Kafka 实例或 Flink 实例是否准备好: ``` bin/kafka-topics.sh --create --bootstrap-server=localhost:9092 --topic test-topic bin/flink-console-consumer.sh --from-beginning --topic test-topic --input-path test. KafkaConsumer ``` 如果 Kafka 实例已经存在,请确保它已经被正确地配置并且可以写入数据。可以使用以下命令检查 Kafka 实例的配置是否正确: ``` bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic test-topic --from-beginning ``` 如果 Flink 实例已经存在,请确保它已经被正确地配置并且可以写入数据。可以使用以下命令检查 Flink 实例的配置是否正确: ``` bin/flink-console-consumer.sh --from-beginning --topic test-topic --input-path test. FlinkConsumer ``` 如果已经创建了 Kafka 实例或 Flink 实例,并且它们都被正确地配置,但仍然无法写入数据,请检查 Kafka 或 Flink 的配置是否正确。可以使用以下命令检查 Kafka 或 Flink 的配置是否正确: ``` bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic test-topic --from-beginning bin/flink-console-consumer.sh --bootstrap-server=localhost:9092 --topic test-topic --input-path test. KafkaConsumer ``` 如果仍然无法解决问题,请尝试重新启动 Kafka 或 Flink 实例。这通常可以解决冲突问题。