Spark WordCount简单案例(java,scala版)

Spark 是什么?

官方文档解释:Apache Spark™ is a fast and general engine for large-scale data processing.

通俗的理解:Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在   大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark   部署在大量廉价硬件之上,形成集群。

扩展了MapReduce计算模型;相比与MapReduce编程模型,Spark提供了更加灵活的DAG(Directed Acyclic Graph) 编程模型, 不仅包含传统的map、reduce接口, 还增加了filter、flatMap、union等操作接口,使得编写Spark程序更加灵活方便。

高效支持多种计算模式;Spark  不仅可以做离线运算,还可以做流式运算以及迭代式运算。

Spark 组成 大一统软件栈

 Spark WordCount简单案例(java,scala版)

接下来直接上代码

Java版

Spark WordCount简单案例(java,scala版)

Spark WordCount简单案例(java,scala版)

Spark WordCount简单案例(java,scala版)

Spark WordCount简单案例(java,scala版)

Spark WordCount简单案例(java,scala版)

Spark WordCount简单案例(java,scala版)

package com.spark.wordcount;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

public class SparkWordCount {
    public static void main(String[] args) {
        //参数检查
        if (args.length < 2) {
            System.err.println("Usage: SparkWordCount <input> <output> ");
            System.exit(1);
        }
        //获取参数
        String input = args[0];   //输入参数
        String output = args[1];   //输出参数
        /**
         * 1、创建SparkConf对象,设置Spark应用程序的配置信息
         */
        SparkConf conf = new SparkConf()
                //设置Spark应用程序的名称
                .setAppName("WordCount");

// conf.set("spark.testing.memory", "2147480000");

// 因为jvm无法获得足够的资源 这串数字大于512
        /**
         * 2、创建SparkContext对象,Java开发使用JavaSparkContext;
         * Scala开发使用SparkContext
         * 在Spark中,SparkContext负责连接Spark集群,创建RDD、累积量和广播量等。
         * Master参数是为了创建TaskSchedule(较低级的调度器,高层次的调度器为DAGSchedule),如下:
         *         如果setMaster("local")则创建LocalSchedule;
         *         如果setMaster("spark")则创建SparkDeploySchedulerBackend。
         *         在SparkDeploySchedulerBackend的start函数,会启动一个Client对象,连接到Spark集群。
         */
        //创建java版本的SparkContext,创建上下文对象
        JavaSparkContext sc = new JavaSparkContext(conf);

        /**
         * 3、sc中提供了textFile方法是SparkContext中定义的,如下:
         *         def textFile(path: String): JavaRDD[String] = sc.textFile(path)
         * 用来读取HDFS上的文本文件、集群中节点的本地文本文件或任何支持Hadoop的文件系统上的文本文件,
         * 它的返回值是JavaRDD[String],是文本文件每一行
         */
        //读取数据
        JavaRDD<String> lines = sc.textFile(input);
        /**
         * 4、将行文本内容拆分为多个单词
         * lines调用flatMap这个transformation算子(参数类型是FlatMapFunction接口实现类)返回每一行的每个单词
         *
         *  flatmap与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,
         *  而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。
         *  返回的是一个迭代
         */
        ////进行相关计算
        //----用各种Transformation算子对RDD进行操作-----------------------------------------
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            public Iterable<String> call(String line) throws Exception {
                // TODO Auto-generated method stub
                return Arrays.asList(line.split(" "));
            }
        });

        /**
         * 2.2.0版本   Iterator不是 Iterable
         */
