是否可以从Spark Streaming文件夹中删除文件?
问题描述:
Spark 2.1,ETL将源文件系统中的文件转换为parquet,并将小parquets放入folder1中。 Spark1在folder1上流式传输工作正常,但是对于HDFS而言,文件夹1中的parquet文件太小。我们必须合并较大的小拼块文件,但是当我尝试从文件夹1中删除文件时,火花流式处理上升异常:是否可以从Spark Streaming文件夹中删除文件?
17/07/26 17:16:23错误StreamExecution:Query [id = f29783ea- bdfb-4b59-a6f6-b77f79509a5a,指定runid = cbcce2b2-7d7b-4e31-A15A-7efed420f974]因错误 java.io.FileNotFoundException终止:文件不存在
是否有可能合并火花流文件夹中的文件拼花?
Welcome to
____ __
/__/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0.2.6.0.3-8
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.types._
val userSchema = new StructType()
.add("itemId", "string")
.add("tstamp", "integer")
.add("rowtype", "string")
.add("rowordernumber", "integer")
.add("parentrowordernumber", "integer")
.add("fieldname", "string")
.add("valuestr", "string")
val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2")
csvDF.createOrReplaceTempView("tab1")
val aggDF = spark.sql("select distinct count(itemId) as cases_count from tab1")
aggDF
.writeStream
.outputMode("complete")
.format("console")
.start()
aggDF
.writeStream
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()
// Exiting paste mode, now interpreting.
+-----------+
|cases_count|
+-----------+
+-----------+
import org.apache.spark.sql.types._
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(itemId,StringType,true), StructField(tstamp,IntegerType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true))
csvDF: org.apache.spark.sql.DataFrame = [itemId: string, tstamp: int ... 5 more fields]
aggDF: org.apache.spark.sql.DataFrame = [cases_count: bigint]
scala> -------------------------------------------
Batch: 0
-------------------------------------------
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+-----------+
|cases_count|
+-----------+
| 292086106|
+-----------+
-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+
|cases_count|
+-----------+
| 292086106|
+-----------+
-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+
|cases_count|
+-----------+
| 292086106|
+-----------+
-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+
|cases_count|
+-----------+
| 292086106|
+-----------+
-------------------------------------------
Batch: 4
-------------------------------------------
+-----------+
|cases_count|
+-----------+
| 324016758|
| 292086106|
+-----------+
-------------------------------------------
Batch: 5
-------------------------------------------
+-----------+
|cases_count|
+-----------+
| 355839229|
| 324016758|
| 292086106|
+-----------+
17/07/26 17:16:23 ERROR StreamExecution: Query [id = f29783ea-bdfb-4b59-a6f6-b77f79509a5a, runId = cbcce2b2-7d7b-4e31-a15a-7efed420f974] terminated with error
java.io.FileNotFoundException: File does not exist: /folder1/folder2/P-FMVDBAF-4021-20161107152556-1_006.gz.parquet
答
您可以使用通配符来只处理您需要的文件。像这样:
val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2/bigger_file*.parquet")
这是Spark Streaming还是Structured Streaming?谨慎分享一些代码?看起来像结构化流媒体。你还可以包含整个堆栈跟踪吗? –
我已经更新了与示例代码的主要帖子,是的,它是结构化流媒体,我使用spark-shell来执行代码。 – Triffids