【项目二】项目实时统计

 

1.项目需求

【项目二】项目实时统计

2.项目过程

【项目二】项目实时统计

3.数据格式

ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1449137597979&p_url=http://list.iqiyi.com/www/4/---.html

4.项目开发

4.1 配置maven

[html] view plain copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"  
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  5.     <modelVersion>4.0.0</modelVersion>  
  6.   
  7.     <groupId>com.aura.spark</groupId>  
  8.     <artifactId>1711categorycount</artifactId>  
  9.     <version>1.0-SNAPSHOT</version>  
  10.   
  11.     <dependencies>  
  12.   
  13.         <dependency>  
  14.             <groupId>org.apache.hadoop</groupId>  
  15.             <artifactId>hadoop-client</artifactId>  
  16.             <version>2.6.5</version>  
  17.         </dependency>  
  18.   
  19.         <dependency>  
  20.             <groupId>org.apache.spark</groupId>  
  21.             <artifactId>spark-streaming_2.11</artifactId>  
  22.             <version>2.3.0</version>  
  23.         </dependency>  
  24.   
  25.   
  26.         <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->  
  27.         <dependency>  
  28.             <groupId>org.apache.spark</groupId>  
  29.             <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>  
  30.             <version>2.3.0</version>  
  31.   
  32.         </dependency>  
  33.   
  34.         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->  
  35.         <dependency>  
  36.             <groupId>org.apache.hbase</groupId>  
  37.             <artifactId>hbase-client</artifactId>  
  38.             <version>0.98.6-hadoop2</version>  
  39.         </dependency>  
  40.   
  41.   
  42.         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->  
  43.         <dependency>  
  44.             <groupId>org.apache.hbase</groupId>  
  45.             <artifactId>hbase-server</artifactId>  
  46.             <version>0.98.6-hadoop2</version>  
  47.         </dependency>  
  48.   
  49.   
  50.     </dependencies>  
  51.   
  52.   
  53. </project>  

4.2 模拟生成样本数据

[java] view plain copy
  1. package com.jenrey.spark.utils;  
  2.   
  3. import java.io.BufferedWriter;  
  4. import java.io.FileWriter;  
  5. import java.io.IOException;  
  6. import java.util.Random;  
  7.   
  8. /** 
  9.  * 模拟两万条数据并且保存到指定目录 
  10.  */  
  11. public class SimulateData {  
  12.     public static void main(String[] args) {  
  13.         BufferedWriter bw = null;  
  14.         try {  
  15.             bw = new BufferedWriter(new FileWriter("G:\\workspace\\categorycount\\src\\main\\java\\com\\jenrey\\spark\\data.txt"));  
  16.             int i = 0;  
  17.   
  18.             while (i < 20000) {  
  19.                 long time = System.currentTimeMillis();  
  20.                 int categoryid = new Random().nextInt(23);  
  21.                 bw.write("ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=" + time + "&p_url=http://list.iqiyi.com/www/" + categoryid + "/---.html");  
  22.                 bw.newLine();   //换行  
  23.                 i++;  
  24.             }  
  25.         } catch (IOException e) {  
  26.             e.printStackTrace();  
  27.         } finally {  
  28.             try {  
  29.                 bw.close();  
  30.             } catch (IOException e) {  
  31.                 e.printStackTrace();  
  32.             } finally {  
  33.             }  
  34.         }  
  35.     }  
  36. }  

运行代码即可,会生成模拟数据

4.3 使用SparkStreaming从kafka中读取数据

