本文为您介绍如何通过Java语言编写UDAF。
UDAF代码结构
您可以通过IntelliJ IDEA(Maven)或MaxCompute Studio工具使用Java语言编写UDAF代码,代码中需要包含如下信息:
Java包(Package):可选。
您可以将定义的Java类打包,为后续查找和使用类提供方便。
继承UDAF类:必选。
必须携带的UDAF类为
com.aliyun.odps.udf.Aggregator
和com.aliyun.odps.udf.annotation.Resolve
(对应@Resolve
注解)。com.aliyun.odps.udf.UDFException
(可选,对应实现Java类初始化和结束的方法)。当您需要使用其他UDAF类或者需要用到复杂数据类型时,请根据MaxCompute UDF概述添加需要的类。@Resolve
注解:必选。格式为
@Resolve(<signature>)
。signature
为函数签名,用于定义函数的输入参数和返回值的数据类型。UDAF无法通过反射分析获取函数签名,只能通过@Resolve
注解方式获取函数签名,例如@Resolve("smallint->varchar(10)")
。更多@Resolve
注解信息,请参见@Resolve注解。自定义Java类:必选。
UDAF代码的组织单位,定义了实现业务需求的变量及方法。
实现Java类的方法:必选。
实现Java类需要继承
com.aliyun.odps.udf.Aggregator
类并实现如下方法。import com.aliyun.odps.udf.ContextFunction; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDFException; public abstract class Aggregator implements ContextFunction { //初始化方法。 @Override public void setup(ExecutionContext ctx) throws UDFException { } //结束方法。 @Override public void close() throws UDFException { } //创建聚合Buffer。 abstract public Writable newBuffer(); //iterate方法。 //buffer为聚合buffer,是指一个阶段性的汇总数据,即在不同的Map任务中,group by后得出的数据(可理解为一个集合),每行执行一次。 //Writable[]表示一行数据,在代码中指代传入的列。例如writable[0]表示第一列,writable[1]表示第二列。 //args为SQL中调用UDAF时指定的参数,不能为NULL,但是args里面的元素可以为NULL,代表对应的输入数据是NULL。 abstract public void iterate(Writable buffer, Writable[] args) throws UDFException; //terminate方法。 abstract public Writable terminate(Writable buffer) throws UDFException; //merge方法。 abstract public void merge(Writable buffer, Writable partial) throws UDFException; }
其中:
iterate
、merge
和terminate
是最重要的三个方法,UDAF的主要逻辑依赖于这三个方法的实现。此外,还需要您实现自定义的Writable buffer。Writable buffer将内存中的对象转换成字节序列(或其他数据传输协议)以便于储存到磁盘(持久化)和网络传输。因为MaxCompute使用分布式计算的方式来处理聚合函数,因此需要知道如何序列化和反序列化数据,以便于数据在不同的设备之间进行传输。
编写Java UDAF时可以使用Java Type或Java Writable Type,MaxCompute项目支持处理的数据类型与Java数据类型的详细映射关系,请参见数据类型。
UDAF代码示例如下。
//将定义的Java类组织在org.alidata.odps.udaf.examples包中。
package org.alidata.odps.udaf.examples;
//继承UDAF类。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.udf.Aggregator;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.annotation.Resolve;
//自定义Java类。
//@Resolve注解。
@Resolve("double->double")
public class AggrAvg extends Aggregator {
//实现Java类的方法。
private static class AvgBuffer implements Writable {
private double sum = 0;
private long count = 0;
@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(sum);
out.writeLong(count);
}
@Override
public void readFields(DataInput in) throws IOException {
sum = in.readDouble();
count = in.readLong();
}
}
private DoubleWritable ret = new DoubleWritable();
@Override
public Writable newBuffer() {
return new AvgBuffer();
}
@Override
public void iterate(Writable buffer, Writable[] args) throws UDFException {
DoubleWritable arg = (DoubleWritable) args[0];
AvgBuffer buf = (AvgBuffer) buffer;
if (arg != null) {
buf.count += 1;
buf.sum += arg.get();
}
}
@Override
public Writable terminate(Writable buffer) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
if (buf.count == 0) {
ret.set(0);
} else {
ret.set(buf.sum / buf.count);
}
return ret;
}
@Override
public void merge(Writable buffer, Writable partial) throws UDFException {
AvgBuffer buf = (AvgBuffer) buffer;
AvgBuffer p = (AvgBuffer) partial;
buf.sum += p.sum;
buf.count += p.count;
}
}
上述UDAF代码示例中,iterate
和merge
方法里的buffer为复用buffer,用于将输入的各行数据通过用户自定义实现的方式聚合到该buffer中。
使用限制
访问外网
MaxCompute默认不支持通过自定义函数访问外网。如果您需要通过自定义函数访问外网,请根据业务情况填写并提交网络连接申请表单,MaxCompute技术支持团队会及时联系您完成网络开通操作。表单填写指导,请参见网络开通流程。
访问VPC网络
MaxCompute默认不支持通过UDF访问VPC网络。如果您的UDF涉及访问VPC网络中的资源时,需要先创建MaxCompute与目标VPC网络间的网络连接,才可以直接通过UDF访问VPC网络中的资源,操作详情请参见通过UDF访问VPC网络资源。
读取表数据
目前版本不支持使用UDF/UDAF/UDTF读取以下场景的表数据:
做过表结构修改(Schema Evolution)的表数据。
包含复杂数据类型的表数据。
包含JSON数据类型的表数据。
Transactional表的表数据。
注意事项
在编写Java UDAF时,您需要注意:
不同UDAF JAR包中不建议存在类名相同但实现逻辑不一样的类。例如UDAF1、UDAF2分别对应资源JAR包udaf1.jar、udaf2.jar,两个JAR包里都包含名称为
com.aliyun.UserFunction.class
的类但实现逻辑不一样,当同一条SQL语句中同时调用UDAF1和UDAF2时,MaxCompute会随机加载其中一个类,此时会导致UDAF执行结果不符合预期甚至编译失败。Java UDAF中输入或返回值的数据类型是对象,数据类型首字母必须大写,例如String。
SQL中的NULL值通过Java中的NULL表示。Java Primitive Type无法表示SQL中的NULL值,不允许使用。
@Resolve注解
@Resolve
注解格式如下。
@Resolve(<signature>)
signature
为字符串,用于标识输入参数和返回值的数据类型。执行UDAF时,UDAF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。
'arg_type_list -> type'
其中:
arg_type_list
:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。arg_type_list
还支持星号(*)或为空(''):当
arg_type_list
为星号(*)时,表示输入参数为任意个数。当
arg_type_list
为空('')时,表示无输入参数。
type
:表示返回值的数据类型。UDAF只返回一列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。
在编写UDAF代码过程中,您可以根据MaxCompute项目的数据类型版本选取合适的数据类型,更多数据类型版本及各版本支持的数据类型信息,请参见数据类型版本说明。
合法@Resolve
注解示例如下。
@Resolve注解示例 | 说明 |
| 输入参数类型为BIGINT、DOUBLE,返回值类型为STRING。 |
| 输入任意个参数,返回值类型为STRING。 |
| 无输入参数,返回值类型为DOUBLE。 |
| 输入参数类型为ARRAY<BIGINT>,返回值类型为STRUCT<x:STRING, y:INT>。 |
数据类型
在MaxCompute中不同数据类型版本支持的数据类型不同。从MaxCompute 2.0版本开始,扩展了更多的新数据类型,同时还支持ARRAY、MAP、STRUCT等复杂类型。更多MaxCompute数据类型版本信息,请参见数据类型版本说明。
为确保编写Java UDAF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。
MaxCompute Type | Java Type | Java Writable Type |
TINYINT | java.lang.Byte | ByteWritable |
SMALLINT | java.lang.Short | ShortWritable |
INT | java.lang.Integer | IntWritable |
BIGINT | java.lang.Long | LongWritable |
FLOAT | java.lang.Float | FloatWritable |
DOUBLE | java.lang.Double | DoubleWritable |
DECIMAL | java.math.BigDecimal | BigDecimalWritable |
BOOLEAN | java.lang.Boolean | BooleanWritable |
STRING | java.lang.String | Text |
VARCHAR | com.aliyun.odps.data.Varchar | VarcharWritable |
BINARY | com.aliyun.odps.data.Binary | BytesWritable |
DATE | java.sql.Date | DateWritable |
DATETIME | java.util.Date | DatetimeWritable |
TIMESTAMP | java.sql.Timestamp | TimestampWritable |
INTERVAL_YEAR_MONTH | 不涉及 | IntervalYearMonthWritable |
INTERVAL_DAY_TIME | 不涉及 | IntervalDayTimeWritable |
ARRAY | java.util.List | 不涉及 |
MAP | java.util.Map | 不涉及 |
STRUCT | com.aliyun.odps.data.Struct | 不涉及 |
当MaxCompute项目采用MaxCompute 2.0数据类型版本时,UDAF的输入或返回值才可以使用Java Writable Type。
使用说明
按照开发流程,完成Java UDAF开发后,您即可通过MaxCompute SQL调用Java UDAF。调用方法如下:
在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。
跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨项目分享信息,请参见基于Package跨项目访问资源。
使用MaxCompute Studio完整开发及调用Java UDAF的操作,请参见使用示例。
使用示例
以通过MaxCompute Studio开发计算平均值的UDAF函数AggrAvg
为例,实现逻辑如下。
输入数据分片:MaxCompute会按照MapReduce处理流程对输入数据按照一定的大小进行分片,每片的大小适合一个Worker在适当的时间内完成。
分片大小需要您通过
odps.stage.mapper.split.size
参数进行配置。分片逻辑请参见MapReduce流程说明。计算平均值第一阶段:每个Worker统计分片内数据的个数及汇总值。您可以将每个分片内的数据个数及汇总值视为一个中间结果。
计算平均值第二阶段:汇总第一阶段中每个分片内的信息。
最终输出:
r.sum/r.count
即是所有输入数据的平均值。
开发并调用Java UDAF的操作步骤如下:
准备工作。
使用MaxCompute Studio开发调试UDF时,您需要先安装MaxCompute Studio并连接MaxCompute项目,做好UDF开发前准备工作。操作详情请参见:
编写UDAF代码。
在Project区域,右键单击Module的源码目录(即
),选择 。在Create new MaxCompute java class对话框,单击UDAF并填写Name后,按Enter键。例如Java Class名称为AggrAvg。
Name为创建的MaxCompute Java Class名称。如果还没有创建Package,在此处填写packagename.classname,会自动生成Package。
在代码编写区域写入如下代码。UDAF代码示例如下。
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import com.aliyun.odps.io.DoubleWritable; import com.aliyun.odps.io.Writable; import com.aliyun.odps.udf.Aggregator; import com.aliyun.odps.udf.UDFException; import com.aliyun.odps.udf.annotation.Resolve; @Resolve("double->double") public class AggrAvg extends Aggregator { private static class AvgBuffer implements Writable { private double sum = 0; private long count = 0; @Override public void write(DataOutput out) throws IOException { out.writeDouble(sum); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { sum = in.readDouble(); count = in.readLong(); } } private DoubleWritable ret = new DoubleWritable(); @Override public Writable newBuffer() { return new AvgBuffer(); } @Override public void iterate(Writable buffer, Writable[] args) throws UDFException { DoubleWritable arg = (DoubleWritable) args[0]; AvgBuffer buf = (AvgBuffer) buffer; if (arg != null) { buf.count += 1; buf.sum += arg.get(); } } @Override public Writable terminate(Writable buffer) throws UDFException { AvgBuffer buf = (AvgBuffer) buffer; if (buf.count == 0) { ret.set(0); } else { ret.set(buf.sum / buf.count); } return ret; } @Override public void merge(Writable buffer, Writable partial) throws UDFException { AvgBuffer buf = (AvgBuffer) buffer; AvgBuffer p = (AvgBuffer) partial; buf.sum += p.sum; buf.count += p.count; } }
在本地运行调试UDAF,确保代码可以运行成功。
更多调试操作,请参见通过本地运行调试UDF。
说明运行参数可参照图示数据填写。
将创建的UDAF打包为JAR包,上传至MaxCompute项目并注册函数。例如函数名称为
user_udaf
。更多打包操作,请参见操作步骤。
在MaxCompute Studio的左侧导航栏,单击Project Explorer,在目标MaxCompute项目上单击右键,启动MaxCompute客户端,并执行SQL命令调用新创建的UDAF。
假设待查询目标表my_table的数据结构如下。
+------------+------------+ | col0 | col1 | +------------+------------+ | 1.2 | 2.0 | | 1.6 | 2.1 | +------------+------------+
执行如下SQL命令调用UDAF。
select user_udaf(col0) as c0 from my_table;
返回结果如下。
+----+ | c0 | +----+ | 1.4| +----+