早在2005年,Oracle 数据库就支持比较丰富的 dynamic filtering 功能,而 Spark 和 Presto 在最近版本才开始支持这个功能。本文将介绍 Presto 动态过滤的原理以及具体使用。
Apache Spark 的动态分区裁减
Apache Spark 3.0 给我们带来了许多的新特性用于加速查询性能,其中一个就是动态分区裁减(Dynamic Partition Pruning,DPP),所谓的动态分区裁剪就是基于运行时(run time)推断出来的信息来进一步进行分区裁剪,从而减少不必要的分区数据读取,以此提升查询性能。比如下面维度表 dim_iteblog 和事实表 fact_iteblog 进行 Join,其中 fact_iteblog.partcol 是一个分区字段。
SELECT * FROM fact_iteblog JOIN dim_iteblog ON (dim_iteblog.partcol = fact_iteblog.partcol) WHERE dim_iteblog.othercol > 10
通过 Spark 的动态分区裁减,可以将执行计划修改成如下形式:
可见,在扫描 fact_iteblog 表时,Spark 自动加上了类似于 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 的过滤条件,如果 fact_iteblog 表有大量的分区,而 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 查询语句只返回少量的分区,这样可以大大提升查询性能。
Presto 动态过滤
Presto 也支持上面的功能,这个功能称为动态过滤(Dynamic Filtering)。事实上,Presto 的动态过滤比 Spark 的动态分区裁减要丰富。因为 Spark 动态分区裁减只能用于分区表,而 Presto 的动态过滤支持分区表和非分区表,Presto 的动态分区包含 Partition Pruning(分区表) 以及 Row filtering(非分区表)。Presto 的动态分区功能最早出现在 Qubole 的一篇名为《SQL Join Optimizations in Qubole Presto》文章上,其在 Qubole 内部的 Presto 分支上实现了动态分区功能,并且在2017年12月把这个功能反馈给社区 #9453。遗憾的是,那个 Patch 并没有合并进社区,直到 Presto 0.241,这个功能正式加入到 master 分支。另外,Trino 是从 317 版本开始支持动态过滤的,要比 PrestoDB 早。
非分区表动态过滤
假设我们有以下的查询语句:
select a.* from lineitem_orc a join orders_orc b on a.l_orderkey = b.o_orderkey and b.o_custkey=66007;
其中 lineitem_orc 和 orders_orc 表均为非分区表,在未启用动态过滤的时候,查询计划如下:
可见,查询 lineitem_orc 表的时候是全表扫描(ScanProject)。如果我们启动动态过滤,执行计划如下:
在扫描 lineitem_orc 表的时候,多了一个 dynamicFilter = {347 -> l_orderkey},这个就是 Presto 运行过程中自动加的过滤条件,相当于在查询 lineitem_orc 表的时候加了一个 l_orderkey in (select o_orderkey from orders_orc where o_custkey=66007)。
注意,目前 PrestoDB 的实现中只有 Hive 数据源支持动态过滤,而且非分区表动态过滤只支持 ORC 数据格式,其他不行。Trino 好像还支持 Memory 数据源;而且我们需要将 enable_dynamic_filtering Session 属性设置成 true,默认为 false,同时需要把 pushdown_filter_enabled 也设置成 true。
分区表动态过滤
分区表动态过滤和 Spark 的 DPP 效果类似。假设我们有以下的查询语句:
select a.* from lineitem_orc_p a join orders_orc_p b on a.dt = b.dt and b.o_orderdate = DATE '1992-11-19';
其中,lineitem_orc_p 和 orders_orc_p 均为分区表。如果没有启用动态过滤,Presto 需要扫描 lineitem_orc_p 表所有分区里面的数据。事实上,o_orderdate = DATE '1992-11-19' 只在 orders_orc_p 的 dt=1992 这个分区里面有数据。如果开启动态过滤,我们只需要扫描 lineitem_orc_p 表 dt=1992 分区里面的数据,而直接忽略掉其他分区。开启动态过滤的执行计划如下:
可见,lineitem_orc_p 表的读取多了一个 dynamicFilter = {347 -> dt} 动态过滤条件,其等价于 lineitem_orc_p.dt in (select dt from orders_orc_p where o_orderdate = DATE '1992-11-19')。
在这个例子里面,Presto 并不是在从元数据里面读取 lineitem_orc_p 分区的时候就把 dynamicFilter 加进去,而是会把 lineitem_orc_p 所有的分区都读出来。动态过滤实际上是在 com.facebook.presto.hive.HivePageSourceProvider#createPageSource 方法里面处理的,如下:
.... if (shouldSkipPartition(typeManager, hiveLayout, hiveStorageTimeZone, hiveSplit, splitContext)) { return new HiveEmptySplitPageSource(); } ...
shouldSkipPartition 的实现如下:
private static boolean shouldSkipPartition(TypeManager typeManager, HiveTableLayoutHandle hiveLayout, DateTimeZone hiveStorageTimeZone, HiveSplit hiveSplit, SplitContext splitContext) { List<HiveColumnHandle> partitionColumns = hiveLayout.getPartitionColumns(); List<Type> partitionTypes = partitionColumns.stream() .map(column -> typeManager.getType(column.getTypeSignature())) .collect(toList()); List<HivePartitionKey> partitionKeys = hiveSplit.getPartitionKeys(); if (!splitContext.getDynamicFilterPredicate().isPresent() || hiveSplit.getPartitionKeys().isEmpty() || partitionColumns.isEmpty() || partitionColumns.size() != partitionKeys.size()) { return false; } TupleDomain<ColumnHandle> dynamicFilter = splitContext.getDynamicFilterPredicate().get(); Map<ColumnHandle, Domain> domains = dynamicFilter.getDomains().get(); for (int i = 0; i < partitionKeys.size(); i++) { Type type = partitionTypes.get(i); HivePartitionKey hivePartitionKey = partitionKeys.get(i); HiveColumnHandle hiveColumnHandle = partitionColumns.get(i); Domain allowedDomain = domains.get(hiveColumnHandle); NullableValue value = parsePartitionValue(hivePartitionKey.getName(), hivePartitionKey.getValue(), type, hiveStorageTimeZone); if (allowedDomain != null && !allowedDomain.includesNullableValue(value.getValue())) { return true; } } return false; }
在我们上面的例子中,allowedDomain 其实就是 dt = 1992,而 value 实际上是 lineitem_orc_p 各个分区的值,比如 dt = 1993、dt = 1992、dt = 1994 等。如果 value 中的值是 dt = 1993,那么 allowedDomain 肯定是不包含的,所以 lineitem_orc_p 中 dt = 1993 的分区直接忽略,也就是返回 HiveEmptySplitPageSource。如果 value 中的值是 dt = 1992,那么 allowedDomain 肯定是包含它的,这时候就不能忽略这个分区,需要读取。通过这个 Presto 实现了分区表的动态过滤。
注意,Presto 中分区表的动态过滤只支持 Hive 数据源。
Benchmarks
下面进行了 TPC-DS 查询测试,准备了五台 Worker node,配置为 r4.8xlarge,数据源为 ORC,其中下面的表是分区表:
catalog_returns
oncr_returned_date_sk
catalog_sales
oncs_sold_date_sk
store_returns
onsr_returned_date_sk
store_sales
onss_sold_date_sk
web_returns
onwr_returned_date_sk
web_sales
onws_sold_date_sk
建表语句参见 这里。下面查询在开启动态过滤时性能提升 20%。
Query | Baseline elapsed | Dynamic partition pruning elapsed | Baseline CPU | Dynamic partition pruning CPU | Baseline data read | Dynamic partition pruning data read |
---|---|---|---|---|---|---|
q01 | 10.96 | 8.50 | 10.2 | 8.9 | 17.91 | 14.53 |
q04 | 21.63 | 10.80 | 23.6 | 16.1 | 34.81 | 12.99 |
q05 | 41.38 | 14.94 | 57.1 | 16.8 | 54.81 | 11.45 |
q07 | 12.35 | 9.26 | 26.4 | 14.6 | 30.28 | 17.31 |
q08 | 10.48 | 6.43 | 11.0 | 4.7 | 10.19 | 3.52 |
q11 | 20.04 | 14.82 | 35.6 | 27.8 | 25.37 | 9.72 |
q17 | 24.05 | 9.87 | 26.4 | 12.0 | 30.18 | 9.75 |
q18 | 13.98 | 6.00 | 17.5 | 7.7 | 20.29 | 8.81 |
q25 | 18.91 | 8.04 | 26.9 | 9.1 | 37.54 | 11.12 |
q27 | 11.98 | 5.58 | 25.1 | 8.6 | 26.69 | 10.12 |
q29 | 24.11 | 15.46 | 30.5 | 18.5 | 30.18 | 13.50 |
q31 | 27.81 | 12.77 | 48.2 | 21.3 | 39.53 | 13.73 |
q32 | 11.51 | 8.15 | 12.7 | 10.3 | 15.05 | 12.76 |
q33 | 15.95 | 4.31 | 24.3 | 5.4 | 31.26 | 6.67 |
q35 | 15.10 | 5.22 | 13.8 | 6.2 | 4.83 | 1.70 |
q36 | 11.68 | 6.43 | 22.4 | 11.4 | 24.28 | 12.78 |
q38 | 21.08 | 16.20 | 39.4 | 31.6 | 5.65 | 3.15 |
q40 | 37.40 | 11.98 | 37.7 | 8.4 | 17.02 | 9.20 |
q46 | 11.57 | 9.06 | 24.4 | 17.3 | 18.51 | 14.19 |
q48 | 20.48 | 12.65 | 42.3 | 22.5 | 20.71 | 11.54 |
q49 | 26.69 | 16.01 | 38.8 | 12.0 | 68.67 | 30.57 |
q50 | 46.90 | 33.22 | 43.4 | 42.5 | 21.30 | 16.77 |
q54 | 43.05 | 11.39 | 27.5 | 14.8 | 17.71 | 11.52 |
q56 | 16.23 | 4.12 | 23.8 | 5.5 | 31.26 | 6.72 |
q60 | 16.39 | 6.02 | 25.1 | 6.6 | 31.26 | 7.42 |
q61 | 17.18 | 5.50 | 33.4 | 7.1 | 42.63 | 9.37 |
q66 | 13.67 | 6.59 | 19.1 | 8.9 | 19.63 | 8.34 |
q69 | 9.89 | 7.46 | 10.5 | 6.1 | 4.83 | 3.16 |
q71 | 17.32 | 6.11 | 23.3 | 6.6 | 31.26 | 8.06 |
q74 | 16.86 | 9.44 | 24.1 | 17.6 | 22.59 | 8.08 |
q75 | 122.04 | 69.45 | 102.7 | 62.9 | 110.86 | 63.91 |
q77 | 23.94 | 7.51 | 29.3 | 6.8 | 49.95 | 12.20 |
q80 | 43.46 | 18.57 | 45.8 | 11.5 | 37.25 | 11.78 |
q85 | 20.97 | 16.54 | 16.9 | 14.7 | 14.65 | 10.52 |
- Q18 查询提高了50%以上的运行时间,同时平均减少了64%的 CPU 使用,数据读取减少66%。
- Q7 查询性能提升了30%到50%,而 CPU 使用率平均降低了47%,数据读取减少54%。
- Q29 查询性能提升了10%到30%,平均减少了20%的 CPU,数据读取减少27%。
总结
Presto 中的动态分区功能在一些场景下能够减少数据的扫描,提升查询性能。PrestoDB 和 Trino 都在最近的分支引入了这个功能,Trino 中的动态过滤目前应该是处于稳定状态;而 PrestoDB 中这个功能还处于实验状态(experimental)。而且目前来看覆盖的数据源有限,Trino 支持 Hive 和 Monery 数据源;而 PrestoDB 支持 Hive 数据源,而且对文件格式有一定的要求。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Presto 动态过滤(dynamic filtering)原理与应用】(https://www.iteblog.com/archives/9972.html)