干货高能!ClickHouse性能优化,看这一篇就够了

  |   0 评论   |   0 浏览

概述

查询的本质是什么?

所谓查询,说白了,就是在一堆有序或者无序的数据中,按照一定的条件,筛选出我们期望的数据集。

ClickHouse以快著称。它到底有多快?它又为什么快?说到底,还是绕不开它的底层存储逻辑。本文以CllickHouse使用最为广泛的MergeTree引擎为例,来揭开ClickHouse性能的神秘面纱。

我们知道,查有序的数据一定比查无序的数据速度要快。不论是从磁盘里查,还是直接从内存里查,均是同理。

写数据则相反,无序的写比有序的写,速度要更快。但是为了查询有序,我们就不得不在插入的时候对数据进行排序。那么怎么排序才合理?特别是海量数据的时候,如何才能保证这种排序性能是幂等的?尤其是当数据一批一批的插入的时候,有没有一种比较稳定的排序算法来完成这一工作?其实我们很快就能想到类似归并排序的思想。我们先对插入的每一批数据进行排序,然后再将不同批次插入的有序数据进行merge,并保证merge之后的数据有序就行了。那么一下子我们就联系到了MergeTree的底层核心思想。

MergeTree存储原理

为什么在聊性能优化之前,我们会先讨论MergeTree的存储原理。因为一切的所谓调优,都是建立在存储原理之上的。只有搞明白了数据存储的原理,它在内存里是如何排布的,才能有的放驶,做出针对性的优化。而不是在网上随便找到一篇文章就奉为圭臬,无脑使用,如果运用场景不对,不仅没有任何优化效果,相反会带来可怕的负优化。

MergeTree是在merge什么

MergeTree的核心是Merge,合并什么?它是将同一个partition内的多个part进行合并。很多初学者可能弄不清part和partition之间的区别。

part是ClickHouse为了加速插入,引入的一个概念。在ClickHouse中,每个线程的每次插入操作,都会产生一个part,这样如果我们开启了多线程来插入(可以通过max_insert_threads来设置),正常情况下,每次insert动作,肯定是处于不同的线程中的,因此每次插入,至少都会产生一个part。这样并发写入,且各个part之间的数据互不影响,所以可以做到快速的插入。

那么这样的快速插入 必然会带来一个问题:查询怎么办?

有可能我第一次插入的和第二次插入的数据有相近的,甚至数据完全相同都是有可能的。这时候如果直接去查part,那么必然需要扫描所有的part,这时候必然会成为性能的拖累(即使part已经是有序的)。我们可以简单举一个例子如下:

上面有三个part,现在我的诉求是查询出其中>=3且<=5的所有数据,虽然在各个part内部,数据本身是有序的,但是这三个part很不凑巧地都包含了3~5的范围,因此必须每个part都扫描一遍,而实际上,part3是没有任何符合条件的数据的,所以这个扫描其实是无用的操作。如果part数量特别多呢?如果查询的条件再苛刻些呢?那么对整个查询的性能影响已经到了无法忽略的地步。

这时候就体现出merge的作用了。所谓merge,他就是将各个满足同一分区范围的数据合并成一个更大的part。假设上述三个part都处于同一个partition内,那么最终效果如下:

合并完之后,数据成了上面的样子 ,这时候查询就非常简单了。直接二分查找就可以。

那么接下来人们肯定关心的一个话题是,什么时候去merge?怎么去merge?

首先,merge肯定是在后台进行的,而不是数据一插入就立即merge。一种比较普遍的说法是数据插入后8-10分钟后会自行merge。这种说法大抵正确,但不全面。

merge的时机的确是不确定的。它既不能每插一批数据数据就进行一次merge,那样势必会拖慢插入的速度。也不能一直不merge,那样就是前面说的问题,查询开销太大。为了寻找这个平衡点,在ClickHouse里,Merge的算法有一个核心参数,叫做base,它决定了part要不要参与merge。为了讲清楚这个base参数,我们同样来举一个例子。

下面我们以base = 3来举例。

如上图所示, base = 3,代表每3个part 合并成更高一级的part, 我们叫它Level1, 然后3个Level1进一步合并成Level2, 从逻辑层面看上去,就像一棵树。这也就是MergeTree的Tree的由来。

那么ClickHouse里base是如何取值的呢?在ClickHouse里,base初始取值为5,但并不总是一成不变的,如果最小的part长时间得不到合并,那就说明base取值过大,它会适当调小;如果参与merge的part的总大小很小,那么base也会适当减小;当然,part总数本来就比较少,base也会适当减小。

为什么base取值和part的大小还有关系呢?因为并不是part数达到了这个base就会合并,它只是一个准入条件,而不是一个充分条件。他真正merge的时机是要判断 总part大小 / 最大part大小 >= base 才会去合并。

所以在某些情况下,可能存在有些part永远得不到merge的情况。(思考下,什么场景会出现?又该怎么解决?)

partition

上面讲了半天的merge,主要还是针对part而言的。那么parttition又是什么呢?它又是由什么决定的?

partition是一个逻辑上的概念,不像part,是有实物可以看得见摸得着的。我们在建表的时候,可以指定partition by的字段的。由于是使用者自己指定,那么,就有很大的可操作空间。可以说,partition是否合理,很可能会影响到查询和写入的性能。

