摘要:本文整理自快手实时计算团队技术专家张静、张芒在 Flink Forward Asia 2021 的分享。主要内容包括:
Flink SQL 在快手
功能扩展
性能优化
稳定性提升
未来展望
一、Flink SQL 在快手
经过一年多的推广,快手内部用户对 Flink SQL 的认可度逐渐提高,今年新增的 Flink 作业中,SQL 作业达到了 60%,与去年相比有了一倍的提升,峰值吞吐达到了 6 亿条/秒。
二、功能扩展
为了支持内部的业务需求,快手做了很多功能扩展,本文重点分享其中的两个围绕窗口的扩展,一个是 Group Window Aggregate 扩展,一个是在 Flip-145 里提出的 Window Table-valued Function 扩展。
解释一下以上两者的区别和联系:
Group Window Aggregate 是在 Flink 1.12 和更早的版本里用来做窗口聚合的,它有两个局限性,第一个是它的语法不符合 SQL 标准,要借助特殊的窗口函数,还要配合窗口辅助函数来完成作业聚合。另外它还限制了窗口函数只能出现在 group by 的子句里面,所以只能用于聚合;
因此 Flink 在 Flip-145 里提出了 Window TVF,它是基于 2017 年的 SQL 标准里提出的多态表函数的语法,另外它除了可以在窗口上做聚合,还可以做窗口关联,TopN 和去重等操作。
2.1 Group Window Aggregate 扩展
大家可能会问,既然已经有 Window TVF 了,为什么还要在 Group Window Aggregate 上做功能扩展呢?因为快手是在今年下半年才开始进行版本 1.10 到 1.13 的升级,大部分业务还是在使用 1.10 版本。
在 Group Window Aggregate 上快手做了两个扩展,一个是支持多维聚合,一个是引入高阶窗口函数。
■ 2.1.1 支持多维分析
Flink SQL 很早就支持无限流上的多维聚合,快手在 Group Window Aggregate 上也增加了多维分析的功能,支持标准的 Grouping Sets、Rollup 和 CUBE 子句,另外还支持各种窗口类型,比如滚动、滑动、会话窗口等。
比如上图实例,需要统计主题维度和总维度下的累计 UV,SQL 的 group by 子句里包含两部分:一个是 CUMULATE 窗口函数,一个是 Grouping Sets 子句。括号里有两个元素:一个表示总维度,一个表示主题维度。
■ 2.1.2 引入高阶窗口函数
数据分析的开发者经常会遇到这样的需求,绘制一条曲线,每个点的含义是当天 0 点到当前时间点的累计指标,横坐标表示时间,纵坐标是表示累计的指标。对这样的需求可以有两个解决方案:
第一个方案是使用无限流聚合,把时间归一到分钟粒度以后作为 group key 的一列,但是业务上要求输出到屏幕上的曲线不再变化,而无限流聚合的输出结果是一个更新流,所以不符合要求;
第二个方案是使用一天的滚动窗口函数。为了提前输出结果,还是要设置提前触发,时间点选用当前机器的时间或者是历史输入数据里最大的时间戳。这个方案的缺点,首先是每个点的纵坐标的值并不是这个时间点上的累计值。这会导致几个异常现象,比如作业失败重启或者是业务主动回溯历史的时候,不能完全还原当时的历史曲线。而且各个点上分维度的累计值加起来也不等于总维度的值。还有一个缺点,统计 UV 的时候,我们经常会使用两阶段的聚合来避免 distinct key 的倾斜,但是使用这个方案的时候,原本自身的曲线上可能会出现凹坑。
上图是方案二导致的一些异常曲线:
第一个曲线是进行历史回溯, lag 消除以后曲线才开始正常,在没有完全消除 lag 的时候,曲线是不平滑的,而且不能还原历史曲线;
第二个曲线是自增曲线上出现凹坑。
因为第一级聚合的输出流是一个更新流,Flink 目前的更新机制是发送撤回和更新两条独立的消息,而不是一个原子消息,所以第二个聚合可能会先收到上游多个并发上发下来的撤回消息,这就会导致累计值先下降再上升,所以形成了凹坑。
我们引入 CUMULATE 窗口来解决这些问题。
这个窗口函数和 Flip-145 里提出的 CUMULATE 窗口是不谋而合的,只是语法上在 Group Window Aggregate 上引入这个窗口类型。它有三个必选参数:时间属性列、窗口的步长和 max size,还有一个可选参数,用来指定窗口开始的偏移量。
关于 CUMULATE 窗口的划分逻辑,假设 CUMULATE 窗口的步长是一分钟,max size 是三分钟,window1 的区间是 0~1 分,window2 是 0~2 分,window3 是 0~3 分,window4 开始是 3~4 分,window5 是 3~5 分,以此类推,而一条时间戳是 0 分 30 秒的数据,会被划分到 window1、window2 和 window3 三个窗口里。
比如需要绘制一条数据曲线,一分钟打一个点,每个点表示各个子页面当天累计 UV。查询语句采用事件时间,CUMULATE 窗口函数的步长是一分钟,max size 是一天,业务的 group key 是子页面的 ID,时间戳是窗口的结束时间。
上图可以看到,使用 CUMULATE 方案绘制出来的曲线不管是正常消费还是历史回溯都很平滑。
CUMULATE 窗口的优点
第一个优点是使用窗口的结束时间作为每个点的横坐标,曲线上每个点的纵坐标就是在横坐标对应时间点上的累计值,所以无论在回溯历史或者是作业发生 failover 的情况下,曲线都可以完全还原,而且各个时间点上分维度的值加起来总是等于总维度的值。
第二个优点是使用两阶段聚合,能够防止 distinct key 倾斜。由于数据是在窗口结束的时候才发送,所以不存在撤回,输出的是 append 流,因此自增曲线上也不会有凹坑。
Dynamic cumulate window
Dynamic cumulate window 也是为了解决曲线类的需求,比如计算直播间自开播以来的累计指标,与前面需求的不同点是每个直播间的开播关播能持续多久都是不确定的,但它也是一种计算累计指标。它有两个必选参数:时间属性列和窗口的步长,还有一个可选参数窗口的 gap,用来定义窗口多久没有输入数据就认为它已经结束了。这里需要注意,一个窗口结束会触发窗口的结果输出,而且会清理掉状态,如果又来了一条相同 key 的数据,迟到的数据会被丢弃,没有迟到的数据会被划分到新的窗口去,累计值也会从零开始。
如上图案例,需要绘制一个曲线,每个点表示每个直播间开播以来的累计 UV,如果一个直播间连续一个小时没有数据流入,则认为关播。窗口函数使用 Dynamic cumulate window,步长是 1 分钟,gap 是 60 分钟,Group key 是直播间 ID,时间戳依然使用窗口的结束时间。
2.2 Window Table-valued Function 扩展
■ 2.2.1 丰富 Window TVF 算子
社区在 Flip-145 中提出的 Window Table-valued Function (window tvf) 语法,并且实现了窗口聚合。在这个基础上我们丰富了窗口算子,包括 TopN、关联和去重,还支持了一个单独的 window Table-valued Function 的查询语句,这些功能都已经陆续推到社区的各个版本里。有了这些窗口的算子,用户就可以用 Flink SQL 实现更复杂的业务逻辑。
如上图,需要统计当天最热销的 100 件商品的销售额和买家数,以及这些爆品所属主播的销售情况。首先做一个窗口聚合,得到 0 点以来每个商品的销售额和买家数,再做一个窗口聚合,得到每个主播所有宝贝的买家数,两个结果做窗口关联,最后做窗口上的 TopN,得到前 100 名爆品以及它的各项累计指标。
■ 2.2.2 支持 Window Offset
window offset 主要用来调整窗口的划分逻辑,它是个可选参数,默认值是 0,表示 unix 时间的零点作为窗口划分的起始时间,它的值可以是一个正数,表示向右偏移,也可以是一个负数,表示向左偏移。但是它只会影响如何划分窗口,不会影响 watermark。另外,相同的窗口,不同的 offset 可能会产生相同的偏移效果,比如对一个 10 分钟的滚动窗口,把起点向左偏移 4 分钟或者向右偏移 6 分钟,对窗口划分产生的影响是一样的。
如上图实例,需要绘制一个数据曲线,每分钟打一个点表示每个页面本周以来的累积 UV。可以使用 CUMULATE 窗口函数,采用事件时间,步长是 1 分钟,max size 是 7 天。因为 unix time 零点那天是在周四,假设用默认的 offset,窗口划分就是从本周四到下周四,所以要设置 offset 为 4 天,表示向右偏移 4 天,这样就是从本周一到下周一。
■ 2.2.3 支持 Batch Mode
另外我们还增加了对批模式的支持。原理是引入一个 windows 算子,给输入数据附上所属的窗口属性后发给下游,而下游的算子复用批上已经存在的算子,比如说聚合上是用 HashAgg 或者 SortAgg,关联上是 HashJoin 或者 SortMergeJoin,这些批上的算子和流模式下的算子相比,不需要状态,所以吞吐上也有更好的表现。
三、性能优化
本文主要介绍两个方面的优化,一个是聚合上的状态优化,一个是维表关联上的攒批优化。
3.1 聚合上的状态优化
先通过一个例子来了解一下聚合场景下 distinct states 的状态复用。需要统计应用下每个子频道的 UV,该用例有两个特点,频道是可枚举的以及每个频道访客的重合度很高的。
最原始的查询语句如上图,group key 是一个频道,用一个 count distinct 来计算各个频道的 UV。设备集合在状态中首先是存在一个 map state,假设频道的枚举只有三个,A、B 和 other,group key 是频道 ID, map state 的 key 设备 ID, value 是一个 64 bit 的 long 类型的值,每个 bit 表示这个频道下该设备是否出现,在简单的场景下这个 Value 值就是 1。
上图 A 频道下有两个设备,ID 分别是 1 和 2,ID 为 1 的设备同时访问了 B 频道,id 为 2 的设备同时访问了 other 频道。可以发现,不同频道的 map 可以有大量的重合,想要复用这些 key,可以用社区提供的方法来手动改写 SQL。
首先做个行转列的操作,把三个频道值拍到三个 count distinct 聚合函数的 filter 条件,在输出之前再用一个自定义的表函数来做列转行。
改写后的查询语句、设备集合的状态和存储如上图。Group key 是 empty,map state key 是设备 ID,map state value 是一个 64bit 的 long 类型,每个 bit 表示各频道下此设备是否出现,比如 ID 为 1 的设备 value 是 110,表示这个设备访问了频道 A 和 B,ID 为 3 的设备访问了频道 B 和 other。
这个方案大大减少了状态,但也存在两个缺点。第一是需要手动改写 SQL,如果一个维度有多个值或有多个可枚举的维度,那么手动改写的 SQL 会很长,另外一个缺点是需要用自定义的表函数进行列转行转换。
我们提出一种简化的 SQL 表达方式,既能达到状态上的收益,又能减轻数据开发人员的负担。用户只需要在查询语句里,通过一个方式告诉优化器 group key 的枚举值,优化器就会自动改写,进行转列和列转行,改写后就可以复用 distinct map state。改写后等价下的查询语句,只需要在过滤条件里指定枚举值就可以,用 in 或 or 的表达方式都可以。
上述性能优化可以用在无限流聚合和窗口聚合,并且一个可枚举维度或多个可枚举维度都是可以的,可以用在简单的聚合查询,也可以用在多维聚合。
但它的限制条件是 group key 里面至少有一个 key 是可枚举的,而且枚举值必须是静态的,能够明确写在过滤条件里。另外每个维度下的 distinct key 得有重合才能达到节约状态的效果。如果需要统计每个省份的 UV,基本上可以认为不同省份的访客是没有交集的,这个时候复用 distinct key 是没有收益的。另外在窗口聚合的时候,窗口函数必须具有行语义,不可以是集合语义。对于行语义的窗口,当前这个数据属于哪个窗口取决于数据本身;但是对于集合语义的窗口,当前这条数据属于哪个窗口,不仅取决于数据本身,还取决于这个窗口收到过的历史数据集合。这个优化调整聚合算子的 group key,会影响每个窗口收到的数据集合,所以不适用于集合语义的窗口。
最后可能有用户会问,为什么语法上不采用 Calcite 提供 pivot/unpivot 来显式地表达行转列和列转行。首先是条件不具备,因为 calcite 的 1.26 版本才引入 pivot,1.27 才引入 unpivot,而 Flink 从 1.12 版本至今都是依赖 Calcite 1.26。第二个原因是如果用 pivot/unpivot 的语法,SQL 会比现在表达方式长很多。
3.2 维表关联的攒批优化
维表关联的攒批优化是为了减少 RPC 的调用次数。原理是攒一批数据以后,调用维表的批量查询接口,语法上我们引入通用的 Mini-Batch hint,它有两个参数:一个表示多长时间攒一批,一个表示多少条数据攒一批。一个合法的 Mini-Batch hint 的至少包含一个参数。我们将 hint 设计得很通用,希望它不仅可以用于维表关联,还可以用于聚合的攒批优化。
再看一个例子,需要打宽订单表,关联订单的客户信息。查询语句在 customers 维表后面跟一个 hint 表示 5 秒攒一批或 1 万条数据攒一批,这个优化在底层算子和设计的实现上,远比 SQL 语法的表达要复杂得多。
四、稳定性提升
稳定性方面主要介绍对 Group Window Aggregate 解决数据倾斜和 Flink SQL 聚合指标调整之后的状态兼容这两部分快手做的一些优化和改进。
4.1 Group Window Aggregate 的数据倾斜
window 计算在快手内部的应用非常广泛,快手的业务场景比较容易遇到数据倾斜,比如大主播的直播、一些重大活动等。实时计算如果遇到数据倾斜,轻则指标延迟,重则数据事故,所以我们在 Tumble window 上支持了 Mini-Batch、Local-Global、Split Distinct 的优化,其他常用的 window 上也支持了类似的优化。这些优化上线之后,不仅能够帮助业务规避数据倾斜,同时还可以带来不错的性能收益。
4.2 Aggregate State 兼容
首先来看 Flink SQL 上 Aggregate state 兼容的业务场景。随着业务的发展,日常运行的任务可能要新增指标或者删除不需要的指标。重大活动过程中,业务要新增天级累计的指标或者活动周期持续累积的指标。
如果是天级指标的变更,开发者只能丢弃状态,在 0 点之后升级任务,然后再指定任务从零点的数据开始消费,从而保证指标的连续。如果是活动持续累积的指标,为了避免对原有指标的影响,只能选择新增一个任务来单独计算新增指标,但是这样会导致资源的冗余。
之所以需要这么复杂的操作,是因为 Flink SQL 判断 state 是否兼容的策略比较简单,只看引擎需要的 state 和 Savepoint 里保存的 state 的数据类型是否完全一致,完全一致就是兼容,否则不兼容。这种判断方式存在一个漏洞,State 的类型没变但是 SQL 中的聚合函数变了,这种情况 Flink 也会认为状态是兼容的。
基于这个背景,我们提出了 aggregate state 的兼容,目标是使用户学习使用 state 兼容方案的成本极低 (或 0 成本),用户可以随时升级任务,不需要再卡零点操作,支持对聚合函数的新增和删除操作。
aggregate state 兼容的操作场景只能在聚合函数尾部新增聚合函数,允许删除任意位置的聚合函数,不允许修改聚合函数的顺序,也不允许一次升级同时有新增和删除两种操作,需要分为两次升级完成。
上图右表格是指标标识和 state 类型的映射关系。为了方便判断 state 是否兼容,我们把指标标识和 state 类型的映射关系保存到 state 的 meta 中,因为有的聚合函数可能有不止一个 state,比如 avg 函数,它就需要通过 sum 和 count 两个 state 来辅助计算,所以这个映射关系很重要。
在新增聚合函数的时候,需要对新增的 state 做初始值填充,不同函数对应的初始值是不一样的,比如 count 的初始值是 0,但是 sum 的初始值必须是 null。
window 的 early-fire 和 late-fire 场景会引入 Retract 消息,这样就多一个 state 来记录已经下发给下游的数据。它比 window 原有的 state 多了时间字段,在做判断和状态迁移的时候需要做一下处理。
前面提到了我们把指标标识和 state 类型的映射关系保存到了 state 的 meta 信息里,这会带来 state 向前兼容的问题,新版本的程序不能正确读取之前版本的 Savepoint。为了解决这个问题,需要修改 meta 的 version 信息,利用 version 信息来区分新老版本的 state,从而做到 state 的向前兼容。
在 aggregate 的场景下,用户可能会通过设置 state TTL 来控制无效 state 的清理,aggregate state 兼容也要对这个场景做处理,保证迁移之后的状态,TTL 的时间戳要和原来的数据保持一致,不能做任何改变。
Aggregate state 兼容的方案,优点是用户学习使用的成本很低,几乎无感知、不依赖任何外部的服务架构,不足是对 Flink 源码有侵入,增加了未来升级 Flink 版本的成本,而且目前只能支持聚合类计算场景。
最后介绍一下快手正在做的状态兼容的终极方案。用户可以在 Savepoint 的任意位置增加、删除 state、甚至是修改 state 中的内容;还可以自定义一份完整的 state,比如 Flink on hudi 任务的 state 初始化。
终极方案的优点是不侵入 Flink 的源码,方便 Flink 版本升级,用户可以在平台界面操作,不需要开发代码以及支持全场景的 state 兼容,不再局限于具体的场景。不足是对于用户来说学习成本比较高,需要了解 Operator State 和 keyedState 这些比较专业的知识点,而且还要知道 Operator 里面是否包含 state。
五、未来展望
未来,快手会在 Stream SQL 方向持续扩展功能,提升性能,达到降本增效的目的,以及探索更多场景下的状态兼容;流批一提方面,快手将会完善 Flink Batch SQL 的能力,增加推测执行、自适应查询等优化,提升 Batch SQL 的稳定性和性能,继续拓宽业务应用场景;在数据湖和实时数仓方面,会继续推动它们在更多业务场景下的落地。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Flink SQL 在快手的扩展与实践】(https://www.iteblog.com/archives/10132.html)