【项目二】项目实时统计
1.项目需求
2.项目过程
3.数据格式
ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=1449137597979&p_url=http://list.iqiyi.com/www/4/---.html
4.项目开发
4.1 配置maven
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.aura.spark</groupId>
- <artifactId>1711categorycount</artifactId>
- <version>1.0-SNAPSHOT</version>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.6.5</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>2.3.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
- <version>2.3.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>0.98.6-hadoop2</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>0.98.6-hadoop2</version>
- </dependency>
- </dependencies>
- </project>
4.2 模拟生成样本数据
- package com.jenrey.spark.utils;
- import java.io.BufferedWriter;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.util.Random;
- /**
- * 模拟两万条数据并且保存到指定目录
- */
- public class SimulateData {
- public static void main(String[] args) {
- BufferedWriter bw = null;
- try {
- bw = new BufferedWriter(new FileWriter("G:\\workspace\\categorycount\\src\\main\\java\\com\\jenrey\\spark\\data.txt"));
- int i = 0;
- while (i < 20000) {
- long time = System.currentTimeMillis();
- int categoryid = new Random().nextInt(23);
- bw.write("ver=1&en=e_pv&pl=website&sdk=js&b_rst=1920*1080&u_ud=12GH4079-223E-4A57-AC60-C1A04D8F7A2F&l=zh-CN&u_sd=8E9559B3-DA35-44E1-AC98-85EB37D1F263&c_time=" + time + "&p_url=http://list.iqiyi.com/www/" + categoryid + "/---.html");
- bw.newLine(); //换行
- i++;
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try {
- bw.close();
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- }
- }
- }
- }
运行代码即可,会生成模拟数据
4.3 使用SparkStreaming从kafka中读取数据
- package com.jenrey.spark.category;
- import kafka.serializer.StringDecoder;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import scala.Tuple2;
- import java.util.HashMap;
- import java.util.HashSet;
- /**
- * SparkStreaming的数据来源来自于Kafka的topics的aura
- */
- public class CategoryRealCount {
- public static void main(String[] args) {
- //初始化程序入口
- SparkConf conf = new SparkConf();
- conf.setMaster("local");
- conf.setAppName("CategoryRealCount");
- JavaSparkContext sc = new JavaSparkContext(conf);
-
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(3));
//Durations.seconds(3)批处理间隔为3秒
- /*或者使用下面方法就自动创建SparkContext()
- JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3));*/
- //读取数据
- HashMap<String, String> KafkaParams = new HashMap<>();
- KafkaParams.put("metadata.broker.list", "hadoop04:9092");
- HashSet<String> topics = new HashSet<>(); //创建topics 主题
- topics.add("aura"); //添加主题 aura
- JavaDStream<String> logDStream = KafkaUtils.createDirectStream(
- ssc,
- String.class, //指定key的返回类型
- String.class, //指定value的返回类型
- StringDecoder.class, //解码器
- StringDecoder.class,
- KafkaParams,
- topics
- ).map(new Function<Tuple2<String, String>, String>() {
- //Function<Tuple2<String, String>, String>: Function函数类 Tuple2<String, String>传入类型(出就是读取每一行,key-value的形式),String返回类型
- @Override
- public String call(Tuple2<String, String> tuple2) throws Exception {
- //kafka读出来数据是kv的形式[String代表k的数据类型(k可就是偏移位置的信息, String代表v的数据类型(kafka内每一条数据), StringDecoder代表的就是解码器, StringDecoder]
- //直接返回的是InputDStream[(String,String)]的KV数据类型,因为偏移位置的信息对我们是没有用的所以我们要.map(_._2)
- return tuple2._2;
- }
- });
- //代码的逻辑
- logDStream.print();
- //启动应用程序
- ssc.start();
- try {
- ssc.awaitTermination();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- ssc.stop();
- }
- }
5.启动kafka的HA集群
注意:先启动zookeeper
[[email protected] kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
[[email protected] kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
[[email protected] kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
[[email protected] kafka_2.11-1.1.0]$ bin/kafka-server-start.sh config/server.properties
创建topics 主题aura
[[email protected] kafka_2.11-1.1.0]$ bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 3 --partitions 3 --topic aura
查看topics副本
[[email protected] kafka_2.11-1.1.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic aura
删除topics
[[email protected] kafka_2.11-1.1.0]$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic aura
那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion
进入zk
zkCli.sh
找到topic所在的目录:ls /brokers/topics
到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
6.模拟数据实时产生
[[email protected] ~]$ cat data.txt | while read line (从data.txt文件中按读取每一行)
> do (开始)> echo "$line" >> data.log (读取每一行后 写入data.log)
> sleep 0.5 (睡眠时间 0.5)
> done (结束)
data.logy存放目录下
(注意:F与f的区别: F 是追踪文件 f 是追踪文件内容)
7.编写并运行flume
[[email protected] ~]$ touch file2kafka.properties
[[email protected] ~]$ vim file2kafka.properties
- #监控指定的目录,如果有新的文件产生,那么将文件的内容显示到控制台
#list the sources, sinks and channels for the agent
#agent的名称可以自定义 a1 sources,sinks,channels的名称也可以自定义 - a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # 配置source
- a1.sources.r1.type = exec
- # 监控指定的文件
- a1.sources.r1.command = tail -F /home/hadoop/data.log
- # 配置channel
- a1.channels.c1.type = memory
- #配置sink
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- #写入 aura 主题参数
- a1.sinks.k1.topic = aura
- a1.sinks.k1.brokerList = hadoop02:9092
- #flume 与kafka 是否要确认 1代表确认 -1代表不要确认
- a1.sinks.k1.requiredAcks = 1
- #flume 每次给kafka发送几条数据
- a1.sinks.k1.batchSize = 5
- #source -> channel -> sink
- # set channel for sink
# 为sink指定他的channel - a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
启动flume:
[[email protected] ~]$ flume-ng agent --conf conf --conf-file file2kafka.properties --name a1 -Dflume.hadoop.logger=INFO,console
或者是下图的日志:
查看kafka消费者能否消费的到数据:
[[email protected] kafka_2.11-1.1.0]$ bin/kafka-console-consumer.sh --zookeeper hadoop02:2181 --topic aura --from-beginning
运行categorycount.java程序后的效果图: