这篇文章本来19年5月份就想写的,最终拖到今天才整理出来😂。
Spark 内置给我们带来了非常丰富的各种优化,这些优化基本可以满足我们日常的需求。但是我们知道,现实场景中会有各种各样的需求,总有一些场景在 Spark 得到的执行计划不是最优的,社区的大佬肯定也知道这个问题,所以从 Spark 1.3.0 开始,Spark 为我们提供了两个主要接口,让我们可以自定义相关优化规则来在逻辑计划处理阶段或者是将逻辑计划(Logical Plan)转换成物理计划(Physical Plan)阶段实现我们业务相关的优化逻辑。
今天我们来介绍一下自定义策略(Strategy),来将逻辑计划转换成我们需要的物理计划。假设我们有以下的业务逻辑处理:
package com.iteblog import org.apache.spark.sql.SparkSession object SparkSQL { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[2]").appName("iteblog Spark").getOrCreate() val tableA = spark.range(10000000).as('a) val tableB = spark.range(20000000).as('b) import spark.implicits._ val result = tableA.join(tableB, $"a.id" === $"b.id").groupBy().count() result.show() } }
如果我们运行上面的代码,发现打出 result 的结果需要很长时间。在我的电脑上运行了大概9s:
20/08/05 22:22:58 INFO DAGScheduler: Job 0 finished: show at SparkSQL.scala:14, took 8.764567 s
为什么会这么慢?我们使用 result.explain()
看下上面程序执行计划:
== Physical Plan == *(6) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition +- *(5) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(5) Project +- *(5) SortMergeJoin [id#0L], [id#3L], Inner :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0L, 200) : +- *(1) Range (0, 10000000, step=1, splits=2) +- *(4) Sort [id#3L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#3L, 200) +- *(3) Range (0, 20000000, step=1, splits=2)
可以看出,上面程序会执行 SortMergeJoin,它会把两张表的数据分区到200个分区里面,然后进行排序,最后再 Join,而想而知这个笑脸是多慢。如果是上面的业务场景,其实我们是可以对其进行优化的。
上面的 join 其实是取 a 表从 0 到 10000000 与 b 表从 0 到 20000000 之间的交集,也就是 0 到 10000000。也就是下面四种情况下才有结果:
tableA: start1----------------------------end1 tableB: start2------------------end2 tableA: start1---------end1 tableB: start2------------------end2 tableA: start1----------------------------end1 tableB: start2------------------------------end2 tableA: start1----------------------------end1 tableB: start2------------------end2
如果是下面两种情况,其实就是没交集:
tableA: start1-----------------end1 tableB: start2--------end2 tableA: start1---------end1 tableB: start2----------end2
根据上面的分析,我们可以实现自己的策略:
package com.iteblog import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Join, Range} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.execution.{ProjectExec, RangeExec, SparkPlan} case object IntervalJoin extends Strategy with Serializable { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case Join(Range(start1, end1, 1, part1, Seq(o1), false), // mathces tableA Range(start2, end2, 1, part2, Seq(o2), false), // matches tableB Inner, Some(EqualTo(e1, e2))) // matches the Join if ((o1 semanticEquals e1) && (o2 semanticEquals e2)) || ((o1 semanticEquals e2) && (o2 semanticEquals e1)) => /// matches cases like: // tableA: start1----------------------------end1 // tableB: ...------------------end2 if (((end2 >= start1) && (end2 <= end1)) || (start1 <= start2) && (start2 <= end1)) { // 交集的起点 val start = math.max(start1, start2) // 交集的终点 val end = math.min(end1, end2) val part = math.max(part1.getOrElse(200), part2.getOrElse(200)) // 创建新的交集范围 val result = RangeExec(Range(start, end, 1, Some(part), o1 :: Nil, isStreaming = false)) val twoColumns = ProjectExec(Alias(o1, o1.name)(exprId = o1.exprId) :: Nil, result) twoColumns :: Nil } else { Nil } case _ => Nil } }
然后 Spark 为我们提供了 spark.experimental.extraStrategies
接口来注册我们自定义的策略:
spark.experimental.extraStrategies = IntervalJoin :: Nil
我们再运行一下上面的程序,发现执行 show 的时间变成 0.5s 了!
20/08/05 22:21:36 INFO DAGScheduler: Job 0 finished: show at SparkSQL.scala:14, took 0.309442 s
新的执行计划没有了 Join 操作:
== Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(1) Project +- *(1) Project [id#0L AS id#0L] +- *(1) Range (0, 10000000, step=1, splits=2)
注意,Strategy 是在将逻辑计划转换成物理计划的阶段使用到的,我们可以查看下优化后的逻辑计划(Optimized Logical Plan)和最后的物理计划(Spark Plan):
逻辑计划树:
result.queryExecution.optimizedPlan.treeString 输出: Aggregate [count(1) AS count#15L] +- Project +- Join Inner, (id#0L = id#3L) :- Range (0, 10000000, step=1, splits=Some(2)) +- Range (0, 20000000, step=1, splits=Some(2))
物理计划树:
result.queryExecution.sparkPlan.treeString 输出: HashAggregate(keys=[], functions=[count(1)], output=[count#15L]) +- HashAggregate(keys=[], functions=[partial_count(1)], output=[count#21L]) +- Project +- Project [id#0L AS id#0L] +- Range (0, 10000000, step=1, splits=2)
可以清楚的看到自定义策略是在最终的物理计划才有的。另外需要注意,虽然本文介绍的功能是从 1.3.0 开始引入的,但是直到 Spark 3.0,这个功能还是实验性(Experimental)的方法,所以在未来的 Spark 版本这个用户可能会发生变化,这个需要注意。
最后,明天我们再介绍一下逻辑计划优化阶段自定义逻辑计划规则。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Apache Spark 自定义优化规则:Custom Strategy】(https://www.iteblog.com/archives/2571.html)
start 和 end代表什么意思啊? 没太看懂
start 代表两段数据交集的起点,
end 代表两段数据交集的终点。