如何将分组的Spark RDD内容平铺为单独的行,然后保存到文件

问题描述:

我有一个按键(index:Int)分组的RDD [(Int,Iterable [Coordinates])]]。坐标与成员等级:如何将分组的Spark RDD内容平铺为单独的行,然后保存到文件

latitude: Double, longitude: Double

我想创建打印或创建一个CSV文件,这将是以下形式(每个数据点的行):

index,latitude,longitude 

随着非分组RDD [(智力,坐标),它的工作是这样的:

val textOutputRDD = initialRDD.map(
    f => f._1.toString() + "," + f._2.latitude.toString() + "," + f._2.longitude.toString()) 
textOutputRDD.saveAsTextFile("TextOutput") 

如何管理这样做,在这种情况下?

+0

您使用的是哪个版本的Apache Spark? – stefanobaghino

+0

@stefanobaghino 2.1.0 – ilvo

一个简单的嵌套循环会做。这里我用一个简单的对双打的大致坐标:

val rdd = 
    sc.parallelize(
    Seq(
     1 -> Seq((4.1, 3.4), (5.6, 6.7), (3.4, 9.0)), 
     2 -> Seq((0.4, -4.1), (-3.4, 6.7), (7.0, 8.9)) 
    ) 
) 

val csvLike = 
    for ((key, coords) <- rdd; (lat, lon) <- coords) yield s"$key,$lat,$lon" 

for (row <- csvLike) println(row) 

此代码将导致下面的输出:

2,0.4,-4.1 
2,-3.4,6.7 
2,7.0,8.9 
1,4.1,3.4 
1,5.6,6.7 
1,3.4,9.0 

编辑

另一种可能的方法是在实际flatMap交换/ map序列编译器将for的理解转变为:

rdd.flatMap { 
    case (key, coords) => 
    coords.map { 
     case (lat, lon) => s"$key,$lat,$lon" 
    } 
} 
+0

尝试了这一点,都用我自己的rdd并复制你的尝试,导致同样的错误:value withFilter不是org.apache.spark.rdd.RDD的成员[(Int,Seq [(双,双)])]。试图找到一个修复,但没有运气。 – ilvo

+1

我在'spark-shell'中成功执行了这个命令,我最好的猜测是你错过了一些'implicit'。你可以尝试交换这一行而不是循环,看看会发生什么?它在语义上是等价的,事实上编译器实际上将循环转换为:'rdd.flatMap {case(key,coords)=> coords.map {case(lat,lon)=> s“$ key,$ lat,$ lon “}}' – stefanobaghino

+0

我在使用'for'变体时看到警告,但没有错误。你偶然编译时启用了'-Xfatal-warnings'编译器标志吗?无论如何,'flatMap' /'map'变体应该解决这个问题。 – stefanobaghino

试试flatmap-

val output = rdd.flatMap(s=>{ 
     val list=List[String]() 
     for (latlon <- s._2) { 
     list.addString(s._1.toString() + "," + latlon.latitude.toString() + "," + latlon.longitude.toString()) 
     } 
     return list 
    }) 
output.save(....) 
+1

无法获得列表来处理此问题,但是打印它,'println(s._1.toString()+“,”+ latlon.latitude +“,”+ latlon.longitude)'按预期工作。另外,addString需要一个StringBuilder作为第一个参数,输出可能应该用'output.saveAsTextFile(....)'保存。感谢您的输入,平面地图工作! – ilvo