已知所有kafka里topic为固定格式的json,目前想用flink处理所有topic里的数据,并且写入第二个kafka,sink的topic和source的topic一致,如何实现?
DEMO
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val inputStream = env.addSource(new FlinkKafkaConsumer010[String]("sensor", new SimpleStringSchema(), properties))
// Transform操作
val dataStream = sourceStream.map(data => data.toString)
// sink
dataStream.addSink( new FlinkKafkaProducer010[String]("mym-sink", new SimpleStringSchema(), properties))
dataStream.print("send to kafka")
env.execute("kafka data process")
}