SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

实验思路:

  1. 在Linux上,创建/data/sparkshell目录,用于存储实验所需的数据。
  2. 切换目录到/data/sparkshell下,并从指定网址下载buyer_favorite文件。
  3. 使用jps查看Hadoop以及Spark的相关进程是否已经启动,若未启动则执行启动命令。
  4. 将Linux本地/data/sparkshell/buyer_favorite文件,上传到HDFS上的/mysparkshell目录下。
  5. 启动spark-shell
  6. 加载数据,创建RDD
  7. 对RDD进行统计并将结果输出至hdfs上的/mysparkshell
  8. 将hdfs中的结果文件导入到本地/data/ans20/ans20_ans1.txt文件中

 

 

SparkSQL,创建表,查询数据

任务内容

某电商平台,需要对订单数据进行分析,已知订单数据包括两个文件,分别为订单数据orders和订单明细数据order_items,orders记录了用户购买商品的订单ID,订单号,用户ID及下单日期。order_items记录了商品ID,订单ID以及明细ID。它们的结构与关系如下图所示:

 

orders表:(order_id,order_number,buyer_id,create_dt)

view plain copy

  1. 订单ID   订单号          用户ID    下单日期  
  2. 52304    111215052630    176474    2011-12-15 04:58:21  
  3. 52303    111215052629    178350    2011-12-15 04:45:31  
  4. 52302    111215052628    172296    2011-12-15 03:12:23  
  5. 52301    111215052627    178348    2011-12-15 02:37:32  
  6. 52300    111215052626    174893    2011-12-15 02:18:56  
  7. 52299    111215052625    169471    2011-12-15 01:33:46  
  8. 52298    111215052624    178345    2011-12-15 01:04:41  
  9. 52297    111215052623    176369    2011-12-15 01:02:20  
  10. 52296    111215052622    178343    2011-12-15 00:38:02  
  11. 52295    111215052621    178342    2011-12-15 00:18:43  

order_items表:(item_id,order_id,goods_id )

view plain copy

  1. 明细ID  订单ID   商品ID  
  2. 252578    52293    1016840  
  3. 252579    52293    1014040  
  4. 252580    52294    1014200  
  5. 252581    52294    1001012  
  6. 252582    52294    1022245  
  7. 252583    52294    1014724  
  8. 252584    52294    1010731  
  9. 252586    52295    1023399  
  10. 252587    52295    1016840  
  11. 252592    52296    1021134  
  12. 252593    52296    1021133  
  13. 252585    52295    1021840  
  14. 252588    52295    1014040  
  15. 252589    52296    1014040  
  16. 252590    52296    1019043  

创建orders表和order_items表,并统计该电商网站都有哪些用户购买了什么商品。

任务步骤

1.首先检查Hadoop相关进程,是否已经启动。若未启动,切换到/apps/hadoop/sbin目录下,启动Hadoop。

view plain copy

  1. jps  
  2. cd /apps/hadoop/sbin  
  3. ./start-all.sh  

2.在Linux本地新建/data/spark5目录。

view plain copy

  1. mkdir -p /data/spark5  

3.切换到/data/spark5目录下,使用wget命令,下载http://192.168.1.100:60000/allfiles/spark5中的orders和order_items。

view plain copy

  1. cd /data/spark5  
  2. wget http://192.168.1.100:60000/allfiles/spark5/orders  
  3. wget http://192.168.1.100:60000/allfiles/spark5/order_items  

4.首先,在HDFS上新建/myspark5目录,然后将/data/spark5目录下的orders与order_items文件,上传到HDFS的/myspark5目录下。

view plain copy

  1. hadoop fs -mkdir /myspark5  
  2. hadoop fs -put /data/spark5/orders /myspark5  
  3. hadoop fs -put /data/spark5/order_items /myspark5  

5.启动Spark Shell。

view plain copy

  1. spark-shell  

6.在spark-shell下,使用case class方式定义RDD,创建orders表。

view plain copy

  1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
  2. import sqlContext.implicits._  
  3. case class Orders(order_id:String,order_number:String,buyer_id:String,create_dt:String)  
  4. val dforders = sc.textFile("/myspark5/orders").map(_.split('\t')).map(line=>Orders(line(0),line(1),line(2),line(3))).toDF()  
  5. dforders.registerTempTable("orders")  

 

验证创建的表是否成功。

view plain copy

  1. sqlContext.sql("show tables").map(t=>"tableName is:"+t(0)).collect().foreach(println)  
  2. sqlContext.sql("select order_id,buyer_id from orders").collect  

 

7.在Spark Shell下,使用applyScheme方式定义RDD,创建order_items表。

view plain copy

  1. import org.apache.spark.sql._  
  2. import org.apache.spark.sql.types._  
  3. val rddorder_items = sc.textFile("/myspark5/order_items")  
  4. val roworder_items = rddorder_items.map(_.split("\t")).map( p=>Row(p(0),p(1),p(2) ) )  
  5. val schemaorder_items = "item_id order_id goods_id"  
  6. val schema = StructType(schemaorder_items.split(" ").map(fieldName=>StructField(fieldName,StringType,true)) )  
  7. val dforder_items = sqlContext.applySchema(roworder_items, schema)  
  8. dforder_items.registerTempTable("order_items")  

 

验证创建表是否成功。

view plain copy

  1. sqlContext.sql("show tables").map(t=>"tableName is:"+t(0)).collect().foreach(println)  
  2. sqlContext.sql("select order_id,goods_id from order_items ").collect  

 

8.将order表及order_items表进行join操作,统计该电商网站,都有哪些用户购买了什么商品。

view plain copy

  1. sqlContext.sql("select orders.buyer_id, order_items.goods_id from order_items  join orders on order_items.order_id=orders.order_id ").collect  

 

9.退出Spark shell模式

view plain copy

  1. exit  

下面演示Spark SQL模式

10.启动Spark SQL。

view plain copy

  1. spark-sql  

11.创建表orders及表order_items。

view plain copy

  1. create table orders (order_id string,order_number string,buyer_id string,create_dt string)  
  2. row format delimited fields terminated by '\t'  stored as textfile;  

view plain copy

  1. create table order_items(item_id string,order_id string,goods_id string)  
  2. row format delimited fields terminated by '\t'  stored as textfile;  

12.查看已创建的表。

view plain copy

  1. show tables;  

 

表名后的false意思是该表不是临时表。

13.将HDFS中/myspark5下的orders表和order_items表中数据加载进刚创建的两个表中。

view plain copy

  1. load data inpath '/myspark5/orders' into table orders;  
  2. load data inpath '/myspark5/order_items' into table order_items;  

 

14.验证数据是否加载成功。

view plain copy

  1. select * from orders;  
  2. select * from order_items;  

 

 

15.处理文件,将order表及order_items表进行join操作,统计该电商网站,都有哪些用户购买了什么商品。

view plain copy

  1. select orders.buyer_id, order_items.goods_id from order_items join orders on order_items.order_id=orders.order_id;  

 

 

SparkSQL加载文件,处理文件,存储文件

1.首先检查Hadoop相关进程,是否已经启动。若未启动,切换到/apps/hadoop/sbin目录下,启动Hadoop。

view plain copy

  1. jps  
  2. cd /apps/hadoop/sbin  
  3. ./start-all.sh  

2.在Linux本地新建/data/spark6目录。

view plain copy

  1. mkdir -p /data/spark6  

切换到/data/spark6目录下,使用wget命令,下载http://192.168.1.100:60000/allfiles/spark6中的文件。

view plain copy

  1. cd /data/spark6  
  2. wget http://192.168.1.100:60000/allfiles/spark6/goods_visit.json  

3.将文件goods_visit.json,上传到HDFS的/myspark6目录下,若目录不存在则需提前创建。

view plain copy

  1. hadoop fs -mkdir /myspark6  
  2. hadoop fs -put /data/spark6/goods_visit.json /myspark6  

4.启动Spark Shell。

view plain copy

  1. spark-shell  

5.读取HDFS中/myspark6的goods_visit.json文件。

view plain copy

  1. val df=sqlContext.read.json("hdfs://localhost:9000/myspark6/goods_visit.json")  

 

6.查看goods_visit.json中的所有数据。

view plain copy

  1. df.show()  

 

7.查看goods_visit.json的表结构。

view plain copy

  1. df.printSchema()  

 

8.只查看商品ID(goods_id)。

