SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件
实验思路:
- 在Linux上,创建/data/sparkshell目录,用于存储实验所需的数据。
- 切换目录到/data/sparkshell下,并从指定网址下载buyer_favorite文件。
- 使用jps查看Hadoop以及Spark的相关进程是否已经启动,若未启动则执行启动命令。
- 将Linux本地/data/sparkshell/buyer_favorite文件,上传到HDFS上的/mysparkshell目录下。
- 启动spark-shell
- 加载数据,创建RDD
- 对RDD进行统计,并将结果输出至hdfs上的/mysparkshell中。
- 将hdfs中的结果文件导入到本地/data/ans20/ans20_ans1.txt文件中
SparkSQL,创建表,查询数据
任务内容
某电商平台,需要对订单数据进行分析,已知订单数据包括两个文件,分别为订单数据orders和订单明细数据order_items,orders记录了用户购买商品的订单ID,订单号,用户ID及下单日期。order_items记录了商品ID,订单ID以及明细ID。它们的结构与关系如下图所示:
orders表:(order_id,order_number,buyer_id,create_dt)
- 订单ID 订单号 用户ID 下单日期
- 52304 111215052630 176474 2011-12-15 04:58:21
- 52303 111215052629 178350 2011-12-15 04:45:31
- 52302 111215052628 172296 2011-12-15 03:12:23
- 52301 111215052627 178348 2011-12-15 02:37:32
- 52300 111215052626 174893 2011-12-15 02:18:56
- 52299 111215052625 169471 2011-12-15 01:33:46
- 52298 111215052624 178345 2011-12-15 01:04:41
- 52297 111215052623 176369 2011-12-15 01:02:20
- 52296 111215052622 178343 2011-12-15 00:38:02
- 52295 111215052621 178342 2011-12-15 00:18:43
order_items表:(item_id,order_id,goods_id )
- 明细ID 订单ID 商品ID
- 252578 52293 1016840
- 252579 52293 1014040
- 252580 52294 1014200
- 252581 52294 1001012
- 252582 52294 1022245
- 252583 52294 1014724
- 252584 52294 1010731
- 252586 52295 1023399
- 252587 52295 1016840
- 252592 52296 1021134
- 252593 52296 1021133
- 252585 52295 1021840
- 252588 52295 1014040
- 252589 52296 1014040
- 252590 52296 1019043
创建orders表和order_items表,并统计该电商网站都有哪些用户购买了什么商品。
任务步骤
1.首先检查Hadoop相关进程,是否已经启动。若未启动,切换到/apps/hadoop/sbin目录下,启动Hadoop。
- jps
- cd /apps/hadoop/sbin
- ./start-all.sh
2.在Linux本地新建/data/spark5目录。
- mkdir -p /data/spark5
3.切换到/data/spark5目录下,使用wget命令,下载http://192.168.1.100:60000/allfiles/spark5中的orders和order_items。
- cd /data/spark5
- wget http://192.168.1.100:60000/allfiles/spark5/orders
- wget http://192.168.1.100:60000/allfiles/spark5/order_items
4.首先,在HDFS上新建/myspark5目录,然后将/data/spark5目录下的orders与order_items文件,上传到HDFS的/myspark5目录下。
- hadoop fs -mkdir /myspark5
- hadoop fs -put /data/spark5/orders /myspark5
- hadoop fs -put /data/spark5/order_items /myspark5
5.启动Spark Shell。
- spark-shell
6.在spark-shell下,使用case class方式定义RDD,创建orders表。
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- import sqlContext.implicits._
- case class Orders(order_id:String,order_number:String,buyer_id:String,create_dt:String)
- val dforders = sc.textFile("/myspark5/orders").map(_.split('\t')).map(line=>Orders(line(0),line(1),line(2),line(3))).toDF()
- dforders.registerTempTable("orders")
验证创建的表是否成功。
- sqlContext.sql("show tables").map(t=>"tableName is:"+t(0)).collect().foreach(println)
- sqlContext.sql("select order_id,buyer_id from orders").collect
7.在Spark Shell下,使用applyScheme方式定义RDD,创建order_items表。
- import org.apache.spark.sql._
- import org.apache.spark.sql.types._
- val rddorder_items = sc.textFile("/myspark5/order_items")
- val roworder_items = rddorder_items.map(_.split("\t")).map( p=>Row(p(0),p(1),p(2) ) )
- val schemaorder_items = "item_id order_id goods_id"
- val schema = StructType(schemaorder_items.split(" ").map(fieldName=>StructField(fieldName,StringType,true)) )
- val dforder_items = sqlContext.applySchema(roworder_items, schema)
- dforder_items.registerTempTable("order_items")
验证创建表是否成功。
- sqlContext.sql("show tables").map(t=>"tableName is:"+t(0)).collect().foreach(println)
- sqlContext.sql("select order_id,goods_id from order_items ").collect
8.将order表及order_items表进行join操作,统计该电商网站,都有哪些用户购买了什么商品。
- sqlContext.sql("select orders.buyer_id, order_items.goods_id from order_items join orders on order_items.order_id=orders.order_id ").collect
9.退出Spark shell模式
- exit
下面演示Spark SQL模式
10.启动Spark SQL。
- spark-sql
11.创建表orders及表order_items。
- create table orders (order_id string,order_number string,buyer_id string,create_dt string)
- row format delimited fields terminated by '\t' stored as textfile;
- create table order_items(item_id string,order_id string,goods_id string)
- row format delimited fields terminated by '\t' stored as textfile;
12.查看已创建的表。
- show tables;
表名后的false意思是该表不是临时表。
13.将HDFS中/myspark5下的orders表和order_items表中数据加载进刚创建的两个表中。
- load data inpath '/myspark5/orders' into table orders;
- load data inpath '/myspark5/order_items' into table order_items;
14.验证数据是否加载成功。
- select * from orders;
- select * from order_items;
15.处理文件,将order表及order_items表进行join操作,统计该电商网站,都有哪些用户购买了什么商品。
- select orders.buyer_id, order_items.goods_id from order_items join orders on order_items.order_id=orders.order_id;
SparkSQL加载文件,处理文件,存储文件
1.首先检查Hadoop相关进程,是否已经启动。若未启动,切换到/apps/hadoop/sbin目录下,启动Hadoop。
- jps
- cd /apps/hadoop/sbin
- ./start-all.sh
2.在Linux本地新建/data/spark6目录。
- mkdir -p /data/spark6
切换到/data/spark6目录下,使用wget命令,下载http://192.168.1.100:60000/allfiles/spark6中的文件。
- cd /data/spark6
- wget http://192.168.1.100:60000/allfiles/spark6/goods_visit.json
3.将文件goods_visit.json,上传到HDFS的/myspark6目录下,若目录不存在则需提前创建。
- hadoop fs -mkdir /myspark6
- hadoop fs -put /data/spark6/goods_visit.json /myspark6
4.启动Spark Shell。
- spark-shell
5.读取HDFS中/myspark6的goods_visit.json文件。
- val df=sqlContext.read.json("hdfs://localhost:9000/myspark6/goods_visit.json")
6.查看goods_visit.json中的所有数据。
- df.show()
7.查看goods_visit.json的表结构。
- df.printSchema()
8.只查看商品ID(goods_id)。
- df.select("goods_id").show()
9.统计文件行数。
- df.count
10.条件查询,查询点击次数超过500商品。(show是返回字段和表数据,collect是返回集合)
- df.filter(df("click_num")>500).show
11.统计点击次数的最值、总和及平均数。
- df.agg(max("click_num"),sum("click_num"),min("click_num"),avg("click_num")).show
12.过滤点击次数小于200的商品。
- df.filter(df("click_num") < 200).show()
13.按点击次数进行分组统计。
- df.groupBy("click_num").count().show()
14.读取goods_visit.json文件,保存为parquet格式。
- val df = sqlContext.read.format("json").load("hdfs://localhost:9000/myspark6/goods_visit.json")
- df.select("goods_id", "click_num").write.format("parquet").save("goods_visit.parquet")
15.查看保存的goods_visit.parquet文件。
- hadoop fs -ls /user/zhangyu
1.在Linux上,创建/data/sparksql目录,由于存储实验所需的数据。
2.切换目录到/data/sparksql下,并从指定网址下载goods_visit.json文件。
3.使用jps查看Hadoop的相关进程是否已经启动。若未启动则执行启动命令。
4.将本地/data/sparksql/goods_visit.json文件,上传到HDFS上的/mysparksql目录下。
5.启动spark-shell
6.使用spark sql命令,读取HDFS中/mysparksql下的goods_visit.json文件。
7.使用spark sql命令,查看商品ID(goods_id)列数据,并将商品ID(goods_id)列数据保存到/data/ans22/ans22_ans1.txt文件中。
8.使用spark sql命令,查询点击次数超过300商品,将查询结果保存至/data/ans22/ans22_ans2.txt文件中。
9.使用spark sql命令,统计点击次数的最大值、最小值、总和、平均数,将查询结果保存至/data/ans22/ans22_ans3.txt文件中。
10.使用spark sql命令,增加一列数据,列名为“clicksum”,数值为click_num的100倍。
11.读取goods_visit.json文件并将文件格式保存为parquet格式,存放至/data/ans22目录下。
12.查看保存的goods_visit.parquet文件。
[试题1-代码-截图]
1.在Linux上,创建/data/sparksql目录 ,并切换目录到/data/sparksql下
2.从指定网址下载goods_visit.json文件到本地/data/sparksql目录下。
3.使用jps查看Hadoop的相关进程是否已经启动。若未启动则执行启动命令。
4.将本地/data/sparksql/goods_visit.json文件,上传到HDFS上的/mysparksql目录下。
5.启动spark-shell
6.使用spark sql命令,读取HDFS中/mysparksql下的goods_visit.json文件。
7.使用spark sql命令,查看商品ID(goods_id)列数据,并将商品ID(goods_id)列数据保存到/data/ans22/ans22_ans1.txt文件中。
(1)查看商品ID(goods_id)列数据
(2)将商品ID(goods_id)列数据保存到/data/ans22/ans22_ans1.txt文件中
8.使用spark sql命令,查询点击次数超过300商品,将查询结果保存至/data/ans22/ans22_ans2.txt文件中。
(1)查询点击次数超过300商品
(2)将查询结果保存至/data/ans22/ans22_ans2.txt文件中
9.使用spark sql命令,统计点击次数的最大值、最小值、总和、平均数,将查询结果保存至/data/ans22/ans22_ans3.txt文件中。
(1)统计点击次数的最大值、最小值、总和、平均数
10.使用spark sql命令,增加一列数据,列名为“clicksum”,数值为click_num的100倍。
11.读取goods_visit.json文件并将文件格式保存为parquet格式,存放至/data/ans22目录下。
(1)读取goods_visit.json文件并将文件格式保存为parquet格式
(2)存放至/data/ans22目录下
12.查看保存的goods_visit.parquet文件。