一、背景介绍
数据驱动理念已被各行各业所熟知,核心环节包括数据采集、埋点规划、数据建模、数据分析和指标体系构建。在用户行为数据领域,对常见的多维数据模型进行信息提炼和模型整合,可以形成一套常见的数据分析方法来发现用户行为的内在联系,能更好洞察用户的行为习惯和行为规律,帮助企业挖掘用户数据的商业价值。
行业内最早可追溯到Google Analytics埋点分析工具,国内较早开始这方面研究的是百度大数据分析平台;随着15年后国内大数据兴起,神策的用户行为分析平台、GrowthingIO的增长平台等独立数据分析平台公司相继成立;18年后一些发展较快的大厂经过几年数据积累也有了自己的分析平台,例如美团点评的Ocean行为分析平台、字节的火山引擎增长分析平台等等。
只有当数据达到一定规模才更适合用科学化的方法来提升数据分析效率,如前面所述,虽然Google和百度在这块最早探索,但后面一些互联网公司也是过几年才有自己的产品,即数据产品的发展需要与实际数据规模和业务发展相符。B站最早从19年开始关注大数据建设,到现在已经有一套较为成熟的数据产品——北极星,可以实现对用户行为数据进行埋点采集、埋点测试、埋点管理、行为数据分析等功能。行为数据分析平台主要包括下图所列功能模块,本文介绍主要模块原理和相关技术实现。
二、技术方案演进
北极星用户行为分析(User Behavior Analysis, UBA)模块自19年以来主要有三波迭代。
这个阶段主要任务是功能实现,根据用户前端查询参数,提交Spark Jar作业等待返回结果。不同的分析模块对应不同的Spark Jar作业,也对应不同加工好的用户行为模型。数据架构如下图所示:
虽然在一定程度上可以完成功能实现,但存在明显弊端:
-
部分模型化:用户维度信息需要提前加工到模型表中,后面不易变更和运维,且早期分析模型设计不支持私参查询,即明细数据信息只保留了一部分;
-
资源自适应问题:Spark Jar任务每次启动都需要通过YARN单独申请资源,不同查询条件对应的任务计算复杂度不同,但任务资源参数固定,一方面资源的申请和调配就需要花费较长时间,另一方面不能动态适应任务复杂度,就算维护一个常驻内存的 SparkSession供查询任务调用,也没法解决根据查询任务的资源自适应问题;
-
并发受限:同一时间段查询的请求太多,后面请求一直会等待前面请求对应Spark任务释放所占用资源,且资源未隔离,会影响其他正常ADHOC查询。
在实际使用中计算时间太长,单事件分析需要超过3分钟返回结果,漏斗和路径分析需要超过30分钟返回结果,导致产品可用性极低,查询稳定性和成功率不是很高,使用人数不是很多。这个阶段的埋点管理和上报格式未完全规范化,所以重点还是做后者。
ClickHouse是Yandex公司于2016年开源的一个列式数据库管理系统。Yandex的核心产品是搜索引擎,非常依赖流量和在线广告业务,因此ClickHouse天生就适合用户流量分析。B站于2020年开始引入ClickHouse,结合北极星行为分析场景进行重构,如下图所示:
这里直接从原始数据开始消费,通过Flink清洗任务将数据直接洗入ClickHouse生成用户行为明细,可以称作无模型化明细数据。Redis维表被用来做实时用户属性关联,字典服务被用于把String类型的实体ID转成Bigint,利用ClickHouse原生的RoaringBitMap函数对参与计算的行为人群交并差集计算。这一代实现了实时埋点效果查看,上线以来北极星产品周活人数提升了300%以上,相对于前代,性能有较大提升:
-
查询速度极大提升:90%事件分析查询可以在5秒内返回查询结果,90%的漏斗查询可以在30S内返回查询结果,速度提升达到98%以上;
-
实时性查询:可以对当天实时的用户行为数据进行分析,极大的增加了用户获取分析结果的及时性。
但本身这种性能提升是以资源消耗为前提的。以移动端日志为例,Flink消费任务峰值可以达到百万条每秒,对Redis维表关联和字典服务处理挑战很大,计算并发度甚至达到1200core,遇到特殊流量事件往往出现堆积、延迟、断流,对人工运维成本消耗也较大。此外这种Lambda数据流架构,实时和离线清洗逻辑需要保持一致,否则很容易导致数据解释成本提升。另外本身实时+离线维护两套对存储上也是极大浪费,即Kafka、Hive、CK都需要存储同一份数据。到21年底,随着业务发展CK存储几经横向扩充剩下不到10%,而集群的扩展和数据迁移也需要较大精力,本文后面小节会详细介绍。功能方面,直接对明细数据应用原生CK函数查询的跨天留存分析、路径分析需要用时分钟级,体验不是很好。
22年开始公司大力推动降本增效,这就要求以尽可能少的资源最大化行为分析产品效能。整体核心思路是全模型化聚合加速,底层流量数据链路走kappa架构,不会再用北极星应用数据和流量表不一致的情况,数据小时级产出。这次改造实时资源可以节约1400core,节省Redis内存400G、节省Kafka300 Partiton。每天数据量由千亿数据降低为百亿,通过特定的sharding方式配合下推参数,利用分区、主键、索引等手段支持事件分析(平均查询耗时2.77s)、事件合并去重分析(平均查询耗时1.65s)、单用户细查(平均查询耗时16.2s)、漏斗分析(平均查询耗时0.58s),留存分析和路径分析从分钟级查询到10s内相应。数据架构如图所示:
拥有以下特点:
-
全模型聚合:21年中开始我们就设计了一款通用流量聚合模型,可以认为是全信息的hive流量模型结构,除了把时间维度退化外其余信息基本能完整保留,原来千亿级的量级可以压缩为百亿内;
-
BulkLoad出仓:数据按文件批次从HDFS导入到ClickHouse,千亿级别的数据一小时内可以导完,其原理后文会有介绍;
-
字典服务升级:我们通过加强版的snowflake+redis+公司自研rockdbKV存储,大大增强了字典服务性能,压测可支持40万QPS;
-
用户属性现算模式:不再采用预计算模式,而是通过我们另一套基于CK的标签平台所生成的指定用户标签人群跨集群关联现算,这样可以灵活指定想要分析的用户属性。
到22年中,随着数据湖的兴起,我们将hive流量聚合模型迁移到Iceberg上,日常事件查询可以在10s内完成,可以作为CK数据的备用链路。这条链路不光降低了紧急事件运维成本,提升数据可用性保障,还可以支持用户日常流量关联其他业务定制化查询取数。通用的模型结构除了支持流量行为日志外,通过映射管理可以快速接入其他服务端日志,扩展其使用的场景。下图为22年12月份最近一周各功能模块使用情况:
从发展历程来看,用户行为数据分析经历了从强离线引擎驱动到强OLAP驱动,离不开业界大数据技术不断发展和进步,北极星行为数据底层明细后面也会切换到Hudi,可以满足更加实时的数据消费,让专业的工具做专业的事。
三、事件、留存分析
事件分析是指对具体的行为事件进行相关指标统计、属性分组、运算、条件筛选等操作,本质上是分析埋点事件的用户触发情况以及埋点事件的分析统计情况。留存分析可以根据业务场景以及产品阶段的不同,自定义起始行为和后续行为做留存计算,协助分析用户使用产品的粘性,根据留存分析结果有针对性地调整策略,引导用户发现产品价值,留住用户,实现用户真实的增长。
过去北极星分析平台的分析模块大多以B站的千亿明细行为数据为基础,通过ClickHouse查询引擎的指标函数例如uniq(),可以支持单个事件分析、多个事件的对比分析以及多个事件的复合指标运算,支持指定时间内的行为留存分析(参与后续行为的用户占参与初始行为用户的比值),通过筛选、分组等组件满足多样化分析需求。但是过去的北极星事件分析是基于明细数据,B站行为数据每天增量千亿级别,存储日增10T以上资源消耗巨大,明细数据分析查询比较慢,每天用户慢查询平均30s~50s体验较差,而且其功能比较单薄,只能支持30天的查询窗口,用户留存、用户分群等复杂分析模块很难实现。而且海量行为数据分析也面临许多挑战,每天千亿行为数据,高峰期写入QPS百万以上。如何实现既满足时效性又满足海量数据压力的计算方式?如何满足复杂分析场景的同时,压缩存储提升查询效率?如何简化数据链路,模块化插件化降低接入成本,提升扩展性?如何打通标签、ABTest等其他业务系统将北极星的行为分析能力标准化?
北极星事件分析:
为了解决以上痛点和海量数据分析的挑战,新的事件、留存分析通过准实时方式建模分层,用户、事件、时间等粒度的预聚合压缩,不仅统一了离线口径,而且自研拉宽汇聚spark脚本可以承载千亿数据压力,搭配多种聚合模型实现丰富的分析模块。同时释放实时资源离线小时任务保证时效性,维表压力采用join离线维表+属性字典维度服务的方式解决,并且早于平台自研可指定shard的BulkLoad出仓工具,配合下推参数可加速查询,数据链路可扩展易运维。相比较以往的处理千亿明细数据,准实时在DWB层实现了对数据的压缩,将每天千亿数据压缩到每天百亿级别。OLAP层也通过汇总后的数据替代了原先的明细数据,大大缩小存储的同时也提高了查询性能,每天用户慢查询可降到10s以内,时间窗口可扩大到45天甚至更长。并且对高复杂的查询比如用户留存,用户分群等分析场景可以更好的支持。
事件分析数据开发流程:
具体实现包括以下核心部分:
1、流量聚合模型创建。首先准实时清洗DWD层B站千亿明细行为数据,流量数据都是分为私有参数和公有参数,其中公有参数在用户粒度下是不会经常改变的,我们会用一般聚合函数取一定时间内指定设备和行为事件下最新保留的不变公有参数,而将同等粒度下变化比较频繁的私有参数维度名写入Array结构,利用map索引原理,把私参维度值组合通过spark自定义逻辑计数并入map的key中,map的value则用来写入各种公共指标聚合结果,整个过程均通过spark脚本实现,最终写入到Iceberg引擎中。因为Iceberg可以关联其他任何已有hive表,通过快速业务表关联也可以支持到其他多项业务应用,也可以作为不出仓的北极星降级备用方案支持大部分查询分析功能。
流量聚合模型数据方案:
2、流量聚合模型在iceberg下查询。 如下图所示,聚合之后的数据形成DWB层落地到iceberg表(即图中iceberg_bdp.dwb_flow_ubt_app_group_buvid_eventid_v1_l_hr),可以在hive和spark上计算大部分查询维度下的指标。利用Trino基于连接器实现了存储与计算分离,通过map_filter、array_position等trino条件函数和map_values、reduce等trino指标函数可以实现一系列复杂事件分析,当然我们也配套开发了一些简单易用的UDF可以绕开较复杂的trino函数组合供用户查询使用,性能上相差不大。
3、公参和私参筛选器创建。接下来我们利用BulkLoad出仓脚本将iceberg数据导入ClickHouse表(即图中polagrou.polaris_dwb_flow_ubt_group_buvid_eventid_pro_i_d_v1),即保证了时效性又兼容了特殊的数据结构。从ClickHouse表结构设计上支持了SAMPLE BY murmurHash3_64(buvid)的抽样功能,由于buvid(设备id)分shard写入可以保证单节点的数据随机分配,只要在单节点上做抽样配合ReplicatedReplacingMergeTree引擎就可以实现了ck to ck的物化筛选器,直接为北极星分析平台提供公参维度聚合、私参枚举排序的维度筛选功能。整个过程直接在可支持调度的python脚本上实现,可支持到近小时更新。
4、流量聚合模型在ClickHouse下查询。在ClickHouse查询上设计特定的CK-UDF来解析嵌套map结构,保证复杂分析场景的同时用于加速了查询,相比用ClickHouse原生多个函数组合解析要快30%左右,比原先明细模型的查询要快更多。而且通过脚本实现了多维度的ClickHouse小时级别的机器人监控告警,早于平台对此定制化监控告警的支持。
目前北极星分析平台平均查询耗时3.4s,通过通用聚合模型,下游可以对行为人群进行交并计算实现标签画像和人群圈选等转化分析功能,也可以利用Retention函数实现了N日的事件留存分析。最终相比前代方案节省计算资源1400C、节省存储资源40%,提升查询效率60%以上,利用RBM实现了北极星、标签、ABTest等多业务打通。
四、漏斗、路径分析
流量业务分析场景上会查看一群用户在客户端或者网页上的路径流转信息,路径分析将用户在产品中的使用路径用桑吉图呈现,展现用户在页面与页面流转中的流量走向。通过路径分析可以帮助验证产品运营策略,优化产品设计思路。漏斗是用户在产品使用中完成的一系列行为转化。漏斗分析可以帮助了解用户在行为步骤中的转化或流失情况,进而通过优化产品或者开展运营活动提升转化率,达成业务目标。
在业务日益增长的情况下,对用户漏斗、路径精细化分析诉求逐渐增加,为此北极星分析平台增加此类型支持,用于分析一群用户在某一页面、某一模块前后的流量流转变化。漏斗分析业界常见解决此类场景利用ClickHouse提供了一个名叫windowFunnel的函数来实现对明细数据的漏斗分析。而路径分析技术一般分为两种,一种为明细数据结合sequenceCount(pattern)(timestamp, cond1, cond2, ...)做简单的路径分析,而复杂的路径分析又叫智能路径分析可以通过ClickHouse提供的高阶数组函数进行曲线救国。
路径分析背景挑战:
但是过去的流量漏斗、路径分析都是基于明细数据进行的。存储资源消耗大、分析查询慢、功能比较单薄等。为了解决以上痛点,新的漏斗、路径分析通过离线方式的建模分层、用户路径粒度的预聚合、存储引擎ClickHouse的RBM物化视图等技术,将每天千亿数据压缩到每天几十亿。查询效率也从分钟级优化到秒级,更是通过关联标签和人群支持到了各种转化查询分析。大大缩小存储的同时查询性能大大提升,最终实现了关联标签和人群圈选等功能。
路径分析功能页面:
具体实现包括以下核心部分:
1、路径聚合DWB模型创建。首先离线处理B站的千亿明细行为数据,经过维度裁剪变化比较频繁的私有参数,保留用户粒度下的公有参数,并且通过buvid(设备id)粒度进行聚合,将同一个buvid的所有事件根据时间线串联聚合到一个字段中,聚合之后的数据形成DWB层落地到hive表。
路径分析数据方案:
2、路径聚合DWS模型创建。在上一步的基础上,对DWB层的数据进行路径的汇总,将同一个路径的buvid(设备id)汇总聚合到数组结构中,这个过程出现很多干扰事件,比如某些路径会频繁出现,会乱序而干扰真正的用户行为,所以我们会通过去重等手段进行干扰事件过滤路径补位拼接形成桑基图节点,当然我们还引入了RBM数据结构存储聚合后的设备编码,最终落到hive表。整个过程都是通过spark脚本利用代码和算法实现的。
漏斗分析查询方案:
3、路径聚合模型Clickhouse表设计。接下来我们利用平台工具将hive数据出仓到ClickHouse,在ClickHouse表结构设计上,采用了ClickHouse的物化视图技术和RBM数据结构,进一步压缩buvid(设备id)集合为RBM编码,利用数组物化RBM的方式大大压缩了存储,可通过Bitmap交并计算路径相关指标,千亿数据压缩到几十亿做到了秒级查询。
路径分析数据协议:
数据结构形成的树型图:
4、路径聚合模型漏斗分析查询。在功能上漏斗分析通过windowFunnel函数进行计算,将计算周期内每个用户的行为明细按时间顺序聚合为对应事件链,然后搜索滑动时间窗口满足漏斗条件的事件链,并计算从链中发生的最大事件数level,最后对各级level计算uv获得结果。
右侧节点上的数字表示从中心事件e0至自身的路径uv:
在树型图中的对应关系:表示路径e0->e4→e1→e3→e2在窗口期内的总uv为1。左侧同理,方向相反。
5、路径聚合模型路径分析查询。同理路径分析在ClickHouse数据基础上利用数据协议和复杂sql绘制出路径树状图进而拼接出桑基图,可直观的展现用户主流流程,帮助确定转化漏斗中的关键步骤,迅速发现被用户忽略的产品价值点,修正价值点曝光方式并发现用户的流失点,同时通过Bitmap的交并计算实现了标签画像和人群圈选等转化分析功能。
五、标签、人群圈选
B站的北极星行为分析平台、标签画像平台、AB实验人群包都是基于ClickHouse的RBM(RoaringBitMap)实现,此外RBM还有其他多项应用,比如事件分析标签人群圈选、预计算的路径分析、创建用户行为的用户分群等,具体可查看之前文章[1]。
下图是基于北极星CK底层数据生成一个满足指定行为结果的人群包逻辑:
RBM固然好用,但是只支持int或者long类型,如果去重字段不是int或者long怎么办呢?海量数据应用层的维度服务如何做到高可用高并发?依赖的链路出问题如何快速恢复,数据如何保障?
属性字典维度服务就是可解码编码多业务属性、可输出管理多业务维度,具有分布式、高可用、高并发等特性的服务系统,通过属性字典维度服务可实现多维度管理多业务打通,为海量数据应用层定制化提供技术支持。
属性字典维度服务架构设计:
高可用方面Grpc+LoadCache+Redis+公司自研rockdbKV存储,多级缓存分布式架构支持平滑扩容和滚动发布,可做到日常缓存命中率70%以上,底层ID生成算法基于Leaf-SnowFlake快速生成,压测可支持50w以上QPS高并发。所有请求通过公司的日志传输通道可以小时级同步到hive做备份,事故情况下配合BulkLoad读写分离可40分钟内恢复20亿+属性字典。
最终利用属性字典对buvid(设备id)等业务属性编码和解码,对用户标签和AB人群进行创建,并且通过RBM交并计算实现了北极星分析平台、用户画像平台、AB实验平台的多业务打通。
人群圈选sql示例:
六、ClickHouse数据导入方案演进
如上文所述,北极星是基于ClickHouse构建的一套海量UBA技术解决方案,底层ClickHouse集群的稳定性 、读写性能、资源使用率均会影响上层业务的使用体验。与此同时,海量数据如何导入ClickHouse,以及数据导入过程的稳定性、导入效率、资源消耗在很大程度上决定了ClickHouse集群的整体稳定性和使用效率。所以,一个稳定高效的数据导入方案对于一套UBA解决方案来说是必不可少的。
在B站,UBA场景的数据导入方案大致经历了三个阶段的演进:
1、 JDBC写入方案
在B站内部,针对数据写入到各个数据库/引擎主要有两套pipeline:一套是基于Spark的离线导入链路,大部分数据来源于Hive;另一套是基于FLink的实时导入链路,大部分数据源来源于kafka。这两套链路都支持clickhouse作为data sink,UBA场景最开始也是基于这两套链路来做数据导入的,主要使用的是实时导入链路,在历史数据初始导入和故障补数等少数情况下也用到离线导入链路。
如上图所示,离线和实时导入最终都使用ClickHouse JDBC向ClickHouse发送数据,这种写入方式实现起来比较简单,使用开源的ClickHouse JDBC Driver就可以使用标准JDBC接口向ClickHouse写入数据。同时,flink实时写入的数据延迟比较低,端到端延迟可控制在秒级。但这个方案存在以下问题:
ClickHouse Server端的资源消耗比较大(因为数据的排序,索引生成,数据压缩等步骤均是在server端完成),在高峰时会影响查询性能。
实时任务写入频次较高,数据会在写入后触发大量merge操作,造成“写放大”,消耗更多的磁盘IO和CPU资源,可能导致too many parts错误。
实时Flink任务需要长时间占用大量资源,且在故障情况下容易出现数据堆积、延迟、断流等问题,运维成本较高。
以上问题在资源充沛的情况下不会影响业务使用,但当集群资源接近瓶颈时,查询性能受写入影响,写入性能和稳定性受merge影响,最终导致集群整体稳定性下降,影响业务使用。
2、基于中间存储的BulkLoad导入方案
UBA场景的多个分析模块对数据延迟要求不尽相同,大部分数据实时性要求并不高,小时级延迟在大部分模块下是可接受的。因此,为了解决上述JDBC写入方案的问题,我们针对大部分对时效性要求不高的数据导入需求,构建了一套基于中间存储的BulkLoad导入方案:
首先,将clickhouse格式的data part文件的生成过程转移到Spark Application中完成,这样就可以利用Yarn集群的资源来完成数据排序,索引生成,数据压缩等步骤。
data part文件的生成我们借助clickhouse-local工具实现,在Spark Executor中调用clickhouse-local写入数据到本地磁盘,生成clickhouse data part文件。
然后,将Spark Executor生成的data part文件上传到HDFS文件系统的特定目录中。
接着,从Spark Executor端发送 "ALTER TABLE ... FETCH PART/PARTITION" SQL语句到clickhouse server执行。
最后,ClickHouse Server执行 "ALTER TABLE ... FETCH PART/PARTITION",从HDFS拉取data part文件并完成attach操作。其中,我们对ClickHouse代码做了一些改造,使得FETCH语句支持从HDFS拉取文件。
由于Bulkload导入将数据写入data part文件这个过程移到了Spark端执行,大大降低了ClickHouse Server数据写入对资源的消耗。与此同时,由于在Spark端数据批量写入之前已经完成了repartition和攒批,到达ClickHouse Server的data part数量相较JDBC写入要少很多,所以clickhouse的merge压力也大幅降低。该方案上线后,数据写入对clickhouse查询的影响基本消除,集群稳定性得到大幅提升。
但这个方案依然存在一些问题:
以HDFS作为文件传输的中间存储,增加了数据传输的耗时和网络开销,同时会占用HDFS的存储资源。
HDFS的负载情况可能影响ClickHouse Bulkload数据导入的性能与稳定性。
3、直达ClickHouse的BulkLoad导入方案
为了进一步优化数据导入的性能和稳定性,我们参照ClickHouse副本间数据同步的DataExchange服务,开发了ClickHouse的DataReceive服务,以支持Spark Executor直接将data part文件传输到ClickHouse Server,绕开HDFS中间存储。
DataReceive服务允许使用HTTP客户端直接将数据文件发送到ClickHouse,ClickHouse端会进行鉴权、数据校验、流量控制、并发控制、磁盘负载均衡等操作。该方案相较于基于HDFS中间存储的Bulkload方案,大致有一倍的性能提升。
七、ClickHouse数据重平衡
B站每天的用户行为数据量达数千亿行,UBA场景需要分析最近半年以上的历史数据,所以底层ClickHouse需要存储PB级的已压缩数据。同时,随着B站活跃用户日益增长,需要存储的数据量也在不断增长,所以集群扩容的需求是必不可少的。
然而,由于受限于存算一体的架构设计,ClickHouse集群目前无法做到弹性扩容,数据需要在新集群中完成重分配。因此,ClickHouse如何高效稳定地完成数据重平衡(Data Rebalance)是ClickHouse集群管理人员必须面对和解决的的问题。
我们在UBA场景集群扩容的准备和实施过程中,经历了从手动化,到半自动化,再到服务化的演进。在此期间,我们将在海量数据重平衡实践过程中遇到的问题与解决方法转化成为了一套自动化工具服务。下面,我们就来介绍一下这套工具服务的功能与实现原理。
1、平衡度
集群中表的大小差异很大,有些达到几百TB, 有些只有几GB,如何度量数据的平衡程度,筛选出需要平衡的表?我们引入了一些数学公式来解决这个问题。
变异系数:当需要比较两组数据离散程度大小的时候,如果两组数据的测量尺度相差太大,或者数据量纲的不同,直接使用标准差来进行比较不合适,此时就应当消除测量尺度和量纲的影响,而变异系数可以做到这一点,它是原始数据标准差与原始数据平均数的比,取值范围0~1,值越小,离散程度越小。
表的平衡度 = 变异系数(取值范围0~1,值越大,表越不平衡)
举例:表A的平衡度
集群共有4个节点,表A在不同节点的大小分别为4GB, 10GB, 5GB, 3GB
平均值: (4 + 10 + 5 + 3) / 4 = 5.5
方差: (x - 平均值) ^ 2 / 4 = 7.25
标准差: root(方差) = 2.69
变异系数: 标准差 / 平均值 = 0.49
表A的平衡度 = 0.49
2、平衡算法
对于待平衡的表,有些业务期望最大程度的平衡,提升并行度,发挥集群的最大算力,而有些表容量过大,业务期望以最小的迁移成本,快速平衡数据,达到相对较优的平衡。
对于不同的业务需求,提供了两种平衡算法,装箱算法和贪心算法。
期望达到极致的均衡,数据量较小时,推荐使用装箱算法。期望以最小的迁移成本,达到较优的均衡,推荐使用贪心算法。
(1)装箱算法
算法整体采用Best Fit(最优装箱算法) + AVL树的设计。每个ClickHouse节点即为一个Node,每个Node有初始阈值capacity,代表ClickHouse节点的容纳量。将准备平衡的part按照大小顺序排序,并根据Best Fit算法依次填充到Node中,Node根据remain_capacity(剩余容量),左旋右旋组成一棵AVL树,以此提升查询效率,方便快速完成平衡。
设计如下图所示。
装箱算法细节在此不做赘述,感兴趣的读者可参考这里[2]。
(2)贪心算法
算法整体采用不断轮询 + 局部最优的设计。将ClickHouse节点按照大小排序,找出最大和最小的节点,如果将某个part从最大的节点搬迁至最小的节点,迁出的节点仍然大于迁入节点,则搬迁该part,直到最大节点无法迁出。依此类推,继续按照大小排序ClickHouse节点,每次找到最大最小节点,平衡part至局部最优,直到轮询ClickHouse节点结束。
设计如下图所示:
3、平衡计划
根据平衡算法,可以得出集群中节点计划的迁入、迁出情况。平衡单位为表级别,迁移粒度到part,可以理解为表内部part平衡。
如下图所示,可以看到表平衡前后的平衡度,以及节点1计划的迁入、迁出情况。平衡计划生成完成后,可以根据需要选择执行特定的平衡计划。
4、重平衡执行流程
在执行平衡计划的过程中,如何准确、高效地将part迁入和迁出?如何保证原子性,避免数据出现丢失或重复的问题?如何限流,避免因平衡占用过多的资源,影响集群的稳定性?
经过不断的测试、调整,最终制定了一套比较健壮的平衡方案,整体流程为:预判断(是否merge) + fetch(迁入节点) + detach(迁出节点) + attach(迁入节点) + detached(迁出节点) + drop detached(迁出节点)。
平衡期间对于不同阶段的异常,添加了相应的重试和回滚机制,以此来覆盖网络抖动、zookeeper重连接等问题,从而保证了平衡的原子性,数据一致性。
平衡期间通过限流配置(max_replicated_fetches_network_bandwidth),来控制平衡速度,保障了集群的稳定性,避免影响其他业务的正常查询。
整体设计如下图所示。
八、ClickHouse应用优化实践
在支持UBA场景各项功能模块的过程中,我们针对ClickHouse的查询,存储等方面做了大量应用优化工作。下面选取其中几个优化点做简单介绍。
1、查询下推
ClickHouse中的针对分布式表的查询会被改写成对local表的查询并发送到集群各个shard执行,然后将各个shard的中间计算结果收集到查询节点做合并。当中间计算结果很大时,比如countDistinct、 windowFunnel函数等,查询节点的数据收集和数据合并可能成为整个查询的性能瓶颈。
查询下推的思路就是尽量将计算都下推到各个shard执行,查询节点仅收集合并少量的最终计算结果。不过,也不是所有查询都适合做下推优化,满足以下两个条件的查询可以考虑做下推优化:
数据已经按照计算需求做好sharding:比如,UBA场景的数据已按user id做好了sharding,所以针对用户的漏斗分析,UV等计算可以下推到各个shard执行。否则,下推后的计算结果是不准确的。
计算的中间结果较大:sum,count等计算是无需下推的,因为其中间结果很小,合并计算很简单,下推并不能带来性能提升。
下面,我们以上文中提到的漏斗分析为例,阐述一下如何做查询下推。
上图是用windowFunnel函数实现漏斗分析的一个SQL,如图中“执行步骤”所示,该查询需要从各shard收集大量数据并在查询节点完成计算,会产生大量数据传输和单点计算量。
我们先使用配置distributed_group_by_no_merge做了一版下推优化:
优化SQL-V1将windowFunnel的计算下推到各个shard执行,仅在查询节点对windowFunnel的最终结果做聚合计算。在我们的场景下,该版本较上一版本性能提升了5倍以上。
为了更进一步做查询下推,我们利用cluster + view的函数组合,将聚合查询进一步下推:
优化SQL-V2的性能较优化SQL-V1进一步提升30+%.
2、Array和Map的跳数索引支持
UBA场景中的事件数据有很多公共属性和私有属性,公共属性被设计为表的固定字段,而私有属性因为各个事件不尽相同,所以采用Array/Map来存储。最初的设计是采用两个数组分别存储属性名和属性值,ClickHouse支持Map结构后,则在后续模块中采用Map来满足类似需求。无论是Array还是Map,最初都不支持创建跳数索引,所以在其他索引字段过滤效果有限的情况下,针对Array和Map的操作可能会成为查询的性能瓶颈。
针对这个问题,我们给Array和Map加上了Bloom filter等跳数索引支持,针对Map仅对其key构建索引。在某些出现频率较低的私有属性过滤场景下,Array/Map的跳数索引可以收获数倍的性能提升。
3、压缩算法优化
ClickHouse常用的数据压缩方式有三种,分别为LZ4、LZ4HC以及ZSTD。针对不同的数据类型,数据分布方式来使用特定的编码方式可以大大提高数据压缩率,以减少存储成本。
针对UBA场景,我们测试了不同压缩算法的压缩率,写入性能,查询性能。相较默认的LZ4,ZSTD(1)在压缩率上普遍可以节省30%以上的存储空间,查询性能方面未见明显差异,不过写入性能在某些场景下有20%左右的下降。由于UBA场景数据存储压力较大,同时对数据时效性要求不是很高,因此我们最终选择了ZSTD(1)作为主要的压缩方式。
九、下一步工作
1、多业务通用模型支持
UBA场景的泛化形态实际是人+内容+行为,例如用户可以在观看场景产出弹幕行为或者点赞行为,这类数据不同于传统的SDK日志数据具有通用的埋点格式,但我们可以通过抽象映射到通用行为聚合模型上来,来实现对服务端日志的行为分析。目前我们正在对社区服务端日志和其他非埋点规范的业务SDK日志进行泛化支持,尽可能复用已有能力提高用户查询和分析效率。
2、Clickhouse增强多维过滤场景支持
在UBA场景下,同一张表可能在多个模块中使用到,比如,用户行为事件数据在事件分析等分析模块中使用,同时在单用户行为明细查询中会使用到。这两种使用场景下对表的查询是基于不同过滤维度的,但clickhouse目前的主键索引很难同时对多个维度过滤都有较好过滤效果,因此很难同时满足多个场景下的查询性能要求。我们已经完成了ZOrder索引的开发,目前正在开发相应的编码类型,使得UBA场景下的数据可以使用ZOrder index同时支持多个维度的高效查询。