【详述】flink的starrockssink,怎么从数据流中获取tableName,根据数据流不同的tableName写入不同starrocks表中
【flink的starrockssink】
StarRocksSink.sink(
StarRocksSinkOptions.builder()
.withProperty(“jdbc-url”, Constant.STARROCKS_JDBC_URL)
.withProperty(“load-url”, Constant.STARROCKS_LOAD_URL)
.withProperty(“username”, username)
.withProperty(“password”, password)
.withProperty(“database-name”, databaseName)
.withProperty(“table-name”, tableName)
.withProperty(“sink.buffer-flush.interval-ms”, “10000”)
.withProperty(“sink.buffer-flush.max-bytes”, 1024102464 + “”)
.withProperty(“sink.max-retries”, “5”)
.withProperty(“sink.properties.format”, “json”)
.withProperty(“sink.properties.strip_outer_array”, “true”)
.build()
);
这里的tableName只能传入常量,不能通过数据流不同的数据获取不同的tableName
【flink的kafkaSink】
Properties props = new Properties();
props.setProperty(“bootstrap.servers”, Constant.KAFKA_BOOTSTRAP_SERVERS);
props.setProperty(“security.protocol”,Constant.KAFKA_SECURITY_PROTOCOL);
props.setProperty(“sasl.mechanism”, Constant.KAFKA_SASL_MECHANISM);
props.setProperty(“sasl.jaas.config”, Constant.KAFKA_SASL_JAAS_CONFIG);
props.setProperty(“transaction.timeout.ms”, Constant.KAFKA_TRANSACTION_TIMEOUT_MS);
return new FlinkKafkaProducer(
“default”,
new KafkaSerializationSchema() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element,
@Nullable Long timestamp) {
JSONObject obj = JSON.parseObject(element);
JSONObject source = obj.getJSONObject(“source”);
String topic = source.getString(“db”) + “.” + source.getString(“table”);
return new ProducerRecord<>(topic, element.getBytes(StandardCharsets.UTF_8));
}
},
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
而kafka的topic却可以从数据流不同的数据获取不同的topic,写入不同的topic
【想要实现的效果】和kafkasink类似,能从数据流不同的数据获取不同的tableName,写入starrocks不同的表中,并且从攒批数量或者时间触发写出sr
两个思路,一个是分流,根据不同的tablename用不同的sink;另一个是重写starrocksink,不使用默认的,在里面允许指定tablename
当你使用Flink的StarRocksSink,希望从数据流中获取不同的tableName并根据不同的tableName将数据写入不同的StarRocks表时,你可以使用Flink的ProcessFunction和状态来实现。以下是一个示例代码,展示了如何根据数据流中的tableName将数据写入不同的StarRocks表:
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class StarRocksProcessFunction extends ProcessFunction<String, String> {
private MapState<String, StarRocksSink> tableSinkMapState;
@Override
public void open(Configuration parameters) {
MapStateDescriptor<String, StarRocksSink> stateDescriptor =
new MapStateDescriptor<>("tableSinkMapState", Types.STRING, Types.POJO(StarRocksSink.class));
tableSinkMapState = getRuntimeContext().getMapState(stateDescriptor);
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// 从数据流中解析出tableName,假设数据流中的tableName存储在JSON数据的某个字段中
String tableName = parseTableName(value);
// 根据tableName获取对应的StarRocksSink
StarRocksSink starRocksSink = tableSinkMapState.get(tableName);
if (starRocksSink == null) {
// 如果Map中没有对应的StarRocksSink,则创建一个新的并加入Map中
starRocksSink = createNewStarRocksSink(tableName);
tableSinkMapState.put(tableName, starRocksSink);
}
// 使用对应的StarRocksSink将数据写入StarRocks
starRocksSink.write(value);
// 触发写出StarRocks数据(根据攒批数量或时间等条件)
if (isTriggerWrite()) {
for (StarRocksSink sink : tableSinkMapState.values()) {
sink.flush();
}
}
}
// 解析数据流中的tableName字段
private String parseTableName(String value) {
// 假设value是JSON格式的数据,可以使用JSON解析库来解析
// 这里仅做示例,实际使用时需要根据具体数据格式解析
return JSON.parseObject(value).getString("tableName");
}
// 创建一个新的StarRocksSink
private StarRocksSink createNewStarRocksSink(String tableName) {
// 创建一个新的StarRocksSink,传入对应的tableName等配置
return StarRocksSink.sink(
// 配置StarRocks连接信息等
);
}
// 根据需要实现触发写出StarRocks数据的条件判断
private boolean isTriggerWrite() {
// 这里可以根据攒批数量、时间等条件来触发写出StarRocks数据
// 示例中简化为每条数据都触发写出
return true;
}
}
在上述代码中,使用了ProcessFunction来获取数据流中的tableName,然后使用MapState来管理不同tableName对应的StarRocksSink。如果MapState中没有对应的StarRocksSink,则会创建一个新的,并加入MapState中。然后,使用对应的StarRocksSink将数据写入StarRocks表。