您可以在Flink Python作业中使用自定义的Python虚拟环境、第三方Python包、JAR包和数据文件等,本文为您介绍如何在Python作业中使用这些依赖。
背景信息
本文通过以下场景为您介绍如何使用Python依赖:
使用自定义的Python虚拟环境
VVR 4.x仅支持3.7版本的Python虚拟环境,VVR 6.x及以上的版本无此限制,您可以使用更高版本的Python虚拟环境。
Python支持构建虚拟环境,每个Python虚拟环境都有一套完整的Python运行环境,并且可以在这套虚拟环境中安装一系列的Python依赖包。下文为您介绍如何准备Python的虚拟环境。
准备Python的虚拟环境。
在本地准备setup-pyflink-virtual-env.sh脚本,其内容如下。
miniconda.sh脚本信息:修改为您的目标版本地址信息。
apache-flink:修改为您作业使用的VVR版本对应的Flink版本,Flink版本查看方法请参见如何查看当前作业的Flink版本?。
在本地准备build.sh脚本,其内容如下。
#!/bin/bash set -e -x sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* yum install -y zip wget cd /root/ bash /build/setup-pyflink-virtual-env.sh mv venv.zip /build/
在命令行,执行如下命令,完成python虚拟环境的安装。
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh
执行完该命令后,会生成一个名字为venv.zip的文件,本示例为Python 3.10的虚拟环境。
您也可以修改上述脚本,在虚拟环境中安装所需的第三方Python包。
set -e # 下载Python 3.10 miniconda.sh脚本。 wget "https://repo.continuum.io/miniconda/Miniconda3-py310_24.7.1-0-Linux-x86_64.sh" -O "miniconda.sh" # 为Python 3.10 miniconda.sh脚本添加执行权限。 chmod +x miniconda.sh # 创建Python的虚拟环境。 ./miniconda.sh -b -p venv # 激活Conda Python虚拟环境。 source venv/bin/activate "" # 安装PyFlink依赖。 # update the PyFlink version if needed pip install "apache-flink==1.17.0" # 关闭Conda Python虚拟环境。 conda deactivate # 删除缓存的包。 rm -rf venv/pkgs # 将准备好的Conda Python虚拟环境打包。 zip -r venv.zip venv
说明本文以作业为VVR 8.x,Python 3.10为例为您介绍,如果您需要使用其他VVR版本或安装其他版本的Python的虚拟环境,则需要修改以下两个参数:
在Python作业中使用Python虚拟环境。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击文件管理,上传venv.zip文件。
在
页面,单击目标作业名称。在部署详情页签基础配置区域Python Archives,选择venv.zip文件。
如果SQL作业在虚拟环境中使用Python UDF,需要在运行参数配置区域的其他配置项,添加如下配置信息。
python.archives: oss://.../venv.zip
在运行参数配置区域其他配置项,按照您作业的VVR版本添加对应的指定Python虚拟环境的安装路径的配置信息。
vvr-6.x及以上版本
python.executable: venv.zip/venv/bin/python python.client.executable: venv.zip/venv/bin/python
vvr-6.x以下版本
python.executable: venv.zip/venv/bin/python
使用第三方Python包
下面将从以下两个场景为您介绍如何使用第三方Python包:
使用可直接Import的第三方Python包
如果您的第三方Python包是Zip Safe的,即不需要安装即可直接在Python作业中使用。操作步骤如下:
使用需要编译的第三方Python包
如果您的第三方Python包是tar.gz格式的压缩包,或从其他地方下载的源码包,且压缩包的根目录下存在setup.py文件,则这种类型的第三方Python包通常需要先编译才能被使用。您需要先在与Flink兼容的环境下编译第三方Python包,然后才可在Python作业中调用第三方Python包。
推荐使用quay.io/pypa/manylinux2014_x86_64镜像容器中的Python 3.7来编译第三方Python包,使用该容器编译生成的包兼容绝大多数Linux环境,关于该镜像容器的更多信息请参见manylinux。
说明Python 3.7的安装路径为 /opt/python/cp37-cp37m/bin/python3。
下面以opencv-python-headless第三方Python包为例,介绍一下如何编译和使用该第三方Python包。
编译第三方Python包。
在本地准备requirements.txt文件,其内容如下。
opencv-python-headless
在本地准备build.sh脚本,其内容如下。
#!/bin/bash set -e -x sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* yum install -y zip PYBIN=/opt/python/cp37-cp37m/bin #PYBIN=/opt/python/cp38-cp38/bin #PYBIN=/opt/python/cp39-cp39/bin #PYBIN=/opt/python/cp310-cp310/bin "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd .. rm -rf __pypackages__
在CMD命令行,执行如下命令。
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh
该命令执行完后,会生成一个名字为deps.zip的文件,该文件为编译之后的第三方Python包。
您也可以修改requirements.txt,安装其他所需的第三方Python包。此外,requirements.txt文件中可以指定多个Python依赖。
在Python作业中使用第三方Python包deps.zip。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击文件管理,上传deps.zip。
在deps.zip。
页面单击目标作业名称,在部署详情页签基础配置区域,单击编辑后,在Python Libraries项,选择
单击保存。
使用JAR包
如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,可以通过如下方式来指定Connector或者Java自定义函数的JAR包。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击文件管理,上传需要使用的JAR包。
在附加依赖文件项,选择需要使用的JAR包。 页面单击目标作业名称,在部署详情页签基础配置区域,单击编辑后,在
在运行参数配置区域其他配置项,添加配置信息。
假如需要依赖多个JAR包,且名字分别为jar1.jar和jar2.jar,配置内容如下。
pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'
单击保存。
使用数据文件
Flink暂不支持通过上传数据文件的方式来进行python作业调试。
下面将从两个场景为您介绍如何使用数据文件:
通过Python Archives选项方式
如果您的数据文件的数量比较多时,您可以将数据文件打包成一个ZIP包,然后通过如下方式在Python作业中使用。操作步骤如下:
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击文件管理,上传目标数据文件ZIP包。
在在 页面单击目标作业名称,在部署详情页签基础配置区域,单击编辑后,Python Archives项,选择需要使用的数据文件ZIP包。
在Python自定义函数中,可以通过如下方式访问数据文件。假如数据文件所在压缩包名称为mydata.zip。
def map(): with open("mydata.zip/mydata/data.txt") as f: ...
通过附加依赖文件选项
如果您的数据文件数量比较少时,可以通过如下方式在Python作业中使用。操作步骤如下:
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击文件管理,上传目标数据文件。
在,在 页面单击目标作业名称,在部署详情页签基础配置区域,单击编辑后,附加依赖文件项,选择需要的数据文件。
在Python自定义函数中,可以通过如下方式访问数据文件。以数据文件名称为data.txt为例的代码如下。
def map(): with open("/flink/usrlib/data.txt") as f: ...
相关文档
Python API作业开发的方法请参见Python作业开发。
Flink Python作业的完整开发流程示例,请参见Flink Python作业快速入门。