在创建flink的流式执行环境时,使用java的api可以配置端口号,但在scala的api中。无法配置端口号等conf配置
java
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
scala
def getExecutionEnvironment: StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
}
scala的换成 createRemoteEnvironment,getExecutionEnvironment只是其中一种方法
你怎么不往下写了?使用scala的话,函数里面不应该这样写吗?
//1.创建执行环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
//2.获取数据源
val text = environment.readTextFile("hdfs://hadoop10:8020/flink/flink-words")
//3.对获取到的数据进行转换
val result = text.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.keyBy(0)
.sum(1)
//4.打印结果
result.print()
//5.执行job
environment.execute("myFlinkJob")