星火需要0.5秒平均100号
我已经约70百万行的数据集的用户的位置和日期时间CSV,并写下了下面的代码,以平均最高100个用户的点数:星火需要0.5秒平均100号
val spark = org.apache.spark.sql.SparkSession.builder
.appName("Test")
.getOrCreate
import spark.implicits._
val watch = new Stopwatch()
watch.start()
val schema = new StructType().add("user_id", StringType).add("datetime", LongType)
val df = spark.read.format("csv").option("header", "true").schema(schema).csv(inputFile)
df.createOrReplaceTempView("paths")
val pathDs = spark.sql("select user_id, min(datetime) as started, max(datetime) as finished, " +
"count(*) as total, max(datetime) - min(datetime) as timeDelta " +
"from paths group by user_id order by total desc limit 100")
pathDs.cache()
pathDs.collect.foreach(println)
println(watch.elapsedTime(TimeUnit.MILLISECONDS))
val avgPoints = pathDs.select(avg("total")).as[Double].head()
println(avgPoints)
println(watch.stop())
这里发生的事情是我花费了数百万/亿的记录(最终可能会花费数TB),并将它们汇总为100列5列的记录。问题不在于这部分需要多长时间,或者我们如何加快速度,而在于我们处理最终的100条记录时会发生什么。
还有一种更简单的方法可以直接通过SQL完成此操作,但我还需要pathDS以便稍后进行更多处理。该代码工作正常,但我注意到pathDs.select(avg("total")).as[Double].head()
开始做了大量的工作,并最终花费了大约半秒,即使pathDS只包含100行。
你知道为什么它需要这么长时间,我怎么能加快这个速度,特别是在这个只有100行的小数据集上运行?我专门做了.cache和.collect,以便在进行任何进一步聚合之前在本地创建所有100条记录(而且我现在正在本地运行此操作)。
我在本地使用Scala 2.11上的Spark 2.2。
Spark针对大数据集进行优化。这意味着通常会有一些开销对于大数据集可以忽略不计,但对于小数据集则不会忽略不计。
考虑当您运行计算avgPoints会发生什么:
- 星火计算“转型”,即它定义需要什么样的计算做(这是选择的部分,平均等)。
- 你称之为“头部”动作,它使得火花把你制作的表达树变成一个物理计划。这包括优化以及比较几种可能的解决方案。请注意,该表达式还包含计算缓存部分的表达式。在实践中,这些步骤将被跳过(你可以在Spark UI中看到这个),但是它们仍然被认为是在某些情况下可能会决定重新计算一些缓存数据(在这种情况下几乎肯定不会)。
- Spark使用整个阶段代码生成将物理计划编译成代码,序列化此代码并将其发送给所有相关的执行者。
- 当spark创建计划时,它将数据分区(可能是200个分区,因为这是groupby的默认分区)。这意味着你在执行者之间分配了200个任务。大多数分区将有0或1个元素,因此他们所做的任务几乎立即执行,但火花必须启动200个任务。
- Spark将200个任务中的每个任务的结果发送到缓冲区,并将它们全部发送给单个执行程序以完成最终聚合。在所有任务完成并发送其数据之前,最终的聚合任务不会开始。
- 一旦最终的聚合完成,结果将被发送回驱动程序。
正如你可以看到这里有很多阶段,包括网络传输和开始/结束任务(需要管理)。即使没有真实数据,这里的总体开销也很容易达到半秒。
如果将限制更改为1000,即使处理10倍数据,也可能看到总体时间变化很小。
这是一个常见的使用情况,使用火花来减少问题的大小,即你有大量的数据,做一些聚合,并获得更少的元素(在你的情况下为100),然后你会收集它们到驱动程序,并直接采取行动,而不是使用火花,以避免开销(例如,在你的情况下保存收集的结果,而不是用println做foreach,总结一下)。
你可以做的一件事情是在计算pathDs时使用coalesce(1)。这意味着你只有一个分区(所有的连接都将成为第一阶段的一部分)。这与使用收集结果并没有太大区别,只是如果要将限制更改为较大的大小,那么合并为较小但不是1的值可能很有用(例如,您可以设置10000的限制,然后将其合并为4仍然有一些平行)。
更新
基础上的评论限制的结果是当前1分区,以便凝聚不会帮助(这也意味着没有真正的理由不这样做,除非收集您要使用数据帧功能结果)。上述过程仍然正确,只是使用了一个分区而不是多个分区。
我建议删除最后一段。这完全没用,特别是当你考虑时,'pathDs'将总是有1个分区。 – zero323
@ zero323是否始终如此?分区数量不会取决于实际的限制数量和每个分区的密钥数量? –
所以,在我的代码,我专门做了 pathDs.cache(的println) 计算平均值,以为这实际上将拉低结果到驱动程序之前 pathDs.collect.foreach - 但它似乎并不像这就是发生了什么事。什么是收集这里的正确方法? 请注意,如果我收集并操作收集的数据集,结果会更快,因为我只是在本地Scala对象上工作,但如果我收集,打印,然后在pathDS上执行其他操作(例如在我的代码中),它仍然是0.5秒。有没有更好的方法来做缓存/收集? – kozyr
优化它的一种方法是使用函数collect
将整个数据集放入内存中,然后使用规范的Scala操作,您可以在1-2 ms内完成该操作?但这首先反驳了使用Spark的原因。
Spark的优势在于在不同机器上的多个节点间高效地执行分布式计算。小数据集上的操作在不通过Spark传递的情况下总是更高效。你试验它的时机相当于747飞行100米。现在你想知道为什么747这么慢,当每个人都说飞行让你如此之快的时候。
在使用RDDs在Spark中执行工作的旧方式中,在版本1.2 - > 1.6左右,可以使用像mapPartitionsWithIndex
这样的函数对分区数据执行正常的scala操作,以避免火花消耗。这当然意味着在这个函数中,所有的数据都已经在火花节点级别被隔离了。使用这种方法,您将获得两个世界的好处。
- 测量不
cache
你丢掉时加载数据时到cache
本身。可能会更快。 - 您能否将
input data
转换为parquet
并将其加载到相同群集中的存储器存储器中,例如alluxio
?如果是,partition
它由user_id
。理想情况下,设计体系结构使新的输入数据被推送到kafka
,一个structured streaming
作业将其附加到alluxio
或cassandra
,另一个在所选范围内进行聚合。另外,请使用flink
或者batch
或stream
,因为它通常更快。
,如果你不能控制输入数据结构,然后重点给予2DN阶段,并尝试使用typed aggregates
如:
groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K,
T]
,你将需要创建数据csv
设置为:
case class Input(userId: String, time: DateTime)
val ds = spark.read.format("csv").option("header",
"true").schema(schema).csv(inputFile).as[Input]
ds.groupByKey(_.userId).avg(_.time).show
由于键入的性能好处,对于大数据集肯定会更快,但对于较小的数据集可能不会更快
我在这里没有处理小数据 - 完整的数据集将会有数百GB/TB。但是我特别想看看为什么这个特定的例子很慢,因为我认为做.cache()应该让我的100行数据集保留在内存中,并且它的结果应该立即出现。 – kozyr
我建议检查一下SparkUI中的一些高级指标,找出为什么它需要这么长时间。我还建议通过此代码运行至少一个数据集,例如1000个值,这样您可以更好地推断常量开销与实际(O(n))的计算时间。 –