This topic describes how to create, register, and use a user-defined scalar function (UDSF) in Realtime Compute for Apache Flink.
Definition
A UDSF maps zero, one, or more scalar values to a new scalar value. The input and output data of a UDSF are mapped in a one-to-one relationship. Each time a UDSF reads a row of data, it writes an output value. For more information, see User-defined Functions.
Create a UDSF
Realtime Compute for Apache Flink provides examples of user-defined functions (UDFs) to facilitate your business development. The examples include how to implement UDSFs, user-defined aggregate functions (UDAFs), and user-defined table-valued functions (UDTFs). The development environment of the related version is configured in each example.
Download and decompress ASI_UDX_Demo to your on-premises machine.
NoteASI_UDX_Demo is provided at a third-party website. When you access the website, the website may fail to be accessed or access to the website may be delayed.
After you decompress the package, the ASI_UDX-main folder is generated. Parameters in the path:
pom.xml: a project-level configuration file that describes the Maven coordinates, dependencies, rules that developers must follow, defect management system, organizations, and licenses of a project, as well as all other project-related factors.
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java: the Java code for the sample UDSF.
Open IntelliJ IDEA and choose ASI_UDX-main folder and click OK.
. Select the extractedDouble-click the ASI_UDF.java file in the \ASI_UDX-main\src\main\java\ASI_UDF directory, and make configurations in the file based on your business requirements.
In this example, ASI_UDF.java is configured with code to obtain the characters from the begin position to the end position in each data record.
package ASI_UDF; import org.apache.flink.table.functions.ScalarFunction; public class ASI_UDF extends ScalarFunction { public String eval(String s, Integer begin, Integer end) { return s.substring(begin, end); } }
Double-click the pom.xml file in the \ASI_UDX-main\ directory and make configurations in the file.
In this example, pom.xml is configured with the information of main JAR dependencies of Apache Flink 1.11. Perform one of the following operations based on your business requirements:
If your business does not depend on other JAR packages, proceed to the next step without the need to configure the pom.xml file.
If your business depends on other JAR packages, add information of the required JAR packages to the pom.xml file.
Apache Flink 1.11 depends on the following JAR packages:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>
Go to the directory where the pom.xml file is stored. Then, run the following command to package the file:
mvn package -Dcheckstyle.skip
If the ASI_UDX-1.0-SNAPSHOT.jar package appears in the \ASI_UDX-main\target\ directory, the UDSF is created.
Register a UDSF
For more information about how to register a UDSF, see Manage UDFs.
Use a UDSF
After you register a UDSF, you can use the UDSF. To use the UDSF, perform the following steps:
Use Flink SQL to create a deployment. For more information, see Develop an SQL draft.
The following code provides an example on how to obtain the characters from the second character to the fourth character of the string in each row of the a field in the ASI_UDSF_Source table:
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;
On the
page in the console of fully managed Flink, find the deployment that you want to start and click Start in the Actions column.After the deployment is started, the second character to the fourth character of the string in each row of the a field in the ASI_UDSF_Source table are inserted into each row of the ASI_UDSF_Sink table.