MaxCompute基于新一代的SQL引擎推出新功能UDT(User Defined Type)。MaxCompute的UDT功能允许您在SQL中直接调用第三方语言的类使用其方法,或直接使用第三方对象获取其数据内容。
UDT介绍
很多SQL引擎中UDT与MaxCompute的复杂类型STRUCT类似,相比之下,MaxCompute中的UDT与Create Type的概念更类似,Type中包含数据域和方法。MaxCompute不需要用特殊的DDL语法来定义新类型,通过UDT可以在SQL中直接使用新类型。通过如下示例,为您直观地介绍UDT的功能。
例如,在SQL语句中调用Java的java.lang包。您可以使用以下两种方法:
通过UDT功能在SQL语句中直接调用java.lang。
--打开新类型,因为下面的操作会用到INTEGER,即INT类型。 set odps.sql.type.system.odps2=true; SELECT java.lang.Integer.MAX_VALUE;
和Java语言一样,java.lang包可以省略,所以上述示例可以简写为如下语句。
set odps.sql.type.system.odps2=true; SELECT Integer.MAX_VALUE;
输出结果如下。
+-----------+ | max_value | +-----------+ | 2147483647 | +-----------+
使用UDF在SQL语句中调用java.lang。
代码开发(定义一个UDF的类。)
package com.aliyun.odps.test; public class IntegerMaxValue extends com.aliyun.odps.udf.UDF { public Integer evaluate() { return Integer.MAX_VALUE; } }
进行打包、上传及注册操作(将上面的UDF编译,并打成JAR包,然后上传JAR包,并创建Function。)
add jar odps-test.jar; create function integer_max_value as 'com.aliyun.odps.test.IntegerMaxValue' using 'odps-test.jar';
在SQL中调用UDF。
select integer_max_value();
由上例可以看出,UDT简化了上述一系列的过程,方便您使用其它语言扩展SQL的功能。
应用场景
UDT的常用场景如下:
MaxCompute没有提供,但可以使用其它语言简单实现的功能。
例如,只需调用一次Java内置类的方法即可实现,但MaxCompute却没有提供简单的方法实现这个功能。如果使用UDF实现,整个过程会过于繁杂。
SQL中需要调用第三方库实现相关功能。希望能够在SQL中直接调用,而不需要再Wrap一层UDF。
SQL中需要直接调用第三方语言源代码。Select Transform支持把脚本写到SQL语句中,提升可读性和代码易维护性。但是某些语言无法这样使用,例如Java源代码必须经过编译才能执行,通过UDT功能将这些语言也可以直接写入SQL中。
使用限制
目前版本不支持使用UDF/UDAF/UDT读取以下场景的表数据:
做过表结构修改(Schema Evolution)的表数据。
包含复杂数据类型的表数据。
包含JSON数据类型的表数据。
Transactional表的表数据。
实现原理
通过以下示例为您介绍UDT的执行过程。
--示例数据。
@table1 := select * from values ('100000000000000000000') as t(x);
@table2 := select * from values (100L) as t(y);
--代码逻辑。
--new创建对象。
@a := select new java.math.BigInteger(x) x from @table1;
--静态方法调用。
@b := select java.math.BigInteger.valueOf(y) y from @table2;
--实例方法调用。
select /*+mapjoin(b)*/ x.add(y).toString() from @a a join @b b;
--输出结果如下所示。
100000000000000000100
整个示例的运行过程如下图所示。
该UDT共有三个Stage:M1、R2和J3。如果您熟悉MapReduce原理即可知道,由于Join
的存在需要做数据Reshuffle,所以会出现多个Stage。通常,不同的Stage是在不同的进程、不同的物理机器上运行的。
M1只执行new java.math.BigInteger(x)
操作。
J3在不同阶段执行了java.math.BigInteger.valueOf(y)
和x.add(y).toString()
操作。这几个操作不仅分阶段执行,而且在不同的进程、不同的物理机器上执行。UDT把这个过程封装起来,将这个过程变得看起来和在同一个JVM中执行的效果几乎一样。
从上述示例中,您可以看到子查询的结果允许UDT类型的列。例如上面变量a的x列是java.math.BigInteger
类型,而不是内置类型。UDT类型的数据可以被带到下一个Operator中,再调用其它方法,甚至可以参与数据Shuffle。
功能说明
UDT仅支持Java语言,Java SDK的类都是默认可用的。
说明Runtime使用的JDK版本是JDK1.8,可能不支持更新版本的JDK。
UDT支持您上传自己的JAR包,并且直接引用。当前提供了一些Flag方便您的使用。
set odps.sql.session.resources
指定引用的资源,可以指定多个,用逗号隔开,例如set odps.sql.session.resources=foo.sh,bar.txt;
。说明这个Flag和
select transform
中指定资源的Flag相同,所以这个Flag会同时影响两个功能。例如UDT概述中UDF的JAR包,用于UDT使用。set odps.sql.type.system.odps2=true; set odps.sql.session.resources=odps-test.jar; --指定要引用的JAR。这些JAR需要提前上传至Project,并且需要是JAR类型的资源。 select new com.aliyun.odps.test.IntegerMaxValue().evaluate();
odps.sql.session.java.imports
指定默认的Java Package,可以指定多个,用逗号隔开。和Java的import
语句类似,可以提供完整类路径,例如java.math.BigInteger
,也可以使用*
。暂不支持static import
。UDT概述中UDF的JAR包,使用UDT功能还有如下写法。
set odps.sql.type.system.odps2=true; set odps.sql.session.resources=odps-test.jar; set odps.sql.session.java.imports=com.aliyun.odps.test.*; -- 指定默认的Package。 select new IntegerMaxValue().evaluate();
UDT支持资源(Resource)的访问,您可以在SQL中通过
com.aliyun.odps.udf.impl.UDTExecutionContext.get()
静态方法获取ExecutionContext
对象,从而访问当前的ExecutionContext
,进而访问资源(例如文件资源和表格资源)。UDT支持的操作:
实例化对象的
new
操作。实例化数组的
new
操作,包括使用初始化列表创建数组,例如new Integer[] { 1, 2, 3 }
。方法调用,包括静态方法调用。
域访问,包括静态域。
说明仅支持公有方法和公有域的访问。
UDT中的标识符是大小写敏感的,包括Package、类名、方法名和域名。
暂不支持匿名类和Lambda表达式。
暂不支持无返回值的函数调用(因为UDT均出现在Expression中,没有返回值的函数调用无法嵌入到Expression中,这个问题在后续的版本中会有解决方案)。
UDT支持的数据类型:
UDT支持类型转换,支持SQL的风格,例如
cast(1 as java.lang.Object)
。但不支持Java风格的类型转换,例如(Object)1
。UDT内置类型与特定Java类型有一一映射关系,详情请参见Java UDF中的数据类型映射表,这个映射关系对UDT也有效。
内置类型的数据能够直接调用其映射到的Java类型的方法,例如
'123'.length() , 1L.hashCode()
。UDT类型能够直接参与内置函数或者UDF的运算, 例如
chr(Long.valueOf('100'))
,其中Long.valueOf
返回的是java.lang.Long
类型的数据,而内置函数Chr
接受的数据类型是内置类型BIGINT。Java的PRIMITIVE类型可以自动转化为其BOXING类型,并应用前两条规则。
说明部分内置的新数据类型需要先设置
set odps.sql.type.system.odps2=true;
才可使用,否则会报错。UDT扩展了类型转换规则:
UDT对象可以被隐式类型转换为其基类对象。
UDT对象可以被强制类型转换为其基类或子类对象。
没有继承关系的两个对象之间遵守原来的类型转换规则,注意这时可能会导致内容的变化。例如
java.lang.Long
类型的数据是可以强制转换为java.lang.Integer
的,应用的是内置类型的BIGINT强制转换为INT的过程,而这个过程会导致数据内容的变化,甚至可能导致精度的损失。
说明目前除隐式类型转换变成内置类型外,UDT对象不能存储到硬盘,即不能将UDT对象
INSERT
到表中(实际上DDL不支持UDT,不能创建这样的表)。内置类型支持BINARY,即支持自己实现序列化的过程,将byte[]的数据存储到硬盘,下次读出时再还原回来。因此需要您自己调用序列化反序列化方法,将其转换为BINARY数据类型再存储到硬盘。屏显的最终结果不可以是UDT类型。对于屏显的场景,由于所有的Java类都有
toString()
方法,而java.lang.String
类型是合法的。所以Debug时,可以用这种方法观察UDT的内容。您也可以设置
set odps.sql.udt.display.tostring=true;
,MaxCompute会自动帮您把所有以UDT为最终输出的列Wrap上java.util.Objects.toString(...)
,以方便调试。这个Flag只对屏显语句生效,对INSERT
语句不生效,所以它只在调试的过程中使用。UDT支持比较完整的泛型。例如
java.util.Arrays.asList(new java.math.BigInteger('1'))
,编译器可以根据参数类型判断出该方法的返回值是java.util.List<java.math.BigInteger>
类型。说明构造函数需要指定类型参数,否则需要使用
java.lang.Object
,这点和Java保持一致。new java.util.ArrayList(java.util.Arrays.asList('1', '2'))
的结果是java.util.ArrayList<Object>
类型,而new java.util.ArrayList<String>(java.util.Arrays.asList('1', '2'))
的结果是java.util.ArrayList<String>
类型。
所有的运算符都是MaxCompute SQL的语义,不是UDT的语义。例如:
STRING的相加操作:
String.valueOf(1) + String.valueOf(2)
的结果是3(STRING隐式转换为DOUBLE,并且DOUBLE相加) ,而不是12(Java中STRING相加是Concatenate的语义)。=
操作:SQL中的=
不是赋值而是判断相等。而对于Java对象来说,判断相等应该用Equals方法,而非=
操作。
UDT对同一对象的概念是模糊的,这是由数据的Reshuffle导致的。对象有可能会在不同进程、不同物理机器之间传输。在传输过程中,同一个对象可能分别引用了不同的对象(例如对象先被Shuffle到两台机器,然后下次又Shuffle回一起)。 所以在使用UDT时,应该使用
equals
方法判断相等,避免使用=
判断相等。某行某列的对象,其内部包含的各个数据对象的相关性是可以保证的。不同行或者不同列的对象的数据相关性是不保证的。
UDT不能用作Shuffle Key,包括
Join
、Group By
、Distribute By
、Sort By
、Order By
、Cluster By
等结构的Key。UDT可以在Expression中间的任意阶段使用,但不能作为最终输出。例如,不可以使用语句
group by new java.math.BigInteger('123')
,但可以使用语句group by new java.math.BigInteger('123').hashCode()
。因为hashCode
方法的返回值是int.class
类型,可以当做内置类型INT来使用(应用上述内置类型与特定Java类型规则)。UDT不仅可以实现Scalar函数的功能,配合内置函数COLLECT_SET和其他函数,UDT还可以实现Aggregator和Table Function功能。
功能优势
UDT的功能优势:
使用简单,无需定义任何函数。
支持JDK的所有功能,扩展了SQL的能力。
代码可与SQL放于同一文件,便于管理。
可直接使用其它类库,代码重用率高。
可以使用面向对象的思想设计某些功能。
后续待完善功能:
支持无返回值的函数调用,或支持(有返回值但忽略返回值)直接取操作数本身的函数调用。例如,调用List的
add
方法会返回执行完add
操作的List。支持匿名类和Lambda表达式。
支持用作Shuffle Key。
支持Java外的其他语言,例如Python。
性能
因为UDT和UDF的执行过程非常接近,所以UDT与UDF的性能几乎一致。优化后的计算引擎使得UDT在特定场景下的性能更高。
UDT对象只有在跨进程时才需要做序列化和反序列化,因此在执行不需要数据Reshuffle的操作(如
JOIN
或AGGREGATE
)时,UDT可节省序列化和反序列化的开销。因为UDT的Runtime基于Codegen而非反射实现的,所以不存在反射带来的性能损失。在您使用过程中,连续多个UDT操作会合并在一个FunctionCall里一起执行。例如在之前的示例中,
values[x].add(values[y]).divide(java.math.BigInteger.valueOf(2))
实际上只会调用一次UDT。所以,UDT操作的单元虽然较小,却并不会因多次函数调用而造成额外的接口开销。
安全性
在安全控制方面,UDT和UDF完全一样,都会受到Java沙箱Policy的限制。因此如果要使用受限的操作,需要打开沙箱隔离,或者申请沙箱白名单。