“戏”说Spark-Spark核心-RDD转换操作算子详解(一)
“戏”说Spark-Spark核心-RDD转换行动类算子详解
算子概述
对于RDD可以有两种计算方式:
转换(返回值还是一个RDD)---懒执行
操作(返回值不是一个RDD)---立即执行
转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。
操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。
我们可以形象的使用下图表示Spark的输入、运行转换、输出。
Spark的输入、运行转换、输出。在运行转换中通过算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
·输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数据空间,会转化为Spark中的数据块,通过BlockManager进行管理。
·运行:在Spark数据输入形成RDD后,便可以通过变换算子fliter等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
·输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中(collect输出到Scala集合,count返回Scala Int型数据)。
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、ShuffledRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。
常用算子总结:
官方文档中常用算子:
翻译:
Action算子:
Transformations算子:
如何区分Transformations算子和Action类算子?
常用的Transformations算子+Action算子案例演示:代码可直接运行
package spark.mySpark.transformationAndaction;
import groovy.lang.Tuple;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.sysFuncNames_return;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
//java中的元组使用scala中的Tuple
import scala.Tuple2;
/**
* @author
* Spark算子演示:
* Transformation类算子
* map
* flatMap
* filter
* sortByKey
* reduceByKey
* sample
* Action类算子:
* count
* collect
* foreach
*/
public class transformation_test {
@SuppressWarnings("resource")
public static void main(String[] args) {
//因为java是面向对象的语言,当使用java来写Spark代码的时候,是传递对象,自动的提示生成返回值可以简化开发
//快捷键:Ctrl+1
//Spark应用程序的配置文件对象,可以设置:1:运行模式,2:应用程序Application的名称,3:运行时的资源的需求
SparkConf sparkConf = new SparkConf().setAppName("transformation_test").setMaster("local[3]");
//SparkContext是非常的重要的,它是通往集群的唯一的通道
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
//加载文件生成RDD
JavaRDD<String> textFileRDD= sparkContext.textFile("words.txt");
//==========================filter(function(T,Boolean))=========================//
//filter算子是Transformation类算子,返回一个由通过filter()的函数的元素组成的RDD,结果为true的元素会返回,可以用于过滤
//第一个泛型是textFileRDD里内容的类型,Boolean是返回值类型
JavaRDD<String> filterRDD = textFileRDD.filter(new Function<String, Boolean>() {
/**
* 分布式的程序:对象需要走网络传输
* 添加序列化id
*/
private static final long serialVersionUID = 1L;
public Boolean call(String line) throws Exception {
//过滤掉java
System.out.println("是否执行filter算子");
return !line.contains("java");
}
});
//============================foreach========================================//
//foreach算子是Action类算子,遍历RDD的计算结果
filterRDD.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
public void call(String word) throws Exception {
System.out.println(word);
}
});
//================================collect=========================//
//collect算子是Action类算子:将在集群中运行任务的结果拉回Driver端
//注意:当计算结果很大的时候,会导致Driver端OOM
List<String> list = filterRDD.collect();
for(String string:list){
System.out.println(string);
}
//===============================================map=========================//
//map算子是transformation类算子,一般用于改变RDD的内数据的内容格式
//String输入数据的类型,第二个为返回值类型
JavaRDD<Integer> mapRDD = textFileRDD.map(new Function<String, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(String line) throws Exception {
return line.contains("java")?1:0;
}
});
mapRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
public void call(Integer num) throws Exception {
System.out.println(num);
}
});
//============================================sample=========================//
//sample算子是一个Transformation类算子,通常用于大数据中的抽样
//withReplacement:是否为放回式的抽样,false为不放会式抽样。fraction为抽样的比例,seed为随机种子:随机抽样算法的初始值
JavaRDD<String> sampleRDD = textFileRDD.sample(true, 0.5);
long count= sampleRDD.count();
System.out.println(count);
//=========================================flatmap=========================//
//flatmap:map+flat,input 1 output *
//map :input 1 output 1
//切分单词
JavaRDD<String> flatMapRDD = textFileRDD.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
//返回迭代器
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
List<String> collect = flatMapRDD.collect();
for(String string:collect){
System.out.println("word="+string);
}
//===============================sortByKey=========================//
//在java的API中:将RDD转化为(K,V)格式的RDD,需要使用**toPair
//第一个为输入数据的类型,第二个,第三个参数为返回的K,V
List<Tuple2<String,Integer>> temp = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
// reduceByKey为Transformation类算子
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception {
//循环反复将v1+v2的值累加
return v1+v2;
}
//变换(K,V)格式的RDD
}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
private static final long serialVersionUID = 1L;
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)
throws Exception {
return new Tuple2<Integer, String>(tuple._2, tuple._1);
}
}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(Tuple2<Integer, String> line)
throws Exception {
return new Tuple2<String, Integer>(line._2,line._1);
}
}). collect();;
//注意:当使用本地的local[*>1]的时候,使用foreach遍历数据的时候会出错?
//具体的什么问题我也不是很清楚?
/**
foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Integer> tuple) throws Exception {
System.out.println(tuple);
}
});
**/
for(Tuple2<String, Integer> list1:temp){
System.out.println(list1);
}
//关闭SparkContext
sparkContext.stop();
}
}
思考:假如有1亿条数据,如何过滤掉出现次数最多的字符串:抽样-wordcount统计-调换(K,V)-排序取Top1-filter过滤即可。
scala版本的思考题代码:
package spark.myspark.functions
import org.apache.spark.{SparkContext, SparkConf}
//思考:假如有1亿条数据,如何动态的统计出出现次数最多的编程语言,然后过滤掉
// 思路:抽样-wordcount统计-调换(K,V)-排序取Top1-filter过滤即可。
/**
* 使用到的算子:
* sample
* flatmap
* map
* reduceByKey
* sortByKey
* take(n)-----Action算子
* first()----源码中即take(1)
* filter
*/
object Sample_test {
def main(args: Array[String]) {
val conf= new SparkConf().setAppName("sample").setMaster("local")
val context =new SparkContext(conf)
val textRDD= context.textFile("words.txt")
//抽样
val sampleRDD=textRDD.sample(false,0.9)
//拿到编程语言---(语言,1)---根据key求和---转换(语言,出现次数)--(次数,语言)---排序---取Top1---取Top1对应的编程语言
val WordRDD= sampleRDD.map(x=>{(x.split(" ")(1),1)}).reduceByKey(_+_)
//first=take(1)----Action类的算子,返回一个非RDD的值
val code= WordRDD.map(x=>{(x._2,x._1)}).sortByKey(false).first()._2
//过滤
textRDD.filter(x=>{!x.contains(code)}).foreach(println)
context.stop()
}
}
注意:有多少个Action类的算子就有多少个Job任务
reduceByKey和sortByKey会产生Shuffle
注意:一个Spark应用程序的编写流程
详细请参考:“戏”说Spark-Spark Stage切分
"戏"说Spark-Spark Shuffle详解
补充:依赖包及源码包:链接:http://pan.baidu.com/s/1nuTS8WT密码:9bf7
1:源码包下载地址(包含依赖包):
技能补充:
如何使用Idea以maven的方式编译源码包
1:下载源码,地址:http://spark.apache.org/downloads.html,选择相应的版本
2:将源码工程import到Idea
3:以maven的方式构建,可能需要等2-3小时,需要下载Spark相关的依赖包
如何使用Eclipse查看源码?
1:下载源码
2:Ctrl需要查看的类,然后选择源码包即可查看
思维导图构建你的知识体系:
参考:
Spark的算子的分类: http://www.cnblogs.com/zlslch/p/5723857.html
http://www.jianshu.com/p/c7eef3eb6225