对于ClickHouse来说,每个part,实际上都是一个文件夹,该文件夹下存储了具体的列存数据,列越多,那么文件也就越多。

part越多,意味着文件越多,也就意味着merge需要更多次才能完成。从上图可以看到,有些合并甚至达到了571次。合并本身是需要消耗CPU和内存的,资源被merge占掉了一部分,那么查询和写入的性能就会受到影响。而且文件数目过多,对文件系统本身来说也是沉重的负担。最重要的,merge不是时时刻刻都在发生的,所以当一个查询语句过来,虽然有部分数据可以在合并后的part中找,但未来得及合并部分的数据,还是要在小part里找。那么part数太多,查询自然就会越慢。

以上是part数太多的坏处。

那么怎样会导致part数太多呢?一个最显而易见的场景就是小批量频繁插入,每次插入都会产生一个part。所以clickhouse的写入,是提倡大批次插入的。测试表明,单批次插入数据上万甚至上百万都是没有问题的。

但是单批次插入数据过多的话,也需要注意一些问题。

首先就是实时性,如果一定要等数据量达到某一个数值才插入,那么当数据源生产较慢时,肯定会带来延迟。但是这个有解决方案,那就是设定一个超时时间,如果超时时间到了,即使没有积攒够一个批次,也插入到数据库中。

第二个问题是大批量插入必然会带来大内存消耗。

第三个问题是如果数据本身不规范,那么大批次数据插入很容易带来"TOO MANY PARTITIONS"的报错。它的意思是说单批次插入的数据中包含了过多的partition(一般是100个)。这个限制也是出于性能考虑,因为即使是在一次插入,它的数据处于不同的partition,那么仍然是落在不同的文件夹里的,如果同时写入的文件夹数过多,那么必然会带来性能的损耗。也就失去了先写part后merge的意义了。

前面提到过,partition中的数据实际上是有序的,那么也就是说,如果查询的语句中带有partition by的字段,可以非常快捷地根据排序字段做二分查找。

如下的例子,现在有三个partition,分别为20231101,20231102,和20231103, 我现在需要查2023-11-03 12:00:00 到 2023-11-03 13:00:00这一个小时的数据,那么前面两个partition其实是可以看都不看,直接跳过的,然后在20231103的分区内做二分查找,就能很快定位到要查询的数据的分布。这样就不需要进行全表扫描了。

sorting key 和 primary key

很多人可能都不知道,MergeTree是有主键的。不过它所谓的主键并不是代表数据唯一,而是作为一个排序的手段。

primary key可以显式指定,也可以隐式指定。

隐式指定就是不声明primary key的字段,那么clickhouse会默认拿order by的字段作为primary key。

但是!

primary key和sorting key还是有区别的。

从概念上来说,primary key是作为索引的,sorting key是用来排序的。

primary key一定是sorting key,但sorting key不一定是primary key。

在排序的时候,primary key一定要放在靠前的位置。

那么,这二者在使用上有什么区别呢?

正常情况下我们是不需要考虑这二者的区别的。但是对于聚合引擎,比如SummingMergeTree和AggregateingMergetree, 分开设置primary key和order by将会有助于优化。为了比较具象的说清楚这二者的关系,下面举个例子:

查询条件: WHERE A
聚合条件: GROUP BY (A, B, C)

此时,这样指定主键和排序字段会比较高效:

PRIMARY KEY(A)
ORDER BY (A, B, C)

排序键是为了上下文一致性,和最大化的压缩比例。而且group by的维度字段在物理上靠的更近的话,查询起来也更高效,这是肯定的。

主键是需要占磁盘空间的。而且基于主键的索引在查询时会被加载到内存当中,如果主键过多,那么内存效率相对来说会非常低下。因此,我们只需要把过滤的列放到主键索引里就可以了,它主要用来快速筛选,缩小数据范围。

还有一点比较重要。现在我们order by (A, B, C )三个维度,假设现在需要新插入一个维度D,那么这个时候,我们想要修改表,变成 order by (A, B, C, D)是非常轻量级的。因为A,B,C本身就是排好序的,所以D一开始肯定是没有数据的,那么更新的成本其实是0。如果我们把主键索引和排序键设置成一样的会怎么样?我们需要修改主键索引的文件,相当于重新构建整个索引文件了,这个成本之大,其实是不可想象的。

所以将这二者分开的最大意义,是为了给修改order by腾出一定的空间。

思考: ReplacingMergeTree去重,根据的是order by,还是primary key?

注意: ReplacingMergeTree去重是根据order by,而不是primary key。

稀疏索引

我们前面提到的primary key, 和我们通常关系型数据库中见到的主键索引有所不同,它是属于稀疏索引。稀疏索引是ClickHouse中一个非常重要的概念。它有别于MySQL这些OLTP数据库的行级索引。行级索引就是一行数据一条索引,每个索引都可以精确定位到一条数据。所以行级索引的跳数通常等于该索引字段去重后的个数。

但是稀疏索引不同,它的跳数由索引粒度决定,即多条数据才会去创建一个索引。MergeTree引擎在建表的时候,默认的索引粒度是8192,也就是说8192条数据才会创建一个索引(当然不是绝对的,它这个粒度大小是自适应的,可以动态调整大小)。

