您可以在Flink Python作業中使用自訂的Python虛擬環境、第三方Python包、JAR包和資料檔案等,本文為您介紹如何在Python作業中使用這些依賴。
背景資訊
本文通過以下情境為您介紹如何使用Python依賴:
預裝Python環境
當前全託管環境中已經預裝了Python環境,具體版本如下:
VVR 8.0.10及以下版本:Python 3.7
VVR 8.0.11及以上版本:Python 3.9
關於Python環境中預裝的第三方軟體包,請參見Python作業開發。
一些 Python 第三方軟體包對glibc的版本有要求,當前全託管環境中預裝的glibc版本如下:
X86
VVR 8.x及以下版本:glibc 2.17
VVR 11.x及以上版本:glibc 2.31
ARM
VVR 11.2及以下版本:glibc 2.17
VVR 11.3及以上版本:glibc 2.31
Glibc 支援向前相容,因此只要您依賴的Python三方庫所依賴的glibc版本不高於環境中的glibc版本即可。
使用自訂的Python虛擬環境
VVR 4.x僅支援3.7版本的Python虛擬環境,VVR 6.x及以上的版本無此限制,您可以使用更高版本的Python虛擬環境。
如果預裝的Python環境滿足不了您的要求,您也可以通過Python虛擬環境的方式使用自訂的Python版本,每個Python虛擬環境都有一套完整的Python運行環境,並且可以在這套虛擬環境中安裝一系列的Python依賴包。下文為您介紹如何準備Python的虛擬環境。
準備Python的虛擬環境。
在本地準備setup-pyflink-virtual-env.sh指令碼,其內容如下。
X86
set -e # 下載miniforge.sh指令碼。 wget "https://github.com/conda-forge/miniforge/releases/download/25.11.0-1/Miniforge3-25.11.0-1-Linux-x86_64.sh" -O "miniforge.sh" # 為miniforge.sh指令碼添加執行許可權。 chmod +x miniforge.sh # 安裝 miniforge ./miniforge.sh -b source /root/miniforge3/bin/activate # 建立Python虛擬環境。 mamba create -n venv python=3.10 -y eval "$(mamba shell hook --shell bash)" # 啟用Python虛擬環境。 mamba activate venv # 安裝PyFlink依賴。 # update the PyFlink version if needed pip install "apache-flink==1.20.3" # 刪除不必要的 JAR 包以降低包大小 find /root/miniforge3/envs/venv/lib/python3.10/site-packages/pyflink/ -name *.jar | xargs rm # 關閉Conda Python虛擬環境。 mamba deactivate # 將準備好的Conda Python虛擬環境打包。 cd /root/miniforge3/envs/ && zip -r /root/venv.zip venvARM
set -e # 下載miniforge.sh指令碼。 wget "https://github.com/conda-forge/miniforge/releases/download/25.11.0-1/Miniforge3-25.11.0-1-Linux-aarch64.sh" -O "miniforge.sh" # 為miniforge.sh指令碼添加執行許可權。 chmod +x miniforge.sh # 安裝 miniforge ./miniforge.sh -b source /root/miniforge3/bin/activate # 建立Python虛擬環境。 mamba create -n venv python=3.10 -y eval "$(mamba shell hook --shell bash)" # 啟用Python虛擬環境。 mamba activate venv # 安裝PyFlink依賴。 # update the PyFlink version if needed yum install -y java-11-openjdk-devel export JAVA_HOME=/usr/lib/jvm/java-11 wget "https://raw.githubusercontent.com/apache/flink/release-1.20/flink-python/dev/dev-requirements.txt" -O dev-requirements.txt pip install -r dev-requirements.txt pip install "apache-flink==1.20.3" # 刪除不必要的 JAR 包以降低包大小 find /root/miniforge3/envs/venv/lib/python3.10/site-packages/pyflink/ -name *.jar | xargs rm # 關閉Conda Python虛擬環境。 mamba deactivate # 將準備好的Conda Python虛擬環境打包。 cd /root/miniforge3/envs && zip -r /root/venv.zip venvmamba create: 修改為您的目標Python版本。
apache-flink:修改為您作業使用的VVR版本對應的Flink版本,Flink版本查看方法請參見空間管理與操作。
在本地準備build.sh指令碼,其內容如下。
#!/bin/bash set -e -x yum install -y zip wget cd /root/ bash /build/setup-pyflink-virtual-env.sh mv venv.zip /build/在命令列,執行如下命令,完成python虛擬環境的安裝。
X86
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_x86_64 bash ./build.shARM
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_aarch64 bash ./build.sh執行完該命令後,會產生一個名字為venv.zip的檔案,本樣本為Python 3.10的虛擬環境。
您也可以修改上述指令碼,在虛擬環境中安裝所需的第三方Python包。
說明本文以作業為VVR 11.x,Python 3.10為例為您介紹,如果您需要使用其他VVR版本或安裝其他版本的Python的虛擬環境,則需要修改以下參數:
在Python作業中使用Python虛擬環境。
單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊檔案管理,上傳venv.zip檔案。
在頁面,單擊目標作業名稱。
在部署詳情頁簽基礎配置地區Python Archives,選擇venv.zip檔案。
如果SQL作業在虛擬環境中使用Python UDF,需要在運行參數配置地區的其他配置項,添加如下配置資訊。
python.archives: oss://.../venv.zip在運行參數配置地區其他配置項,按照您作業的VVR版本添加對應的指定Python虛擬環境的安裝路徑的配置資訊。
vvr-6.x及以上版本
python.executable: venv.zip/venv/bin/python python.client.executable: venv.zip/venv/bin/pythonvvr-6.x以下版本
python.executable: venv.zip/venv/bin/python
使用第三方Python包
下面將從以下兩個情境為您介紹如何使用第三方Python包:
使用可直接Import的第三方Python包
如果您的第三方Python包是Zip Safe的,即不需要安裝即可直接在Python作業中使用。操作步驟如下:
下載可直接Import的第三方Python包。
在瀏覽器上開啟PyPI頁面。
在搜尋方塊中輸入目標第三方Python包名稱,例如apache-flink 1.20.3。
在搜尋結果中,單擊目標結果名稱。
在左側導覽列,單擊Download files。
單擊檔案名稱中包含cp39-cp39m-manylinux1的包名稱進行下載。
單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊檔案管理,上傳第三方Python包。
在頁面,單擊,Python Libraries項,選擇所上傳的第三方Python包。
單擊儲存。
使用需要編譯的第三方Python包
如果您的第三方Python包是tar.gz格式的壓縮包,或從其他地方下載的源碼包,且壓縮包的根目錄下存在setup.py檔案,則這種類型的第三方Python包通常需要先編譯才能被使用。您需要先在與Flink相容的環境下編譯第三方Python包,然後才可在Python作業中調用第三方Python包。
推薦使用quay.io/pypa/manylinux_2_28_x86_64鏡像容器中的Python 3.9來編譯第三方Python包,使用該容器編譯產生的包相容絕大多數Linux環境,關於該鏡像容器的更多資訊請參見manylinux。
說明Python 3.9的安裝路徑為 /opt/python/cp39-cp39/bin/python3。
下面以opencv-python-headless第三方Python包為例,介紹一下如何編譯和使用該第三方Python包。
編譯第三方Python包。
在本地準備requirements.txt檔案,其內容如下。
opencv-python-headless numpy<2在本地準備build.sh指令碼,其內容如下。
#!/bin/bash set -e -x yum install -y zip #PYBIN=/opt/python/cp37-cp37m/bin #PYBIN=/opt/python/cp38-cp38/bin PYBIN=/opt/python/cp39-cp39/bin #PYBIN=/opt/python/cp310-cp310/bin #PYBIN=/opt/python/cp311-cp311/bin "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd .. rm -rf __pypackages__在CMD命令列,執行如下命令。
X86
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_x86_64 bash ./build.shARM
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_aarch64 bash ./build.sh該命令執行完後,會產生一個名字為deps.zip的檔案,該檔案為編譯之後的第三方Python包。
您也可以修改requirements.txt,安裝其他所需的第三方Python包。此外,requirements.txt檔案中可以指定多個Python依賴。
在Python作業中使用第三方Python包deps.zip。
單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊檔案管理,上傳deps.zip。
在頁面單擊目標作業名稱,在部署詳情頁簽基礎配置地區,單擊編輯後,在Python Libraries項,選擇deps.zip。
單擊儲存。
使用JAR包
如果您的Flink Python作業中使用了Java類,例如作業中使用了Connector或者Java自訂函數時,可以通過如下方式來指定Connector或者Java自訂函數的JAR包。
單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊檔案管理,上傳需要使用的JAR包。
在頁面單擊目標作業名稱,在部署詳情頁簽基礎配置地區,單擊編輯後,在附加依賴檔案項,選擇需要使用的JAR包。
在運行參數配置地區其他配置項,添加配置資訊。
假如需要依賴多個JAR包,且名字分別為jar1.jar和jar2.jar,配置內容如下。
pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'單擊儲存。
使用內建連接器、資料格式和Catalog
僅支援Realtime Compute引擎 VVR 11.2 及以上版本支援使用內建連接器、資料格式和Catalog。
如果您需要在Flink Python作業中使用內建連接器、資料格式和Catalog,可以通過如下方式來指定所用的內建連接器、資料格式和Catalog。
在運行參數配置地區其他配置項,添加配置資訊。
假如需要依賴多個內建連接器,且名字分別為kafka和sls,配置內容如下(內建連接器的具體名字,請參考支援的連接器中各連接器的文檔)。
pipeline.used-builtin-connectors: kafka;sls假如需要依賴多個內建資料格式,且名字分別為avro和parquet,配置內容如下(內建資料格式的具體名字,請參考支援的資料格式的文檔)。
pipeline.used-builtin-formats: avro;parquet假如需要依賴多個內建Catalog,且名字分別為hive-2.3.6和paimon,配置內容如下(內建Catalog的具體名字,請參考資料管理中相應Catalog的文檔)。
pipeline.used-builtin-catalogs: hive-2.3.6;paimon單擊儲存。
使用資料檔案
Flink暫不支援通過上傳資料檔案的方式來進行python作業調試。
下面將從兩個情境為您介紹如何使用資料檔案:
通過Python Archives選項方式
如果您的資料檔案的數量比較多時,您可以將資料檔案打包成一個ZIP包,然後通過如下方式在Python作業中使用。操作步驟如下:
單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊檔案管理,上傳目標資料檔案ZIP包。
在頁面單擊目標作業名稱,在部署詳情頁簽基礎配置地區,單擊編輯後,在Python Archives項,選擇需要使用的資料檔案ZIP包。
在Python自訂函數中,可以通過如下方式訪問資料檔案。假如資料檔案所在壓縮包名稱為mydata.zip。
def map(): with open("mydata.zip/mydata/data.txt") as f: ...
通過附加依賴檔案選項
如果您的資料檔案數量比較少時,可以通過如下方式在Python作業中使用。操作步驟如下:
單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊檔案管理,上傳目標資料檔案。
在頁面單擊目標作業名稱,在部署詳情頁簽基礎配置地區,單擊編輯後,,在附加依賴檔案項,選擇需要的資料檔案。
在Python自訂函數中,可以通過如下方式訪問資料檔案。以資料檔案名稱為data.txt為例的代碼如下。
def map(): with open("/flink/usrlib/data.txt") as f: ...
相關文檔
Python API作業開發的方法請參見Python作業開發。
Flink Python作業的完整開發流程樣本,請參見Flink Python作業快速入門。