实时计算Flink版支持在Flink SQL作业中使用Java自定义函数,本文介绍Flink Java自定义函数的分类、参数传递及调用注意事项。
注意事项
为了避免JAR包依赖冲突,在开发自定义函数时您需要注意以下几点:
SQL开发页面选择的Flink版本,请和Pom依赖中的Flink版本保持一致。
Flink相关依赖,scope请使用provided,即在依赖中添加
<scope>provided</scope>
。其他第三方依赖请采用Shade方式打包,Shade打包详情请参见Apache Maven Shade Plugin。
Flink依赖冲突问题,详情请参见如何解决Flink依赖冲突问题?
为避免UDF在SQL作业文本里被频繁调用导致超时的情况,推荐您将UDF的JAR包作为依赖文件上传,并且通过
CRETATE TEMPORARY FUNCTION
语法在作业中声明函数,例如CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';
自定义函数分类
Flink支持以下3类自定义函数。
分类 | 描述 |
UDSF(User Defined Scalar Function) | 用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDSF)。 |
UDAF(User Defined Aggregation Function) | 自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)。 |
UDTF(User Defined Table-valued Function) | 自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)。 |
自定义函数注册
自定义函数参数传递
您可以在Flink开发控制台配置自定义函数中的参数并在UDF代码中使用。这样,后续可以直接在控制台上修改参数值,实现快速修改UDF参数值的目的。
自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。具体步骤如下:
在Flink开发控制台pipeline.global-job-parameters配置项,代码示例如下。
页面部署详情页签运行参数配置的其他配置中,添加pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。pipeline.global-job-parameters配置项填写的具体操作步骤如下。
步骤
动作
具体操作
示例
步骤1
定义key-value。
将key和value之间通过冒号(:)分隔,并将每一对key-value用单引号(')包围起来。
说明如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。
如果key或value中含有半角冒号(:)和双引号("),则需要通过连写两个双引号("")进行转义。
当
key = k1,value = {hi,hello}
,则定义为'k1:{hi,hello}'
。当
key = k2,value = str:ing,str:ing
,则定义为'k2:"str:ing,str:ing"'
当
key = k3,value = str"ing,str:ing
,则定义为'k3:"str""ing,str:ing"'
步骤2
按照YAML文件的格式,形成最终的pipeline.global-job-parameters。
将不同的key-value放在不同的行里,并将所有key-value用逗号(,)连接。
说明YAML文件的多行字符串以竖线(| )开始。
YAML文件的多行字符串,每一行需要有相同的缩进。
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容。
代码示例如下。
context.getJobParameter("k1", null); // 获得字符串 {hi,hello}。 context.getJobParameter("k2", null); // 获得字符串 str:ing,str:ing。 context.getJobParameter("k3", null); // 获得字符串 str"ing,str:ing。 context.getJobParameter("pipeline.global-job-parameters", null); // null,只能获得pipeline.global-job-parameters里定义的内容,而不能获得任意的作业配置项。
命名参数
仅实时计算引擎VVR 8.0.7及以上版本支持使用命名参数来实现自定义函数。
在SQL中调用函数时必须按顺序指定所有参数字段。当参数较多时,容易出现传参个数、顺序错误,而且不能省略非必填参数。通过使用命名参数,可以按需指定所需的参数,减少出错概率,使用起来也更加方便。我们通过一个自定义标量函数(ScalarFunction)的例子来介绍下命名参数的使用。
// 实现一个自定义标量函数,后两个入参为可选参数(isOptional = true)
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}
在SQL中使用该自定义函数时,您可以只指定第一个必选参数,或选择性指定可选参数,代码示例如下。
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- 仅指定第一个必选参数
MyNamedUdf(f1 => c) arg1_res,
-- 指定第一个必选参数及第二个可选参数
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- 指定第一个必选参数及第三个可选参数
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;
相关文档
Java自定义函数的开发和使用demo,请参见自定义聚合函数(UDAF)、自定义标量函数(UDSF)和自定义表值函数(UDTF)。
Python自定义函数的调试和调优方法,请参见概述。
Python自定义函数的开发和使用demo,请参见自定义聚合函数(UDAF)、自定义标量函数(UDSF)和自定义表值函数(UDTF)。