数仓学习总结
1 数仓概念总结
1)数据仓库的输入数据源和输出系统分别是什么?
输入系统:埋点产生的用户行为数据、JavaEE后台产生的业务数据。
输出系统:报表系统、用户画像系统、推荐系统
2 项目需求及架构总结
2.1 集群规模计算
2.2 框架版本选型
1)Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业的运维人员)
2)CDH:国内使用最多的版本,但CM不开源,但其实对中、小公司使用来说没有影响(建议使用)
3)HDP:开源,可以进行二次开发,但是没有CDH稳定,国内使用较少
2.3 服务器选型
服务器使用物理机还是云主机?
1)机器成本考虑:
(1)物理机:以128G内存,20核物理CPU,40线程,8THDD和2TSSD硬盘,单台报价4W出头,需考虑托管服务器费用。一般物理机寿命5年左右
(2)云主机,以阿里云为例,差不多相同配置,每年5W
2)运维成本考虑:
(1)物理机:需要有专业的运维人员
(2)云主机:很多运维工作都由阿里云已经完成,运维相对较轻松
3 数据采集模块总结
3.1 Linux&Shell相关总结
1)Linux常用命令
序号 命令 命令解释
2)Shell常用工具
awk、sed、cut、sort
3.2 Hadoop相关总结
1)Hadoop默认不支持LZO压缩,如果需要支持LZO压缩,需要添加jar包,并在hadoop的cores-site.xml文件中添加相关压缩配置。
2)Hadoop常用端口号
3)Hadoop配置文件以及简单的Hadoop集群搭建
4)HDFS读流程和写流程
5)MapReduce的Shuffle过程及Hadoop优化(包括:压缩、小文件、集群优化)
6)Yarn的Job提交流程
7)Yarn的默认调度器、调度器分类、以及他们之间的区别
8)HDFS存储多目录
9)Hadoop参数调优
10)项目经验之基准测试
3.3 Zookeeper相关总结
1)选举机制
半数机制
2)常用命令
ls、get、create
3.4 Flume相关总结
1)Flume组成,Put事务,Take事务
Taildir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
File Channel:数据存储在磁盘,宕机数据可以保存。但是传输速率慢。适合对数据传输可靠性要求高的场景,比如,金融行业。
Memory Channel:数据存储在内存中,宕机数据丢失。传输速率快。适合对数据传输可靠性要求不高的场景,比如,普通的日志数据。
Kafka Channel:减少了Flume的Sink阶段,提高了传输效率。
Source到Channel是Put事务
Channel到Sink是Take事务
2)Flume拦截器
(1)拦截器注意事项
项目中自定义了:ETL拦截器和区分类型拦截器。
采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
(2)自定义拦截器步骤
a)实现 Interceptor
b)重写四个方法
initialize 初始化
public Event intercept(Event event) 处理单个Event
public List intercept(List events) 处理多个Event,在这个方法中调用Event intercept(Event event)
close 方法
c)静态内部类,实现Interceptor.Builder
3)Flume Channel选择器
4)Flume 监控器
Ganglia
5)Flume采集数据会丢失吗?
不会,Channel存储可以存储在File中,数据传输自身有事务。
6)Flume内存
开发中在flume-env.sh中设置JVM heap为4G或更高,部署在单独的服务器上(4核8线程16G内存)
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
7)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
8)Sink:HDFS Sink小文件处理
(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0,hdfs.roundValue=10,hdfs.roundUnit= second几个参数综合作用,效果如下:
(1)tmp文件在达到128M时会滚动生成正式文件
(2)tmp文件创建超10秒时会滚动生成正式文件
举例:在2018-01-01 05:23的时侯sink接收到数据,那会产生如下tmp文件:
/atguigu/20180101/atguigu.201801010520.tmp
即使文件内容没有达到128M,也会在05:33时滚动生成正式文件
3.5 Kafka相关总结
1)Kafka压测
Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
2)Kafka的机器数量
Kafka机器数量=2*(峰值生产速度副本数/100)+1
3)Kafka的日志保存时间
7天
4)Kafka的硬盘大小
每天的数据量7天
5)Kafka监控
公司自己开发的监控器;
开源的监控器:KafkaManager、KafkaMonitor
6)Kakfa分区数。
分区数并不是越多越好,一般分区数不要超过集群机器数量。分区数越多占用内存越大(ISR等),一个节点集中的分区也就越多,当它宕机的时候,对系统的影响也就越大。
分区数一般设置为:3-10个
7)副本数设定
一般我们设置成2个或3个,很多企业设置为2个。
8)多少个Topic
通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。
9)Kafka丢不丢数据
Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。
Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack=-1,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。
10)Kafka的ISR副本同步队列
ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。
任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。
11)Kafka分区分配策略
在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。
Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
RoundRobin:前提:同一个Consumer Group里面的所有消费者的num.streams(消费者消费线程数)必须相等;每个消费者订阅的主题必须相同。
第一步:将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。
12)Kafka中数据量计算
每天总数据量100g,每天产生1亿条日志, 10000万/24/60/60=1150条/每秒钟
平均每秒钟:1150条
低谷每秒钟:400条
高峰每秒钟:1150条*(2-20倍)=2300条-23000条
每条日志大小:0.5k-2k
每秒多少数据量:2.3M-20MB
13) Kafka挂掉
(1)Flume记录
(2)日志有记录
(3)短期没事
14) Kafka消息数据积压,Kafka消费能力不足怎么处理?
(1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
(2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
总结
1 用户行为数仓业务总结
1.1 数仓分几层?每层做什么的?
1)ODS层(原始数据层)
存储原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
2)DWD层(明细层)
对ODS层数据进行清洗(去除空值、脏数据,超过极限范围的数据)
3)DWS层(服务数据层)
以DWD层为基础,进行轻度汇总。比如:用户当日、设备当日、商品当日。
4)ADS层(数据应用层)
1.2 Tez引擎优点?
Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。
1.3 在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题?
自定义过。
用UDF函数解析公共字段;用UDTF函数解析事件字段。
1.4 如何分析用户活跃?
在启动日志中统计不同设备id 出现次数。
1.5 如何分析用户新增?
用活跃用户表 left join 用户新增表,用户新增表中mid为空的即为用户新增。
1.6 如何分析用户1天留存?
留存用户=前一天新增 join 今天活跃
用户留存率=留存用户/前一天新增
1.7 如何分析沉默用户?
按照设备id对日活表分组,登录次数为1,且是在一周前登录。
1.8 如何分析本周回流用户?
本周活跃left join本周新增 left join上周活跃,且本周新增id和上周活跃id都为null
1.9 如何分析流失用户?
按照设备id对日活表分组,且七天内没有登录过。
1.10 如何分析最近连续3周活跃用户数?
按照设备id对周活进行分组,统计次数等于3次。
1.11 如何分析最近七天内连续三天活跃用户数?
1)查询出最近7天的活跃用户,并对用户活跃日期进行排名
2)计算用户活跃日期及排名之间的差值
3)对同用户及差值分组,统计差值个数
4)将差值相同个数大于等于3的数据取出,然后去重,即为连续3天及以上活跃的用户
1.12 整个文档中涉及的所有层级及表
2 Hive总结
2.1 Hive的架构
2.2 Hive和数据库比较
Hive 和数据库除了拥有类似的查询语言,再无类似之处。
1)数据存储位置
Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
2)数据更新
Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,
3)执行延迟
Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
4)数据规模
Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。
2.3 内部表和外部表
1)管理表:当我们删除一个管理表时,Hive也会删除这个表中数据。管理表不适合和其他工具共享数据。
2)外部表:删除该表并不会删除掉原始数据,删除的是表的元数据
2.4 4个By区别
1)Sort By:分区内有序;
2)Order By:全局排序,只有一个Reducer;
3)Distrbute By:类似MR中Partition,进行分区,结合sort by使用。
4) Cluster By:当Distribute by和Sorts by字段相同时,可以使用Cluster by方式。Cluster by除了具有Distribute by的功能外还兼具Sort by的功能。但是排序只能是升序排序,不能指定排序规则为ASC或者DESC。
2.5 窗口函数
1)窗口函数:
(1) OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化。常用partition by 分区order by排序。
(2)CURRENT ROW:当前行
(3)n PRECEDING:往前n行数据
(4) n FOLLOWING:往后n行数据
(5)UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点
(6) LAG(col,n):往前第n行数据
(7)LEAD(col,n):往后第n行数据
(8) NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。
2)排序函数:
(1)RANK() 排序相同时会重复,总数不会变
(2)DENSE_RANK() 排序相同时会重复,总数会减少
(3)ROW_NUMBER() 会根据顺序计算
2.6 在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题?
1)自定义过。
2)用UDF函数解析公共字段;用UDTF函数解析事件字段。
3)自定义UDF步骤:定义一个类继承UDF,重写evaluate方法
4)自定义UDTF步骤:定义一个类继承GenericUDTF,重写初始化方法、关闭方法和process方法。
2.7 Hive优化
1)MapJoin
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
2)行列过滤
列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。
3)采用分桶技术
4)采用分区技术
5)合理设置Map数
(1)通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。
(2)是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
(3)是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数;
6)小文件进行合并
在Map执行前合并小文件,减少Map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
7)合理设置Reduce数
Reduce个数并不是越多越好
(1)过多的启动和初始化Reduce也会消耗时间和资源;
(2)另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适;
8)常用参数
// 输出合并小文件
SET hive.merge.mapfiles = true; – 默认true,在map-only任务结束时合并小文件
SET hive.merge.mapredfiles = true; – 默认false,在map-reduce任务结束时合并小文件
SET hive.merge.size.per.task = 268435456; – 默认256M
SET hive.merge.smallfiles.avgsize = 16777216; – 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
项目总结
1 熟悉表的业务字段
2 数仓理论
1)表的分类:实体表、维度表、事务型事实表、周期型事实表
2)表的同步策略:
实体表(全量)
维度表(全量)
事务型事实表(增量)
周期型事实表(新增和变化、拉链表)
3)范式理论:
一范式原则:属性不可切割;
二范式原则:不能存在部分函数依赖;
三范式原则:不能存在传递函数依赖;
4)数仓维度建模模型
星型模型,维度一层;
雪花模型,维度多层;
星座模型,多个事实表;
性能优先选择星型模型,灵活优先选择雪花模型。企业中星型模型多一些。
3 Sqoop参数
/opt/module/sqoop/bin/sqoop import
–connect
–username
–password
–target-dir
–delete-target-dir
–num-mappers
–fields-terminated-by
–query “$2” ’ and KaTeX parse error: Undefined control sequence: \t at position 1201: …terminated-by "\̲t̲" --export-dir …{day}" --staging-table app_cource_study_report_tmp --clear-staging-table --input-null-string ‘\N’
2)场景2:设置map数量为1个(不推荐,面试官想要的答案不只这个)
多个Map任务时,采用–staging-table方式,仍然可以解决数据一致性问题。
3.3 Sqoop底层运行的任务是什么
只有Map阶段,没有Reduce阶段的任务。
3.4 Sqoop数据导出的时候一次执行多长时间
Sqoop任务5分钟-2个小时的都有。取决于数据量。
4 需求指标分析
1)GMV:一段时间内的网站成交金额(包括付款和未付款)
计算:基于用户行为宽表,对订单总金额进行sum。
2)转化率:(先分别计算分子和分母,再相除)
(1)新增用户占活跃用户的比率;
cast(sum( uc.nmc)/sum( uc.dc)*100 as decimal(10,2)) new_m_ratio
(2)下单人数占活跃用户的比率;
sum(if(order_count>0,1,0)) order_count
cast(ua.order_count/uv.day_count*100 as decimal(10,2)) visitor2order_convert_ratio
(3)支付人数占下单人数的比率;
sum(if(payment_count>0,1,0)) payment_count
cast(ua.payment_count/ua.order_count*100 as decimal(10,2)) order2payment_convert_ratio
3)复购率:(先分别计算分子和分母,再相除)
sum(if(mn.order_count>=1,1,0)) buycount,
sum(if(mn.order_count>=2,1,0)) buyTwiceLast,
sum(if(mn.order_count>=2,1,0))/sum( if(mn.order_count>=1,1,0)) buyTwiceLastRatio,
sum(if(mn.order_count>=3,1,0)) buy3timeLast ,
sum(if(mn.order_count>=3,1,0))/sum( if(mn.order_count>=1,1,0)) buy3timeLastRatio ,
5 拉链表
1)通过关系型数据库的create time和operation time获取数据的新增和变化。
2)用临时拉链表解决Hive了中数据不能更新的问题。
6 Azkaban
1)每天集群运行多少job?
2)多个指标(200)6=1200(1000-2000个job)
3)每天集群运行多少个task? 1000(5-8)=5000多个
4)任务挂了怎么办?运行成功或者失败都会发邮件
7 项目中表关系