[java] view plain copy
  1. package com.jenrey.spark.category;  
  2.   
  3. import kafka.serializer.StringDecoder;  
  4. import org.apache.spark.SparkConf;  
  5. import org.apache.spark.api.java.JavaSparkContext;  
  6. import org.apache.spark.api.java.function.Function;  
  7. import org.apache.spark.streaming.Durations;  
  8. import org.apache.spark.streaming.api.java.JavaDStream;  
  9. import org.apache.spark.streaming.api.java.JavaStreamingContext;  
  10. import org.apache.spark.streaming.kafka.KafkaUtils;  
  11. import scala.Tuple2;  
  12.   
  13. import java.util.HashMap;  
  14. import java.util.HashSet;  
  15.   
  16. /** 
  17.  * SparkStreaming的数据来源来自于Kafka的topics的aura 
  18.  */  
  19. public class CategoryRealCount {  
  20.     public static void main(String[] args) {  
  21.         //初始化程序入口  
  22.         SparkConf conf = new SparkConf();  
  23.         conf.setMaster("local");  
  24.         conf.setAppName("CategoryRealCount");  
  25.         JavaSparkContext sc = new JavaSparkContext(conf);  
  26.         JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(3));  
        //Durations.seconds(3)批处理间隔为3秒
  27.         /*或者使用下面方法就自动创建SparkContext() 
  28.         JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3));*/  
  29.   
  30.         //读取数据  
  31.         HashMap<String, String> KafkaParams = new HashMap<>();  
  32.         KafkaParams.put("metadata.broker.list""hadoop04:9092");  
  33.         HashSet<String> topics = new HashSet<>();  //创建topics 主题
  34.         topics.add("aura");  //添加主题 aura
  35.         JavaDStream<String> logDStream = KafkaUtils.createDirectStream(  
  36.                 ssc,  
  37.                 String.class,  //指定key的返回类型
  38.                 String.class,  //指定value的返回类型
  39.                 StringDecoder.class,  //解码器
  40.                 StringDecoder.class,  
  41.                 KafkaParams,  
  42.                 topics  
  43.         ).map(new Function<Tuple2<String, String>, String>() { 
  44.             //Function<Tuple2<String, String>, String>: Function函数类 Tuple2<String, String>传入类型(出就是读取每一行,key-value的形式),String返回类型
  45.             @Override  

  46.             public String call(Tuple2<String, String> tuple2) throws Exception {  
  47.                 //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder]  
  48.                 //直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2)  
  49.                 return tuple2._2;  
  50.             }  
  51.         });  
  52.         //代码的逻辑  
  53.         logDStream.print();  
  54.         //启动应用程序  
  55.         ssc.start();  
  56.         try {  
  57.             ssc.awaitTermination();  
  58.         } catch (InterruptedException e) {  
  59.             e.printStackTrace();  
  60.         }  
  61.         ssc.stop();  
  62.   
  63.     }  
  64. }  

5.启动kafka的HA集群

注意:先启动zookeeper

[[email protected] kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties

[[email protected] kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties

[[email protected] kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties

[[email protected] kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties


创建topics  主题aura

[[email protected] kafka_2.11-1.1.0]$  bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 3 --partitions 3 --topic aura

【项目二】项目实时统计

查看topics副本

[[email protected] kafka_2.11-1.1.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic aura

【项目二】项目实时统计

删除topics

[[email protected] kafka_2.11-1.1.0]$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic aura

那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion

进入zk

zkCli.sh

找到topic所在的目录:ls /brokers/topics

到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。


6.模拟数据实时产生

[[email protected] ~]$ cat data.txt | while read line  (从data.txt文件中按读取每一行)

> do     (开始)
> echo "$line" >> data.log  (读取每一行后 写入data.log)
> sleep 0.5  (睡眠时间 0.5)

> done  (结束)

【项目二】项目实时统计

data.logy存放目录下

【项目二】项目实时统计

(注意:F与f的区别:  F 是追踪文件    f   是追踪文件内容


7.编写并运行flume

[[email protected] ~]$ touch file2kafka.properties

[[email protected] ~]$ vim file2kafka.properties

[html] view plain copy
  1. #监控指定的目录,如果有新的文件产生,那么将文件的内容显示到控制台
    #list the sources, sinks and channels for the agent
    #agent的名称可以自定义 a1 sources,sinks,channels的名称也可以自定义

  2. a1.sources = r1  
  3. a1.sinks = k1  
  4. a1.channels = c1  
  5.   
  6. # 配置source
  7. a1.sources.r1.type = exec  
  8. # 监控指定的文件
  9. a1.sources.r1.command = tail -F /home/hadoop/data.log  
  10.   
  11. # 配置channel  
  12. a1.channels.c1.type = memory  
  13.   
  14. #配置sink  
  15. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
  16. #写入 aura 主题参数
  17. a1.sinks.k1.topic = aura  
  18. a1.sinks.k1.brokerList = hadoop02:9092  
  19. #flume 与kafka 是否要确认  1代表确认   -1代表不要确认
  20. a1.sinks.k1.requiredAcks = 1  
  21. #flume 每次给kafka发送几条数据
  22. a1.sinks.k1.batchSize = 5  
  23.   
  24. #source -> channel -> sink  
  25. # set channel for sink
    # 为sink指定他的channel
  26. a1.sources.r1.channels = c1  
  27. a1.sinks.k1.channel = c1  

启动flume:

[[email protected] ~]$ flume-ng agent --conf conf --conf-file file2kafka.properties --name a1 -Dflume.hadoop.logger=INFO,console 

【项目二】项目实时统计

或者是下图的日志:

【项目二】项目实时统计


查看kafka消费者能否消费的到数据:

[[email protected] kafka_2.11-1.1.0]$ bin/kafka-console-consumer.sh --zookeeper hadoop02:2181 --topic aura --from-beginning

【项目二】项目实时统计


运行categorycount.java程序后的效果图:

【项目二】项目实时统计

【项目二】项目实时统计

出现上图即代表:flume采集日志存到kafka中,然后SparkStreaming从kafka消费数据成功