上图描述了根据主键稀疏索引去查找数据的过程,在每个part目录下,会有一个primary.idx文件,存储的是索引信息,根据稀疏索引,可以在col.mrk2文件中找到实际数据在bin文件中的偏移量,然后根据偏移量就可以定位到数据处于哪个颗粒中。当然这个地方画得比较简略,实际逻辑比这个要复杂。

颗粒,是clickhouse查询的最小单位。

稀疏索引的前提是数据必须是有序的。稀疏索引的存在,使得索引的存储成本极小,8192万行数据,只需要1万个索引,这对存储海量的数据是一个巨大的优化。又由于MergeTree引擎的特性,数据本身就是有序存储的,所以查询的效率自然而然就非常快了。

正常情况下,主键索引的查找逻辑是使用二分查找。但也不能一概而论,对于联合主键索引来说,需要看联合索引的顺序。

通过trace日志查看查询执行的逻辑,可以知道,第一关键字列使用的是二分查找算法,而非第一关键字列,使用的则是通用排除搜索算法。通用排除算法的性能要比二分查找低很多,因为它进行的是全表扫描。也就是说,非第一关键字列放在主键索引里,对该字段并不能起到优化作用。

所以,联合主键索引的创建顺序,是和查询场景息息相关的。

那么这种情况应该怎么解决呢?这里先留个悬念,大家也可以先思考一下。

建表优化

合理分区

通过测试发现,单表的分区数量最好不要超过1000个,否则会对写入性能有很大影响。

分区的选择可参考性有很多。假如数据存储的时间较长,比如存三年甚至更久,那么按天分区或者按星期分区或许比较合理。但是如果查询的场景大部分是根据天去查询,那么按天分区也许会比按星期会更好一些。

当然,还要考虑到数据规模因素,如果单日数据增量太大,比如APM的span数据,有些规模比较大的企业,可以达到秒增百万,此时如果选择按天分区的话,那么单表的单个分区可能就会达到数十TB。而实际上的查询场景可能比较频繁的还是查询15分钟,或者1个 小时的数据,那么,这时候就可以考虑按小时进行分区了。

合理选择shardingkey

单节点的clickhouse服务是没有shardingkey的概念的。我们谈论shardingkey的时候,其对象一定是clickhouse集群。

起初,设计shardingkey的意图,仅仅是为了负载均衡,尽量使得数据均匀地分布在把不同的shard内。通过这种方式,充分利用服务器的资源,从而使查询效率最大化。

上例中,分片1有10TB数据,分片2有100G数据,分片3则是800G数据,整体分布是不均匀的。

假设此时一个分布式查询语句过来,真正落到各个节点上执行的,查询的还是本地表,那么由于数据规模的不同,需要消耗的内存和CPU也是不同的。这在服务器资源充裕的情况下可能影响不大,可一旦系统资源本身就吃紧,那么MPP架构的短板效应就暴露无疑。性能最差的那个节点的性能,势必会拖累整个查询的性能。因此,我们需要通过shardingkey来尽可能地使数据均匀地落在不同的分片。

为什么需要shardingkey来做均衡,而不是通过round-robin的方式来写?

我们知道,clickhouse数据的插入是以batch为单位的,只要每个batch相对平均,那么round-robin之后最终得到的每个分片上的数据,始终是相对均衡的。

但是这种方式无法保证业务上的逻辑,它很可能会把有业务关联的数据分拆到了不同的shard里,这对于查询,往往是非常致命的。

接下来我们来考虑下面这一种场景:

我们现在有两张表,并要对这两张表进行关联查询,虽然从数据量上来看,这两张表在shard1和shard2上面分布是均匀的,但是要做join查询,如果本地join,那么得不到我们想要的数据结果,所以只能用global join去查分布式表。这也就意味着,需要先将所有数据先查出来,然后在汇总节点上去做join,这对于查询来说,开销是巨大的。

而如果这时候,我们使用id作为shardingkey, 算法如下:

shardNum = id % clickhouse_shards + 1

当计算出来的shardNum和分片编号相等时,将数据存入对应的分片,那么上述数据就可以重新排布如下:

那么这时候去查询,实际上使用本地join得到结果,再汇总出去,性能提升将会是巨大的。

当然,这种选择shardingkey的方式,也是有些需要去考虑的点的:

  • 如果集群内的某一个分片挂了怎么办?按照这种shardingkey的逻辑,这部分数据不可能写到其他的分片,那么会导致本来该写到该分片的数据永远block住,无法写入,除非集群被修复。
  • 如果集群扩容或者缩容了,那么由于clickhouse_shards发生了变化,计算出来的shardNum和之前的算法算出来是不一致的,那么就需要一个rehash的过程,否则数据就会乱掉。
  • 如果shardingkey选取不合理,也可能造成数据分布不均匀的情况。
  • clickhouse_sinker提供了shardingkey的选择方案,如果指定的shardingkey本身就是一个数值类型,那么直接取余计算。如果指定的shardingkey是一个字符串类型,那么会先通过xxhash64得到一个值,再拿该值去取余计算。
  • ckman提供了按照shardingkey做rebalance的功能,它可以在集群扩缩容之后对已有数据进行rehash,从而保证与后写入的数据保持hash算法一致。

