全部產品
Search
文件中心

:PyODPS使用第三方包

更新時間:Jun 19, 2024

本文為您介紹如何在PyODPS中使用第三方包。PyODPS製作第三方包的操作請參見PyODPS製作第三方包

前提條件

上傳三方包

使用三方包前,請確保您產生的包已被上傳至MaxCompute Archive資源。上傳方式如下:

  • 使用代碼上傳資源。您需要將packages.tar.gz替換成目標包所在的路徑和檔案名稱:

    import os
    from odps import ODPS
    
    # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
    # 不建議直接使用 Access Key ID / Access Key Secret 字串
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='<your-default-project>',
        endpoint='<your-end-point>',
    )
    o.create_resource("test_packed.tar.gz", "archive", fileobj=open("packages.tar.gz", "rb"))
  • 使用DataWorks上傳資源。具體操作請參見步驟一:建立或上傳資源

在Python UDF中使用三方包

您需要對UDF進行修改以使用上傳的三方包。具體如下:

  1. 在UDF類的_init_方法中添加對三方包的引用。

  2. 在UDF代碼(例如evaluate或process方法)中調用三方包。

樣本

下面以實現scipy中的psi函數為例,為您介紹如何在Python UDF中使用三方包。

  1. 使用以下命令打包scipy。

    pyodps-pack -o scipy-bundle.tar.gz scipy
  2. 編寫以下代碼,並將其儲存為test_psi_udf.py

    import sys
    from odps.udf import annotate
    
    @annotate("double->double")
    class MyPsi(object):
        def __init__(self):
            # 將路徑增加到引用路徑
            sys.path.insert(0, "work/scipy-bundle.tar.gz/packages")
    
        def evaluate(self, arg0):
            # 將 import 語句保持在 evaluate 函數內部
            from scipy.special import psi
    
            return float(psi(arg0))

    代碼解釋:__init__函數中將work/scipy-bundle.tar.gz/packages添加至sys.path,因為MaxCompute會將所有UDF引用的Archive資源以資源名稱為目錄解壓至work目錄下,而packages則是pyodps-pack產生包的子目錄。而將對scipy的import放在evaluate函數體內部的原因是三方包僅在執行時可用,當UDF在MaxCompute服務端被解析時,解析環境不包含三方包,函數體外的三方包import會導致報錯。

  3. test_psi_udf.py上傳為MaxCompute Python資源,並將scipy-bundle.tar.gz上傳為Archive資源。

  4. 建立UDF名為test_psi_udf,引用上述兩個資源檔,並指定類名為test_psi_udf.MyPsi

    步驟3~4中,可以使用PyODPS或者MaxCompute用戶端的方式執行。

    • 使用PyODPS執行方法:

      import os
      from odps import ODPS
      
      # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
      # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
      # 不建議直接使用 Access Key ID / Access Key Secret 字串
      o = ODPS(
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
          os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
          project='<your-default-project>',
          endpoint='<your-end-point>',
      )
      
      bundle_res = o.create_resource(
          "scipy-bundle.tar.gz", "archive", fileobj=open("scipy-bundle.tar.gz", "rb")
      )
      udf_res = o.create_resource(
          "test_psi_udf.py", "py", fileobj=open("test_psi_udf.py", "rb")
      )
      o.create_function(
          "test_psi_udf", class_type="test_psi_udf.MyPsi", resources=[bundle_res, udf_res]
      )
    • 使用MaxCompute用戶端執行方法:

      add archive scipy-bundle.tar.gz;
      add py test_psi_udf.py;
      create function test_psi_udf as test_psi_udf.MyPsi using test_psi_udf.py,scipy-bundle.tar.gz;
  5. 完成以上操作後,即可使用UDF執行SQL。

    set odps.pypy.enabled=false;
    set odps.isolation.session.enable=true;
    select test_psi_udf(sepal_length) from iris;

在PyODPS DataFrame中使用三方包

PyODPS DataFrame支援在execute或persist時使用libraries參數使用上面的第三方庫。 下面以map方法為例,apply或map_reduce方法的過程類似。

  1. 使用以下命令打包scipy。

    pyodps-pack -o scipy-bundle.tar.gz scipy
  2. 假定表名為test_float_col,內容只包含一列FLOAT值:

       col1
    0  3.75
    1  2.51

    計算psi(col1)的值,代碼如下:

    import os
    from odps import ODPS, options
    
    def my_psi(v):
        from scipy.special import psi
    
        return float(psi(v))
    
    # 如果 Project 開啟了 Isolation,下面的選項不是必需的
    options.sql.settings = {"odps.isolation.session.enable": True}
    
    # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
    # 不建議直接使用 Access Key ID / Access Key Secret 字串
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='<your-default-project>',
        endpoint='<your-end-point>',
    )
    df = o.get_table("test_float_col").to_df()
    # 直接執行並取得結果
    df.col1.map(my_psi).execute(libraries=["scipy-bundle.tar.gz"])
    # 儲存到另一張表
    df.col1.map(my_psi).persist("result_table", libraries=["scipy-bundle.tar.gz"])
  3. (可選)如果希望在整個執行過程中使用相同的三方包,可以設定全域選項:

    from odps import options
    options.df.libraries = ["scipy-bundle.tar.gz"]

