如何将火花DataFrame保存为磁盘上的csv?
df.filter("project = 'en'").select("title","count").groupBy("title").sum()
这将返回一个数组示例结果。
如何将火花DataFrame保存为磁盘上的csv?
Apache Spark不支持磁盘上的本机CSV输出。
你虽然有四个可用的解决方案:
-
您可以将数据帧转换成RDD:
def convertToReadableString(r : Row) = ??? df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
这将创建一个文件夹中的文件路径。根据该文件的路径,你会发现分区上的文件(例如零件-000 *)
如果我想所有的分区追加到一个大的CSV我最常做的是
cat filePath/part* > mycsvfile.csv
有些人会使用
coalesce(1,false)
从RDD创建一个分区。它通常是一个坏习惯,因为它可能会通过将您收集的所有数据拉到驱动程序而使驱动程序不堪重负。请注意,df.rdd
将返回RDD[Row]
。 -
可以使用Databricks火花CSV library:
-
星火1.4+:
df.write.format("com.databricks.spark.csv").save(filepath)
-
星火1.3:
df.save(filepath,"com.databricks.spark.csv")
-
-
威特h Spark 2.x
spark-csv
包不需要,因为它包含在Spark中。df.write.format("csv").save(filepath)
可以转换为本地熊猫数据帧,并使用
to_csv
方法(PySpark只)。
注:解决方案1,2和3将导致由底层的Hadoop API火花调用当你调用save
生成CSV格式文件(part-*
)。每个分区将有一个part-
文件。
我有类似的问题。当我以客户端模式连接到集群时,我需要在驱动程序上写下csv文件。
我想重用与Apache Spark相同的CSV解析代码以避免潜在的错误。
我检查了spark-csv代码,发现代码负责将数据帧转换为原始csv RDD[String]
的com.databricks.spark.csv.CsvSchemaRDD
。
可悲它是硬编码与sc.textFile
和相关方法的结束。
我复制粘贴代码,并删除最后一行与sc.textFile
并返回RDD直接代替。
我的代码:
/*
This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
But in last lines of that method it's hardcoded against writing as text file -
for our case we need RDD.
*/
object DataframeToRawCsvRDD {
val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat
def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
(implicit ctx: ExecutionContext): RDD[String] = {
val delimiter = parameters.getOrElse("delimiter", ",")
val delimiterChar = if (delimiter.length == 1) {
delimiter.charAt(0)
} else {
throw new Exception("Delimiter cannot be more than one character.")
}
val escape = parameters.getOrElse("escape", null)
val escapeChar: Character = if (escape == null) {
null
} else if (escape.length == 1) {
escape.charAt(0)
} else {
throw new Exception("Escape character cannot be more than one character.")
}
val quote = parameters.getOrElse("quote", "\"")
val quoteChar: Character = if (quote == null) {
null
} else if (quote.length == 1) {
quote.charAt(0)
} else {
throw new Exception("Quotation cannot be more than one character.")
}
val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
val quoteMode: QuoteMode = if (quoteModeString == null) {
null
} else {
QuoteMode.valueOf(quoteModeString.toUpperCase)
}
val nullValue = parameters.getOrElse("nullValue", "null")
val csvFormat = defaultCsvFormat
.withDelimiter(delimiterChar)
.withQuote(quoteChar)
.withEscape(escapeChar)
.withQuoteMode(quoteMode)
.withSkipHeaderRecord(false)
.withNullString(nullValue)
val generateHeader = parameters.getOrElse("header", "false").toBoolean
val headerRdd = if (generateHeader) {
ctx.sparkContext.parallelize(Seq(
csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
))
} else {
ctx.sparkContext.emptyRDD[String]
}
val rowsRdd = dataFrame.rdd.map(row => {
csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
})
headerRdd union rowsRdd
}
}
我有类似的问题,我必须数据框的内容保存到我定义名称的CSV文件。 df.write("csv").save("<my-path>")
正在创建目录而不是文件。所以必须拿出以下解决方案。 大部分代码从以下dataframe-to-csv采取小的修改的逻辑。
def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
val tmpParquetDir = "Posts.tmp.parquet"
df.repartition(1).write.
format("com.databricks.spark.csv").
option("header", header.toString).
option("delimiter", sep).
save(tmpParquetDir)
val dir = new File(tmpParquetDir)
val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
(new File(tmpTsvFile)).renameTo(new File(tsvOutput))
dir.listFiles.foreach(f => f.delete)
dir.delete
}
btw这不会返回一个数组,而是一个DataFrame! [参考这里(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – eliasah
如果给出的答案解决您的问题,请接受它, up-vote,所以我们可以将这个问题归类为已解决! – eliasah