如何将分组的Spark RDD内容平铺为单独的行,然后保存到文件
我有一个按键(index:Int)分组的RDD [(Int,Iterable [Coordinates])]]。坐标与成员等级:如何将分组的Spark RDD内容平铺为单独的行,然后保存到文件
latitude: Double, longitude: Double
我想创建打印或创建一个CSV文件,这将是以下形式(每个数据点的行):
index,latitude,longitude
随着非分组RDD [(智力,坐标),它的工作是这样的:
val textOutputRDD = initialRDD.map(
f => f._1.toString() + "," + f._2.latitude.toString() + "," + f._2.longitude.toString())
textOutputRDD.saveAsTextFile("TextOutput")
如何管理这样做,在这种情况下?
一个简单的嵌套循环会做。这里我用一个简单的对双打的大致坐标:
val rdd =
sc.parallelize(
Seq(
1 -> Seq((4.1, 3.4), (5.6, 6.7), (3.4, 9.0)),
2 -> Seq((0.4, -4.1), (-3.4, 6.7), (7.0, 8.9))
)
)
val csvLike =
for ((key, coords) <- rdd; (lat, lon) <- coords) yield s"$key,$lat,$lon"
for (row <- csvLike) println(row)
此代码将导致下面的输出:
2,0.4,-4.1
2,-3.4,6.7
2,7.0,8.9
1,4.1,3.4
1,5.6,6.7
1,3.4,9.0
编辑
另一种可能的方法是在实际flatMap
交换/ map
序列编译器将for
的理解转变为:
rdd.flatMap {
case (key, coords) =>
coords.map {
case (lat, lon) => s"$key,$lat,$lon"
}
}
尝试了这一点,都用我自己的rdd并复制你的尝试,导致同样的错误:value withFilter不是org.apache.spark.rdd.RDD的成员[(Int,Seq [(双,双)])]。试图找到一个修复,但没有运气。 – ilvo
我在'spark-shell'中成功执行了这个命令,我最好的猜测是你错过了一些'implicit'。你可以尝试交换这一行而不是循环,看看会发生什么?它在语义上是等价的,事实上编译器实际上将循环转换为:'rdd.flatMap {case(key,coords)=> coords.map {case(lat,lon)=> s“$ key,$ lat,$ lon “}}' – stefanobaghino
我在使用'for'变体时看到警告,但没有错误。你偶然编译时启用了'-Xfatal-warnings'编译器标志吗?无论如何,'flatMap' /'map'变体应该解决这个问题。 – stefanobaghino
试试flatmap-
val output = rdd.flatMap(s=>{
val list=List[String]()
for (latlon <- s._2) {
list.addString(s._1.toString() + "," + latlon.latitude.toString() + "," + latlon.longitude.toString())
}
return list
})
output.save(....)
无法获得列表来处理此问题,但是打印它,'println(s._1.toString()+“,”+ latlon.latitude +“,”+ latlon.longitude)'按预期工作。另外,addString需要一个StringBuilder作为第一个参数,输出可能应该用'output.saveAsTextFile(....)'保存。感谢您的输入,平面地图工作! – ilvo
您使用的是哪个版本的Apache Spark? – stefanobaghino
@stefanobaghino 2.1.0 – ilvo