Spark本地worldcount详细讲解(Scala版本)以及流程
主要介绍Scala来写spark代码的流程
package com.xlucas
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* Created by Xlucas on 2018/12/16.
*/
object WordCount {
def main(args: Array[String]): Unit = {
/**
第一步:创建spark的配置对象sparkconf,设置spark程序的运行时的配置信息,例如说通过setMaster来设置程序
链接spark集群的master的URL,如果设置为local,则代表spark程序在本地运行,
*/
val conf=new SparkConf();//创建SparkConf对象
conf.setAppName("WordCount")//设置应用程序的名称,在程序运行的监控界面可以看到这个名字
conf.setMaster("local")//此时,程序在本地执行,不需要安装spark集群
//conf.setMaster("spark://192.168.18.140:7077")//指定spark运行是集群模式 一般我们不在代码中指定,我们在提交的时候指定
/*
第二步:创建SparkContext对象,
SparkContext是spark程序所有功能的唯一入口,无论是采用Scala,Java,Python,R等都必须有一个SparkContext
SparkContext核心作用:初始化spark应用程序运行时候所需要的核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
同时还会负责Spark程序往Master注册程序等
SparkContext是整个spark应用程序中最为至关重要的一个对象
*/
val sc=new SparkContext(conf)//创建SparkContext对象,通过传入SparkContext实例来定制Spark运行的具体参数和配置信息
/*
第3步:根据具体的数据来源 (HDFS,HBase,Local等)通过SparkContext来创建RDD
RDD的创建有3种方式,外部的数据来源,根据scala集合,由其他的RDD操作
数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
val line=sc.textFile("E:\\server.log",1) //读取本地的一个文件并且设置为1个partition
// val line =sc.textFile("hdfs://192.168.18.140:9000/input/LICENSE.txt") //指定HDFS的路径,这个也可以到时候在参数传入
/*
第4步:对初始的RDD进行Transformation级别的处理,例如Map、filter等高阶函数等的编程来进行具体的数据计算
在对每一行的字符串拆分成单个单词
在单词的拆分的基础上对每个单词实例计算为1,也就是word=>(word,1)
在对每个单词实例计数为1基础上统计每个单词在文件中出现的总次数
*/
val words=line.flatMap(_.split(" "))
val pairs=words.map(word=>(word,1))
val wordcounts=pairs.reduceByKey(_+_).map(pairs=>(pairs._2,pairs._1)).sortByKey(false,1).map(pairs=>(pairs._2,pairs._1))
//wordcounts.foreach(wordNum=>println(wordNum._1+":"+wordNum._2)) 本地模式用这个打印,
wordcounts.foreach(wordNum=>println(wordNum._1+":"+wordNum._2))
sc.stop()
}
}
worldcout的流程