合理选择 order by字段

order by的字段怎么选取,是由具体的查询SQL来决定的。

不同的查询场景,使用不同的order by字段,得到的效果可能差别非常大。

这就要求我们在建表之初,就要考虑好具体的查询场景,从而谨慎决定order by的字段(包括顺序)。

前面提到过,如果不显式指定主键字段,那么order by的字段就会被当成主键字段。

前面还提到过,对于联合主键,第一关键列会用到二分查找,而非第一关键列使用的则是全表扫描的通用排除搜索算法。这也就意味着,有些字段即使放在order by里,其实也是不适合作为过滤条件的。

那通用排除算法就毫无用处吗?

其实也不是的,它对于低基数的排序列,还是有很大的加速作用的。

上例中,cardinality_IsRobot就是一个低基数的列,它在800w数据中一共只有4个值。

这种如何理解呢?

在这里例子中,col1具有低基数。虚线框表示的是granule,那么相同的col1可能分布在不同的颗粒上。这时候,如果我们按照 col1 = 'v1' and col2 = 'w3'去做查询。

假设order by 顺序为:

ORDER BY (col1, col2);

此时col1是第一关键列,那么按照col1做二分查找,反而一个颗粒都过滤不掉,到了col2, 需要做全表扫描。我们把整个过程分成两个阶段来看:

第一阶段: col1做二分查找, 4个granule,过滤完,还是4个granule
第二阶段: col2做通用排除搜索,至少得扫描到第三个granule的时候,才能停止(因为col2也是排过序的,所以看到有w4,说明后面肯定不会有w3了)
扫描的granule一共为 4 + 3 = 7个轮次。

如果我们改一下order by的顺序:

ORDER BY (col2, col1);

我们仍然当成两个阶段来分析:

第一阶段,根据col2做二分查找, 4个granule立即缩减到2个(索引2和索引3)
第二阶段,根据col1做通用排除搜索,仍然需要扫描2个granule
扫描的granule一共为 2 + 2 = 4个轮次。

上面通过举例的方式讲解了order by的顺序对查询效率的影响。但实际使用时场景肯定要比这复杂得多,因为不太可能所有设计者一开始就能预料到所有的查询场景,随着业务的不断进化,SQL的查询场景也是越来越复杂,那么这个时候,为了提升性能,势必要修改order by的字段(或者顺序),这对于MergeTree来说,是一个非常重甚至是危险的操作,它可能导致数据在一定的时间内无法对外提供服务。

此时我们有几种解决方案:

  • 额外建一张表,使用新的order by字段

    • 这种方案的弊端在于需要冗余一倍以上的存储空间,而且表的数据需要双写,一旦有哪一个环节出问题,那么得到的结果就不是我们期望的。而且要修改表,需要同步修改两张表
    • 对于用户不透明,在场景A需要查A表,在场景B需要查B表,对于使用者来说,十分不友好
  • 创建物化视图,在物化视图里使用新的排序逻辑

    • 同样需要占用一定的额外空间,但相比于重新建一张表,会好很多,表的schema修改也需要同步进行,好处是数据无需双写,只需要写一份数据就行了
    • 对用户同样不透明,场景A需要查A表,场景B需要查物化视图
  • 创建projection

    • 最轻量级,所占的存储空间也是最少的,projection本身就是属于表的一部分,所以无需双写,也不用考虑同步修改表的问题
    • 对用户透明,projection内部会自动选择最优的版本进行查询。
    • projection的最大问题是,不那么容易被命中,如果WHERE条件中加入了projection中未声明的字段,那么projection并不会走到。

二级索引

所谓二级索引,是MergeTree在主键索引之外提供的另一类索引类型。它是作用在每个granule之上的,所以是先有主键索引,然后才能谈论二级索引的使用。

官方提供了很多中二级索引类型,通过这些索引,我们可以更快速地对整个颗粒做统计、过滤、去重等操作。

下图给出了常见的二级索引在SQL查询加速优化方面的范围列表。

二级索引虽然比较多,但其中使用比较广泛的场景是全文搜索。

首先给出结论,clickhouse的全文搜索能力,与ES相比,还是有很大差距的。

clickhouse使用二级索引做全文检索的核心思想就是:利用布隆过滤器,快速筛选掉大部分不符合条件的颗粒,然后在符合条件的少部分颗粒中进行查询。

布隆过滤器的原理就不多做解释了。它提供了一个长度,hash函数的个数,以及一个种子值。通过布隆过滤器可以判断一个字段值一定不存在,而不能判断一个值一定存在。

因此,对于IN, LIKE的查询有优化效果,但对于 NOT IN, NOT LIKE则无优化效果。

要想减小假阳性产生的概率,那么bitmap的长度要足够大,hash函数的个数决定了计算出来的hash值重复概率,hash函数越多,计算出来的hash值重复的可能性就越小,但是占用的bitmap空间也会越多,当bitmap长度一定,hash函数太多,那么极可能最终bitmap里全是1,那么对于过滤就毫无益处了。

bitmap足够大,hash函数足够多,理论上是可以保证足够小的假阳率的。但是二级索引也是需要占用存储空间的,而且hash函数越多,计算也就越复杂,耗费的CPU也会越多。因此,二级索引说白了,其实就是拿空间换时间。

