如何在kmeans中映射Spark中的MongoDB数据?
我想在Spark中使用MongoDB提供的数据运行k-means。 我有一个工作实施例中,其作用对一个平面文件:如何在kmeans中映射Spark中的MongoDB数据?
sc = SparkContext(appName="KMeansExample") # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
这是平面文件的格式为:
现在我想用MongoDB的更换简单文件:
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()
# <<<< Here I am missing the parsing >>>>>
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
我喜欢了解如何映射df中的数据,以便它可以用作kmeans的输入。
的数据库的 “布局” 是:
根
| - _id:字符串(可为空=真)
| - field0:二进制(可为空=真)
| - FIELD1:二进制(可空=真)
| - 场2:二进制(可为空=真)
| - 场3:二进制(可为空=真)
| - 字段4:二进制(可为空=真)
| - 字段5:二进制(空值=真)
| - 基尔D6:二进制(可为空=真)
| - 字段7:二进制(可为空=真)
| - 字段8:二元的(可为空=真)
| - 字段9:二元的(可为空=真)
我喜欢了解如何映射df中的数据,以便它可以用作kmeans的输入。
根据您的代码段,我假定您使用PySpark。
如果你看看clustering.KMeans的Python API文档,你可以看到,第一个参数需要使用MongoDB Spark Connector
df = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("uri","mongodb://127.0.0.1/ycsb.usertable")
.load()
从MongoDB的负载数据,你有什么要RDD of Vector or convertible sequence types
后,你下面的代码执行在df
是一个DataFrame,所以我们需要将它转换成可转换为Vector类型的东西。
因为你在你的文本文件例如使用numpy.array,我们可以继续使用这种阵列式学习转变。
根据提供的layout
,首先我们需要删除_id
列,因为它不需要进行聚类训练。有关更多信息,另请参阅Vector数据类型。
通过以上信息,让我们来看看它:
# Drop _id column and get RDD representation of the DataFrame
rowRDD = df.drop("_id").rdd
# Convert RDD of Row into RDD of numpy.array
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))
# Feed into KMeans
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
如果你想保持布尔值(真/假),而不是整数(1/0),那么你可以删除int
部分。如下图所示:
parsedRdd = rowRDD.map(lambda row: array([x for x in row]))
把所有的人都在一起:
from numpy import array
from pyspark.mllib.clustering import KMeans
import org.apache.spark.sql.SparkSession
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
rowRDD = df.drop("_id").rdd
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
clusters.clusterCenters