目前卡在许可环节
Adsk文件版本与pj版本符合,杀毒软件也关了,但是仍然会弹出FlexNet窗口,显示网络许可不通过……
引用 皆我百晓生 小程序回复内容作答:
在 IntelliJ IDEA 上编写 Scala 代码使用 Flink 消费 Kafka 数据并存储到 Redis 中,可以按照以下步骤进行操作:
步骤 1:设置项目依赖项
name := "FlinkKafkaRedisExample"
version := "1.0"
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.10.0",
"org.apache.flink" %% "flink-streaming-scala" % "1.10.0",
"org.apache.flink" %% "flink-connector-kafka" % "1.10.0",
"org.apache.flink" %% "flink-connector-redis" % "1.10.0",
"org.apache.kafka" % "kafka-clients" % "2.4.1",
"redis.clients" % "jedis" % "3.3.0",
"org.apache.flink" %% "flink-json" % "1.10.0"
)
步骤 2:编写 Scala 代码
在 src/main/scala 目录下创建一个名为 Main.scala 的文件,并将以下代码复制到文件中:
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper
object Main {
case class SensorReading(id: String, timestamp: Long, temperature: Double)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(1000L)
env.enableCheckpointing(60000L)
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L))
val brokers = "localhost:9092"
val topic = "sensor_data"
val kafkaProperties = new java.util.Properties()
kafkaProperties.setProperty("bootstrap.servers", brokers)
kafkaProperties.setProperty("group.id", "flink_group")
val kafkaConsumer = new FlinkKafkaConsumer010[String](topic, new SimpleStringSchema(), kafkaProperties)
kafkaConsumer.setStartFromEarliest()
val stream = env.addSource(kafkaConsumer)
val redisConfig = new FlinkJedisConfigBase {
override def getHost: String = "localhost"
override def getPort: Int = 6379
}
stream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
})
.addSink(new RedisSink[SensorReading](redisConfig, new RedisExampleMapper))
env.execute("Flink Kafka Redis Example")
}
class RedisExampleMapper extends RedisMapper[SensorReading] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temperatures")
}
override def getKeyFromData(data: SensorReading): String = {
data.id
}
override def getValueFromData(data: SensorReading): String = {
data.temperature.toString
}
}
}
步骤 3:运行代码
保存并运行代码,在 IntelliJ IDEA 中点击运行按钮或使用 sbt run
命令执行代码。
以上代码会从名为 "sensor_data" 的 Kafka 主题中获取数据,将数据转换为 SensorReading 实例,并将其存储到 Redis 的名为 "sensor_temperatures" 的哈希集合中。
请确保 Kafka 和 Redis 运行在本地,并且已按照指定的端口启动。
希望这可以帮助您在 IntelliJ IDEA 上编写 Scala 代码使用 Flink 消费 Kafka 中的数据并存储到 Redis 中。