flink集成drools,哪里错了吗?谢谢。

flink集成drools,哪里错了吗?谢谢。
val kieServices = KieServices.Factory.get(); // 通过这个静态方法去获取一个实例
val kieContainer = kieServices.getKieClasspathContainer();// 默认去读取配置文件

val env = StreamExecutionEnvironment.getExecutionEnvironment
val text1: DataStream[String] = env.socketTextStream("192.168.171.103", 9000, '\n')

val kieSession = kieContainer.newKieSession("all-rules");// 根据这个字符串去获取kieSession
text1.map(f => {
  val arr = f.split(" ")
  val userId = arr(0)
  val ip = arr(1)
  val `type` = arr(2)
  kieSession.insert(new DLoginEvent1(userId, ip, `type`))
  val count = kieSession.fireAllRules()
  println("Fire " + count + " rule(s)!")
})

kieSession.dispose()

env.execute(this.getClass.getSimpleName)