spark编程的例子
问题需求:
表1记录了学生的 ID 、姓名 、 性别 、 学历 , 表2 记录里学生的ID,要求根据表2 中的 ID从表1中筛选出符合要求的学生的信息
思路:
把表2和表1合起来 -> 以ID为键构成(K,V)对 -> 把相同的键对应的值加起来 -> 将每个值拆分并提取特征 -> 选出符号要求的项
表1 - 学生信息
1 张三 男 本科 2 李四 男 专科 3 王五 男 硕士 4 赵六 男 博士 5 杨七 女 本科 6 黄八 女 博士 7 咸九 女 硕士 8 方十 女 专科 9 周天 女 本科 10 陈二 男 博士 11 韩一 女 专科 12 杜岚 女 硕士 13 冯马 男 本科 14 丁一 女 专科 15 熊四 男 本科 16 孙齐 女 专科 17 钱多 女 硕士 18 吴花 男 博士 19 郑钱 女 本科 20 刘海 男 硕士 21 孔一 女 硕士
表2 - 学生ID
2 4 5 8 12 15 17 20
编写Scala程序
import org.apache.log4j.Logger import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.Level object Test { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.OFF) //去掉无用的提示信息,便于观察输出 Logger.getLogger("com").setLevel(Level.OFF) System.setProperty("spark.ui.showConsoleProgress","false") Logger.getRootLogger().setLevel(Level.OFF) val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("MyTest")) val textFile1 = sc.textFile("/home/test/1.txt") //导入表1 val textFile2 = sc.textFile("/home/test/2.txt") //导入表2 val textFile3 = textFile2.++(textFile1).map{line=> //我们把表2和表1拟合起来,形成一个RDD val item =line.split(" ") (item(0),line) }.reduceByKey(_+" "+_).filter{line=> //把相同key下的value值加起来,相同key其实就是我们要找的目标学生(同时存在于表1和表2) val ite= line._2.split(" ") if (ite(0).equals(ite(1))) false else true}.map(_._2) //得到的是不符合条件的学生 val textFile4 = textFile1.subtract(textFile3).sortBy(_.split(" ")(0)) //用减法求得符合条件的学生 println("符合查询条件的学生:") textFile4.foreach(println(_)) println("--------------------") println("不符合查询条件的:") textFile3.foreach(println(_)) } }
观察输出
注意:
假如你想编好程序放到spark集群上跑,发现stdout的 中文输出乱码,那么:
<1. > 检查spark的运行环境
先随便跑一个集群,然后webUi中查看http://Master:4040 , 里面查看Environment,查找 file.encoding 和 sun.jnu.encoding 的值看看是否是 UTF8
<2. > 假如你在docker中部署的spark
敲上代码: locale ,会出现该容器的编码格式。如图:
[[email protected] local]# locale LANG= LC_CTYPE="POSIX" LC_NUMERIC="POSIX" LC_TIME="POSIX" LC_COLLATE="POSIX" LC_MONETARY="POSIX" LC_MESSAGES="POSIX" LC_PAPER="POSIX" LC_NAME="POSIX" LC_ADDRESS="POSIX" LC_TELEPHONE="POSIX" LC_MEASUREMENT="POSIX" LC_IDENTIFICATION="POSIX" LC_ALL=
编码格式POSIX不支持中文,所以你需要设置成UTF8格式,我们输入指令 locale -a 看看本地支持什么编码
[[email protected] local]# locale -a C en_AG en_AG.utf8 en_AU ......... en_US.utf8 en_ZA en_ZA.iso88591 en_ZA.utf8 en_ZM en_ZM.utf8 en_ZW en_ZW.iso88591 en_ZW.utf8 POSIX
看到系统支持en_US.utf8格式(你有en_CN.utf8更好,没有就用这个也行),这个是支持中文的。我们可以把该容器临时设置成UTF8格式。步骤是 ,输入指令 vim /etc/profile ,在该文件中追加一句 export LANG="en_US.utf8" ,保存退出,source /etc/profile 即可,现在在启动hdfs,启动集群,再跑文件就输出中文了。
因为是临时改的编码格式,所以你每次重启容器之后,都要source /etc/profile ,否则编码格式还是以前的老样子。