我们说回全文检索。比较适合做全文搜索的二级索引有三个,分别是tokenbf_v1, ngrambf_v1, 以及inverted。我在ClickHouse利用跳数索引加速模糊查询一文中详细介绍了三者的区别。概括起来就是:

tokenbf_v1ngrambf_v1inverted
分词方式按非字母数字的特殊字符做分词按长度做分词按非字母数字的特殊字符做分词
索引原理布隆过滤器布隆过滤器倒排索引
特殊字符过滤不支持支持不支持
汉字过滤不支持支持不支持
生产可用实验性质,不推荐
占用存储空间适中和n长度有关,n越大,占用空间越少,n越小,占用空间越大

tokenbf_v1:

ngrambf_v1:

inverted:

个人比较推荐使用ngrambf_v1去创建二级索引。但是ngrambf_v1的长度n的选取非常讲究。

如果搜索的关键字的长度小于n,那么这个索引是不会被命中的。比如n = 10, 但是条件为where value like '%test%',那么这个索引边并不会生效。所以这就决定了n的值不是越大越好。测试表明,n的值也不是越小越好。因为当n足够小时,那么拆分出来的token就会越多,填充到bitmap的hash值也就会越多,那么假阳率也就越高,这时候反而什么都过滤不出来。

另有一点需要注意的是,二级索引能够提升查询效率的前提是,主键索引要创建得合理。

假设有如下场景:我们需要根据visitor_id = '1001'来做过滤。

布隆过滤器设置得再好,它也是针对颗粒而言的,在上例中,1001这个数,本身在所有的颗粒中都存在,所以即便是布隆过滤器的假阳率接近0,也是无法过滤掉任何一个颗粒的。但实际上呢,可能1001只是颗粒中8192行里的某一条或几条数据。那么这种场景,二级索引也是没有任何优化作用的。

利用物化视图和投影

前面我们已经提到过物化视图和投影在修改order by上的场景应用。

但物化视图和投影的更大用处是用于聚合查询的预聚合。projection相比物化视图来说更加轻量级,但从效果上来说,毕竟还是要比物化视图稍微差一点。这个很好理解,projection操作的毕竟还是原表,而物化视图把数据完全独立出来了,查询的时候是不需要在原表进行搜索的。

关于物化视图和projection的一些使用技巧。

举个交易串联的场景的例子来说明一下。交易串联场景,就是按照交易ID进行预聚合。我们这里使用物化视图来演示。

先创建本地表和分布式表:

CREATE TABLE IF NOT EXISTS tran_log ON CLUSTER abc (
    ts DateTime64(3),
    tranId String,
    serviceCode String,
    message String
) ENGINE=ReplicatedMergeTree
PARTITION BY toYYYYMMDD(ts)
ORDER BY (ts, tranId, serviceCode);

CREATE TABLE dist_tran_log ON CLUSTER abc AS tran_log ENGINE = Distributed(abc, default, tran_log, rand());

这里提供两种创建物化视图的风格。

风格1:物化视图直接与本地表关联:

CREATE MATERIALIZED VIEW mv_tran ON CLUSTER abc
ENGINE = ReplicatedAggregatingMergeTree() ORDER BY (tranId)
POPULATE AS SELECT tranId, uniqExactState(ts) as logCount FROM tran_log GROUP BY tranId;

CREATE TABLE dist_mv_tran ON CLUSTER abc AS mv_tran ENGINE = Distributed(abc, default, mv_tran, rand());

风格2:创建一张聚合表,物化视图作为聚合表和本地表的桥接:

CREATE TABLE agg_tran ON CLUSTER abc (
 tranId String,
 logCount uniqExactState(DateTime64(3))
) ENGINE=ReplicatedReplacingMergeTree()
ORDER BY transId;

CREATE TABLE dist_agg_tran ON CLUSTER abc AS agg_tran ENGINE = Distributed(abc, default, agg_tran, rand());

CREATE MATERIALIZED VIEW mv_tran ON CLUSTER abc
TO agg_tran AS
SELECT tranId, uniqExactState(ts) as logCount FROM tran_log GROUP BY tranId;

CREATE TABLE dist_mv_tran ON CLUSTER abc AS mv_tran ENGINE = Distributed(abc, default, mv_tran, rand());

风格2相比于风格1,优势在于可以在运行期暂停和重启与原始表的连接关系。

华为云ClickHouse提供的Adaptive MV 提供了类似Projection的透明物化视图。

我们准备一些数据,在各个shard上执行(故意引入一些重复数据):

INSERT INTO tran_log VALUES 
('2000-01-01 00:00:00.000', '001', 'open', 'open browser') 
('2000-01-01 00:00:01.000', '001', 'buy', 'buy a ticket') 
('2000-01-01 00:00:02.000', '001', 'close', 'close browser') 
('2000-01-01 00:00:00.000', '002', 'open', 'open browser')
('2000-01-01 00:00:01.000', '002', 'buy', 'buy a ticket')
('2000-01-01 00:00:02.000', '002', 'close', 'close browser')
('2000-01-01 00:00:00.000', '003', 'open', 'open browser') 
('2000-01-01 00:00:01.000', '003', 'buy', 'buy a ticket') 
('2000-01-01 00:00:02.000', '003', 'close', 'close browser');

