全部产品
Search
文档中心

实时计算Flink版:Java

更新时间:Nov 27, 2024

实时计算Flink版支持在Flink SQL作业中使用Java自定义函数,本文介绍Flink Java自定义函数的分类、参数传递及调用注意事项。

注意事项

  • 为了避免JAR包依赖冲突,在开发自定义函数时您需要注意以下几点:

    • SQL开发页面选择的Flink版本,请和Pom依赖中的Flink版本保持一致。

    • Flink相关依赖,scope请使用provided,即在依赖中添加<scope>provided</scope>

    • 其他第三方依赖请采用Shade方式打包,Shade打包详情请参见Apache Maven Shade Plugin

    Flink依赖冲突问题,详情请参见如何解决Flink依赖冲突问题?

  • 为避免UDF在SQL作业文本里被频繁调用导致超时的情况,推荐您将UDF的JAR包作为依赖文件上传,并且通过CRETATE TEMPORARY FUNCTION语法在作业中声明函数,例如CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';

自定义函数分类

Flink支持以下3类自定义函数。

分类

描述

UDSF(User Defined Scalar Function)

用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDSF)

UDAF(User Defined Aggregation Function)

自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)

UDTF(User Defined Table-valued Function)

自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)

自定义函数注册

自定义函数参数传递

您可以在Flink开发控制台配置自定义函数中的参数并在UDF代码中使用。这样,后续可以直接在控制台上修改参数值,实现快速修改UDF参数值的目的。

自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项通过此对象来传递。具体步骤如下:

  1. 在Flink开发控制台运维中心 > 作业运维页面部署详情页签运行参数配置其他配置中,添加pipeline.global-job-parameters配置项,代码示例如下。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    FunctionContext#getJobParameter只能获取pipeline.global-job-parameters这一配置项的值。因此需要将UDF用到的所有配置项全部写入到pipeline.global-job-parameters中。pipeline.global-job-parameters配置项填写的具体操作步骤如下。

    步骤

    动作

    具体操作

    示例

    步骤1

    定义key-value。

    将key和value之间通过冒号(:)分隔,并将每一对key-value用单引号(')包围起来。

    说明
    • 如果key或value中含有半角冒号(:),则需要用双引号(")将key或value包围起来。

    • 如果key或value中含有半角冒号(:)和双引号("),则需要通过连写两个双引号("")进行转义。

    • key = k1,value = {hi,hello},则定义为'k1:{hi,hello}'

    • key = k2,value = str:ing,str:ing,则定义为'k2:"str:ing,str:ing"'

    • key = k3,value = str"ing,str:ing,则定义为'k3:"str""ing,str:ing"'

    步骤2

    按照YAML文件的格式,形成最终的pipeline.global-job-parameters

    将不同的key-value放在不同的行里,并将所有key-value用逗号(,)连接。

    说明
    • YAML文件的多行字符串以竖线(| )开始。

    • YAML文件的多行字符串,每一行需要有相同的缩进。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. 在自定义函数代码中,通过FunctionContext#getJobParameter获取map的各项内容。

    代码示例如下。

    context.getJobParameter("k1", null); // 获得字符串 {hi,hello}。
    context.getJobParameter("k2", null); // 获得字符串 str:ing,str:ing。
    context.getJobParameter("k3", null); // 获得字符串 str"ing,str:ing。
    context.getJobParameter("pipeline.global-job-parameters", null); // null,只能获得pipeline.global-job-parameters里定义的内容,而不能获得任意的作业配置项。

命名参数

说明

仅实时计算引擎VVR 8.0.7及以上版本支持使用命名参数来实现自定义函数。

在SQL中调用函数时必须按顺序指定所有参数字段。当参数较多时,容易出现传参个数、顺序错误,而且不能省略非必填参数。通过使用命名参数,可以按需指定所需的参数,减少出错概率,使用起来也更加方便。我们通过一个自定义标量函数(ScalarFunction)的例子来介绍下命名参数的使用。

// 实现一个自定义标量函数,后两个入参为可选参数(isOptional = true)
public class MyFuncWithNamedArgs extends ScalarFunction  {
	private static final long serialVersionUID = 1L;

	public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
			@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
			@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {

		if (i2 != null) {
			return "i2#" + i2;
		}
		if (l3 != null) {
			return "l3#" + l3;
		}
		return "default#" + f1;
	}
}

在SQL中使用该自定义函数时,您可以只指定第一个必选参数,或选择性指定可选参数,代码示例如下。

CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';

CREATE temporary TABLE s1 (
    a INT,
    b BIGINT,
    c VARCHAR,
    d VARCHAR,
    PRIMARY KEY(a) NOT ENFORCED
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1'
);

CREATE temporary TABLE sink (
    a INT,
    b VARCHAR,
    c VARCHAR,
    d VARCHAR
) WITH (
    'connector' = 'print'
);

INSERT INTO sink
SELECT a,
    -- 仅指定第一个必选参数
    MyNamedUdf(f1 => c) arg1_res,
    -- 指定第一个必选参数及第二个可选参数
    MyNamedUdf(f1 => c, f2 => a) arg2_res,
    -- 指定第一个必选参数及第三个可选参数
    MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;

相关文档