//        lines.flatMap(new FlatMapFunction<String, Object>() {
//            public Iterator call(String lines){
//                return Arrays.asList(lines.split(""));
//            }
//        });

        /**
         * 5、将每个单词的初始数量都标记为1个
         * words调用mapToPair这个transformation算子(参数类型是PairFunction接口实现类,
         * PairFunction<String, String, Integer>的三个参数是<输入单词, Tuple2的key, Tuple2的value>),
         * 返回一个新的RDD,即JavaPairRDD
         *
         * Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为PairRDD
          mapToPair函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的,
         调用f函数后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象
          Tuple2多元组
         */
        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                // return new Tuple2(word, 1);  //也可以
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        /**
         * 6、计算每个相同单词出现的次数
         * pairs调用reduceByKey这个transformation算子(参数是Function2接口实现类)对每个key的value进行reduce操作,
         *
         * 返回一个JavaPairRDD,这个JavaPairRDD中的每一个Tuple的key是单词、value则是相同单词次数的和
         * reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,
         *因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。
         */
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            public Integer call(Integer v1, Integer v2) throws Exception {
                // TODO Auto-generated method stub
                return v1 + v2;
            }
        });

        /**
         * 7、使用foreach这个action算子提交Spark应用程序
         * 在Spark中,每个应用程序都需要transformation算子计算,最终由action算子触发作业提交
         */
        //----------打印信息-----------------------------------
        wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {

            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("====" + wordCount._1 + " appeared " + wordCount._2 + " times");
            }
        });
        /**
         * 8、将计算结果文件输出到文件系统
         *         HDFS:
         *    使用新版API(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;)
         *     wordCount.saveAsNewAPIHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class,
         *     TextOutputFormat.class, new Configuration());
         *     使用旧版API(org.apache.hadoop.mapred.JobConf;org.apache.hadoop.mapred.OutputFormat;)
         *      wordCount.saveAsHadoopFile("hdfs://ns1/spark/wordcount", Text.class, IntWritable.class,
         *      OutputFormat.class, new JobConf(new Configuration()));
         *      使用默认TextOutputFile写入到HDFS(注意写入HDFS权限,如无权限则执行:hdfs dfs -chmod -R 777 /spark)
         *    wordCount.saveAsTextFile("hdfs://soy1:9000/spark/wordCount");
         */
        wordCounts.saveAsTextFile("G:/test/3");
        //将结果直接写入到dfs上
        //wordCounts.saveAsTextFile("hdfs://ip:8020/file/" + System.currentTimeMillis());
        System.out.println("单词计数完成!!!!!!!!!!!!!!");
        /**
         * 9、关闭SparkContext容器,结束本次作业
         */
        sc.close();
    }
}

Scala版

Spark WordCount简单案例(java,scala版)

Spark WordCount简单案例(java,scala版)

Spark WordCount简单案例(java,scala版)

package com.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    /**
      * 第1步;创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息
      * 例如 setAppName用来设置应用程序的名称,在程序运行的监控界面可以看到该名称,
      * setMaster设置程序运行在本地还是运行在集群中,运行在本地可是使用local参数,也可以使用local[K]/local[*],
      * 可以去spark官网查看它们不同的意义。

 如果要运行在集群中,以Standalone模式运行的话,需要使用spark:

//HOST:PORT的形式指定master的IP和端口号,默认是7077
      */
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    //  val conf = new SparkConf().setAppName("WordCount").setMaster("spark://master:7077")                     // 运行在集群中

    /**
      * 第2步:创建SparkContext 对象
      * SparkContext是Spark程序所有功能的唯一入口
      * SparkContext核心作用: 初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
      * 同时还会负责Spark程序往Master注册程序
      *
      * 通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
      */
    val sc = new SparkContext(conf)

    /**
      * 第3步: 根据具体的数据来源(HDFS HBase、Local FS、DB S3等)通过SparkContext来创建RDD
      * RDD 的创建基本有三种方式: 根据外部的数据来源(例如HDFS)、根据Scala集合使用SparkContext的parallelize方法、
      * 由其他的RDD操作产生
      * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
      */

    val lines = sc.textFile("D:/resources/README.md") // 读取本地文件
    // 读取HDFS文件,并切分成不同的Partition
    //  val lines = sc.textFile("/library/wordcount/input")
    // 或者明确指明是从HDFS上获取数据
    //  val lines = sc.textFile("hdfs://master:9000/libarary/wordcount/input")

    /**
      * 第4步: 对初始的RDD进行Transformation级别的处理,例如 map、filter等高阶函数来进行具体的数据计算
      */
    // 拆分单词,并过滤掉空格,当然还可以继续进行过滤,如去掉标点符号
    val words = lines.flatMap(_.split(" ")).filter(word => word != " ")

    // 在单词拆分的基础上对每个单词实例计数为1, 也就是 word => (word, 1)
    val pairs = words.map(word => (word, 1))

    // 在每个单词实例计数为1的基础之上统计每个单词在文件中出现的总次数, 即key相同的value相加
    val wordscount = pairs.reduceByKey(_ + _)
    //  val wordscount = pairs.reduceByKey((v1, v2) => v1 + v2)  // 等同于

    // 打印结果,使用collect会将集群中的数据收集到当前运行drive的机器上,需要保证单台机器能放得下所有数据
    wordscount.collect.foreach(println)

    sc.stop() // 释放资源

  }
}