This topic describes how to create, register, and use a user-defined aggregate function (UDAF) in Realtime Compute for Apache Flink.
Definition
A UDAF aggregates multiple values into a single value. A many-to-one mapping is established between the input and output of a UDAF. Multiple input values are aggregated to generate one output value.
Limits
Services provided by Realtime Compute for Apache Flink are subject to deployment environments and network environments. Therefore, when you develop Python UDFs in Realtime Compute for Apache Flink, take note of the following limits:
Only Apache Flink 1.12 and later are supported.
Python 3.7.9 is pre-installed on a Realtime Compute for Apache Flink workspace. Therefore, you must develop code in Python 3.7.9.
JDK 8 and JDK 11 are supported in the runtime environment of Realtime Compute for Apache Flink. If your Python deployment depends on a third-party JAR package, make sure that the JAR package is compatible with JDK 8 or JDK 11.
Only open source Scala 2.11 is supported. If your Python deployment depends on a third-party JAR package, make sure that the JAR package is compatible with Scala 2.11.
Create a UDAF
Flink provides sample code of Python user-defined extensions (UDXs) for you to develop UDXs. The sample code includes the implementation of Python UDFs, Python user-defined aggregate functions (UDAFs), and Python UDTFs. This topic describes how to create a UDAF in the Windows operating system.
Download and decompress the python_demo-master package to your on-premises machine.
In the main menu bar of PyCharm, choose python_demo-master package.
to open the decompressedDouble-click the udfs.py file in the \python_demo-master\udx directory. Then, modify the content of the file based on your business requirements.
In this example, weighted_avg defines the code for the weighted average of the current data and historical data.
from pyflink.common import Row from pyflink.table import AggregateFunction, DataTypes from pyflink.table.udf import udaf class WeightedAvg(AggregateFunction): def create_accumulator(self): # Row(sum, count) return Row(0, 0) def get_value(self, accumulator: Row) -> float: if accumulator[1] == 0: return 0 else: return accumulator[0] / accumulator[1] def accumulate(self, accumulator: Row, value, weight): accumulator[0] += value * weight accumulator[1] += weight def retract(self, accumulator: Row, value, weight): accumulator[0] -= value * weight accumulator[1] -= weight weighted_avg = udaf(f=WeightedAvg(), result_type=DataTypes.DOUBLE(), accumulator_type=DataTypes.ROW([ DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())]))
Go to the \python_demo-master directory to which the udx folder belongs and run the following command to package the files in the directory:
zip -r python_demo.zip udx
If the python_demo.zip package appears in the \python_demo-master\ directory, the UDAF is created.
Register a UDAF
For more information about how to register a UDAF, see Manage UDFs.
Use a UDAF
After a UDAF is registered, you can perform the following steps to use the UDAF:
Use Flink SQL to create a draft. For more information, see Develop an SQL draft.
Obtain the value of the a field in the ASI_UDAF_Source table with the b field as the weight. Sample code:
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT, b BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( avg_value DOUBLE ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDAF_Sink SELECT weighted_avg(a, b) FROM ASI_UDAF_Source;
In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose
. On the Deployments page, find the desired deployment and click Start in the Actions column.After the deployment is started, the average of the current data and historical data of the a field in the ASI_UDAF_Source table is inserted into each row in the ASI_UDAF_Sink table with field b as the weight.