hive原理及执行任务流程优化
- Hive架构
-
- Hive主要有QL,MetaStore和Serde三大核心组件构成。
- QL:是编译器也是Hive中最核心的部分。
- Driver模块的工作是将HQL语句转化为MapReduce调用.
- 包括主要的三个阶段:
- 编译:Compile,生成执行计划
- 优化:Optimize,优化执行计划(当前的Hive实现是在执行前做一次唯一的优化,没有反馈的过程,这使得优化工作只能是rule-based,做不到cost-based)。
- 执行:Execute,将执行计划提交给Hadoop。
- Serde:是Serializer和Deserializer的缩写,用于序列化和反序列化数据,即读写数据。
- MetaStore :对外暴露Thrift API,用于元数据的修改。比如表的增删改查,分区的增删改查,表的属性的修改,分区的属性的修改等。
- QL:是编译器也是Hive中最核心的部分。
- Hive的数据模型
-
- Hive的数据存储在HDFS上,基本存储单位是表或者分区。
- Hive内部把表或者分区称作SD,即Storage Descriptor。一个SD通常是一个HDFS路径,或者其它文件系统路径。SD的元数据信息存储在Hive MetaStore中,如文件路径,文件格式,列,数据类型,分隔符。Hive默认的分格符有三种,分别是^A、^B和^C,即ASCii码的1、2和3,分别用于分隔列,分隔列中的数组元素,和元素Key-Value对中的Key和Value。
- MetaStore:对外Thrift API,用于元数据的修改
- Hive的核心是Driver,Driver的核心是SemanticAnalyzer。 Hive实际上是一个SQL到Hadoop作业的编译器。
- Hive目前支持MapReduce, Tez, Spark执行引擎
-
- Hive的编译流程:
-
-
- (1)ParseDriver内部词法分析,生成AST(抽象语法树)。
- (2)SemanticAnalyzer分析AST树,生成 QB查询块(QueryBlock)。
- (3)从metastore中获取表的信息,SemanticAnalyzer.getMetaData完成。
- (4)生成逻辑执行计划,SemanticAnalyzer.genPlan完成。
- (5)优化逻辑执行计划,Optimizer完成,ParseContext作为上下文信息进行传递。
- (6)生成物理执行计划,SemanticAnalyzer.genMapRedTasks完成。
- (7)物理计划优化,PhysicalOptimizer完成,PhysicalContext作为上下文信息进行传递。
- (8)执行生成的物理计划,获得结果。
- (1)~(7)在Driver的compile中完成,(8)在Driver的execute中完成,在执行阶段一个一个Task运行,不会改变物理计划。
-
- Group By的执行任务:
- 例如一条SQL语句:
- INSERT INTO TABLE pageid_age_sum
- SELECT pageid, age, count(1)
- FROM pv_users
- GROUP BY pageid, age;
-
- 将每个网页的阅读数按年龄进行分组统计。MapReduce就是一个Group By的过程,这个SQL翻译成MapReduce就是相对简单的。
-
- 首先在Map端,每一个Map读取一部分表的数据,通常是64M或者256M,然后按需要Group By的Key分发到Reduce端。经过Shuffle Sort,每一个Key再在Reduce端进行聚合(这里是Count),然后就输出了最终的结果。
- distinct的执行任务:
- Distinct在实现原理上与Group By类似。
- 例如:SELECT pageid, COUNT(DISTINCT userid) FROM page_view GROUP BY pageid
-
- Hive 实现成MapReduce的原理如下:
-
- 也就是说Map分发到Reduce的时候,会使用pageid和userid作为联合分发键,再去聚合(Count),输出结果。
- 从原理上可以看出,当遇到Group By的查询时,会按Group By 键进行分发?如果键很多,很可能会撑爆了机器硬盘,Impala或Spark,为了快,key在内存中,内存爆是经常的。
- join的执行任务:
- 例如一条SQL语句:
- INSERT INTO TABLE pv_users
- SELECT pv.pageid, u.age
- FROM page_view pv JOIN user u ON (pv.userid = u.userid);
-
- 把访问和用户表进行关联,生成访问用户表,Hive的Join也是通过MapReduce来完成的。
-
- 上面的查询,在MapReduce的Join的实现过程如下:
- Map端会分别读入各个表的一部分数据,把这部分数据进行打标,如pv表打标1,user表打标2.
- Map读取是分布式进行的,标完标记后分发到Reduce端,Reduce 端根据Join Key,也就是关联键进行分组。然后按打的标记进行排序,如图上的Shuffle Sort。
- 在每一个Reduce分组中,相同的Key为111的在一起,也就是一台机器上。同时,pv表的数据在这台机器的上端,user表的数据在这台机器的下端。
- 这时,Reduce把pv表的数据读入到内存里,然后逐条与硬盘上user表的数据做Join就可以了。
- 从这个实现可以看出,我们在写Hive Join的时候,应该尽可能把小表(分布均匀的表)写在左边,大表(或倾斜表)写在右边。这样可以有效利用内存和硬盘的关系,提高Hive的处理能力。
- 同时由于使用Join Key进行分发, Hive也只支持等值Join,不支持非等值Join。由于Join和Group By一样存在分发,所以也同样存在着倾斜的问题,所以Join也要对抗倾斜数据,提升查询执行性能。
- 例如一条SQL语句:
- Map join的执行任务:
- 有一种执行非常快的Join叫Map Join 。
- 手动的Map Join SQL如下(pv是小表):
- INSERT INTO TABLE pv_users
- SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age
- FROM page_view pv JOIN user u
- ON (pv.userid = u.userid);
- 同上边例子,用Map Join执行
-
- Map Join通常只适用于一个大表和一个小表做关联的场景,例如事实表和维表的关联。
- 原理如上图,用户可以手动指定哪个表是小表,然后在客户端把小表打成一个哈希表序列化文件的压缩包,通过分布式缓存均匀分发到作业执行的每一个结点上。然后在结点上进行解压,在内存中完成关联。
- Map Join全过程不会使用Reduce,非常均匀,不会存在数据倾斜问题。默认情况下,小表不应该超过25M。在实际使用过程中,手动判断是不是应该用Map Join太麻烦了,而且小表可能来自于子查询的结果。
- Hive有一种稍微复杂一点的机制,叫Auto Map Join
-
- 图上左边是优化前的,右边是优化后的,Hive原理中提到的物理优化器(Physical Optimizer)它其中一个功能就是把Join优化成Auto Map Join
- 优化过程是把Join作业前面加上一个条件选择器ConditionalTask和一个分支。左边的分支是MapJoin,右边的分支是Common Join(Reduce Join)
- 这个时候,我们在执行的时候,就由这个Conditional Task 进行实时路径选择,遇到小于25M走左边,大于25M走右边。
- 在比较新版的Hive中,Auto Mapjoin是默认开启的。如果没有开启,可以使用一个开关, set hive.auto.convert.join=true开启。
- 当然,Join也会遇到和上面的Group By一样的倾斜问题。