You can use custom Python virtual environments, third-party Python packages, JAR packages, and data files in Flink Python jobs. This topic describes how to use these dependencies in your Python jobs.
Background information
This topic describes how to use Python dependencies in the following scenarios:
Pre-installed Python environment
The fully managed environment comes pre-installed with Python. The available versions are as follows:
Ververica Runtime (VVR) 8.0.10 and earlier: Python 3.7
VVR 8.0.11 and later: Python 3.9
For more information about the third-party software packages pre-installed in the Python environment, see Python job development.
Some third-party Python packages have requirements for the GNU C Library (glibc) version. The glibc versions pre-installed in the fully managed Flink environment are as follows:
X86
VVR 8.x and earlier: glibc 2.17
VVR 11.x and later: glibc 2.31
ARM
VVR 11.2 and earlier: glibc 2.17
VVR 11.3 and later: glibc 2.31
Glibc supports forward compatibility. Therefore, the glibc version required by your dependent Python third-party library must not be later than the glibc version in the environment.
Use a custom Python virtual environment
VVR 4.x supports only Python 3.7 virtual environments. VVR 6.x and later do not have this restriction. You can use later versions of Python virtual environments.
If the pre-installed Python environment does not meet your requirements, you can use a custom Python version with a Python virtual environment. Each Python virtual environment has a complete Python runtime, and you can install various Python dependency packages in it. This section describes how to prepare a Python virtual environment.
Prepare a Python virtual environment.
On your local machine, prepare the setup-pyflink-virtual-env.sh script. The content is as follows.
X86
set -e # Download the miniforge.sh script. wget "https://github.com/conda-forge/miniforge/releases/download/25.11.0-1/Miniforge3-25.11.0-1-Linux-x86_64.sh" -O "miniforge.sh" # Add execute permissions to the miniforge.sh script. chmod +x miniforge.sh # Install miniforge. ./miniforge.sh -b source /root/miniforge3/bin/activate # Create a Python virtual environment. mamba create -n venv python=3.10 -y eval "$(mamba shell hook --shell bash)" # Activate the Python virtual environment. mamba activate venv # Install the PyFlink dependency. # update the PyFlink version if needed pip install "apache-flink==1.20.3" # Remove unnecessary JAR packages to reduce the package size. find /root/miniforge3/envs/venv/lib/python3.10/site-packages/pyflink/ -name *.jar | xargs rm # Deactivate the Conda Python virtual environment. mamba deactivate # Package the prepared Conda Python virtual environment. cd /root/miniforge3/envs/ && zip -r /root/venv.zip venvARM
set -e # Download the miniforge.sh script. wget "https://github.com/conda-forge/miniforge/releases/download/25.11.0-1/Miniforge3-25.11.0-1-Linux-aarch64.sh" -O "miniforge.sh" # Add execute permissions to the miniforge.sh script. chmod +x miniforge.sh # Install miniforge. ./miniforge.sh -b source /root/miniforge3/bin/activate # Create a Python virtual environment. mamba create -n venv python=3.10 -y eval "$(mamba shell hook --shell bash)" # Activate the Python virtual environment. mamba activate venv # Install the PyFlink dependency. # update the PyFlink version if needed yum install -y java-11-openjdk-devel export JAVA_HOME=/usr/lib/jvm/java-11 wget "https://raw.githubusercontent.com/apache/flink/release-1.20/flink-python/dev/dev-requirements.txt" -O dev-requirements.txt pip install -r dev-requirements.txt pip install "apache-flink==1.20.3" # Remove unnecessary JAR packages to reduce the package size. find /root/miniforge3/envs/venv/lib/python3.10/site-packages/pyflink/ -name *.jar | xargs rm # Deactivate the Conda Python virtual environment. mamba deactivate # Package the prepared Conda Python virtual environment. cd /root/miniforge3/envs && zip -r /root/venv.zip venvmamba create: Change the Python version to your target version.
apache-flink: Change the Flink version to the one that corresponds to the VVR version of your job. For more information about how to view the Flink version, see Workspace management and operations.
On your local machine, prepare the build.sh script. The content is as follows.
#!/bin/bash set -e -x yum install -y zip wget cd /root/ bash /build/setup-pyflink-virtual-env.sh mv venv.zip /build/On the command line, run the following command to complete the installation of the Python virtual environment.
X86
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_x86_64 bash ./build.shARM
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_aarch64 bash ./build.shAfter you run this command, a file named venv.zip is generated. This example creates a virtual environment for Python 3.10.
You can also modify the preceding script to install required third-party Python packages in the virtual environment.
NoteThis topic uses a job on VVR 11.x and Python 3.10 as an example. To use a different VVR version or install a different Python version for the virtual environment, you can modify the following parameters:
Use the Python virtual environment in a Python job.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the left navigation pane, click File Management and upload the venv.zip file.
On the page, click the name of the target job.
On the Deployment Details tab, in the Basic Configuration section, select the venv.zip file for Python Archives.
If a SQL job uses a Python user-defined function (UDF) in the virtual environment, add the following configuration in the Other Configuration field of the Parameter Settings section.
python.archives: oss://.../venv.zipIn the Parameter Settings section, in the Other Configuration field, add the configuration that specifies the installation path of the Python virtual environment based on the VVR version of your job.
VVR 6.x and later
python.executable: venv.zip/venv/bin/python python.client.executable: venv.zip/venv/bin/pythonVersions earlier than VVR 6.x
python.executable: venv.zip/venv/bin/python
Use third-party Python packages
The following two scenarios describe how to use third-party Python packages:
Use a third-party Python package that can be directly imported
If your third-party Python package is Zip Safe, you can use it directly in a Python job without installation. You can follow these steps:
Download a third-party Python package that can be directly imported.
Open the PyPI page in your browser.
In the search box, enter the name of the target third-party Python package, such as apache-flink 1.20.3.
In the search results, click the name of the target result.
In the navigation pane on the left, click Download files.
Click the name of the package that contains cp39-cp39m-manylinux1 to download it.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the left navigation pane, you can click File Management to upload third-party Python packages.
On the page, click . For the Python Libraries option, select the uploaded third-party Python package.
Click Save.
Use a third-party Python package that requires compilation
If your third-party Python package is a compressed package in the tar.gz format or a source code package downloaded from another location, and a setup.py file exists in the root directory of the package, the package usually needs to be compiled before use. You must first compile the third-party Python package in an environment compatible with Flink. Then, you can call the package in your Python job.
We recommend that you use Python 3.9 in the quay.io/pypa/manylinux_2_28_x86_64 image container to compile third-party Python packages. The packages generated by this container are compatible with most Linux environments. For more information about this image container, see manylinux.
NoteThe installation path for Python 3.9 is /opt/python/cp39-cp39/bin/python3.
The following example shows how to compile and use the opencv-python-headless third-party Python package.
Compile the third-party Python package.
On your local machine, prepare the requirements.txt file. The content is as follows.
opencv-python-headless numpy<2On your local machine, prepare the build.sh script. The content is as follows.
#!/bin/bash set -e -x yum install -y zip #PYBIN=/opt/python/cp37-cp37m/bin #PYBIN=/opt/python/cp38-cp38/bin PYBIN=/opt/python/cp39-cp39/bin #PYBIN=/opt/python/cp310-cp310/bin #PYBIN=/opt/python/cp311-cp311/bin "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd .. rm -rf __pypackages__On the command line, run the following command.
X86
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_x86_64 bash ./build.shARM
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_aarch64 bash ./build.shAfter this command is run, a file named deps.zip is generated. This file is the compiled third-party Python package.
You can also modify requirements.txt to install other required third-party Python packages. In addition, you can specify multiple Python dependencies in the requirements.txt file.
Use the deps.zip third-party Python package in a Python job.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the navigation pane on the left, click Files, and upload deps.zip.
On the page, click the name of the target job. On the Deployment Details tab, in the Basic Configuration section, click Edit. Then, for the Python Libraries option, select deps.zip.
Click Save.
Use JAR packages
If your Flink Python job uses Java classes, such as a connector or a Java UDF, you must specify the JAR package for the connector or Java UDF. You can perform the following steps:
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the navigation pane on the left, click Files, and upload the JAR package to use.
On the page, click the name of the target job. On the Deployment Details tab, in the Basic Configuration section, click Edit. Then, for the Additional Dependencies option, select the JAR package to use.
In the Parameter Settings section, in the Other Configuration field, add the configuration.
For example, to depend on multiple JAR packages named jar1.jar and jar2.jar, the configuration is as follows.
pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'Click Save.
Use built-in connectors, data formats, and Catalogs
Only jobs that run on VVR 11.2 or later support built-in connectors, data formats, and Catalogs.
To use built-in connectors, data formats, and Catalogs in a Flink Python job, you can specify them as follows:
In the Parameter Settings section, in the Other Configuration field, add the configuration.
For example, to depend on multiple built-in connectors named kafka and sls, the configuration is as follows. For the specific names of built-in connectors, see the documentation for each connector in Supported connectors.
pipeline.used-builtin-connectors: kafka;slsFor example, to depend on multiple built-in data formats named avro and parquet, the configuration is as follows. For the specific names of built-in data formats, see the documentation in Data formats.
pipeline.used-builtin-formats: avro;parquetFor example, to depend on multiple built-in Catalogs named hive-2.3.6 and paimon, the configuration is as follows. For the specific names of built-in Catalogs, see the documentation for the corresponding Catalog in Data Management.
pipeline.used-builtin-catalogs: hive-2.3.6;paimonClick Save.
Use data files
Flink does not support debugging Python jobs by uploading data files.
The following two scenarios describe how to use data files:
Using the Python Archives option
If you have many data files, you can package them into a ZIP file and use them in a Python job by following these steps:
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the navigation pane on the left, click File Management and upload the ZIP package of the target data file.
On the page, click the name of the target job. On the Deployment Details tab, in the Basic Configuration section, click Edit. Then, for the Python Archives option, select the data file ZIP package to use.
In a Python UDF, you can access the data files as follows. This example assumes the compressed package containing the data files is named mydata.zip.
def map(): with open("mydata.zip/mydata/data.txt") as f: ...
Using the Additional Dependencies option
If you have a small number of data files, you can use them in a Python job by following these steps:
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the navigation pane on the left, click Files, and upload the target data file.
On the page, click the name of the target job. On the Deployment Details tab, in the Basic Configuration section, click Edit. Then, for the Additional Dependencies option, select the data file to use.
In a Python UDF, you can access the data files as follows. The following code uses a data file named data.txt as an example.
def map(): with open("/flink/usrlib/data.txt") as f: ...
References
For more information about how to develop Python API jobs, see Python job development.
For a complete example of the development process for a Flink Python job, see Quick Start for Flink Python jobs.
Fully managed Flink also supports SQL and DataStream jobs. For more information about how to develop these jobs, see Job development map and JAR job development.