基于ClickHouse的海量数据高效即席查询方案

一.背景介绍

ClickHouse 是俄罗斯Yandex在2016年年开源的⼀一个⾼高性能分析型SQL数 据库,主要⾯面向OLAP场景。开源之后,凭借优异的查询性能,受到业界的青睐。

优点:

1)为了高效的使用CPU,数据不仅仅按列存储,同时还按向量进行处理;

2)数据压缩空间大,减少io;处理单查询高吞吐量每台服务器每秒最多数十亿行;

3)索引非B树结构,不需要满足最左原则;只要过滤条件在索引列中包含即可;即使在使用的数据不在索引中,由于各种并行处理机制ClickHouse全表扫描的速度也很快;

4)写入速度非常快,50-200M/s,对于大量的数据更新非常适用;

ClickHouse并非万能的,正因为ClickHouse处理速度快,所以也是需要为“快”付出代价。选择ClickHouse需要有下面注意以下几点:

1)不支持事务,不支持真正的删除/更新;

2)不支持高并发,官方建议qps为100,可以通过修改配置文件增加连接数,但是在服务器足够好的情况下;

3)sql满足日常使用80%以上的语法,join写法比较特殊;最新版已支持类似sql的join,但性能不好;

4)尽量做1000条以上批量的写入,避免逐行insert或小批量的insert,update,delete操作,因为ClickHouse底层会不断的做异步的数据合并,会影响查询性能,这个在做实时数据写入的时候要尽量避开;

5)Clickhouse快是因为采用了并行处理机制,即使一个查询,也会用服务器一半的cpu去执行,所以ClickHouse不能支持高并发的使用场景,默认单查询使用cpu核数为服务器核数的一半,安装时会自动识别服务器核数,可以通过配置文件修改该参数;

 

二.支持应用场景

1.数据摄入

ClickHouse可以覆盖实时与离线两种场景:

实时:数据流可以通过kafka/flink/sparkstreaming实时处理后,通过JDBC的方式批量导入到ClickHouse中。

离线:数据落地HDFS ODS层,离线通过Spark,MR或其他方式的batch形式导入到ClickHouse中。

基于ClickHouse的海量数据高效即席查询方案

2. 数据存储

存储上使用多磁盘结构,可以充分利用物理机多磁盘特性,增加存储量和磁盘IO吞吐。

ClickHouse可以通过配置文件的形式配置不同的存储策略,可以将数据按照表维度进行完全的物理资源隔离。

针对不同的业务和优先级进行表级别的抽象,这样的好处在于数据的摄入和查询不会互相影响,同时需要在不同的shard之前有路由层,突破单点数据导入瓶颈和路由不同的table查询到指定的shard上。

基于ClickHouse的海量数据高效即席查询方案

3. 数据扩容

  Ck在扩容方面也比较平滑,进行如下步骤可以完成扩容:

(1)安装新部署的shard分片机器

(2)批量修改当前集群的配置文件增加新的分片

(3)在新shard上创建表结构

(4)名字服务添加节点

4. 数据查询

   数据有冷热之分,绝大多数OLAP场景需要查询最近一段时间的数据,(主要有:过去三天,过去一周,过去一个月等)

针对热数据,建议通过clickhouse来进行查询,保证查询速度,特点:数据量较小,查询速度快。

针对冷数据,可以通过直接通过Spark/MR来直接查询HDFS数据,特点:数据量巨大,查询速度较慢。

 

三.ClickHouse有极高的性能的原因:

1. 基于cpp编写

2. 基于列式存储,使用向量化的执行引擎,利用SIMD指令(单指令操作多条数据)进行处理加速,同时使用LLVM加速快函数编译执行。

3. 稀疏索引  条件过滤的时候不需要扫描很多块

4. 存储与执行解耦

 

四.Clickhouse是如何进行查询与插入的

1. ck的表分为本地表与分布式表,本地表存储数据,而分布式表只是逻辑表,不存储任何数据只是做一个路由使用,一般使用的时候都是直接使用分布式表,分布式引擎将我们查询的请求路由到本地表进行查询,最后汇总返回给用户。Ck也支持列级别的稀疏索引,索引默认粒度为8192,即每8192条数据进行一次记录,很好的节省空间,条件查询扫描的时候也不需要全表扫描。

2. Ck是基于Batch插入的,他不能够像传统关系型数据库一样频繁插入,ck写入的时候是直接落磁盘的,在落盘之前还会对数据进行排序及必要的拆分。

 

五.最佳实践

(1)实时写入本地表,不要使用分布式表

分布式表引擎会帮助我们将数据自动路由到健康的数据表进行数据的存储,所以使用分布式表相对简单,对于Producer不需要太多的考虑,但是分布式对于数据一致性方面有着致命的缺点

(2)推荐使用(*)MergeTree引擎,该引擎是ck的最核心的组件,数据有保证,查询有保证,升级无感知。

(3)谨慎使用On cluster的sql

 

六.最常见参数配置推荐

1)max_concurrent_queries

 

最大并发处理的请求数(包含select,insert等),默认值100,推荐150(不够再加),在我们的集群中出现过”max concurrent queries”的问题。

 

2)max_bytes_before_external_sort

 