view plain copy

  1. df.select("goods_id").show()  

 

9.统计文件行数。

view plain copy

  1. df.count  

 

10.条件查询,查询点击次数超过500商品。(show是返回字段和表数据,collect是返回集合)

view plain copy

  1. df.filter(df("click_num")>500).show  

 

 

11.统计点击次数的最值、总和及平均数。

view plain copy

  1. df.agg(max("click_num"),sum("click_num"),min("click_num"),avg("click_num")).show  

 

12.过滤点击次数小于200的商品。

view plain copy

  1. df.filter(df("click_num") < 200).show()  

 

13.按点击次数进行分组统计。

view plain copy

  1. df.groupBy("click_num").count().show()  

 

14.读取goods_visit.json文件,保存为parquet格式。

view plain copy

  1. val df = sqlContext.read.format("json").load("hdfs://localhost:9000/myspark6/goods_visit.json")  
  2. df.select("goods_id""click_num").write.format("parquet").save("goods_visit.parquet")  

 

15.查看保存的goods_visit.parquet文件。

view plain copy

  1. hadoop fs -ls /user/zhangyu  

 

1.在Linux上,创建/data/sparksql目录,由于存储实验所需的数据。

2.切换目录到/data/sparksql下,并从指定网址下载goods_visit.json文件。

3.使用jps查看Hadoop的相关进程是否已经启动。若未启动则执行启动命令。

4.将本地/data/sparksql/goods_visit.json文件,上传到HDFS上的/mysparksql目录下。

5.启动spark-shell

6.使用spark sql命令,读取HDFS中/mysparksql下的goods_visit.json文件。 

7.使用spark sql命令,查看商品ID(goods_id)列数据,并将商品ID(goods_id)列数据保存到/data/ans22/ans22_ans1.txt文件中。

 8.使用spark sql命令,查询点击次数超过300商品,将查询结果保存至/data/ans22/ans22_ans2.txt文件中。

 9.使用spark sql命令,统计点击次数的最大值、最小值、总和、平均数,将查询结果保存至/data/ans22/ans22_ans3.txt文件中。

 10.使用spark sql命令,增加一列数据,列名为“clicksum”,数值为click_num的100倍。

 11.读取goods_visit.json文件并将文件格式保存为parquet格式,存放至/data/ans22目录下。 

12.查看保存的goods_visit.parquet文件。

[试题1-代码-截图]

1.在Linux上,创建/data/sparksql目录 ,并切换目录到/data/sparksql下

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

 

2.从指定网址下载goods_visit.json文件到本地/data/sparksql目录下。 

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

 

3.使用jps查看Hadoop的相关进程是否已经启动。若未启动则执行启动命令。

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

 

4.将本地/data/sparksql/goods_visit.json文件,上传到HDFS上的/mysparksql目录下。 

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

 

5.启动spark-shell

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

 

6.使用spark sql命令,读取HDFS中/mysparksql下的goods_visit.json文件。 

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

 

7.使用spark sql命令,查看商品ID(goods_id)列数据,并将商品ID(goods_id)列数据保存到/data/ans22/ans22_ans1.txt文件中。 

(1)查看商品ID(goods_id)列数据

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件(2)将商品ID(goods_id)列数据保存到/data/ans22/ans22_ans1.txt文件中

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

 

8.使用spark sql命令,查询点击次数超过300商品,将查询结果保存至/data/ans22/ans22_ans2.txt文件中。 

(1)查询点击次数超过300商品

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

(2)将查询结果保存至/data/ans22/ans22_ans2.txt文件中

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

 9.使用spark sql命令,统计点击次数的最大值、最小值、总和、平均数,将查询结果保存至/data/ans22/ans22_ans3.txt文件中。 

(1)统计点击次数的最大值、最小值、总和、平均数

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

10.使用spark sql命令,增加一列数据,列名为“clicksum”,数值为click_num的100倍。 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

11.读取goods_visit.json文件并将文件格式保存为parquet格式,存放至/data/ans22目录下。  

(1)读取goods_visit.json文件并将文件格式保存为parquet格式

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件


(2)存放至/data/ans22目录下

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件

12.查看保存的goods_visit.parquet文件。

 

SparkSQL,创建表,查询数据,加载文件,处理文件,存储文件