检查RDD中是否存在值
我已经在python中编写了一个可正常工作的Spark程序。检查RDD中是否存在值
但是,它在内存消耗方面效率低下&我正试图优化它。我在AWS EMR上运行它,EMR正在消耗太多内存的工作。
Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
我相信这个内存问题是由于这样的事实,我在几种情况下收集我的RDDS((),即使用.collect),因为在后期阶段,我需要测试,如果在列表中存在一定的价值是否由这些RDD制成。
所以,目前我的代码看起来像这样:
myrdd = data.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
,并在代码
if word in myrdd:
mylist.append(word)
myrdd2 = data2.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
if word in myrdd2:
mylist2.append(word)
,然后我重复这种模式多次晚些时候。
有没有办法做到操作
if word in myrdd:
do something
不先收集RDD?
是否有像rdd.contains()这样的函数?
P.S:我没有在内存中缓存任何东西。我的火花背景是这样的:从纱线
jobName = "wordcount"
sc = SparkContext(appName = jobName)
......
......
sc.stop()
错误消息指出collect
不是一个问题,因为你的执行者(而不是驱动器)有记忆问题。
首先,尝试按照错误消息建议并提升spark.yarn.executor.memoryOverhead
- 在YARN上运行pyspark时,您可以告诉YARN为python工作进程内存分配更大的容器。
接下来,看看执行者需要大量内存的操作。你使用reduceByKey
,也许你可以增加分区的数量,使它们在内存使用方面更小。看看numPartitions
参数:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey
最后,如果你想检查是否RDD包含了一些值,那么仅仅通过这个值过滤和检查,使用count
或first
,例如:
looking_for = "....."
contains = rdd.filter(lambda a: a == looking_for).count() > 0
谢谢。有很多RDD会对执行者造成压力吗?例如。如果我做了像myrddalias = myrdd之类的东西,它是否会给内存带来额外的压力,或者那样好吗? – Piyush
它只是复制引用,rdds本身不会被克隆 – Mariusz
问题是,looking_for是一个RDD,当我在另一个RDD上执行过滤器时,它向我显示一个错误,说我不能将一个转换放在另一个RDD中。 Looking_for是一个列表,我希望根据存在于look_for rdd中的某个值来修剪我的rdd。确切的错误 - 例外:看起来您正试图广播RDD或引用操作或转换中的RDD。 RDD转换和操作只能由驱动程序调用,而不能在其他转换中调用; – Piyush
不使用.collect ()它会把所有的数据传给驱动程序,如果你有更大的数据集,会产生一个问题。使用myrdd2.foreachRDD并检查值是否存在 – Backtrack
word = sc.broadcast([w1,w2,w3]) valuepresent = myrdd.filter {lambda x:x in word}这样做也是一种解决方法,我会认为 – Backtrack