This topic describes how to create, register, and use a Python user-defined scalar function (UDSF).
Definition
A UDSF maps zero, one, or more scalar values to a new scalar value. The input and output data of a UDSF are mapped in a one-to-one relationship. Each time a UDSF reads a row of data, it writes an output value.
Limits
Services provided by fully managed Flink are subject to deployment environments and network environments. Therefore, when you develop Python UDFs in fully managed Flink, take note of the following limits:
Only Apache Flink 1.12 and later are supported.
Python 3.7.9 is pre-installed on a fully managed Flink cluster. Therefore, you must develop code in Python 3.7.9.
JDK 1.8 is used in the runtime environment of fully managed Flink. If your Python deployment depends on a third-party JAR package, make sure that the JAR package is compatible with JDK 1.8.
Only open source Scala 2.11 is supported. If your Python API deployment depends on a third-party JAR package, make sure that the JAR package that is compatible with Scala 2.11 is used.
Create a UDSF
Flink provides sample code of Python user-defined functions (UDFs) for you to develop UDFs. The sample code includes the implementation of Python UDSFs, Python user-defined aggregate functions (UDAFs), and Python user-defined table-valued function (UDTFs). This topic describes how to create a UDSF in the Windows operating system.
Download and decompress the python_demo-master package to your on-premises machine.
Notepython_demo-master is provided at a third-party website. When you access the website, the website may fail to be accessed or access to the website may be delayed.
In the main menu bar of PyCharm, choose python_demo-master package.
to open the decompressedDouble-click the udfs.py file in the \python_demo-master\udx directory. Then, modify the content of the file based on your business requirements.
In this example, sub_string defines the code to obtain the characters from the begin position to the end position in each data record.
from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end]
Go to the \python_demo-master directory to which the udx folder belongs and run the following command to package the files in the directory:
zip -r python_demo.zip udx
If the python_demo.zip package appears in the \python_demo-master\ directory, the Python UDSF is created.
Register a UDSF
For more information about how to register a UDSF, see Register a UDF.
Use a UDSF
After you register a UDSF, you can use the UDSF. To use the UDSF, perform the following steps:
Use Flink SQL to create a draft. For more information, see Develop an SQL draft.
The following code provides an example on how to obtain the characters from the second character to the fourth character of the string in each row of the a field in the ASI_UDSF_Source table:
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;
On the
page in the console of fully managed Flink, find the desired deployment and click Start in the Actions column.After the deployment is started, the second character to the fourth character of the string in each row of the a field in the ASI_UDSF_Source table are inserted into each row of the ASI_UDSF_Sink table.