Spark中常用的算法：

3.2.1 分类算法

import org.apache.spark.SparkContext

import org.apache.spark.mllib.classification.SVMWithSGD

import org.apache.spark.mllib.regression.LabeledPoint

// 加载和解析数据文件

val data = sc.textFile("mllib/data/sample_svm_data.txt")

val parsedData = data.map { line =>

val parts = line.split(' ')

LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)

}

// 设置迭代次数并进行进行训练

val numIterations = 20

val model = SVMWithSGD.train(parsedData, numIterations)

// 统计分类错误的样本比例

val labelAndPreds = parsedData.map { point =>

val prediction = model.predict(point.features)

(point.label, prediction)

}

val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count

println("Training Error = " + trainErr)

3.2.2 回归算法

import org.apache.spark.mllib.regression.LinearRegressionWithSGD

import org.apache.spark.mllib.regression.LabeledPoint

// 加载和解析数据文件

val data = sc.textFile("mllib/data/ridge-data/lpsa.data")

val parsedData = data.map { line =>

val parts = line.split(',')

LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)

}

//设置迭代次数并进行训练

val numIterations = 20

val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// 统计回归错误的样本比例

val valuesAndPreds = parsedData.map { point =>

val prediction = model.predict(point.features)

(point.label, prediction)

}

val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count

println("training Mean Squared Error = " + MSE)

3.2.3 聚类算法

import org.apache.spark.mllib.clustering.KMeans

// 加载和解析数据文件

val data = sc.textFile("kmeans_data.txt")

val parsedData = data.map( _.split(' ').map(_.toDouble))

// 设置迭代次数、类簇的个数

val numIterations = 20

val numClusters = 2

// 进行训练

val clusters = KMeans.train(parsedData, numClusters, numIterations)

// 统计聚类错误的样本比例

val WSSSE = clusters.computeCost(parsedData)

println("Within Set Sum of Squared Errors = " + WSSSE)

3.2.4 协同过滤

import org.apache.spark.mllib.recommendation.ALS

import org.apache.spark.mllib.recommendation.Rating

// 加载和解析数据文件

val data = sc.textFile("mllib/data/als/test.data")

val ratings = data.map(_.split(',') match {

case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)

})

// 设置迭代次数

val numIterations = 20

val model = ALS.train(ratings, 1, 20, 0.01)

// 对推荐模型进行评分

val usersProducts = ratings.map{ case Rating(user, product, rate) => (user, product)}

val predictions = model.predict(usersProducts).map{

case Rating(user, product, rate) => ((user, product), rate)

}

val ratesAndPreds = ratings.map{

case Rating(user, product, rate) => ((user, product), rate)

}.join(predictions)

val MSE = ratesAndPreds.map{

case ((user, product), (r1, r2)) => math.pow((r1- r2), 2)

}.reduce(_ + _)/ratesAndPreds.count

println("Mean Squared Error = " + MSE)

