Realtime Compute for Apache Flink allows you to use Java user-defined functions (UDFs) in Flink SQL deployments. This topic describes the types of Java UDFs. This topic also describes how to pass parameters to the UDFs and the precautions to consider when you develop the UDFs.
Precautions
UDFs may cause dependency conflicts in the packaged JAR file. Take note of the following items when you develop UDFs:
Make sure that the Realtime Compute for Apache Flink version you configure for the SQL deployment corresponds to the Apache Flink version you specify in the pom.xml file.
Specify
<scope>provided</scope>
for dependencies related to Apache Flink.Use the Shade plug-in to package other third-party dependencies. For more information, see Apache Maven Shade plug-in.
For information about how to resolve dependency conflicts, see Console operations
Frequent calls of UDFs in a SQL deployment may cause a timeout. We recommend that you upload the JAR file of the UDF as an additional dependency of the deployment and use the
CREATE TEMPORARY FUNCTION
statement to declare the UDF. Example:CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';
.
UDF types
The following table describes the supported UDF types in Realtime Compute for Apache Flink.
Type | Description |
User Defined Scalar Function (UDSF) | A UDSF maps zero or more scalar values to a new scalar value. A one-to-one mapping is established between the input and output. You can call a UDSF to generate a new value based on a data record. For more information, see UDSFs. |
User-defined aggregate function (UDAF) | A UDAF aggregates multiple data records into a single data record. A many-to-one mapping is established between the input and output. For more information, see UDAFs. |
User-defined table function (UDTF) | A UDTF accepts zero or more scalar values as input parameters. The parameters can be variable-length. UDTFs are similar to UDSFs except that UDTFs can return any number of rows instead of a single value. The returned rows can include one or more columns. You can call a UDTF to generate multiple rows or columns. For more information, see UDTFs. |
Register a UDF
For information about how to globally register a UDF, see Register a catalog UDF.
For information about how to register UDF for a single deployment, see Register a deployment-level UDF.
Pass UDF parameters
You can configure the parameters of a UDF in the development console of Realtime Compute for Apache Flink and obtain the parameter values within the UDF.
The parameter configurations in the console are passed to the UDF by the open(FunctionContext context) method. The method is optional to implement and provides a FunctionContext object that contains information about the execution context of the corresponding UDF. To pass the parameter configurations, perform the following steps:
On the pipeline.global-job-parameters parameter in the Other Configuration field. Sample code:
page in the development console of Realtime Compute for Apache Flink, click the name of the deployment you want to use. On the Configuration tab, click Edit in the upper-right corner of the Parameters section. Then, specify thepipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
Only the specified configurations in the pipeline.global-job-parameters parameter can be obtained by the FunctionContext#getJobParameter method. Make sure that you specify all parameter configurations for the UDF in the pipeline.global-job-parameters parameter. The following table describes how to configure the pipeline.global-job-parameters parameter.
Step
Description
Operation
Example
Step 1
Define key-value pairs.
Separate the key and value in a key-value pair with a colon (:) and enclose each key-value pair with single quotation marks (').
NoteIf a key or value contains a colon (:), enclose the key or value with double quotation marks (").
If a key or value contains a colon (:) and a double quotation mark ("), escape the double quotation mark by using two double quotation marks ("").
The key-value pair
key = k1,value = {hi,hello}
is defined as'k1:{hi,hello}'
.The key-value pair
key = k2,value = str:ing,str:ing
is defined as'k2:"str:ing,str:ing"'
.The key-value pair
key = k3,value = str"ing,str:ing
is defined as'k3:"str""ing,str:ing"'
.
Step 2
Organize the defined key-value pairs in YAML format.
Place different key-value pairs in different lines and separate the lines with commas (,).
NoteA vertical bar (|) must be added at the beginning of multiple lines of strings.
Each line of a string must have the same indentation.
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
In the implementation code of the UDF, call the FunctionContext#getJobParameter method to obtain the content of the key-value pairs.
Sample code:
context.getJobParameter("k1", null); // The string {hi,hello} is returned. context.getJobParameter("k2", null); // The string str:ing,str:ing is returned. context.getJobParameter("k3", null); // The string str"ing,str:ing is returned. context.getJobParameter("pipeline.global-job-parameters", null); // null is returned because only the key-value pairs defined in the pipeline.global-job-parameters parameter can be obtained.
Named parameters
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.7 or later allows you to use named parameters to implement UDFs.
In SQL, parameter values must be passed to a function in the order in which the parameters are defined and optional parameters cannot be omitted. If the number of parameters is large, ensuring the correct parameter order can be challenging. To ensure the correct parameter order and improve usability, you can use named parameters to specify only the parameters you want to use. The following example shows how to use named parameters. In this example, a UDSF named ScalarFunction is used.
// Implement a UDSF in which the second and third parameters are optional (isOptional = true).
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}
When you call the function in SQL, you can specify only the first required parameter. You can also specify the second and third optional parameters. Sample code:
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- Specify only the first required parameter
MyNamedUdf(f1 => c) arg1_res,
-- Specify the first required parameter and the second optional parameter.
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- Specify the first required parameter and the third optional parameter.
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;
References
For information about how to use a UDAF to sort and aggregate data, see Use a UDAF to sort and aggregate data.
For information about how to develop and use Java UDFs, see UDAFs, UDSFs, and UDTFs.
For information about how to debug and optimize Python UDFs, see Overview.
For information about how to develop and use Python UDFs, see UDAFs, UDSFs, and UDTFs.