Carbondata 在Hulu OLAP引擎的优化与应用
☀ 本文转载自Hulu工程师自建公众号hadoop123,该公众号是最知名的hadoop/spark大数据技术分享基地,分享hadoop/spark技术内幕,hadoop/spark最新技术进展,hadoop/spark行业技术应用,发布hadoop/spark相关职位和求职信息,hadoop/spark技术交流聚会、讲座以及会议等。欢迎各位点击底部阅读原文进行关注。
Anning Luo | Software Developer at Hulu
1. 背景
hulu是一家北美视频行业互联网公司。在hulu,OLAP查询主要针对大量的用户数据,这些数据包括用户的登录、观看、购买等行为。通过对这些行为进行分析筛选,提炼有价值的用户行为数据,能够为扩展市场,改善用户体验,增加广告效益产生巨大的价值。
OLAP查询优化策略,首先应当细分查询场景,然后从存储格式和查询引擎两个方面进行调优。本文针对hulu的OLAP查询场景,介绍在Carbondata和SparkSQL上的一些调优经验。
2. OLAP基础知识简介
OLAP查询的基本概念
维度(dimension):常用于where查询条件,是人们观察数据的特定角度,是考虑问题时的一类属性。
度量(measure):即按照维度标记,用于描述事实本身的数据。
事实(Fact):由measure信息组成并按照维度划分的表称为Fact表
例如下表是一个航班的统计信息,其中measure包括航班的票价,满意度,而dimension则是用于建立索引的列,包括航班的时间,乘客ID等。
表1 航班统计信息
常用的数据模型包括星型模型和雪花型模型,详细的介绍可参考[1].
3. 主流大数据文件格式
3.1 存储方式
3.1.1 按行存储
传统数据库主要以按行存储为主,物理上以行为存储单位,不同的列存储在一起。每张表需要一个distinct主键来标记行。行存对事务ACID操作具有几十年的发展和成熟的支持,但是行存的缺点在于压缩比低,对于列命中较少的OLAP查询,行存格式会读取大量的冗余信息,增加IO。
3.1.2 按列存储
列式存储在早在上世纪70年代便与行存一同出现发展,google在2010年发表的列式存储dremel在OLAP查询中表现出优异的性能。开源的列存储实现有apache parquet和apache orc。
列存储的特点是在一个文件内按行划分为不同的行组,每个行组内物理上按列存储。 parquet的每个行组称为row group,ORC称为stripe。在文件末尾,存放包含索引、字典以及其他原信息的footer。在ORC中,每个Stripe还包含有额外的索引信息。列存储的特点有:
由于单个存储单元存储同一列的数据,其类型相同,能够获得更高的压缩比。
在查询只有少量列命中时,能够减少大量的IO和解压工作。
支持嵌套数据。维度信息表能够以嵌套的方式存储在一行内,其效果相当于预先join。
列存整体上的时间消耗与请求列的数量成正比,随着列数的增加,用于解压和组装的时间成线性增长,当请求列数过大时,将超出行存的读取时间。因此按列存储不适合查询列数过多的场景。Carbondata尝试改进这一点。
Carbondata是华为开发的一种大数据场景下的列式存储格式,目前是apache社区的孵化项目。相比parquet和orc,Carbondata的优势在于:
数据排序上,Carbondata不局限于单个row group(blocklet)内
MDK+invert index实现多列,行级别的索引
全局字典+惰性转换,能够实现在排序,聚合之后进行字典到数据的转换
Carbondata底层的每一个存储单元称为一个blocklet,对应于parquet的一个row group。每个blocklet内按列分为多个Column Chunk。carbondata引入了ColumnGroup Chunk支持行存,在一个ColumnGroup内将多个列按行组织存储。在ColumnGroup外部,整个ColumnGroup视为一列与其他ColumnChunk混合存储在同一Blocklet内部。
表2 存储格式对比
3.2 索引
为了加快查询速度,大部分存储格式在不同层级上添加索引以滤除无效的block。
3.2.1 Parquet
Parquet支持row group level的min/max索引,支持谓词下推并直接根据meta信息过滤row group。在row level上,parquet亦支持谓词下推,但是需要先读取并解析命中的dimension列,然后决定是否过滤此行数据。
3.2.3 ORC
orc支持File level,strip level 和 row group level的min/max索引。File level的索引信息记录在File Footer中,strip level的索引记录在File Meta中。在更细粒度上,orc默认将每10000行作为一个row group(这里与parquet的row group不是同一概念),统计每列的min/max索引并记录在Stripe中。这些信息能够在谓词下推中从不同的粒度上滤除整块数据。
3.2.3 Carbondata
Carbondata支持File level和blocklet level上的min/max索引,以及row level上的invert index索引。在carbondata默认的spark sql引擎上,不同level的索引起作用的方式不同:
File level索引用于在driver进行过滤,减少executor的负载
blocklet level索引在executor的task中起作用
Row level索引(invert index)在executor端读取数据时过滤数据。
相比parquet和orc,carbondata的过滤层次更丰富,粒度也更细,能更有效的过滤数据。
Carbondata采用MDK(multi-dimension keys)对多列进行排序,同时跨多个文件范围内进行排序,以此来提高索引的效率。
MDK计算原理如图3所示,根据create table时声明Column的顺序,先按照首列排序,然后在首列相同值的范围内排序第二列,以此类推。在获得MDK排序后,再计算各个维度的反向索引,并压缩存储。
图3,左:MDK排列后的结果,其中前4列为dimension列编码后的结果,格式为“[编码值|MDK排列前的行号]” ,后两列为measure,右:invert index,列顺序与左图对应,dimension列记录的格式为“编码值(MDK排列前行号范围)”
4. hulu典型OLAP应用场景
在hulu,典型的数据组织方式可以分为三类。
4.1 关系型结构
比如查询某个电视剧在一天内的观看时间变化曲线。这种查询所对应的表需要根据电视剧,用户,观看设备和观看时间建立多张表和索引,由于往往仅仅查询短时间内的数据变化情况,数据量整体不大,但查询比较灵活,需要事务操作。这类数据采用的关系型数据建表方式,采用行式存储(比如Sequence File)或列式存储(比如ORC),一般用hive,presto,impala作为查询引擎。
4.2 嵌套型结构
嵌套型数据用于筛选符合条件的用户,用于市场扩展。这类数据的主要特点是用户有大量的特征,部分特征比如观看行为是repeated的,适用于嵌套逻辑的列式存储。这类OLAP查询受限于大多数查询引擎对嵌套数据缺乏完善的支持,在hulu我们开发了Nesto查询引擎来解决上述问题,Nesto不仅能够在用户层面添加过滤条件,同时可以在repeated列上进行筛选,相当于在join前后添加where条件,而嵌套式存储则相当于预先对数据进行了join和存储优化。以此获得更加优异的查询性能。
4.3 宽表查询
用于获取用户的特征的查询,往往有大量select的列。这类请求主要用于获取某一类用户的大量特征,用于后续对用户分析和挖掘。这类请求往往会输出数百列到上千,大大降低列式存储的查询性能。对于这类请求,一方面需要高效的索引滤除大量数据,另一方面需要将部分输出列组合在一起采用行式存储,在这种场景下,我们尝试用carbondata进行优化。
5 Carbondata在hulu的优化
hulu的部分广告数据的查询以宽表查询为主。数据的特点是:
filter过滤掉95%~98%的数据,只有2%~5%的数据参与aggregation
filter索引一般命中5~10列,大部分列distinct值小于100。
select的列有100+,列式存储组装所花费的时间长。
在上述场景下,由于carbondata比parquet和ORC拥有更细粒度的索引,支持行式存储,我们采用carbondata(版本0.2.0)格式以加快查询速度。carbondata的查询引擎采用sparkSQL(目前carbondata仅支持sparkSQL),在此基础上进一步优化。
5.1 表优化
5.1.1 schema
Carbondata 在create table时的创建语句会影响到MDK索引列的顺序。MDK索引列的排列顺序与create table时的列的声明顺序一致。如下的create语句中,MDK将先按序列a排序,然后按照列b排序,以此类推。默认情况下,除了数字类型外的所有类型会被视为dimension,dimension除数字类型会被加入到MDK中。除此之外可以通过DICTIONARY_INCLUDE或者DICTIONARY_EXCLUDE显示声明列来加入到MDK中。
CREATE TABLE IF NOT EXISTS Table1
( a Int, b Int, c String, d Int, e Int,
STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES("DICTIONARY_INCLUDE”=“a, b, c, d, e”))
表3 以carbondata格式创建table,MDK排列顺序为a,b,c,d,e
列的排列规则应该按照distinct值从小到大的顺序或者过滤数据量由多到少的顺序排列。这样可以减少索引的大小,提升索引效率。
5.1.2 Shuffle
Carbondata每次load会生成一个segment,每个segment可以看做一组carbondata文件的集合。load过程首先启动生成全局字典的job,之后启动计算MDK和写入文件的job,由于MDK和写入文件是在相同的task中完成,因此MDK的排序范围仅限于同一个task上的数据,而非整个segment的数据。因此如图4所示,当原始数据文件中满足过滤条件的目标数据分布不均衡时,经过过滤得到的blocklet包含的目标数据数量也存在较大差异。当这些包含较多目标数据的blocklet被分配到相同的节点上时,任务执行时间会被拉长。当机器数目多时,这种情况常常被观察到,而且即便这种task所占比例极小,对整体时间也会产生严重影响。此时,可以在数据load之前,对原始数据进行一次repartition操作,将原数据文件均匀打散,这样每个task所获得的目标数据基本一致。注意这种方式能够减弱数据的不均衡,但是不能使数据完全均衡,由于blocklet按照固定行数切分,其结果必然造成部分数据块数据包含大小不一的数据,同时不同的MDK策略也会影响到数据的分布。
图4 数据不均衡对查询的影响
5.1.3 Block size / blocklet size 调整
在每个task load数据时,每到固定行数(默认120k)切分一个blocklet,每当写入blocklet的总尺寸超过一个阈值(默认Block size为1GB,阈值为Block size的90%,预留一部分空间写入footer)切分一个文件,并写入block和blocklet的索引信息。查询时,在driver端同一个task生成的文件会构建成一个B+tree,B+tree的叶节点为Block,此时一个Block对应一个文件。通过min/max filter将不满足where条件的文件过滤掉,然后将得到的文件的blocklet重新打散组合,然后拼装为task分发到executor上。在executor端,用blocklet级别的min/max filter对blocklet进一步过滤。
当查询的数据按照MDK排列较为集中时,减小Block size和Blocklet size能够充分利用driver和executor端的filter快速排除大量无效数据。但是过小的Block size和Blocklet size会产生大量的小文件和频繁的IO,拉长IO时间,导致整体性能的下降。另外通过控制load data时node的数量(load 生成数据的job将创建与node数量相同的task),可以控制B+tree的叶节点数量。更多的叶节点会增加B+tree build时间,但同时也能够过滤更多的block。
5.2 查询引擎优化
5.2.1 Speculation
由于OLAP查询的节点是混合使用,因此常常有节点拖尾现象严重。因此在spark上开启推测执行(speculation),将拖尾的task重新发送到其他机器上执行,只需等待其中任一结果返回。speculation设置参数如下:
表4 speculation设置参数
上述speculation设置比较激进,原因是我们观察到在我们的场景下只有少量task时间明显长于中位数。因此尽可能的提前执行Speculation。但是在有较多长task情况下,Speculation有可能过量的消耗计算资源。
5.2.2 GC
在有大量数据被频繁加载时,GC在部分节点上可能会非常严重。如果OLAP机器资源充足,将Young区设置足够大,保证一次查询内不触发GC,每次query后执行GC,可以避免这种情况。但是如果集群资源共享,单个节点拿不到足够的内存和cpu资源,计算过程中的GC时间在部分节点上会很长。OLAP查询一般更关注整体的GC时间消耗,但是单次的GC时间如果过长,有可能导致IO等组件超时,引起重试。因此Young区不易过大,设置在512M~1g,Old区应尽早检测使用量,设置阈值-XX:CMSInitiatingOccupancyFraction=50,即当Old区的使用率超过50%时,触发GC。由于CMS在超过4g时的内存管理性能下降,因此尽量使用Java8的G1GC,效率对于CMS GC有明显提升。
5.3 调优实验
测试环境如下:
数据集:
500G原始文件(每副本)
约300 列(部分列是嵌套的), 3亿多行
查询语句:
查询select命中100~200 列
查询命中嵌套列,如果采用关系型表结构,需要join处理,性能上增加较大开销
95% 以上的数据会被查询条件滤除,因此高效的过滤条件能够带来可观的性能提升
集群环境:
100 executors,5vCore 与其他应用混用同一集群
每个executor分配10g内存(hulu内部这种宽表查询只占一小部分,这些executor会常驻机器而不会被释放,为了节省开销,我们选择较小内存)
不同OLAP引擎下,carbondata的实验结果对比如表3所示:
表4 查询结果对比
第五行的优化中,Load并发10个task,每个文件约256MB,每个task约生成40个文件,在driver端能够过滤30+文件,在executor端,部分不含目标数据的blocklet能够被直接滤除。可以看到,carbondata + SparkSQL对于parquet + Presto/Impala效果明显,得益于在driver端和executor端能对blocklet快速滤除,speculation避免长时task造成的拖尾。对carbondata的参数进一步调整,能够一定程度的降低查询时间,但20s的级别上,当前场景下继续优化的空间已十分困难。
6. 参考链接
[1] https://en.wikipedia.org/wiki/Online_analytical_processing
[2] carbondata wiki: https://cwiki.apache.org/confluence/display/CARBONDATA/CarbonData+Home
[3] carabondata github: https://github.com/apache/incubator-carbondata
[4] orc index document: https://orc.apache.org/docs/spec-index.html
长按二维码扫描关注
敬
请
关
注