Pyspark:如何在另一个数据框中的UDF中引用数据框?
如何在另一个数据帧上执行UDF时引用pyspark数据框?Pyspark:如何在另一个数据框中的UDF中引用数据框?
这是一个虚拟的例子。我创建了两个数据帧scores
和lastnames
,并且在每个数据帧中存在两个数据帧相同的列。在应用于scores
的UDF中,我想过滤lastnames
并返回在lastname
中找到的字符串。
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext("local")
sqlCtx = SQLContext(sc)
# Generate Random Data
import itertools
import random
student_ids = ['student1', 'student2', 'student3']
subjects = ['Math', 'Biology', 'Chemistry', 'Physics']
random.seed(1)
data = []
for (student_id, subject) in itertools.product(student_ids, subjects):
data.append((student_id, subject, random.randint(0, 100)))
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("student_id", StringType(), nullable=False),
StructField("subject", StringType(), nullable=False),
StructField("score", IntegerType(), nullable=False)
])
# Create DataFrame
rdd = sc.parallelize(data)
scores = sqlCtx.createDataFrame(rdd, schema)
# create another dataframe
last_name = ["Granger", "Weasley", "Potter"]
data2 = []
for i in range(len(student_ids)):
data2.append((student_ids[i], last_name[i]))
schema = StructType([
StructField("student_id", StringType(), nullable=False),
StructField("last_name", StringType(), nullable=False)
])
rdd = sc.parallelize(data2)
lastnames = sqlCtx.createDataFrame(rdd, schema)
scores.show()
lastnames.show()
from pyspark.sql.functions import udf
def getLastName(sid):
tmp_df = lastnames.filter(lastnames.student_id == sid)
return tmp_df.last_name
getLastName_udf = udf(getLastName, StringType())
scores.withColumn("last_name", getLastName_udf("student_id")).show(10)
而下面是跟踪的最后一部分:
Py4JError: An error occurred while calling o114.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
更改配对字典而不是创造rdd
并使得人们df
的名字
data2 = {}
for i in range(len(student_ids)):
data2[student_ids[i]] = last_name[i]
容易查找创建广播变量
//rdd = sc.parallelize(data2)
//lastnames = sqlCtx.createDataFrame(rdd, schema)
lastnames = sc.broadcast(data2)
现在通过广播变量(lastnames
)在udf上使用values
attr来访问。
from pyspark.sql.functions import udf
def getLastName(sid):
return lastnames.value[sid]
我用**广播变量**修改了你的实现。尽量让你的UDF尽可能多的纯功能,太多的外部依赖可能会降低性能。 – mrsrinivas
我试过了你的代码片段 - 当我看到'lastnames.value'时,我得到'[('student1','Granger'),('student2','Weasley'),('student3','Potter') ]',这意味着'lastnames.value.filter'不会工作了吗?看起来好像是 – tohweizhong
在udf中尝试'return lastnames.value [“sid”]'并创建一个字典(变量'data2'),其中'sid'作为键和值作为'lastname'。 – mrsrinivas
您不能直接从UDF内引用数据帧(或RDD)。 DataFrame对象是驱动程序上的一个句柄,用于表示将在群集上发生的数据和操作。在您选择Spark时,UDF中的代码将在群集上运行。 Spark通过对该代码进行序列化并将闭包中包含的任何变量的副本发送给每个工作人员来完成此操作。
你想做什么,是使用Spark提供的API来加入/组合两个DataFrame。如果其中一个数据集很小,则可以手动发送广播变量中的数据,然后从UDF访问它。否则,您可以像创建两个数据框一样创建两个数据框,然后使用连接操作来合并它们。像这样的东西应该工作:
joined = scores.withColumnRenamed("student_id", "join_id")
joined = joined.join(lastnames, joined.join_id == lastnames.student_id)\
.drop("join_id")
joined.show()
+---------+-----+----------+---------+
| subject|score|student_id|last_name|
+---------+-----+----------+---------+
| Math| 13| student1| Granger|
| Biology| 85| student1| Granger|
|Chemistry| 77| student1| Granger|
| Physics| 25| student1| Granger|
| Math| 50| student2| Weasley|
| Biology| 45| student2| Weasley|
|Chemistry| 65| student2| Weasley|
| Physics| 79| student2| Weasley|
| Math| 9| student3| Potter|
| Biology| 2| student3| Potter|
|Chemistry| 84| student3| Potter|
| Physics| 43| student3| Potter|
+---------+-----+----------+---------+
另外值得一提的,是引擎盖下星火DataFrames有一个优化,其中一个数据帧是的加入可以被转换成广播变量,以避免洗牌如果是部分够小。因此,如果您执行上面列出的联接方法,您应该获得最佳性能,而不会牺牲处理大型数据集的能力。
您不能在UDF内部访问'df',因为它将在执行程序中处理,'df' ref只能从驱动程序访问。你可以使用广播变量作为'lastnames'。让我知道是否需要任何帮助。 – mrsrinivas
但是考虑将'lastnames'加入'scores'而不是从UDF中加入。 – mrsrinivas
嗨@mrsrinivas,谢谢你的回复。首先我不能加入,因为即使这个虚拟示例可以使用连接来解决,在我的实际实现中,我需要在UDF中执行更多的处理。其次,是的!我如何在这种情况下使用广播变量? – tohweizhong