当order by已使用max_bytes_before_external_sort内存就进行溢写磁盘(基于磁盘排序),如果不设置该值,那么当内存不够时直接抛错,设置了该值order by可以正常完成,但是速度相对存内存来说肯定要慢点(实测慢的非常多,无法接受)。

 

3)background_pool_size

 

后台线程池的大小,merge线程就是在该线程池中执行,当然该线程池不仅仅是给merge线程用的,默认值16,推荐32提升merge的速度(CPU允许的前提下)。

 

4)max_memory_usage

 

单个SQL在单台机器最大内存使用量,该值可以设置的比较大,这样可以提升集群查询的上限。

 

5)max_memory_usage_for_all_queries

 

单机最大的内存使用量可以设置略小于机器的物理内存(留一点内操作系统)。

 

6)max_bytes_before_external_group_by

 

在进行group by的时候,内存使用量已经达到了max_bytes_before_external_group_by的时候就进行写磁盘(基于磁盘的group by相对于基于磁盘的order by性能损耗要好很多的),一般max_bytes_before_external_group_by设置为max_memory_usage / 2,原因是在clickhouse中聚合分两个阶段:

 

 

查询并且建立中间数据;

 

合并中间数据 写磁盘在第一个阶段,如果无须写磁盘,clickhouse在第一个和第二个阶段需要使用相同的内存。

 

这些内存参数强烈推荐配置上,增强集群的稳定性避免在使用过程中出现莫名其妙的异常。

 

七.表引擎分析

   基于ClickHouse的海量数据高效即席查询方案

1. Log引擎系列

    TinyLog是Log系列引擎中功能简单,性能较低的引擎,它的存储结果由数据文件和元数据两部分组成。其中,数据文件是按列独立存储的,也就是说每一个列字段都对应一个文件。不支持并发数据读取。适合一次写入,多次读取的场景。对于处理小批数据的中间表可以使用该引擎。不支持alter操作。

    StripLog支持并发读取数据文件,当读取数据时,ck会使用多线程进行读取,每个线程处理一个单独的数据块。StripLog将所有列存储在同一个文件中,减少了文件的使用数量。

    Log支持并发读取数据文件,当读取数据时,ck会使用多线程进行读取,每个线程处理一个单独的数据库。Log引擎会将每个列数据单独存储在一个独立文件中。

2. MergeTree引擎系列

MergeTree在写入一批数据时,数据总会以数据片段的形式写入磁盘,且数据片段不可修改。为了避免片段过多,ClickHouse会通过后台线程,定期合并这些数据片段,属于相同分区的数据片段会被合成一个新的片段。这种数据片段往复合并的特点,也正是合并树名称的由来。

MergeTree作为家族系列最基础的表引擎,主要有以下特点:

存储的数据按照主键排序:允许创建稀疏索引,从而加快数据查询速度

支持分区,可以通过PRIMARY KEY语句指定分区字段。

支持数据副本

支持数据采样

 

八.常用语句

 

-- 创建表 create table temp (`EventDate` DateTime, `UserId` UInt32) engine = MergeTree() partition by toYYYYMM(EventDate) order by EventDate;  

-- 创建分布式复制表 create table temp (`EventDate` DateTime, `UserId` UInt32) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/temp', '{replica}') partition by toYYYYMM(EventDate) order by EventDate;

-- 物化视图 -- 数据会根据原始表更新 create materialized view temp_v (`EventDate` DateTime, `UserId` UInt32) engine = MergeTree() partition by toYYYYMM(EventDate) order by UserId populate as select * from temp;

-- 可以从 MySQL 中导入数据并建表 -- 注意要指定 order by create table from_mysql ENGINE = MergeTree order by id as select * from mysql('127.0.0.1', 'db', 'table', 'user', 'password');

create table temp_test (`id` UInt32, `creaetd_at` UInt32) ENGINE = MergeTree order by id partition by toYYYYMM(toDateTime(creaetd_at)); insert into temp_test select id, created_at from mysql('127.0.0.1', 'db', 'table', 'user', 'password');

-- 可以从 ClickHouse 其他表导入数据 insert into temp_test select * from remote('127.0.0.1:9000', db, table); insert into temp_test select * from remote('127.0.0.1:9000', db, table, 'user', 'password');

-- 删除/更新数据

-- 该命令是异步的,并不会马上执行

alter table tb_name delete where id > 0;

alter table tb_name update col1=val1, col2=val2 where id > 0;

-- 查看任务进度 select * from system.mutations; kill mutation where mutation_id = 'trx_id';

-- 获取表的分区数据 select partition, name, active from system.parts where table = 'temp';

-- 查看表占用大小 select database, table, formatReadableSize(sum(bytes)) as size from system.parts group by database, table order by database, table;

-- 查看进程等 show processlist; select query_id, user, address, elapsed, query from system.processes order by query_id; kill query where query_id='query_id';

-- 查看连接数量 select * from system.metrics where metric like '%Connection%';

-- 查看集群信息 select * from system.clusters;

-- 优化表分区 optimize table test [PARTITION partition] [FINAL]

-- 系统配置 select * from system.settings; set send_logs_level = 'debug'; -- 修改日志级别,如 trace|debug 等等 set insert_deduplicate = 0; -- 关闭重复数据自动删除,测试数据时关闭会比较好用