访问循环中的RDD时发生火花 - 酸洗错误
问题描述:
我在Spark中实现了k-means算法。当我运行下面的代码时,出现酸洗错误(如下所示)。如果我修改它并将所有内容放在循环之外,它将正确计算质心。访问循环中的RDD时发生火花 - 酸洗错误
sc = SparkContext(appName="Document Similarity")
lines = sc.wholeTextFiles(sys.argv[1])
articles = lines.flatMap(lambda x: re.split(r' ',x[1]))
shingles = articles.flatMap(shingle_pairs.get_pairs)
sig_vecs = shingles.groupBy(lambda x: x[1]) \
.map(lambda x: sig_vector.create_vector(x, a, b, n, p))
centroids = k_means.init_centroids(sig_size, k)
for i in range(max_it):
# assign documents to closest cluster
docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids))
# get count by key to use in mean calculation for new clusters
doc_count = docs.countByKey()
# recompute cluster centroids
reduced_docs = docs.reduceByKey(k_means.reducer)
centroids = reduced_docs.map(lambda x: k_means.mapper(x, doc_count))
误差如下:
pickle.PicklingError: Could not serialize object: Exception:
It appears that you are attempting to broadcast an RDD or reference an
RDD from an action or transformation. RDD transformations and actions
can only be invoked by the driver, not inside of other transformations;
for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid
because the values transformation and count action cannot be performed
inside of the rdd1.map transformation. For more information, see SPARK-5063.
答
如SPARK-5063解释 “火花不支持嵌套RDDS”。您在map
上sig_vecs
(RDD
)试图访问centroids
(RDD
):
docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids))
转换centroids
到本地集合(collect
?),并调整classify_docs
应该解决的问题。