我们使用分布式表聚合查询,和直接查询物化视图,得到的结果均一致:

SELECT tranId, uniqExact(ts) AS cnt FROM dist_tran_log GROUP BY tranId;
SELECT tranId, uniqExactMerge(logCount) AS cnt FROM dist_mv_tran GROUP BY tranId;
┌─tranId─┬─cnt─┐
│ 003    │   3 │
│ 002    │   3 │
│ 001    │   3 │
└────────┴─────┘

注意上述物化视图的使用,创建物化视图时,使用了uniqExactState, 而查询时,使用了uniqueExactMerge, 这种用法并不特殊,ClickHouse所有的聚合函数几乎都可以这样使用。

首先我们要知道,不管是物化视图,还是projection,都是针对单个part而言的。

这也就意味着,如果需要聚合折叠的数据位于不同的part,那么查询的结果很有可能是不准的。

如上图所示,由于uniqExact只能在part内去重,所以得到的最终结果,仍然包含有重复的数据,并不是我们所期望的结果。

而uniqExactState存储的并不是去重后的数据,而是一个去重后的状态,这个状态是不能明文读取的。只有当使用uniqExactMerge去查询时,它得到的才是一个part合并之后的数据。

查询SQL优化

常规查询优化

prewhere

prewhere就是先只读取执行prewhere表达式所需要的列,然后再补全读取select所需要的其他列。它的作用就是在查询之前,提前过滤掉一部分的数据。

现在prewhere已经不需要显式指定了。clickhouse优化器会默认将where条件移到prewhere中去执行,所以写where和prewhere效果是一样的。

谓词下推

所谓的谓词下推,就是尽可能地将过滤条件对数据源操作,使参与查询的数据越少越好。

用在SQL上,就是先过滤,后聚合。

举一个例子:

SELLECT count() FROM A JOIN B ON A.id = B.id WHERE A.a > 10 AND B.b < 100;

上面这个查询,它会先join,然后从结果集里去执行WHERE条件的过滤。

这时候如果我们将WHERE条件放到子查询中,先过滤,再JOIN:

SELECT count() FROM 
(SELECT * FROM A WHERE A.a > 10) AS A1
JOIN
(SELECT * FROM B WHERE B.b < 100) AS B1
ON A1.id = B1.id

这样参与join的数据就会一下子少很多。

聚合外推

这个比较好理解,就是先聚合,再统一计算。

如 sum(money * 2), 那么需要计算N次,再进行N次sum; 而sum(money) * 2, 只需要N次sum后计算1次。

count优化

使用count()代替具体的count(col_name), 原因是count(col_name)需要实际去计算一遍,但是count()的话,底层文件是有一个count.txt文件的,直接读取文件数据即可。

大表join

clickhouse的join查询在OLAP场景似乎是不可避免的。尤其是clickhouse这种MPP去中心化架构,分布式join也越来越成为很多企业绕不过去的一个话题。那么如何才能高效的做到大表join?

首先,我们要知道clickhouse里表join的原理。

单节点join

clickhouse单机join默认采用的是hash join算法,也就是说,先将右表全量读取到内存,然后构建hashmap,然后从左表分批读取数据,到hashmap中去匹配,如果命中,那么就作为join之后的结果输出。

因为右表是要全量加载到内存的,这就要求要足够小。但是实际上右表大于TB级是很常见的事情,这时候就很容易出现OOM。

为了解决这个问题,有一种解决思路是将右表构建成大宽表,将维度拍平,使行尽量少,这样右表只需要加载少量的列进内存,从而缓解这一情况。

一个典型的落地实施案例就是clickhouse_sinker存储Prometheus指标的方案。它将指标分拆成两张表,一张metric表,用来存储时序指标值,一张metric_series表,用来存储具体的指标。两张表通过series_id进行关联。

在metric表中,每个时间点的每个值,都对应着一个series_id,我们通过这个series_id,反查metric_series表,就可以找到这个series_id对应的指标以及label。

分布式join

clickhouse的分布式join有两种玩法,一种是带global,一种是不带global。

普通join

  • 汇总节点将左表替换为本地表
  • 将左表的本地表分发到每个节点
  • 在每个节点执行本地join
  • 将结果汇总回汇总节点

这种做法有一个非常严重的问题。在第三步,每个节点执行分布式join的时候,如果右表也是分布式表,那么集群中的每个节点都要去执行分布式查询,那也就是说,如果集群有N个节点,右表查询就会在集群中执行N*N次,这就是读放大现象。

正是因为这种问题的存在,在比较新的clickhouse版本中,分布式join如果不加global,已经会从语法层面报错了。这说明官方已经禁止了这种写法的存在。

global join

  • 汇总节点将右表改成子查询,先在汇总节点将右表的数据结果集查询出来
  • 将右表的结果集广播给各个节点,与各个节点的左表本地表进行join查询
  • 各个节点将查询结果发送给汇总节点

由于右表的结果已经在汇总节点计算出来了,那么也就不需要在其他节点重复计算,从而避免了读放大的问题。

