Spark算子实战transforma(Scala)
- map算子:将集合中的每个元素乘2
package cn.spark.study.core import org.apache.spark.{SparkConf, SparkContext} object transformationOpertion { def main(args: Array[String]): Unit = { map() } def map(): Unit ={ val conf = new SparkConf().setAppName("map").setMaster("local") val sc = new SparkContext(conf) val numbers = Array(1,2,3,4,5) val numberRDD = sc.parallelize(numbers,1) val pair = numberRDD.map(x => x * 2) val result = pair.foreach(x => println(x)) } }
结果:
- filter算子:过滤出集合中的偶数
package cn.spark.study.core import org.apache.spark.{SparkConf, SparkContext} object transformationOpertion { def main(args: Array[String]): Unit = { //map() filter() } def filter(): Unit ={ val conf = new SparkConf().setMaster("local").setAppName("filter") val sc = new SparkContext(conf) val number = Array(1,2,3,4,5,6,7,8,9) val numRDD = sc.parallelize(number) val evenNum = numRDD.filter(x => x % 2 ==0) val result = evenNum.foreach(x => println(x)) } }
测试:
- flatmap:将行拆分为单词
def flatmap(): Unit ={ val conf = new SparkConf().setAppName("flatmap").setMaster("local") val sc = new SparkContext(conf) val strs = Array("hello world","ni hao","ha ni shi") val strRDD = sc.parallelize(strs) val flat = strRDD.flatMap(x => x.split(" ")) val result = flat.foreach(x => println(x)) }
测试:
- groupByKey:将每个班级的成绩进行分组
def groupBykey(): Unit ={ val conf = new SparkConf().setMaster("local").setAppName("groupBykey") val sc = new SparkContext(conf) val scoreList = Array(Tuple2("jike1",99),Tuple2("ruangong1",34),Tuple2("jike1",80), Tuple2("ruangong1",78)) val scoreRDD = sc.parallelize(scoreList) val result = scoreRDD.groupByKey() result.foreach(x => {println(x._1); x._2.foreach(x => println(x)); println("==========================")} ) } }
测试:
- reduceByKey:统计每个班级的总分
def rdcbykey(): Unit ={ val conf = new SparkConf().setMaster("local").setAppName("groupBykey") val sc = new SparkContext(conf) val scoreList = Array(Tuple2("jike1",99),Tuple2("ruangong1",34),Tuple2("jike1",80), Tuple2("ruangong1",78)) val sRDD = sc.parallelize(scoreList) val result = sRDD.reduceByKey(_ + _) result.foreach(x => println(x._1 + " : " + x._2)) } }
测试:
- sortByKey:将学生分数进行排序
def sortByKey() {
val conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local")
val sc = new SparkContext(conf)
val scoreList = Array(Tuple2(65, "leo"), Tuple2(50, "tom"),
Tuple2(100, "marry"), Tuple2(85, "jack"))
val scores = sc.parallelize(scoreList, 1)
val sortedScores = scores.sortByKey(false)
sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))
}
测试:
- join:打印每个学生的成绩
- cogroup:打印每个学生的成绩
def join() {
val conf = new SparkConf()
.setAppName("join")
.setMaster("local")
val sc = new SparkContext(conf)
val studentList = Array(
Tuple2(1, "leo"),
Tuple2(2, "jack"),
Tuple2(3, "tom"));
val scoreList = Array(
Tuple2(1, 100),
Tuple2(2, 90),
Tuple2(3, 60));
val students = sc.parallelize(studentList);
val scores = sc.parallelize(scoreList);
val studentScores = students.join(scores)
studentScores.foreach(studentScore => {
println("student id: " + studentScore._1);
println("student name: " + studentScore._2._1)
println("student socre: " + studentScore._2._2)
println("=======================================")
})
}
测试: