spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)
今天我们要做的就是简单编写一个统计单词出现数量的项目!!!
目录
1.搭建Spark开发环境
先安装scala,我这里是直接安装运行已经集成好的eclipse软件,包括jdk的配置
jdk你可以去官网下载,1.8以上的,最好是我这个版本的(如果出错就可以使用下图版本):
还需要去配置jdk环境:(不懂去百度一下)
eclipse软件资源:
链接:https://pan.baidu.com/s/1D6Bp1KRmH-ZZM4s_0Esx7A
提取码:dujm
复制这段内容后打开百度网盘手机App,操作更方便哦
1.1 完成wordcount示例
打开eclipse软件,新建scala项目,可以直接在窗口的右上角创建
新建好后,窗体左边就会出现下图项目文件列表,但是这里需要改一个版本:右击出现选项--->点击Properties选项
然后就是导入需要用到的jar包:
jar的网盘资源:(全选jar导入)
链接:https://pan.baidu.com/s/1jZHbd5e2r01CiuoJtx3WwQ
提取码:liom
复制这段内容后打开百度网盘手机App,操作更方便哦
最后就是编写代码了,代码如下:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object numberWord {
def main(args: Array[String]):Unit = {
System.setProperty("hadoop.home.dir", "E:\\U\\互联网大数据\\0509\\hadoop-2.6.5");
//这里是你的hadoop的安装包的路径
//spark处理---并发迭代大数据
//RDD.map word->(w,1)
//rdd.reduce->w->sum 1
//1.sc
var conf = new SparkConf()
conf.setMaster("local[*]").setAppName("")
var sc = new SparkContext(conf)
sc.setLogLevel("WARN");
//2.sc.textFile ->RDD
var path = "src/data/txt" //这里是你的数据路径
var filedata_rdd = sc.textFile(path,2)
var words_rdd=filedata_rdd
.flatMap(_.split("\\W+"))
.map(x=>(x,1))
.reduceByKey(_+_)
.map(_.swap)
.sortByKey(false)
// words_rdd.foreach(println)
println(filedata_rdd.count())
words_rdd.foreach{line=>
println("word="+line._1+" ,num="+line._2)
}
println("end....")
}
}
我就直接在项目里面新建一个文件,来存放测试数据
数据内容为:
运行结果为:
2.Spark架构理解
并行化是将工作负载分在不同线程或不同节点上执行的子任务.
Spark的工作负载的划分由RDD分区决定。
(1).任务调度
架构示意图:
3.Spark工作原理
(1).Spark工作流程
编写程序提交到Master上,
Master是由四大部分组成(RDD Graph,Scheduler,Block Tracker以及Shuffle Tracker)
启动RDD Graph就是DAG,它会提交给Task Scheduler任务调度器等待调度执行
具体执行时,Task Scheduler会把任务提交到Worker节点上
Block Tracker用于记录计算数据在Worker节点上的块信息
Shuffle Blocker用于记录RDD在计算过程中遇到Shuffle过程时会进行物化,Shuffle Tracker用于记录这些物化的RDD的存放信息
Spark有如下优势:
Spark提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求
官方资料介绍Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍,甚至能够将应用在磁盘上的运行速度提升10倍
4.DAG、Stage、宽窄依赖
(1).DAG:有向无环图:有方向,无闭环,代表着数据的流向,这个DAG的边界则是Action方法的执行;原始的RDD通过一系列的转换就形成
(2).Stage概念
Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAG Scheduler(调度),DAG Scheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。stage是由一组并行的task组成。
stage划分
一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分,简单的说是以shuffle和result这两种类型来划分。
spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。
DAG Scheduler:负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,且将DAG划分为不同的Stage,每个Stage可并发执行一组task。
TaskScheduler:DAGScheduler将划分完成的Task提交到TaskScheduler,TaskScheduler通过Cluster Manager在集群中的某个Worker的Executor上启动任务,实现类TaskSchedulerImpl。
(3).RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)以及宽依赖(wide dependency).
窄依赖
父RDD和子RDD partition之间的关系是一对一的。或者父RDD一个partition只对应一个子RDD的partition情况下的父RDD和子RDD partition关系是多对一的。不会有shuffle的产生。父RDD的一个分区去到子RDD的一个分区。
宽依赖
父RDD与子RDD partition之间的关系是一对多。会有shuffle的产生。父RDD的一个分区的数据去到子RDD的不同分区里面。
图片来自博客:https://blog.****.net/qq_16681169/article/details/82432841