All Products
Search
Document Center

Realtime Compute for Apache Flink:Python

Last Updated:Nov 27, 2024

Realtime Compute for Apache Flink allows you to use Python user-defined functions (UDFs) in Flink SQL deployments. This topic describes the classification and tuning methods of Python UDFs. This topic also describes how to use Python dependencies.

UDF classification

UDF classification

Description

User-defined scalar function (UDSF)

A UDSF maps zero, one, or more scalar values to a new scalar value. A one-to-one mapping is established between the input and output of a UDSF. Each time a UDSF reads one row of data, the UDSF writes an output value. For more information, see UDSFs.

User-defined aggregate function (UDAF)

A UDAF aggregates multiple values into a single value. A many-to-one mapping is established between the input and output of a UDAF. Multiple input records are aggregated to generate one output value. For more information, see UDAFs.

User-defined table-valued function (UDTF)

A UDTF takes zero, one, or more scalar values as input parameters. These parameters can be variable-length parameters. UDTFs are similar to UDSFs except that UDTFs can return any number of rows instead of a single value. Returned rows consist of one or more columns. Multiple rows or columns are returned each time a UDTF is called. For more information, see UDTFs.

Use Python dependencies

Commonly used Python packages such as pandas, NumPy, and PyArrow are pre-installed in the workspaces of Realtime Compute for Apache Flink. For more information about the third-party Python packages that are pre-installed in the workspaces of Realtime Compute for Apache Flink, see Develop a Python API draft. The pre-installed Python packages must be imported in Python UDFs. Sample code:

@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

You can also use other types of third-party Python packages in Python UDFs. If you use a third-party Python package that is not pre-installed in fully managed Flink, you must upload the package as a dependency file to fully managed Flink. For more information, see Manage UDFs and Use Python dependencies.

Code debugging

In the code implementation of Python UDFs, you can use the logging method to generate logs and locate errors based on the logs. The following code is an example.

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

After logs are generated, you can view the logs in the log file of TaskManagers. For more information, see View the operational logs of a deployment.

Performance tuning

Use the resource pre-load feature

If you use the resource pre-load feature, resources can be loaded in advance during UDF initialization. This way, resources do not need to be reloaded each time the eval() method is used to compute data. For example, if you only want to load a large deep learning model once and then make predictions on the model multiple times, you can use the following code:

from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf

class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
Note

For more information about how to upload Python files, see Use Python dependencies.

Use the pandas library

Realtime Compute for Apache Flink allows you to use pandas UDFs in addition to commonly used Python UDFs. The input data types of pandas UDFs use the data structures defined in pandas, such as pandas. Series and pandas.DataFrame. You can use high-performance Python libraries such as pandas and NumPy in pandas UDFs to develop high-performance Python UDFs. For more information, see Vectorized User-defined Functions.

Configure required parameters

The performance of Python UDFs mainly depends on their implementation. If you encounter performance issues related to Python UDFs, you need to optimize their implementation. The performance of Python UDFs is also affected by the values of the parameters described in the following table.

Parameter

Description

python.fn-execution.bundle.size

Python UDFs are asynchronously calculated. During calculation, a Java operator asynchronously sends data to a Python process for processing. The Java operator caches data and then sends the data to the Python process when the number of data records in the cache reaches a specific threshold. The maximum number of data records that can be cached.

Default value: 100000.

python.fn-execution.bundle.time

The maximum duration for which data can be cached. If the number of cached data reaches the value specified by the python.fn-execution.bundle.size parameter or the duration for which data is cached reaches the value specified by the python.fn-execution.bundle.time parameter, the calculation on the cached data is triggered.

Default value: 1000. Unit: milliseconds.

python.fn-execution.arrow.batch.size

The maximum number of data records that are allowed in an arrow batch when you use pandas UDFs. Default value: 10000.

Note

The value of the python.fn-execution.arrow.batch.size parameter cannot be greater than the value of the python.fn-execution.bundle.size parameter.

Note

If the values of these parameters are too large, excessive data needs to be processed during checkpointing. This prolongs checkpointing or even causes a failure of checkpointing. For more information about these parameters, see Configuration.

References

  • For more information about how to register, update, and delete a UDF, see Manage UDFs.

  • For more information about how to develop and use Python UDFs, see UDAFs, UDSFs, and UDTFs.

  • For more information about how to use custom Python virtual environments, third-party Python packages, JAR packages, and data files in Flink Python deployments, see Use Python dependencies.

  • For more information about how to develop and use Java UDFs, see UDAFs, UDSFs, and UDTFs.

  • For more information about how to debug and tune Java UDFs, see Overview.

  • For more information about how to use UDAFs to sort and aggregate data, see Use a UDAF to sort and aggregate data.