如何将GroupedDataset保存为实木复合地板或将其转换为DF
问题描述:
我正在使用spark 1.6.1。如何将GroupedDataset保存为实木复合地板或将其转换为DF
是否有任何API可用于将GroupDataset保存为实木复合地板文件。 或将其转换为DataFrame。
E.g.我有一个自定义的对象“过程”,我已经将Dataframe转换为过程对象。 之后,我正在通过patientID进行分组。 我想分组为文件或将其作为数据框传递给其他函数。 我没有获得任何存储API或将其转换为Dataframe。
val procedureDs: Dataset[Procedure] = joinDf.select("patientid", "patientprocedureid", "procedurecode").as[Procedure]
val groupedDs:GroupedDataset[Long, Procedure] = procedureDs.groupBy{ x => x.patientid }
应用mapGroups后
val a = groupedDs.mapGroups{ case (k,vs) => { (k, vs.toSeq)}}
它提供了以下错误:
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.....PatientDiagnosis
- array element class: "com....PatientDiagnosis"
- field (class: "scala.collection.Seq", name: "_2")
- root class: "scala.Tuple2"
我曾试图给出明确的编码器
val a = groupedDigDs.mapGroups((k,vs) => (k, vs.toSeq))(org.apache.spark.sql.Encoders.bean(classOf[(Long, Seq[com....PatientDiagnosis])]))
然后错误更改为:
java.lang.UnsupportedOperationException: Cannot infer type for class scala.Tuple2 because it is not bean-compliant
答
同GroupedData
(RelationalGroupedDataset
在火花2.X),GroupedDataset
(在火花2.X KeyValueGroupedDataset
)具有被聚合,然后才能保存。
如果你的目标是到另一个groupByKey
您可以使用mapGroups
:
val groupedDs: GroupedDataset[K, V] = ???
// ... { case (k, xs) => (k, xs.toSeq) } to preserve key as well
groupedDs.mapGroups { case (_, xs) => xs.toSeq }
并将结果写入。