Spark入门学习——要点1
1、collect() 函数
RDD 还有一个collect() 函数,可以用来获取整个RDD 中的数据。如果你的程序把RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用collect(),因此,collect() 不能用在大规模数据集上。使用collect() 使得RDD 的值与预期结果之间的对比变得很容易。由于需要将数据复制到驱动器进程中,collect() 要求所有数据都必须能一同放入单台机器的内存中。
2、Java标准函数接口
函数名 | 实现的方法 | 用途 |
Function<T,R> | R call(T) | 接收一个输入值并返回一个输出值,用于类似map() 和filter() 等操作中 |
Function2<T1, T2, R> | R call(T1, T2) | 接收两个输入值并返回一个输出值,用于类似aggregate()和fold() 等操作中 |
FlatMapFunction<T, R> | Iterable<R> call(T) | 接收一个输入值并返回任意个输出,用于类似flatMap()这样的操作中 |
3、常见的转化操作和action操作
对一个数据为{1,2,3,3}的RDD进行基本的RDD转化操作
函数名 | 目的 | 示例 | 结果 |
map() | 将函数应用于Rdd中每个元素,将返回值构成新的RDD | rdd.map(x->x+1) | {2, 3, 4, 4} |
flatMap() | 将函数应用于RDD 中的每个元 素,将返回的迭代器的所有内 容构成新的RDD。通常用来切 分单词 |
rdd.flatMap(x => x.to(3)) | {1, 2, 3, 2, 3, 3, 3} |
filter() | 返回一个由通过传给filter() 的函数的元素组成的RDD |
rdd.filter(x => x != 1) | {2, 3, 3} |
distinct() | 去重 | rdd.distinct() | {1, 2, 3} |
sample(withReplacement, fraction, [seed]) |
对RDD 采样,以及是否替 | rdd.sample(false, 0.5) | 非确定的 |
对数据分别为{1,2,3} 和{3,4,5}的RDD进行针对两个RDD的转化操作
函数名 | 目的 | 示例 | 结果 |
union() | 生成一个包含两个RDD 中所有元 素的RDD |
rdd.union(other) | {1, 2, 3, 3, 4, 5} |
intersection() | 求两个RDD 共同的元素的RDD | rdd.intersection(other) | {3} |
subtract() | 移除一个RDD 中的内容(例如移 除训练数据) |
rdd.subtract(other) | {1, 2} |
cartesian() | 与另一个RDD 的笛卡儿积 | rdd.cartesian(other) | {(1, 3), (1, 4), ... (3, 5)} |
action操作
行动 | 涵义 |
---|---|
reduce(func) | 使用传入的函数参数 func 对数据集中的元素进行汇聚操作 (两两合并). 该函数应该具有可交换与可结合的性质, 以便于能够正确地进行并行计算. |
collect() | 在 driver program 上将数据集中的元素作为一个数组返回. 这在执行一个 filter 或是其他返回一个足够小的子数据集操作后十分有用. |
count() | 返回数据集中的元素个数 |
first() | 返回数据集中的第一个元素 (与 take(1) 类似) |
take(n) | 返回数据集中的前 n 个元素 |
takeSample(withReplacement, num, [seed]) | 以数组的形式返回数据集中随机采样的 num 个元素. |
takeOrdered(n, [ordering]) | 以其自然序或使用自定义的比较器返回 RDD 的前 n 元素 |
saveAsTextFile(path) | 将数据集中的元素写入到指定目录下的一个或多个文本文件中, 该目录可以存在于本地文件系统, HDFS 或其他 Hadoop 支持的文件系统. Spark 将会对每个元素调用 toString 将其转换为文件的一行文本. |
saveAsSequenceFile(path)(Java and Scala) | 对于本地文件系统, HDFS 或其他任何 Hadoop 支持的文件系统上的一个指定路径, 将数据集中的元素写为一个 Hadoop SequenceFile. 仅适用于实现了 Hadoop Writable 接口的 kay-value pair 的 RDD. 在 Scala 中, 同样适用于能够被隐式转换成 Writable 的类型上 (Spark 包含了对于 Int, Double, String 等基本类型的转换). |
saveAsObjectFile(path)(Java and Scala) | 使用 Java 序列化将数据集中的元素简单写为格式化数据, 可以通过 SparkContext.objectFile() 进行加载. |
countByKey() | 仅适用于 (K, V) 类型的 RDD. 返回每个 key 的 value 数的一个 hashmap (K, int) pair. |
foreach(func | 对数据集中的每个元素执行函数 func. 这通常用于更新一个 Accumulator 或与外部存储系统交互时的副作用. 注意: 修改 foreach() 外的非 Accumulator 变量可能导致未定义的行为. 更多细节请查看 Understanding closures. |
4、distinct()函数
我们的RDD 中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只要唯一的元素,我们可以使用RDD.distinct() 转化操作来生成一个只包含不同元素的新RDD。不过需要注意,distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。
5、Spark RDD缓存/持久化策略
为了避免多次计算同一个RDD,可以让Spark 对数据进行持久化。
出于不同的目的,我们可以为RDD 选择不同的持久化级别(如表3-6 所示)。在Scala(见 例3-40)和Java 中,默认情况下persist() 会把数据以序列化的形式缓存在JVM 的堆空间中。在Python 中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在JVM 堆空间中。当我们把数据写到磁盘或者堆外存储上时,也总是使用序列化后的数据。
表3-6:org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级别;如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份
如果在调用RDD的action的操作之前调用了persist()操作,并不会马上出发强制求值机制,仍然是懒加载的机制。
如果缓存的数据过多,内存中放不下,Spark会自动利用最近最少使用(LRU)缓存策略进行数据分区移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区,这些分区需要重新计算。但是对于内存+磁盘模式的缓存级别的分区来说,被移除的分区都会被写入磁盘。
RDD还有一个方法叫做UNpersist(),调用该方法可以手动把持久化的RDD从缓存中删除。
转载于:https://my.oschina.net/guanhe/blog/2051525