完成以上操作後,即可在DataFrame執行時使用相關的三方包。

在DataWorks中使用三方包

DataWorks PyODPS節點預置了若干三方包,同時提供了load_resource_package方法用以引用其他的包,具體使用方式請參見使用三方包

手動上傳和使用三方包

說明

以下內容僅作為維護舊專案或者舊環境的參考,新專案提案直接使用pyodps-pack打包。

部分舊專案可能採用了之前的方式使用三方包,即手動上傳所有依賴的Wheel包並在代碼中引用,或者使用了不支援二進位包的舊版MaxCompute環境,本節內容為該情境準備。下面以在map中使用python_dateutil為例為您介紹使用三方包的步驟。

  1. 在Linux Bash中使用pip download命令,下載包及其依賴到某個路徑。下載後會出現兩個包,即six-1.10.0-py2.py3-none-any.whlpython_dateutil-2.5.3-py2.py3-none-any.whl

    pip download python-dateutil -d /to/path/
    說明

    您需要下載支援Linux環境的包,建議直接在Linux下調用該命令。

  2. 將上述已下載的兩個包分別上傳至ODPS資源。

    • 方式一:通過代碼上傳。

      # 這裡要確保資源名的尾碼是正確的檔案類型
      odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
      odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
    • 方式二:通過DataWorks介面上傳。

      您可以參考步驟一:建立或上傳資源完成目標資源的上傳與提交。

  3. 使用三方包。

    假定DataFrame只有一個STRING類型的欄位,內容如下。

                   datestr
    0  2016-08-26 14:03:29
    1  2015-08-26 14:03:29
    • 全域配置使用到的三方庫如下:

      from odps import options
      
      def get_year(t):
          from dateutil.parser import parse
          return parse(t).strftime('%Y')
      
      options.df.libraries = ['six.whl', 'python_dateutil.whl']
      df.datestr.map(get_year).execute()
         datestr
      0     2016
      1     2015

    • 通過立即運行方法的libraries參數指定:

      def get_year(t):
          from dateutil.parser import parse
          return parse(t).strftime('%Y')
      
      df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
         datestr
      0     2016
      1     2015

PyODPS預設支援執行純Python且不含檔案操作的第三方庫。在較新版本的MaxCompute服務下,PyODPS也支援執行帶有二進位代碼或帶有檔案操作的Python庫。這些庫名必須擁有一定的尾碼,可根據下表判斷:

平台

Python版本

可用的尾碼

RHEL 5 x86_64

Python 2.7

cp27-cp27m-manylinux1_x86_64

RHEL 5 x86_64

Python 3.7

cp37-cp37m-manylinux1_x86_64

RHEL 7 x86_64

Python 2.7

cp27-cp27m-manylinux1_x86_64, cp27-cp27m-manylinux2010_x86_64, cp27-cp27m-manylinux2014_x86_64

RHEL 7 x86_64

Python 3.7

cp37-cp37m-manylinux1_x86_64, cp37-cp37m-manylinux2010_x86_64, cp37-cp37m-manylinux2014_x86_64

RHEL 7 Arm64

Python 3.7

cp37-cp37m-manylinux2014_aarch64

所有的Wheel包都需要以Archive格式上傳,whl尾碼的包需要重新命名為zip尾碼。同時,作業需要開啟odps.isolation.session.enable選項,或者在Project層級開啟Isolation。以下樣本展示了如何上傳並使用scipy中的特殊函數:

# 對於含有二進位代碼的包,必須使用 Archive 方式上傳資源,whl 尾碼需要改為 zip
odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))

# 如果 Project 開啟了 Isolation,下面的選項不是必需的
options.sql.settings = { 'odps.isolation.session.enable': True }

def my_psi(value):
    # 建議在函數內部 import 第三方庫,以防止不同作業系統下二進位包結構差異造成執行錯誤
    from scipy.special import psi
    return float(psi(value))

df.float_col.map(my_psi).execute(libraries=['scipy.zip'])

對於只提供源碼的二進位包,可以在Linux Shell中打包成Wheel再上傳,Mac和Windows中產生的Wheel包無法在MaxCompute中使用,Linux Shell中打包命令如下:

python setup.py bdist_wheel