但global join 的问题是它需要将整个子查询的结果集发送给各个节点,如果右表的结果集特别大,那么整个过程耗费的网络带宽也将是非常恐怖的。

正确姿势

  • 大表在左,小表在右
  • 数据预分布,实现colocate join。也就是将涉及到join的表按照相同的join key分片,使需要join的数据尽量都能在本地完成。也就是前文提到的shardingkey的用处。
  • 改大宽表

并发查询

并发查询也是clickhouse的一个比较薄弱的领域。

因为对于clickhouse的查询来说,一个大的查询SQL往往会把整个服务器的资源吃满,如果并发查询比较多的话,那么不可避免地造成资源竞争,最终的结果就是谁也快不了,甚至还会出现OOM的情况。

官方默认的最大并发数是100, 这个100是每个节点的最大并发数,而不是整个集群的。它包含了查询的并发和写入的并发。

我们很容易碰到这样的场景:在界面上点击一个查询耗时比较久,等得不耐烦,就多点了几下。事实上,clickhouse并不会因为你在界面上多点了一下鼠标,就取消之前的SQL运行,反而会产生多个SQL在并发执行,如果这种耗时比较久的SQL越积CPU打满,造成的结果就是恶性循环,其他SQL也会越来越慢,最终导致并发的SQL超过了设置的最大并发数,再也无法执行任何查询语句,甚至写入都会受到影响。

我们一般建议将最大查询并发数,设置为最大并发数的90%,预留出10%的并发数量供数据写入使用。这样即使查询并发数打满了,仍然不会影响到数据的写入。

我们可以通过配置来实现这一策略:

<clickhouse>
    <max_concurrent_queries>100</max_concurrent_queries>
    <max_concurrent_select_queries>90</max_concurrent_select_queries>
</clickhouse>

读写分离

读写分离,其实也是资源最大化利用的一个实现方案。

写的节点和读的节点互不干扰,彼此都能完整利用服务器的资源。比如插入的时候插入1,3,5节点,查询的时候查询2,4,6节点。

硬件优化

磁盘选择

推荐做RAID5磁盘阵列。

为什么推荐raid5, 首先是快,其次是有容错能力。raid0虽然快,但无容错能力,一块盘坏,数据全无。raid1虽然有冗余能力了,但是性能下降了。而raid5兼顾了二者的优点。

当然有条件的推荐直接上SSD,那就是快上加快。现在国产存储逐渐展露头角,固态硬盘也不贵,一块1TB的固态硬盘,成本也就三四百块钱。

有人说,我有多块磁盘,不做raid行不行?从技术实现上来说,肯定是可行的。但带来的就是性能的牺牲,做成了raid,可以统一索引,而配置多块盘到default卷,如果要查询的数据跨盘了,那么需要在不同的磁盘上都构建一遍索引。这个性能差异在数据量不大的情况下,可能肉眼感受不出来,如果数据量达到了一定的量级,差异还是有的。而且,冗余能力肯定是没有的,如果其中一块盘坏了,那么数据能不能恢复,就要看人品了。

我个人比较推荐固态硬盘和机械盘搭配使用。固态磁盘做default磁盘,机械磁盘可以做raid5。热数据,经常会被查询的数据(比如近两三天的数据),存储到固态磁盘,这样不论是写入,还是查询都非常快,超过一定时间的数据,可以通过存储策略转移到机械磁盘上,因为查询频率并不怎么高,那么受到的影响就会有限。

磁盘的io决定了数据的读写速度。在大数据量场景,机械磁盘和固态磁盘的查询和写入性能差异还是非常大的。测试环境、POC环境,如果条件有限,可以用机械磁盘,生产环境,特别是对响应要求比较高的生产系统,一定要上固态,最好是一步到位直接使用NVMe。

生命周期管理

前面提到磁盘,那么就不得不提到存储策略。而存储策略和表的生命周期往往是相伴相生的。

生命周期对表的优化体现在两个方面。

第一个是实现数据的冷热分离。就如前面提到的,把时间比较近的,查询比较频繁的数据存在固态磁盘,这样使查询和写入都有一个最佳性能。但固态硬盘的成本毕竟较贵,不可能无限扩容,因此,可以将一些比较查询不那么频繁的温数据存放在机械磁盘中。而那些一个月以上的数据,可能一年也查询不了几次,主要用来做归档用的,就可以存储在HDFS或者S3这类对象存储中,其特点是不利于查询,但是存储空间够大,成本便宜。

但是有些使用场景可能并不需要存储那么长时间的数据,也没有归档的硬性要求,这时,就可以利用生命周期来做删除策略,将某短时间时间之前的数据直接删除掉,这样保持表里永远只存储一定时间段的数据,那么数据规模也将维持在一个固定的范围,不会无休止增长,查询性能相对来说也会比较稳定,不会有太大的波动。

需要注意的是,生命周期的前提是数据的插入是有序的。比如实时的时序数据。
如果插入的数据无序,又设置了生命周期,那么可能出现的一种情况就是数据刚刚插入,生命周期就生效,立即就给删除了,除了徒然增加资源消耗,这部分数据永远无法查询到。
而且这种情况一旦出现,其时间跨度一定很大,必然落在不同的partition里,那么数据的频繁写入、删除,必然给zookeeper带来非常大的压力,很容易造成表出现READ_ONLY的情况。

