在《ASM 与 Presto 动态代码生成简介》这篇文章中,我们简单介绍了 Presto 动态代码生成的原理以及 Presto 在计算表达式的地方会使用到动态代码生成技术。为了加深理解,本文将以两个例子介绍 Presto 里面动态代码生成的使用。
EmbedVersion
我们往 Presto 提交 SQL 查询以及 TaskExecutor 启动 TaskRunner 执行 Task 的时候都会使用到 EmbedVersion 类里面的 embedVersion 方法。embedVersion 方法其实就是初始化一个 Runnable 实例,比如启动 TaskRunner 的代码片段如下:
executor.execute(embedVersion.embedVersion(new TaskRunner()));
其中 TaskRunner 就是实现 Runnable 接口的。EmbedVersion 的 embedVersion 方法实现如下:
public Runnable embedVersion(Runnable runnable) { requireNonNull(runnable, "runnable is null"); try { return (Runnable) runnableConstructor.invoke(runnable); } catch (Throwable throwable) { throwIfUnchecked(throwable); throw new RuntimeException(throwable); } }
其中 runnableConstructor 就是使用 ASM 进行代码生成的类,实现如下:
// 这里定义了一个类,类名大概为 Presto_null__testversion____20211011_105831_1, // 它的父类是 Object,并实现了 Runnable 接口。 ClassDefinition classDefinition = new ClassDefinition( a(PUBLIC, FINAL), makeClassName(baseClassName(serverConfig)), type(Object.class), type(Runnable.class)); // 定义了一个名为 runnable 的局部变量,类型为 Runnable FieldDefinition field = classDefinition.declareField(a(PRIVATE), "runnable", Runnable.class); Parameter parameter = arg("runnable", type(Runnable.class)); // 定义了这个类的构造函数,参数为 runnable,参数类型为 Runnable MethodDefinition constructor = classDefinition.declareConstructor(a(PUBLIC), parameter); // 构造方法里面其实就是把参数 runnable 的值赋值给局部变量 runnable constructor.getBody() .comment("super(runnable);") .append(constructor.getThis()) .invokeConstructor(Object.class) .append(constructor.getThis()) .append(parameter) .putField(field) .ret(); // 定义了一个名为 run 的方法,事实上就是实现 Runnable 接口里面的 run 方法 MethodDefinition run = classDefinition.declareMethod(a(PUBLIC), "run", type(void.class)); // run 里面其实就是调用局部变量 runnable 的 run 方法 run.getBody() .comment("runnable.run();") .append(run.getThis()) .getField(field) .invokeInterface(Runnable.class, "run", void.class) .ret(); // 定义这个类,并加载到 ClassLoader 中 Class<? extends Runnable> generatedClass = defineClass(classDefinition, Runnable.class, ImmutableMap.of(), getClass().getClassLoader()); this.runnableConstructor = constructorMethodHandle(generatedClass, Runnable.class);
上面是 Presto 操作 Java 字节码并动态生成了一个类,其生成的类大概如下面所示:
package com.facebook.presto.$gen; public final class Presto_null__testversion____20211011_105831_1 implements Runnable { private Runnable runnable; public Presto_null__testversion____20211011_105831_1(Runnable runnable) { this.runnable = runnable; } public void run() { this.runnable.run(); } }
看起来内容其实很简单。EmbedVersion 类算是 Presto 里面动态代码生成最简单的例子了。
concat 函数实现
下面我们来看下稍微复杂的,也就是 Presto 里面内置函数的实现。Presto 的内置函数的实现很多也是用到代码生成技术,比如 map_filter、transform_keys 以及 transform_values 等。我们这里也举一个比较简单的例子,也就是 concat 函数的实现。比如下面的 SQL 查询:
select concat(o_orderstatus, o_orderpriority) from orders limit 10;
在 Presto 里面,concat 函数的实现就是通过代码生成进行的,其实现代码可以参见 com.facebook.presto.operator.scalar.ConcatFunction。Presto 接收到上面的 SQL 查询后,会在 Coordinator 端进行解析,并生成相应的 Tasks,提交给 Worker 执行。在 Worker 端,执行 Task 的时候,会调用 LocalExecutionPlanner 的 plan 方法生成 LocalExecutionPlan 其实就是本地可执行的计划,在 plan 方法里面会调用 com.facebook.presto.sql.planner.LocalExecutionPlanner.Visitor 对 Coordinator 传过来的 PlanNode 进行变量生成 PhysicalOperation。在我们的例子中,会在 com.facebook.presto.sql.planner.LocalExecutionPlanner.Visitor#visitScanFilterAndProject 里面对 concat(o_orderstatus, o_orderpriority) 进行代码生成,最终调用到 com.facebook.presto.operator.scalar.ConcatFunction 的 generateConcat 方法,其就是 Presto 的 concat 函数实现逻辑,如下:
// arity 代表 Concat 函数输入参数的个数 private static Class<?> generateConcat(TypeSignature type, int arity) { checkCondition(arity <= 254, NOT_SUPPORTED, "Too many arguments for string concatenation"); // 定义动态代码生成的类名,生成的类名大概是 varchar_concat2ScalarFunction_20211011_062900_3 样子的 ClassDefinition definition = new ClassDefinition( a(PUBLIC, FINAL), makeClassName(type.getBase() + "_concat" + arity + "ScalarFunction"), type(Object.class)); // 生成类的构造函数,这里是使用 private 修饰的 // Generate constructor definition.declareDefaultConstructor(a(PRIVATE)); // Generate concat() // 定义 concat 函数的参数,比如 arg0、arg1;类型是 Slice List<Parameter> parameters = IntStream.range(0, arity) .mapToObj(i -> arg("arg" + i, Slice.class)) .collect(toImmutableList()); // 定义一个名为 concat 的函数,它的修饰符是 public static, // 返回类型是 Slice,输入参数是上面定义的 arg0、arg1 等。 MethodDefinition method = definition.declareMethod(a(PUBLIC, STATIC), "concat", type(Slice.class), parameters); Scope scope = method.getScope(); BytecodeBlock body = method.getBody(); // 定义一个名为 length 的局部变量,类型为 int Variable length = scope.declareVariable(int.class, "length"); // length 变量初始化为0 body.append(length.set(constantInt(0))); // 下面是计算 concat 函数每个参数的长度(其实就是调用 string 的 length 方法) // 然后再把得到的字符串长度加到 length 里面,并赋值给 length for (int i = 0; i < arity; ++i) { body.append(length.set(generateCheckedAdd(length, parameters.get(i).invoke("length", int.class)))); } // 定义一个名为 result 的局部变量,类型为 Slice Variable result = scope.declareVariable(Slice.class, "result"); // 调用 Slices 的 allocate 方法分配出长度为 length 空间的 Slice 对象,并赋值给 result body.append(result.set(invokeStatic(Slices.class, "allocate", Slice.class, length))); // 定义一个名为 position 的局部变量,类型为 int,赋值为 0 Variable position = scope.declareVariable(int.class, "position"); body.append(position.set(constantInt(0))); // 下面是循环调用 result 的 setBytes 方法,并分别把 arg0、arg1 里面的内容放到 result 里面去 // 最后计算 arg0 或 arg1 字符串长度再加上 position 的值,结果再赋值给 position for (int i = 0; i < arity; ++i) { body.append(result.invoke("setBytes", void.class, position, parameters.get(i))); body.append(position.set(add(position, parameters.get(i).invoke("length", int.class)))); } // 返回 result body.getVariable(result) .retObject(); // 定义生成的类,并把它加载打破 DynamicClassLoader 里面去 return defineClass(definition, Object.class, ImmutableMap.of(), new DynamicClassLoader(ConcatFunction.class.getClassLoader())); } private static BytecodeExpression generateCheckedAdd(BytecodeExpression x, BytecodeExpression y) { // 调用 ConcatFunction 类里面的 checkedAdd 静态方法 return invokeStatic(ConcatFunction.class, "checkedAdd", int.class, x, y); } @UsedByGeneratedCode public static int checkedAdd(int x, int y) { try { return addExact(x, y); } catch (ArithmeticException e) { throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "Concatenated string is too large"); } }
为了方便理解,我对 generateConcat 方法的实现进行了注释,应该很好理解。为了性能问题,最终生成的函数会进行缓存,下一次再调用 concat 函数,只要函数签名一样,就不用再一次进行 concat 代码的生成。比如我们前面的例子是对两个字符串进行合并(函数签名为 presto.default.concat(varchar,varchar):varchar ),如果下一次还是调用这个函数就不用再进行代码生成了。但是如果下一次是对三个字符串进行合并,还是要进行一次代码生成的。
到这里,大家可能还是不太明白 Presto 代码生成到底生成了什么东西。这里我就进一步介绍一下。如果运行我们上面的 SQL 查询,Presto 生成的 concat 实现大概如下面所示:
package com.facebook.presto.$gen; import com.facebook.presto.operator.scalar.ConcatFunction; import io.airlift.slice.Slice; import io.airlift.slice.Slices; public final class varchar_concat2ScalarFunction_20211011_062900_3 { private varchar_concat2ScalarFunction_20211011_062900_3() { } public static Slice concat(Slice arg0, Slice arg1) { int length = 0; int length = ConcatFunction.checkedAdd(length, arg0.length()); length = ConcatFunction.checkedAdd(length, arg1.length()); Slice result = Slices.allocate(length); int position = 0; result.setBytes(position, arg0); int position = position + arg0.length(); result.setBytes(position, arg1); int var10000 = position + arg1.length(); return result; } }
注意,Presto 里面生成的是 Java 字节码,这里只是为了说明的方便,给出了 Java 源代码。可以看到,最终生成的代码其实很好理解。Presto 里面对两个字符串进行 concat 其实就是执行上面的代码片段。
总结
本文通过两个例子简单的介绍了 Presto 里面动态代码生成的用法,通过上面两个例子,相信大家应该能够大概了解 Presto 里面的动态代码生成的技术和用法。后面有空我会介绍一下表达式这块代码生成和整个 page 处理逻辑是怎么联系到一起的,敬请关注。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【动态代码生成技术在 Presto 中使用简介】(https://www.iteblog.com/archives/10047.html)