本文来自上周(2020-11-17至2020-11-19)举办的 Data + AI Summit 2020 (原 Spark+AI Summit),主题为《Spark SQL Beyond Official Documentation》的分享,作者 David Vrba,是 Socialbakers 的高级机器学习工程师。
实现高效的 Spark 应用程序并获得最大的性能为目标,通常需要官方文档之外的知识。理解 Spark 的内部流程和特性有助于根据内部优化设计查询,从而在执行期间实现高效率。在这次演讲中,我们将重点讨论 Spark SQL 的一些官方文档中没有很好描述的内部特性,重点在一些基本示例上解释这些特性,同时分享一些性能技巧。
本次分享主要有两大主题:
- Spark SQL 中的统计信息(Statistics)
- 以排序的方式将数据存储在文件中
Spark SQL 中的统计信息(Statistics)
统计信息这块的分享主要包括以下四个方面:
- 我们怎么可以看到统计信息
- 统计信息是如何计算的
- 统计信息是怎么使用
- 有什么注意事项?
在 Spark 中,我们可以通过以上方法来查看表级别的统计信息、,输出的信息里面有个 Statistics 那行,那就是本次分享要提到的统计信息。
如果想查看列级别的统计信息,可以使用 DESCRIBE EXTENDED table_name column_name 进行查看。输出的信息可以看到这列的最大最小值、null 的数量、去重之后的值数量等信息。
如果我们使用的是 Apache Spark 3,我们还可以通过 explain(mode="cost") 来查看统计信息:
统计信息是通过执行计划树的叶子节点计算的,然后从树的最底层往上传递,同时 Spark 也会利用这些统计信息来修改执行树的执行过程。
统计信息的传递主要有两种方法:最简单的方法(Simple way)以及高级方法。
上面就是统计信息的最简单的传递方法,这种方法只传递 sizeInBytes。从右上图可以看出,我们通过 user_id > 0 的过滤条件来过滤数据,其实表中肯定没有用户的 user_id 小于 0 ,但是我们可以通过逻辑计划看出,简单的统计信息传递,Filter 节点的统计信息的 sizeInBytes 和 Relation 节点是一样。
高级传递方式的特点是可以传递 sizeInBytes 信息、行数以及列级别的信息。这种传递方式必须使用 Apache Spark 2.2 版本,而且需要打开 CBO(默认没有打开,需要配置 spark.sql.cbo.enabled=true)。
上面例子可以看出,统计信息里面只有 sizeInBytes 和 rowCount,并没有列级别的统计信息,特别是 user_id 这列的统计信息。所以 Filter 节点和 Relation 节点的统计信息是一样的,因为 Spark 无法利用这些统计信息算出文件里面到底有多少满足 user_id > 0 的数据。
如果我们算出 user_id 的统计信息,可以看出,Filter 利用了这些统计信息,算出过滤后的数据行数为0。
那么,上面提到的统计信息是如何计算的呢?在 Spark 中主要有三种:
- 从 metastore 中获取
- 利用 Hadoop API 计算,仅仅计算 sizeInBytes
- 使用 sizeInBytes 的默认值,通过 spark.sql.defaultSizeInBytes 配置,默认为 Long 的最大值。
上面是计算流程,图中的每个节点是条件,根据不同条件选择不同的路径。
在 Spark 中,统计信息主要在两种情况使用:
- Join 策略选择
- 多表 Join,调节 Join 表的顺序,这个需要打开 spark.sql.cbo.joinReorder.enable。
以上就是 Spark 统计信息相关的知识点。
以排序的方式将数据存储在文件中
以排序的方式将数据存储到文件主要涉及排序的函数有哪些,怎么将数据以排序的形式存储在文件。
上面就是 Spark 中排序的算子:
- orderBy/sort 这个是 DataFrame 的转换算子,其是在不同的作业粒度对数据进行排序,需要 shuffle 来达到全局的有序;
- sortWithinPartitions 这个也是 DataFrame 的转换算子,是分区粒度的排序;
- sortBy 这个通常是在 DataFrameWriter 上调用,并且在 write 算子调用之后调用的,一般和 bucketing 算子一起使用,需要配合使用 saveAsTable
为了有个直观的体验,我们用个例子来介绍。在这个例子中,我们使用 year 这列对数据进行分区;每个分区使用 user_id 来进行排序;每个分区只有一个文件,这个文件是通过 user_id 进行排序的。
很多人会很轻松的写出以上的程序。但是遗憾的是,最后保存在文件系统的文件并没有排序。如果需要保存的数据是有序的,需要保证 partitionColumns 有序, bucketingIdExpression 有序以及 sortColumns 有序的顺序。如果这个没有满足,则 Spark 将忽略之前的排序。
上面例子中因为我们对 year 这列进行分区,但是使用 user_id 进行排序,所以最后保存的文件肯定不会有序。
如果我们将 sortWithinPartitions('user_id') 修改为 sortWithinPartitions('year', 'user_id') 。这样最后算出的文件就是分区内有序的。
到这里我们已经学习了如何使用统计信息来提升我们的 Join 性能,以及如何以有序的方式来存储数据。
好了,今天的分享就到这里,欢迎大家转发+点赞。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【这些未在 Spark SQL 文档中说明的优化措施,你知道吗?】(https://www.iteblog.com/archives/9899.html)