如何将火花DataFrame保存为磁盘上的csv?

问题描述:

对本如何将火花DataFrame保存为磁盘上的csv?

df.filter("project = 'en'").select("title","count").groupBy("title").sum() 

这将返回一个数组示例结果。

如何将火花DataFrame保存为磁盘上的csv?

+1

btw这不会返回一个数组,而是一个DataFrame! [参考这里(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – eliasah

+0

如果给出的答案解决您的问题,请接受它, up-vote,所以我们可以将这个问题归类为已解决! – eliasah

Apache Spark不支持磁盘上的本机CSV输出。

你虽然有四个可用的解决方案:

  1. 您可以将数据帧转换成RDD:

    def convertToReadableString(r : Row) = ??? 
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath) 
    

    这将创建一个文件夹中的文件路径。根据该文件的路径,你会发现分区上的文件(例如零件-000 *)

    如果我想所有的分区追加到一个大的CSV我最常做的是

    cat filePath/part* > mycsvfile.csv 
    

    有些人会使用coalesce(1,false)从RDD创建一个分区。它通常是一个坏习惯,因为它可能会通过将您收集的所有数据拉到驱动程序而使驱动程序不堪重负。请注意,df.rdd将返回RDD[Row]

  2. 可以使用Databricks火花CSV library

    • 星火1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath) 
      
    • 星火1.3:

      df.save(filepath,"com.databricks.spark.csv") 
      
  3. 威特h Spark 2.xspark-csv包不需要,因为它包含在Spark中。

    df.write.format("csv").save(filepath) 
    
  4. 可以转换为本地熊猫数据帧,并使用to_csv方法(PySpark只)。

注:解决方案1,2和3将导致由底层的Hadoop API火花调用当你调用save生成CSV格式文件(part-*)。每个分区将有一个part-文件。

+1

我认为'spark-csv'是首选的解决方案。从零开始创建正确的csv行并不容易。所有的方言和适当的逃避可能会非常棘手。 – zero323

+0

我完全同意 – eliasah

+1

在PySpark你也可以小表转换为大熊猫并在本地保存。但它可能是一个斯卡拉的问题。 – zero323

我有类似的问题。当我以客户端模式连接到集群时,我需要在驱动程序上写下csv文件。

我想重用与Apache Spark相同的CSV解析代码以避免潜在的错误。

我检查了spark-csv代码,发现代码负责将数据帧转换为原始csv RDD[String]com.databricks.spark.csv.CsvSchemaRDD

可悲它是硬编码与sc.textFile和相关方法的结束。

我复制粘贴代码,并删除最后一行与sc.textFile并返回RDD直接代替。

我的代码:

/* 
    This is copypasta from com.databricks.spark.csv.CsvSchemaRDD 
    Spark's code has perfect method converting Dataframe -> raw csv RDD[String] 
    But in last lines of that method it's hardcoded against writing as text file - 
    for our case we need RDD. 
*/ 
object DataframeToRawCsvRDD { 

    val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat 

    def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map()) 
      (implicit ctx: ExecutionContext): RDD[String] = { 
    val delimiter = parameters.getOrElse("delimiter", ",") 
    val delimiterChar = if (delimiter.length == 1) { 
     delimiter.charAt(0) 
    } else { 
     throw new Exception("Delimiter cannot be more than one character.") 
    } 

    val escape = parameters.getOrElse("escape", null) 
    val escapeChar: Character = if (escape == null) { 
     null 
    } else if (escape.length == 1) { 
     escape.charAt(0) 
    } else { 
     throw new Exception("Escape character cannot be more than one character.") 
    } 

    val quote = parameters.getOrElse("quote", "\"") 
    val quoteChar: Character = if (quote == null) { 
     null 
    } else if (quote.length == 1) { 
     quote.charAt(0) 
    } else { 
     throw new Exception("Quotation cannot be more than one character.") 
    } 

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL") 
    val quoteMode: QuoteMode = if (quoteModeString == null) { 
     null 
    } else { 
     QuoteMode.valueOf(quoteModeString.toUpperCase) 
    } 

    val nullValue = parameters.getOrElse("nullValue", "null") 

    val csvFormat = defaultCsvFormat 
     .withDelimiter(delimiterChar) 
     .withQuote(quoteChar) 
     .withEscape(escapeChar) 
     .withQuoteMode(quoteMode) 
     .withSkipHeaderRecord(false) 
     .withNullString(nullValue) 

    val generateHeader = parameters.getOrElse("header", "false").toBoolean 
    val headerRdd = if (generateHeader) { 
     ctx.sparkContext.parallelize(Seq(
     csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*) 
    )) 
    } else { 
     ctx.sparkContext.emptyRDD[String] 
    } 

    val rowsRdd = dataFrame.rdd.map(row => { 
     csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*) 
    }) 

    headerRdd union rowsRdd 
    } 

} 

我有类似的问题,我必须数据框的内容保存到我定义名称的CSV文件。 df.write("csv").save("<my-path>")正在创建目录而不是文件。所以必须拿出以下解决方案。 大部分代码从以下dataframe-to-csv采取小的修改的逻辑。

def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = { 
    val tmpParquetDir = "Posts.tmp.parquet" 

    df.repartition(1).write. 
     format("com.databricks.spark.csv"). 
     option("header", header.toString). 
     option("delimiter", sep). 
     save(tmpParquetDir) 

    val dir = new File(tmpParquetDir) 
    val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv" 
    val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString 
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput)) 

    dir.listFiles.foreach(f => f.delete) 
    dir.delete 
    }