spark mllib库 进行电影聚类分析(Scala语言)
实验镜像:
下载链接:https://pan.baidu.com/s/15Fc1L3iJEcbXo7SVW9mTfg
提取密码:iaom 用户名:c205,密码:一个空格
root密码:一个空格
Spark 机器学习库简介
Spark 机器学习库提供了常用机器学习算法的实现,包括聚类,分类,回归,协同过滤,维度缩减等。使用 Spark 机器学习库来做机器学习工作,可以说是非常的简单,通常只需要在对原始数据进行处理后,然后直接调用相应的 API 就可以实现。但是要想选择合适的算法,高效准确地对数据进行分析,您可能还需要深入了解下算法原理,以及相应 Spark MLlib API 实现的参数的意义。
需要提及的是,Spark 机器学习库从 1.2 版本以后被分为两个包,分别是:
- spark.mllib
Spark MLlib 历史比较长了,1.0 以前的版本中已经包含了,提供的算法实现都是基于原始的 RDD,从学习角度上来讲,其实比较容易上手。如果您已经有机器学习方面的经验,那么您只需要熟悉下 MLlib 的 API 就可以开始数据分析工作了。想要基于这个包提供的工具构建完整并且复杂的机器学习流水线是比较困难的。
- spark.ml
Spark ML Pipeline 从 Spark1.2 版本开始,目前已经从 Alpha 阶段毕业,成为可用并且较为稳定的新的机器学习库。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件,使用 ML Pipeline API,我们可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。显然,这种新的方式给我们提供了更灵活的方法,而且这也更符合机器学习过程的特点。
从官方文档来看,Spark ML Pipeline 虽然是被推荐的机器学习方式,但是并不会在短期内替代原始的 MLlib 库,因为 MLlib 已经包含了丰富稳定的算法实现,并且部分 ML Pipeline 实现基于 MLlib。而且就笔者看来,并不是所有的机器学习过程都需要被构建成一个流水线,有时候原始数据格式整齐且完整,而且使用单一的算法就能实现目标,我们就没有必要把事情复杂化,采用最简单且容易理解的方式才是正确的选择。
本文基于 Spark 1.5,向读者展示使用 MLlib API 进行聚类分析的过程。读者将会发现,使用 MLlib API 开发机器学习应用方式是比较简单的,相信本文可以使读者建立起信心并掌握基本方法,以便在后续的学习和工作中事半功倍。
K-means 聚类算法原理
聚类分析是一个无监督学习 (Unsupervised Learning) 过程, 一般是用来对数据对象按照其特征属性进行分组,经常被应用在客户分群,欺诈检测,图像分析等领域。K-means 应该是最有名并且最经常使用的聚类算法了,其原理比较容易理解,并且聚类效果良好,有着广泛的使用。
和诸多机器学习算法一样,K-means 算法也是一个迭代式的算法,其主要步骤如下:
- 第一步,选择 K 个点作为初始聚类中心。
- 第二步,计算其余所有点到聚类中心的距离,并把每个点划分到离它最近的聚类中心所在的聚类中去。在这里,衡量距离一般有多个函数可以选择,最常用的是欧几里得距离 (Euclidean Distance), 也叫欧式距离。公式如下:
其中 C 代表中心点,X 代表任意一个非中心点。
- 第三步,重新计算每个聚类中所有点的平均值,并将其作为新的聚类中心点。
- 最后,重复 (二),(三) 步的过程,直至聚类中心不再发生改变,或者算法达到预定的迭代次数,又或聚类中心的改变小于预先设定的阀值。
在实际应用中,K-means 算法有两个不得不面对并且克服的问题。
- 聚类个数 K 的选择。K 的选择是一个比较有学问和讲究的步骤,我们会在后文专门描述如何使用 Spark 提供的工具选择 K。
- 初始聚类中心点的选择。选择不同的聚类中心可能导致聚类结果的差异。
Spark MLlib K-means 算法的实现在初始聚类点的选择上,借鉴了一个叫 K-means||的类 K-means++ 实现。K-means++ 算法在初始点选择上遵循一个基本原则: 初始聚类中心点相互之间的距离应该尽可能的远。基本步骤如下:
- 第一步,从数据集 X 中随机选择一个点作为第一个初始点。
- 第二步,计算数据集中所有点与最新选择的中心点的距离 D(x)。
- 第三步,选择下一个中心点,使得
最大。
- 第四部,重复 (二),(三) 步过程,直到 K 个初始点选择完成。
K-均值算法试图将一系列样本分割成K个不同的类簇(其中K是模型的输入参数),其形式化的目标函数称为类簇内的方差和(within cluster sum of squared errors,WCSS)。K-均值聚类的目的是最小化所有类簇中的方差之和。标准的K-均值算法初始化K个类中心(为每个类簇中所有样本的平均向量),后面的过程不断重复迭代下面两个步骤。
(1) 将样本分到WCSS最小的类簇中。因为方差之和为欧拉距离的平方,所以最后等价于将每个样本分配到欧拉距离最近的类中心。
(2) 根据第一步类分配情况重新计算每个类簇的类中心。 K-均值迭代算法结束条件为达到最大的迭代次数或者收敛。收敛意味着第一步类分配之后没有改变,因此WCSS的值也没有改变。
数据集地址:https://grouplens.org/datasets/movielens/100k/
实验过程
启动Hadoop集群
start-all.sh
在hdfs中创建文件夹
hadoop fs –mkdir /user/kmeans
hadoop fs –ls /user/
将文件上传到hdfs文件系统中
hadoop fs -put /home/sys-01/ml-100k/u.item /user/kmeans/
hadoop fs -put /home/sys-01/ml-100k/u.genre /user/kmeans/
hadoop fs -put /home/sys-01/ml-100k/u.data /user/kmeans/
数据特征提取
启动spark-shell
读取三个文件
val movies = sc.textFile("hdfs://localhost:9000/user/kmeans/u.item")
val genres = sc.textFile("hdfs://localhost:9000/user/kmeans/u.genre")
val rawData = sc.textFile("hdfs://localhost:9000/user/kmeans/u.data")
提取电影的题材标题,并从u.genre文件中提取题材的映射关系
val genreMap = genres.filter(!_.isEmpty).map(line => line.split("\\|")).map(array => (array(1), array(0))).collectAsMap
println(genreMap)
val titlesAndGenres = movies.map(_.split("\\|")).map { array =>
val genres = array.toSeq.slice(5, array.size)
val genresAssigned = genres.zipWithIndex.filter { case (g, idx) =>
g == "1"
}.map { case (g, idx) =>
genreMap(idx.toString)
}
(array(0).toInt, (array(1), genresAssigned))
}
println(titlesAndGenres.first)
导入依赖包
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
运行ALS模型生成电影和用户因素
val rawRatings = rawData.map(_.split("\t").take(3))
val ratings = rawRatings.map{ case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }
ratings.cache
val alsModel = ALS.train(ratings, 50, 10, 0.1)
提取因子向量
val movieFactors = alsModel.productFeatures.map { case (id, factor) => (id, Vectors.dense(factor)) }
val movieVectors = movieFactors.map(_._2)
val userFactors = alsModel.userFeatures.map { case (id, factor) => (id, Vectors.dense(factor)) }
val userVectors = userFactors.map(_._2)
训练聚类模型
在MLlib中训练K-均值的方法和其他模型类似,只要把包含训练数据的RDD传入KMeans对象的train方法即可。注意,因为聚类不需要标签,所以不用LabeledPoint实例,而是使用特征向量接口,即RDD的Vector数组即可。MLlib的K-均值提供了随机和K-means||两种初始化方法,后者是默认初始化。因为两种方法都是随机选择,所以每次模型训练的结果都不一样。K-均值通常不能收敛到全局最优解,所以实际应用中需要多次训练并选择最优的模型。MLlib提供了完成多次模型训练的方法。经过损失函数的评估,将性能最好的一次训练选定为最终的模型。
代码实现中,首先需要引入必要的模块,设置模型参数: K(numClusters)、最大迭代次数(numIteration)和训练次数(numRuns)。然后,对电影的系数向量运行K-均值算法。最后,在用户相关因素的特征向量上训练K-均值模型
在电影因子向量上运行k均值模型
val numClusters = 5
val numIterations = 10
val numRuns = 3
val movieClusterModel = KMeans.train(movieVectors, numClusters, numIterations, numRuns)
训练用户模型
val userClusterModel = KMeans.train(userVectors, numClusters, numIterations, numRuns)
使用聚类模型进行预测
K-均值最小化的目标函数是样本到其类中心的欧拉距离之和,我们便可以将“最靠近类中心”定义为最小的欧拉距离
定义欧氏距离函数
import breeze.linalg._
import breeze.numerics.pow
def computeDistance(v1: DenseVector[Double], v2: DenseVector[Double]): Double = pow(v1 - v2, 2).sum
利用上面的函数对每个电影计算其特征向量与所属类簇中心向量的距离:
val titlesWithFactors = titlesAndGenres.join(movieFactors)
val moviesAssigned = titlesWithFactors.map { case (id, ((title, genres), vector)) =>
val pred = movieClusterModel.predict(vector)
val clusterCentre = movieClusterModel.clusterCenters(pred)
//求两坐标的距离
val dist = computeDistance(DenseVector(clusterCentre.toArray), DenseVector(vector.toArray))
(id, title, genres.mkString(" "), pred, dist)
}
val clusterAssignments = moviesAssigned.groupBy { case (id, title, genres, cluster, dist) => cluster }.collectAsMap
//pred为预测出的该点所属的聚点
//vector可以理解为该点的坐标向量
//clusterCentre为该pred聚点的坐标向量
我们枚举每个类簇并输出距离类中心最近的前20部电影
for ( (k, v) <- clusterAssignments.toSeq.sortBy(_._1)) {
println(s"Cluster $k:")
val m = v.toSeq.sortBy(_._5)
println(m.take(20).map { case (_, title, genres, _, d) => (title, genres, d) }.mkString("\n"))
println("=====\n")
}
Cluster 0:包含了很多20世纪40年代、50年代和60年代的老电影,和一些近代的戏剧
Cluster 1:主要是一些恐怖电影
Cluster 2:有相当一部分是喜剧和戏剧电影
Cluster 3:和戏剧相关
Cluster 4:主要是动作片、惊悚片和言情片
正如你看到的,我们并不能明显看出每个类簇所表示的内容。但是,也有证据表明聚类过程会提取电影之间的属性或者相似之处,这不是单纯基于电影名称和题材容易看出来的(比如外语片的类簇和传统电影的类簇,等等)。如果我们有更多元数据,比如导演、演员等,便有可能从每个类簇中找到更多特征定义的细节
评估聚类模型的性能
与回归、分类和推荐引擎等模型类似,聚类模型也有很多评价方法用于分析模型性能,以及评估模型样本的拟合度。聚类的评估通常分为两部分:内部评估和外部评估。内部评估表示评估过程使用训练模型时使用的训练数据,外部评估则使用训练数据之外的数据。 内部评价指标WCSS(我们之前提过的K-元件的目标函数),是使类簇内部的样本距离尽可能接近,不同类簇的样本相对较远。
MLlib提供的函数computeCost可以方便地计算出给定输入数据RDD [Vector]的WCSS。下面我们使用这个方法计算电影和用户训练数据的性能
计算电影和用户集群的成本(WCSS)
val movieCost = movieClusterModel.computeCost(movieVectors)
val userCost = userClusterModel.computeCost(userVectors)
println("WCSS for movies: " + movieCost)
println("WCSS for users: " + userCost)
聚类模型参数调优
不同于以往的模型,K-均值模型只有一个可以调的参数,就是K,即类中心数目。
电影集群的交叉验证
val trainTestSplitMovies = movieVectors.randomSplit(Array(0.6, 0.4), 123)
val trainMovies = trainTestSplitMovies(0)
val testMovies = trainTestSplitMovies(1)
val costsMovies = Seq(2, 3, 4, 5, 10, 20).map { k => (k, KMeans.train(trainMovies, numIterations, k, numRuns).computeCost(testMovies)) }
println("Movie clustering cross-validation:")
costsMovies.foreach { case (k, cost) => println(f"WCSS for K=$k id $cost%2.2f") }
用户集群的交叉验证
val trainTestSplitUsers = userVectors.randomSplit(Array(0.6, 0.4), 123)
val trainUsers = trainTestSplitUsers(0)
val testUsers = trainTestSplitUsers(1)
val costsUsers = Seq(2, 3, 4, 5, 10, 20).map { k => (k, KMeans.train(trainUsers, numIterations, k, numRuns).computeCost(testUsers)) }
println("User clustering cross-validation:")
costsUsers.foreach { case (k, cost) => println(f"WCSS for K=$k id $cost%2.2f") }
从结果可以看出,随着类中心数目增加,WCSS值会出现下降,然后又开始增大。另外一个现象,K-均值在交叉验证的情况,WCSS随着K的增大持续减小,但是达到某个值后,下降的速率突然会变得很平缓。这时的K通常为最优的K值(这称为拐点)