现在本人急需一套可以实现的Spark ALS协同过滤推荐算法
import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
/**
def main(args:Array[String]) ={
// 设置运行环境
val conf = new SparkConf().setAppName("ALS 01")
.setMaster("spark://master:7077").setJars(Seq("E:\Intellij\Projects\MachineLearning\MachineLearning.jar"))
val sc = new SparkContext(conf)
Logger.getRootLogger.setLevel(Level.WARN)
// 读取样本数据并解析
val dataRDD = sc.textFile("hdfs://master:9000/ml/data/test.data")
val ratingRDD = dataRDD.map(_.split(',') match {
case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
})
// 拆分成训练集和测试集
val dataParts = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingRDD = dataParts(0)
val testRDD = dataParts(1)
// 建立ALS交替最小二乘算法模型并训练
val rank = 10
val numIterations = 10
val alsModel = ALS.train(trainingRDD, rank, numIterations, 0.01)
// 预测
val user_product = trainingRDD.map {
case Rating(user, product, rate) =>
(user, product)
}
val predictions =
alsModel.predict(user_product).map {
case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPredictions = trainingRDD.map {
case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
val MSE = ratesAndPredictions.map {
case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println("Mean Squared Error = " + MSE)
println("User" + "\t" + "Products" + "\t" + "Rate" + "\t" + "Prediction")
ratesAndPredictions.collect.foreach(
rating => {
println(rating._1._1 + "\t" + rating._1._2 + "\t" + rating._2._1 + "\t" + rating._2._2)
}
)
}
}