本文作者:李精卫
更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群
背景
随着抖音集团内部对流式任务的需求不断增长,Flink SQL作为一种低成本接入手段,已经在内部多个方向上得到大规模应用。目前,流式 SQL 任务的规模已经超过3万,任务资源使用量和分配量也达到了百万core。 在降本增效的大背景下,为了解决资源紧缺的问题,并同时满足业务对更高性能的需求,流式计算团队对 FlinkSQL 进行了深度优化,本文将聚焦这一实践,详解主要优化思路。Engine优化
查询优化
View Reuse
在流式 SQL 中,为增加 SQL 代码的可读性,通常会将通用的计算逻辑放在 view 中。在这里,view 只是一个逻辑概念:在底层实现时,并没有真实存储中的 view 与之对应。 如下图所示,场景一表示任务中存在多个 sink 的表,view 中是窗口聚合运算的逻辑。场景二表示任务需要对两个流进行 union,view 中是普通聚合运算的逻辑。 在这两种场景中,用户会定义一个通用的 view 来进行计算。因为下游不同分支对 view 的查询不同,view 中的计算逻辑会在不同算子中重复计算,由此带来了重复的资源开销。那么问题就在于:为什么 view 没有被复用? 在 Calcite 的原有逻辑中,view 中包含的 Query 会被立即转化成一颗关系表达式树。如果有多条Query 访问了同一个 view,那么就会获得多颗属性完全相同,但分属于不同 Java 对象的 RelNode Tree。因此,后续所有优化都是基于不同的子树对象分别进行的,无法再重新合并成同一棵树。- Multi-sink
- Union all
Remove Redundant Streaming Shuffle
Remove Redundant Streaming Shuffle 可以移除流式场景下不必要的数据分发开销。在批式场景中, shuffle 操作会有落盘的性能开销,这已经在社区中得到了优化。而在流式场景中,shuffle 操作则有序列化和网络传输的开销。 如下图例子所示,在计算不同品类产品价格 Top5 的平均值时,使用了排序和聚合计算。在排序和聚合前对 id 进行了 hash,这说明两个算子有相同的 hash key。数据被 rank 算子 hash 后,就不需要再进行第二次 hash 了,这说明第二个 shuffle 是多余的。 shuffle 是在生成 physical plan 的阶段中产生的。下图展示了 Sql 优化器将 SqlNode 从逻辑节点转换为物理节点的过程,在这个过程中,shuffle 也就是 exchange。转换过程是通过规则进行的,在 relRule.Convert 过程中会遍历每一个逻辑节点,判断当前节点是否满足转换规则,如果存在不满足的情况,就会增加一个 AbstractConvert。 在生成 Exchange 的规则中,会判断当前节点的数据分布特征是否满足需求,如果不满足,就在节点上游增加 Exchange 节点来满足数据分布的特征。最后,PhysicalExchange 会被转换为 hashShuffle,用于数据的分发。 如何移除掉多余的 Streaming Shuffle?针对该问题,主要思路是参考 Batch 对 Shuffle 的优化。在规则转换的过程中,不仅要考虑节点本身,还要考虑输入节点的特征是否满足需求,将问题往上抛。 实现针对 Physical RelNode 的规则判断方法,主要分为以下两种情况:- 对于本身没有数据分布特征的节(如 Calc 和 Correlate Node),判断它们能否满足一个特定数据分布的需求,只需检查自身输入中是否包含 hash key。
- 对于本身有数据分布特征的节点(如 Aggregate 和 Rank nodes),需要确认本身的数据分布特征是否满足给定的 distribution requirements。 如下图所示,首先要检查 aggregate 节点是否满足数据分布特征,这需要查看它的输入,即 rank 节点是否满足要求。如果 rank 节点不满足,则需要在其上游添加 exchange 节点。添加后,rank 算子满足了数据分布特征。由于 rank 和 aggregate 的 hashkey 相同,因此 aggreagte 也满足了。
查询执行优化
Streaming MultipleInput Operator
基于 Remove Streaming Shuffle,在对多余的 hash shuffle 进行优化的前提下,可以在 join+join、 join+agg、join+union 中,对shuffle 进行更深层的优化。 如下图所示,因为 agg1 hash key 和 Join left key 相同;agg2 hash key 和 Join right key 相同,所以可将 Join 前的 hash 变为 forward。 当前的 OperatorChain 策略不支持多input算子的 Chain,无法避免因多余 shuffle 而导致的序列化、反序列化和可能的网络开销。因此,流式计算团队使用 MultipleInput 机制,在 Streaming 场景下,将多个 Input 的算子上下游合并为 MultipleInutOperator,从而进行优化。 具体而言,优化经历了以下几个步骤:-
首先,在 Planner 层构建出 MultipleInputExecNode。
- MultipleInputExecNode 是在 logical physical 计划后,当 plan 被转换为ExecNode DAG时,从 ExecNodeDAG 中推导而出。获得 ExecNodeDAG 后,先从根节点进行广度优先搜索,从而获取图的拓扑排序。构建 MultipleInputExecNode 是在 Covert ExecNode DAG 环节进行的,完成这一系列操作后,它将在 ExecNode Graph 中构建出来。
-
在生成 StreamMultipleInputExecNode 后,被 translate 成 StreamMultipleInput transformation。
- 在 transformation 中,包含了创建 MultipleInput Operator 的一些信息,通过 TableOperatorWrapper 存储 sub-op 信息。
-
生成 Job Graph。这需要满足以下2个条件:
- StreamConfig 需要兼容 Multiple Input 从 two Input 的 TypeSerializer1,2变成 TypeSerializer[],这主要用于 state/key 数据传输。
- Stream Graph 可以添加 MultipleInputOperator 节点,通过方法 addMultipleInputOperator,将 Transformation 对应的 properties 添加到 vertex 中构成 Stream Graph 中的节点。
-
Operator initialization:
- 不只要创建 StreamingMultipleInputOperator,也要创建对应的 sub-op;
- sub-op 本质上是 Abstract StreamOperator,sub-op id = op id + index;
- 在 createAllOperator 创建每个 sub-op 对象,并构建 DAG 的输入输出。
-
ProcessElement :
- 处理数据过程中要保证 key 的传递。
-
State
- MultipleInputStreamOperato 和 sub-op 分享state handler;
- 创建新的 API stateNameContext 来解决状态名字冲突。
-
Timer && Watermark
- MultipleInputStreamOperator和sub-op 分享 timeServiceManager;
- 创建新的 api TimerNameContext来解决状态名字冲突;
- timeServiceManager 以 sub-op粒度管理 timer;
- 使用Combindedwatermark 来保证 Watermark 对齐。
- barrier: 此处无需过多考虑,MultipleInput Operator 内部没有 buffer 中的数据,因此按照拓扑顺序进行 checkpoint 不会丢失数据。但需要注意的是,需要将 prepareSnapshotPreBarrier 从 MultipleInputStreamOperator 传播到所有子算子。
Optimization of Long Sliding Windows
- 长滑动窗口及其底层实现逻辑
- 长滑动窗口优化思路
- 在窗口算子中定义全局状态,存储当前窗口的计算结果;
- 在聚合函数中新增 retractMerge 方法,窗口向后滑动时,移除被划走窗口的数据;
- 触发下一次计算时,合并新增窗口的数据。
数据处理(Format侧)
Native Json Format
目前,抖音集团公司内部约有1.3万个任务使用 Json Format,占用资源近 70 万core。如果按照 5%的占比进行保守估计,线上约有3.5万core用于 Json 的反序列化,因此该部分有较大优化空间。 下图展示了数据从消息队列(MQ)中读取,并最终传递给下游运算符的主要流程。其中,Json 反序列化和将 GeneralRowData 序列化为字节,是两个重要的开销。 针对上述两项重要的资源消耗,主要从以下两个方面进行优化:- 针对 Json 反序列化开销 使用支持向量化编程的 c++ json 解析库, 选择字节内部自研的 sonic-cpp,来提高性能。
- 针对序列化为 binaryRowData 的开销 使用 native 方法直接产出 BinaryRowData 所需要的二进制表示,再使用 BinaryRowData 指向这一部分数据,从而免去序列化对应的开销。
优化实践
为了确保引擎优化能够给业务方带来实际的优化效果,流式计算团队在内部做了大量工作,以确保优化项能够稳定上线,以下将对此展开详细介绍。- 工具层
a. 支持 SQL 任务元信息实时上报;
b. 算子粒度离线数仓,提供算子粒度的任务监控;
c. Commits 粒度 DAG 兼容性检查 :可以提前发现哪些优化项会影响任务状态恢复;
d. 优化项分优先级灰度:可以限制风险暴露范围;
e. 数据准确性链路构建:保证了上线优化项不会导致数据准确性发现问题。
基于上述能力,工具层实现了算子粒度的任务监控,同时保证了任务稳定性和数据准确性- 优化层
- 引擎&平台层