集群规模

ClickHouse是MPP架构。MPP全称叫海量并行处理架构。这种架构的集群,每个节点的地位是相等的,也就是说,每个分片内,各个副本都是主,既可以读,也可以写。它最大的特点就是share nothing,也就是每个节点只访问本地的资源。

也就是说,一个查询语句落到集群里,它是先在各个节点进行查询、聚合、汇总,最后汇聚到汇总节点统一返回。

这就很容易得出结论,当CPU或者内存成为瓶颈时,扩容集群的分片数,是可以有效加速查询效率的。

我曾经给很多客户做过集群资源评估,推荐过集群硬件配置。需要考虑很多因素,比如数据日增量,存储时间,查询场景(如查询多长时间的数据)等。数据日增量决定了建表的分区策略,存储时间和日增量就能计算出数据总规模(当然要考虑压缩),这决定了硬盘容量需要多大。正常情况下需要冗余1.5倍左右的磁盘空间, 因为数据被merge之后,原始part的数据并不是立即被删除的,所以大多时候经常是两份数据共存,那么如果实际数据是1TB(压缩后),那么实际占用的磁盘肯定是不止1TB的。

查询场景决定了CPU和内存资源的占用,如果查询的数据量大,那么对应需要占用的CPU和内存自然就要更多。此时,要么优化查询语句以及建表语句,要么就只能使用更多的资源。

读写性能瓶颈

此处讨论的瓶颈,是指表和SQL都已经做到了最大化的优化,硬件以及第三方原因带来的性能瓶颈。

写性能瓶颈

zookeeper

写数据的瓶颈,绝大部分都是zookeeper引起的。特别是insert分布式表的时候,它的数据会先全部insert到某一个节点上的一个临时目录,然后通过zookeeper按照sharding policy进行分发到各个分片节点,同时副本节点需要从主节点上去拉取数据,走的都是zookeeper。

因此,我们不推荐写分布式表,一来是给单节点带来非常大的负担,二来给zookeeper带来很大的压力。第三就是无法保证数据在物理上和业务逻辑上的均衡。

zookeeper的瓶颈主要在于znode的数量,如果znode的数量过多,会非常影响性能。经测试,当znode数量达到M级别(百万)时,插入性能就已经非常缓慢了,严重的甚至可能造成zookeeper失联,从而使表进入read_only状态。

znode数量与什么东西有关呢?

首先是part的数量。每个part在zookeeper上都有副本数 * part数个znode。通俗点计算,N个节点,M个part,那么znode的数量就会无限接近于M* N。因此,如果插入的batch太小,数据来不及merge,那么就会产生非常多的part。

还有一种可能就是replica_queue过多。这种情况一般是merge任务已经分发到了各副本节点,但是副本节点来不及合并,那么这个任务都阻塞在队列里,队列越来越长,就会导致znode越来越多。

之前笔者就曾碰见过,zookeeper上log_pointer数据意外丢失,造成数据一直没有同步,但是数据又一直在写,导致replica_queue越积越多,znode达到了150w之巨,最终导致了zookeeper的不可用。

磁盘IO

磁盘IO很好理解,固态硬盘的写入速度肯定是远远大于机械硬盘的。如果发现CPU一直没吃满的情况下,写入速度怎么都上不去,那么很有可能就是磁盘IO限制了写入的性能了。这时,就需要考虑更换性能更高的固态硬盘了。

网络带宽

网络带宽的瓶颈一般出现在副本同步数据上。因为数据插入,肯定是插入到分片的某一个副本上的,那么其他副本要同步数据,必然要通过网络传输的方式来同步,那么当写入速度足够快,网络带宽必然会成为数据同步的瓶颈。一般生产环境,推荐使用万兆网卡。

读性能瓶颈

磁盘IO

查询,说白了就是扫描磁盘 。因此读磁盘的快慢就决定了查询的快慢,这点是显而易见的。

网络带宽

分布式查询,需要将数据结果从各个节点汇总到初始节点,这个过程是需要占用非常大的带宽的。如果涉及到分布式join,那么带宽将会占用更多。

内存和CPU

MPP架构决定的,在本地查询,充分利用本地资源。当我们一个查询SQL运行比较慢,通过增加节点能有明显改善的时候,那说明瓶颈就在CPU上。如果内存不足,直接就OOM了,就不会有执行完成的机会。

优化推荐

硬件层面

  • 优先使用固态硬盘
  • 使用raid5磁盘阵列
  • zookeeper单独部署,不要与其他业务混用
  • 为保证单节点资源利用最大化,不建议集群混合部署,一个节点仅部署一个服务

使用层面

  • 建表时合理分区
  • 合理指定主键字段,排序字段
  • 合理创建二级索引(全文检索场景)
  • 合理使用物化视图和projection(预聚合场景)
  • 选择好shardingkey(方便Colocate join)
  • 尽量使用子查询代替join
  • 配置读写分离
  • 配置生命周期,冷热数据分离或清理策略

参考文档

原文链接https://zhuanlan.zhihu.com/p/667086347


标题:干货高能!ClickHouse性能优化,看这一篇就够了
作者:michael
地址:https://blog.junxworks.cn/articles/2024/05/11/1715396614044.html