Spark本地worldcount详细讲解(Scala版本)以及流程

主要介绍Scala来写spark代码的流程

package com.xlucas
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    /**
      * Created by Xlucas on 2018/12/16.
      */
    object WordCount {
      def main(args: Array[String]): Unit = {
    
        /**
        第一步:创建spark的配置对象sparkconf,设置spark程序的运行时的配置信息,例如说通过setMaster来设置程序
          链接spark集群的master的URL,如果设置为local,则代表spark程序在本地运行,
          */
        val conf=new SparkConf();//创建SparkConf对象
        conf.setAppName("WordCount")//设置应用程序的名称,在程序运行的监控界面可以看到这个名字
        conf.setMaster("local")//此时,程序在本地执行,不需要安装spark集群
        //conf.setMaster("spark://192.168.18.140:7077")//指定spark运行是集群模式 一般我们不在代码中指定,我们在提交的时候指定
        /*
        第二步:创建SparkContext对象,
        SparkContext是spark程序所有功能的唯一入口,无论是采用Scala,Java,Python,R等都必须有一个SparkContext
        SparkContext核心作用:初始化spark应用程序运行时候所需要的核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
        同时还会负责Spark程序往Master注册程序等
        SparkContext是整个spark应用程序中最为至关重要的一个对象
         */
        val sc=new SparkContext(conf)//创建SparkContext对象,通过传入SparkContext实例来定制Spark运行的具体参数和配置信息
        /*
        第3步:根据具体的数据来源 (HDFS,HBase,Local等)通过SparkContext来创建RDD
        RDD的创建有3种方式,外部的数据来源,根据scala集合,由其他的RDD操作
        数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
         */
        val line=sc.textFile("E:\\server.log",1) //读取本地的一个文件并且设置为1个partition
        // val line =sc.textFile("hdfs://192.168.18.140:9000/input/LICENSE.txt") //指定HDFS的路径,这个也可以到时候在参数传入
        /*
        第4步:对初始的RDD进行Transformation级别的处理,例如Map、filter等高阶函数等的编程来进行具体的数据计算
         在对每一行的字符串拆分成单个单词
         在单词的拆分的基础上对每个单词实例计算为1,也就是word=>(word,1)
         在对每个单词实例计数为1基础上统计每个单词在文件中出现的总次数
         */
        val words=line.flatMap(_.split(" "))
        val pairs=words.map(word=>(word,1))
        val wordcounts=pairs.reduceByKey(_+_).map(pairs=>(pairs._2,pairs._1)).sortByKey(false,1).map(pairs=>(pairs._2,pairs._1))
        //wordcounts.foreach(wordNum=>println(wordNum._1+":"+wordNum._2)) 本地模式用这个打印,
        wordcounts.foreach(wordNum=>println(wordNum._1+":"+wordNum._2))
        sc.stop()
      }
    }

worldcout的流程
Spark本地worldcount详细讲解(Scala版本)以及流程