All Products
Search
Document Center

Realtime Compute for Apache Flink:Java

Last Updated:Nov 27, 2024

Realtime Compute for Apache Flink allows you to use Java user-defined functions (UDFs) in Flink SQL deployments. This topic describes the types of Java UDFs. This topic also describes how to pass parameters to the UDFs and the precautions to consider when you develop the UDFs.

Precautions

  • UDFs may cause dependency conflicts in the packaged JAR file. Take note of the following items when you develop UDFs:

    • Make sure that the Realtime Compute for Apache Flink version you configure for the SQL deployment corresponds to the Apache Flink version you specify in the pom.xml file.

    • Specify <scope>provided</scope> for dependencies related to Apache Flink.

    • Use the Shade plug-in to package other third-party dependencies. For more information, see Apache Maven Shade plug-in.

    For information about how to resolve dependency conflicts, see Console operations

  • Frequent calls of UDFs in a SQL deployment may cause a timeout. We recommend that you upload the JAR file of the UDF as an additional dependency of the deployment and use the CREATE TEMPORARY FUNCTION statement to declare the UDF. Example: CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';.

UDF types

The following table describes the supported UDF types in Realtime Compute for Apache Flink.

Type

Description

User Defined Scalar Function (UDSF)

A UDSF maps zero or more scalar values to a new scalar value. A one-to-one mapping is established between the input and output. You can call a UDSF to generate a new value based on a data record. For more information, see UDSFs.

User-defined aggregate function (UDAF)

A UDAF aggregates multiple data records into a single data record. A many-to-one mapping is established between the input and output. For more information, see UDAFs.

User-defined table function (UDTF)

A UDTF accepts zero or more scalar values as input parameters. The parameters can be variable-length. UDTFs are similar to UDSFs except that UDTFs can return any number of rows instead of a single value. The returned rows can include one or more columns. You can call a UDTF to generate multiple rows or columns. For more information, see UDTFs.

Register a UDF

Pass UDF parameters

You can configure the parameters of a UDF in the development console of Realtime Compute for Apache Flink and obtain the parameter values within the UDF.

The parameter configurations in the console are passed to the UDF by the open(FunctionContext context) method. The method is optional to implement and provides a FunctionContext object that contains information about the execution context of the corresponding UDF. To pass the parameter configurations, perform the following steps:

  1. On the O&M > Deployments page in the development console of Realtime Compute for Apache Flink, click the name of the deployment you want to use. On the Configuration tab, click Edit in the upper-right corner of the Parameters section. Then, specify the pipeline.global-job-parameters parameter in the Other Configuration field. Sample code:

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    Only the specified configurations in the pipeline.global-job-parameters parameter can be obtained by the FunctionContext#getJobParameter method. Make sure that you specify all parameter configurations for the UDF in the pipeline.global-job-parameters parameter. The following table describes how to configure the pipeline.global-job-parameters parameter.

    Step

    Description

    Operation

    Example

    Step 1

    Define key-value pairs.

    Separate the key and value in a key-value pair with a colon (:) and enclose each key-value pair with single quotation marks (').

    Note
    • If a key or value contains a colon (:), enclose the key or value with double quotation marks (").

    • If a key or value contains a colon (:) and a double quotation mark ("), escape the double quotation mark by using two double quotation marks ("").

    • The key-value pair key = k1,value = {hi,hello} is defined as 'k1:{hi,hello}'.

    • The key-value pair key = k2,value = str:ing,str:ing is defined as 'k2:"str:ing,str:ing"'.

    • The key-value pair key = k3,value = str"ing,str:ing is defined as 'k3:"str""ing,str:ing"'.

    Step 2

    Organize the defined key-value pairs in YAML format.

    Place different key-value pairs in different lines and separate the lines with commas (,).

    Note
    • A vertical bar (|) must be added at the beginning of multiple lines of strings.

    • Each line of a string must have the same indentation.

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. In the implementation code of the UDF, call the FunctionContext#getJobParameter method to obtain the content of the key-value pairs.

    Sample code:

    context.getJobParameter("k1", null); // The string {hi,hello} is returned. 
    context.getJobParameter("k2", null); // The string str:ing,str:ing is returned. 
    context.getJobParameter("k3", null); // The string str"ing,str:ing is returned. 
    context.getJobParameter("pipeline.global-job-parameters", null); // null is returned because only the key-value pairs defined in the pipeline.global-job-parameters parameter can be obtained.

Named parameters

Note

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.7 or later allows you to use named parameters to implement UDFs.

In SQL, parameter values must be passed to a function in the order in which the parameters are defined and optional parameters cannot be omitted. If the number of parameters is large, ensuring the correct parameter order can be challenging. To ensure the correct parameter order and improve usability, you can use named parameters to specify only the parameters you want to use. The following example shows how to use named parameters. In this example, a UDSF named ScalarFunction is used.

// Implement a UDSF in which the second and third parameters are optional (isOptional = true).
public class MyFuncWithNamedArgs extends ScalarFunction  {
	private static final long serialVersionUID = 1L;

	public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
			@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
			@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {

		if (i2 != null) {
			return "i2#" + i2;
		}
		if (l3 != null) {
			return "l3#" + l3;
		}
		return "default#" + f1;
	}
}

When you call the function in SQL, you can specify only the first required parameter. You can also specify the second and third optional parameters. Sample code:

CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';

CREATE temporary TABLE s1 (
    a INT,
    b BIGINT,
    c VARCHAR,
    d VARCHAR,
    PRIMARY KEY(a) NOT ENFORCED
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1'
);

CREATE temporary TABLE sink (
    a INT,
    b VARCHAR,
    c VARCHAR,
    d VARCHAR
) WITH (
    'connector' = 'print'
);

INSERT INTO sink
SELECT a,
    -- Specify only the first required parameter
    MyNamedUdf(f1 => c) arg1_res,
    -- Specify the first required parameter and the second optional parameter.
    MyNamedUdf(f1 => c, f2 => a) arg2_res,
    -- Specify the first required parameter and the third optional parameter.
    MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;

References