如何使用Python在Spark中添加两个稀疏向量
问题描述:
我到处搜索过,但我找不到如何使用Python添加两个稀疏向量。 我要添加两个稀疏矢量是这样的: -如何使用Python在Spark中添加两个稀疏向量
(1048576, {110522: 0.6931, 521365: 1.0986, 697409: 1.0986, 725041: 0.6931, 749730: 0.6931, 962395: 0.6931})
(1048576, {4471: 1.0986, 725041: 0.6931, 850325: 1.0986, 962395: 0.6931})
答
像这样的东西应该工作:
from pyspark.mllib.linalg import Vectors, SparseVector, DenseVector
import numpy as np
def add(v1, v2):
"""Add two sparse vectors
>>> v1 = Vectors.sparse(3, {0: 1.0, 2: 1.0})
>>> v2 = Vectors.sparse(3, {1: 1.0})
>>> add(v1, v2)
SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0})
"""
assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
assert v1.size == v2.size
# Compute union of indices
indices = set(v1.indices).union(set(v2.indices))
# Not particularly efficient but we are limited by SPARK-10973
# Create index: value dicts
v1d = dict(zip(v1.indices, v1.values))
v2d = dict(zip(v2.indices, v2.values))
zero = np.float64(0)
# Create dictionary index: (v1[index] + v2[index])
values = {i: v1d.get(i, zero) + v2d.get(i, zero)
for i in indices
if v1d.get(i, zero) + v2d.get(i, zero) != zero}
return Vectors.sparse(v1.size, values)
如果你喜欢只单通和不关心推出零您可以在上面修改像这样的代码:
from collections import defaultdict
def add(v1, v2):
assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
assert v1.size == v2.size
values = defaultdict(float) # Dictionary with default value 0.0
# Add values from v1
for i in range(v1.indices.size):
values[v1.indices[i]] += v1.values[i]
# Add values from v2
for i in range(v2.indices.size):
values[v2.indices[i]] += v2.values[i]
return Vectors.sparse(v1.size, dict(values))
如果你愿意,你可以尝试猴补丁SparseVector
:
SparseVector.__add__ = add
v1 = Vectors.sparse(5, {0: 1.0, 2: 3.0})
v2 = Vectors.sparse(5, {0: -3.0, 2: -3.0, 4: 10})
v1 + v2
## SparseVector(5, {0: -2.0, 4: 10.0})
或者,您应该可以使用scipy.sparse
。
from scipy.sparse import csc_matrix
from pyspark.mllib.regression import LabeledPoint
m1 = csc_matrix((
v1.values,
(v1.indices, [0] * v1.numNonzeros())),
shape=(v1.size, 1))
m2 = csc_matrix((
v2.values,
(v2.indices, [0] * v2.numNonzeros())),
shape=(v2.size, 1))
LabeledPoint(0, m1 + m2)
答
我有同样的问题,但我没能得到其他的解决方案,完成了不到几个小时一个中等大小的数据集(〜20M的记录,向量大小= 10K)
所以不是我把它完成了在短短几分钟内其他相关的方法:
import numpy as np
def to_sparse(v):
values = {i: e for i,e in enumerate(v) if e != 0}
return Vectors.sparse(v.size, values)
rdd.aggregate(
np.zeros(vector_size),
lambda acc, b: acc + b.toArray(),
lambda acc, b: acc + b
).map(to_sparse)
基本思想是在的减少,只有一次在年底的每一步,以不建稀疏向量,让numpy的做所有的矢量添加工作。即使使用需要混合密集向量的aggregateByKey,它仍然只需要几分钟。
答
所有上述功能都添加了两个相同大小的稀疏矢量。我尝试用不同长度的增加稀疏向量,并已在爪哇发现类似的东西,以我的要求在这里 How to combine or merge two sparse vectors in Spark using Java? 这么写的那个函数在python如下:
def combineSparseVectors(svs):
size = 0
nonzeros = 0
for sv in svs :
size += sv.size
nonzeros += len(sv.indices)
if nonzeros != 0 :
indices = np.empty([nonzeros])
values = np.empty([nonzeros])
pointer_D = 0
totalPt_D = 0
pointer_V = 0
for sv in svs :
indicesSV = sv.indices
for i in indicesSV :
indices[pointer_D] = i + totalPt_D
pointer_D=pointer_D+1
totalPt_D += sv.size
valuesSV = sv.values
for d in valuesSV :
values[pointer_V] = d
pointer_V=pointer_V+1
return SparseVector(size, indices, values)
else :
return null
答
其他的答案抵触的编程概念火花。更简单地说,只需将pyspark.ml.lingalg.SparseVector
(下面代码中的urOldVec)转换为Scipy.sparse.csc_matrix
对象(即列向量),然后使用“+”运算符进行添加。
import scipy.sparse as sps
urNewVec = sps.csc_matrix(urOldVec)
urNewVec + urNewVec
如在该文档为pyspark.ml.linalg
提到的,scipy.sparse
载体可以传递到pyspark代替。
感谢您的回答。有效。你能解释一下第一种方法是如何在那里计算加法 – Nick
它只是创建两个字典{index:value}并添加相应的值来创建输出字典。我已经更新了一个应该更易于阅读的解决方案。 – zero323