使用flink消费kafka数据写入到Iceberg时产生InstanceAlreadyExistsException,应该怎么解决??
[org.apache.kafka.common.utils.AppInfoParser] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.admin.client:type=app-info,id=test-enumerator-admin-client
WARN [org.apache.kafka.common.utils.AppInfoParser] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=test-1
功能如下
package bug.DWD;
import bug.MyUtil.DateUtil;
import bug.PoJo.DwdInfoPoJo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Objects;
import static bug.MyUtil.ConfigUtil.KAFKA_BROKERS;
public class ODSDataToDWD {
private static String kafkaBrokers = KAFKA_BROKERS;
private static String sql;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(5000);
tableEnv.executeSql(
"CREATE CATALOG hadoop_iceberg WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hadoop',\n" +
" 'warehouse'='hdfs://hadoop102/user/',\n" +
" 'default-database'='lakehouse'\n" +
")"
);
tableEnv.executeSql(
"create table kafka_ods_tbl(\n" +
" iceberg_ods_tbl_name string,\n" +
" kafka_dwd_topic string,\n" +
" data string\n" +
") with (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'KAFKA-ODS-TOPIC',\n" +
" 'properties.bootstrap.servers'='hadoop102:9092,hadoop103:9092,hadoop104:9092',\n" +
" 'scan.startup.mode'='earliest-offset',\n" +
" 'properties.group.id' = 'test',\n" +
" 'properties.enable.auto.commit' = 'false',\n" +
" 'format' = 'json'\n" +
")"
);
Table odsTbl = tableEnv.sqlQuery("select iceberg_ods_tbl_name,data,kafka_dwd_topic from kafka_ods_tbl");
// odsTbl.execute().print();
DataStream<Row> odsDs = tableEnv.toDataStream(odsTbl);
OutputTag<JSONObject> kafkaDataTag = new OutputTag<JSONObject>("kafka_data") {
};
// 清洗表换成对应的DataStream数据处理ODS 中的数据,存入Iceberg
SingleOutputStreamOperator<DwdInfoPoJo> dwdDs = odsDs.filter(new FilterFunction<Row>() {
@Override
public boolean filter(Row row) throws Exception {
return row.getField(0) != null && row.getField(1) != null && row.getField(2) != null;
}
})
.process(new ProcessFunction<Row, DwdInfoPoJo>() {
@Override
public void processElement(Row value, ProcessFunction<Row, DwdInfoPoJo>.Context ctx, Collector<DwdInfoPoJo> out) throws Exception {
String iceberg_ods_tbl_name = value.getField(0).toString();
String data = value.getField(1).toString();
String kafka_dwd_topic = value.getField(2).toString();
JSONObject jsonObj = JSON.parseObject(data);
// 时间处理
jsonObj.put("logTime", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logTime")));
jsonObj.put("login_tm", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("login_tm")));
jsonObj.put("logout_tm", DateUtil.getDateYYYYMMDDHHMMSS(jsonObj.getString("logout_tm")));
String browse_product_code = jsonObj.getString("browseProductCode");
String browse_product_tpCode = jsonObj.getString("browseProductTpCode");
String user_ip = jsonObj.getString("userIp");
String obtain_points = jsonObj.getString("obtainPoints");
String user_id1 = jsonObj.getString("user_id");
String user_id2 = jsonObj.getString("userId");
String front_product_url = jsonObj.getString("frontProductUrl");
String log_time = jsonObj.getString("logTime");
String browse_product_url = jsonObj.getString("browseProductUrl");
String id = jsonObj.getString("id");
String ip = jsonObj.getString("ip");
String login_tm = jsonObj.getString("login_tm");
String logout_tm = jsonObj.getString("logout_tm");
jsonObj.put("kafka_dwd_topic", kafka_dwd_topic);
ctx.output(kafkaDataTag, jsonObj);
DwdInfoPoJo dwdInfoPoJo = new DwdInfoPoJo(iceberg_ods_tbl_name, kafka_dwd_topic, browse_product_code, browse_product_tpCode, user_ip, obtain_points, user_id1, user_id2, front_product_url, log_time, browse_product_url, id, ip, login_tm, logout_tm);
if (dwdInfoPoJo.kafka_dwd_topic.equals("KAFKA-DWD-BROWSE-LOG-TOPIC"))
System.out.println("dwdInfoPoJo=======" + dwdInfoPoJo);
out.collect(dwdInfoPoJo);
}
});
Schema schema = Schema.newBuilder()
.column("iceberg_ods_tbl_name", DataTypes.STRING())
.column("kafka_dwd_topic", DataTypes.STRING())
.column("browse_product_code", DataTypes.STRING())
.column("browse_product_tpcode", DataTypes.STRING())
.column("user_ip", DataTypes.STRING())
.column("obtain_points", DataTypes.STRING())
.column("user_id1", DataTypes.STRING())
.column("user_id2", DataTypes.STRING())
.column("front_product_url", DataTypes.STRING())
.column("log_time", DataTypes.STRING())
.column("browse_product_url", DataTypes.STRING())
.column("id", DataTypes.STRING())
.column("ip", DataTypes.STRING())
.column("login_tm", DataTypes.STRING())
.column("logout_tm", DataTypes.STRING())
.build();
Table table = tableEnv.fromDataStream(dwdDs, schema);
//插入 iceberg - dwd 层 用户登录信息数据 :DWD_USER_LOGIN
sql = "insert into hadoop_iceberg.lakehouse.DWD_USER_LOGIN\n" +
"select\n" +
" id,user_id1,ip,login_tm,logout_tm\n" +
" from " + table + " where iceberg_ods_tbl_name = 'ODS_USER_LOGIN'";
tableEnv.executeSql(sql);
// dwdDs.getSideOutput(kafkaDataTag).print("kafkaDataTag=====>");
// dwdDs.getSideOutput(kafkaDataTag).sinkTo(
// KafkaSink.<JSONObject>builder()
// .setBootstrapServers(kafkaBrokers)
// .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// .setProperty("transaction.timeout.ms", 1000 * 60 * 15 + "")
// .setRecordSerializer(
// KafkaRecordSerializationSchema.<JSONObject>builder()
// .setTopicSelector(jsonObject -> jsonObject.getString("kafka_dwd_topic"))
// .setValueSerializationSchema(new JsonSerializationSchema<JSONObject>())
// .build()
// ).build()
// );
env.execute("ODSToDWD Job");
}
}
这个异常通常是因为在Iceberg中尝试创建一个已经存在的表。您可以通过检查表名是否正确并且不存在来解决此问题。如果表名正确且不存在,则可能是由于在多个任务之间共享Iceberg Catalog实例而导致的。在这种情况下,您需要确保每个任务都有自己的Catalog实例,或者使用线程安全的Catalog实现。
出现InstanceAlreadyExistsException错误,通常是由于一个已经存在的实例在写入数据时发生了冲突。这可能是由于以下原因之一: 1. Kafka 实例已经存在:在将 Kafka 数据写入到 Iceberg 时,如果 Kafka 实例已经存在,那么将无法写入新的数据。因此,在将数据写入到 Kafka 之前,需要创建一个新的 Kafka 实例。 2. Flink 实例已经存在:在将数据写入到 Flink 时,如果 Flink 实例已经存在,那么将无法写入新的数据。因此,在将数据写入到 Flink 之前,需要创建一个新的 Flink 实例。 如果已经创建了 Kafka 实例或 Flink 实例,请检查它们是否都已经准备好写入数据。可以使用以下命令检查 Kafka 实例或 Flink 实例是否准备好: ``` bin/kafka-topics.sh --create --bootstrap-server=localhost:9092 --topic test-topic bin/flink-console-consumer.sh --from-beginning --topic test-topic --input-path test. KafkaConsumer ``` 如果 Kafka 实例已经存在,请确保它已经被正确地配置并且可以写入数据。可以使用以下命令检查 Kafka 实例的配置是否正确: ``` bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic test-topic --from-beginning ``` 如果 Flink 实例已经存在,请确保它已经被正确地配置并且可以写入数据。可以使用以下命令检查 Flink 实例的配置是否正确: ``` bin/flink-console-consumer.sh --from-beginning --topic test-topic --input-path test. FlinkConsumer ``` 如果已经创建了 Kafka 实例或 Flink 实例,并且它们都被正确地配置,但仍然无法写入数据,请检查 Kafka 或 Flink 的配置是否正确。可以使用以下命令检查 Kafka 或 Flink 的配置是否正确: ``` bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic test-topic --from-beginning bin/flink-console-consumer.sh --bootstrap-server=localhost:9092 --topic test-topic --input-path test. KafkaConsumer ``` 如果仍然无法解决问题,请尝试重新启动 Kafka 或 Flink 实例。这通常可以解决冲突问题。