有这么一个需求:让rdd1执行map(或其他算子),当出现满足条件的情况时,控制整个spark程序停止,或是保存停止结果到文件(优先保存结果到文件),而不继续执行后续步骤,为了提高速度。
//示例,大概就是这么一个意思:当出现值为5的行,停止程序或是保存结果
val rdd2 = rdd1.map(x =>{
if(x==5){
//整个spark程序停止,或是保存停止结果到文件
}
...........
}
)
rdd2.count()
有没有什么方法可以实现呢?
没怎么看明白这个问题,如果只是(通过某些控制条件,在某一步保存结果到文件)可以filter需要的条件,并将结果保存。
添加了一个breakable块,看看这样能达到优化条件不
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks._
val ab= new ArrayBuffer[String]()
var pp=0
rdd1.foreach(aa=>
{
if(aa.contains("条件")){
pp+=1
ab += aa
breakable{
if (pp == 2) {
println(ab)
break
}}}
})
我已经实现了,建立一个累加器,当出现匹配行时,改变累加器的值,并通过在driver中创建线程持续监听累加器的值,从而捕捉到算子内部的变化以中断程序