Error:(11, 32) not enough arguments for method dataAnalysis: (answer: org.apache.spark.streaming.dstream.DStream[ort.niit.bean.Answer]): Unit.
Unspecified value parameter answer.
weaDataService.dataAnalysis()
Error:(9, 31) not enough arguments for method dispatch: (dataStream: org.apache.spark.streaming.dstream.DStream[ort.niit.bean.Answer]): Unit.
Unspecified value parameter dataStream.
weaDataController.dispatch()
package ort.niit.service
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import ort.niit.bean.Answer
class WEADataService {
def dataAnalysis(answer: DStream[Answer]): Unit = {
cityMosttemperatureTop10(answer)
cityRelativehumidityTop3(answer)
}
//需求五:实时统计每个城市top10温度
private def cityMosttemperatureTop10(answer: DStream[Answer]): Unit = {
val mapDS = answer.map(data=>{
((data.location,data.temperature),1)
})
val reduceDS = mapDS.reduceByKey(_+_)
val newmapDS = reduceDS.map{
case ((location,temperature),count)=>(location,(temperature,count))
}
val groupDS = newmapDS.groupByKey()
groupDS.foreachRDD(rdd=>{
val value = rdd.mapValues(iter=>{
val sort = iter.toList.sortBy(_._2).reverse
sort.take(10)
})
println("-------实时统计每个城市top10温度---------")
value.collect().foreach(println)
})
}
//需求二:实时统计每个城市top3相对湿度
private def cityRelativehumidityTop3(answer: DStream[Answer]): Unit = {
val mapDS = answer.map(data=>{
((data.location,data.humidity),1)
})
val reduceDS = mapDS.reduceByKey(_+_)
val newmapDS = reduceDS.map{
case ((location,humidity),count)=>(location,(humidity,count))
}
val groupDS = newmapDS.groupByKey()
groupDS.foreachRDD(rdd=>{
val value = rdd.mapValues(iter=>{
val sort = iter.toList.sortBy(_._2).reverse
sort.take(3)
})
println("-------实时统计每个城市top3相对湿度--------")
value.collect().foreach(println)
})
}
}
package ort.niit.controller
import org.apache.spark.streaming.dstream.DStream
import ort.niit.bean.Answer
import ort.niit.service.WEADataService
class WEADataController {
private val weaDataService = new WEADataService()
def dispatch(dataStream: DStream[Answer]): Unit = {
weaDataService.dataAnalysis()
}
}
package ort.niit.app
import ort.niit.common.TApp
import ort.niit.controller.WEADataController
object WEADataApp extends App with TApp{
start("local[*]","weaData"){
val weaDataController = new WEADataController()
weaDataController.dispatch()
}
}
package ort.niit.common
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.niit.util.SparkUtil
trait TApp {
def start(master:String = "local[*]",appName:String = "application")(op : => Unit ) : Unit = {
val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
val sc = SparkUtil.CreateSpark(sparkConf,Seconds(5))
val spark = SparkUtil.takeSpark()
val ssc = SparkUtil.takeSSC()
sc.setLogLevel("ERROR")
try{
op
}catch {
case ex => ex.printStackTrace()
}
spark.stop()
ssc.stop(true,true)
sc.stop()
SparkUtil.clear()
}
}
package ort.niit.bean
import java.sql.Timestamp
case class Answer(location: String,
weatherType: String,
temperature: Double,
humidity: Int,
pressure: Int,
weatherTime: String,
timestamp: Timestamp)extends Serializable
表明在运行weaDataController.dispatch()和weaDataService.dataAnalysis()时没有传入需要的参数。weaDataController.dispatch()方法需要传入一个DStream[String]类型参数,而weaDataService.dataAnalysis()方法需要一个DStream[Answer]类型参数。你需要在调用这两个方法时传入相应的参数才能正常运行。
调用方法的时候没传参数呀