全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:Python 3 UDTF

更新时间:Aug 01, 2024

Python官方即将停止维护Python 2,MaxCompute已支持Python 3,对应版本为CPython-3.7.3。本文为您介绍如何通过Python 3语言编写UDTF。

UDTF代码结构

您可以通过MaxCompute Studio工具使用Python 3语言编写UDTF代码,代码中需要包含如下信息:

  • 导入模块:必选。

    至少要包含from odps.udf import annotatefrom odps.udf import BaseUDTFfrom odps.udf import annotate用于导入函数签名模块,MaxCompute才可以识别后续代码中定义的函数签名。from odps.udf import BaseUDTF为Python UDTF的基类,您需要通过此类在派生类中实现processclose等方法。

    当UDTF代码中需要引用文件资源或表资源时,需要包含from odps.distcache import get_cache_file(文件资源)或from odps.distcache import get_cache_table(表资源)。

  • 函数签名:可选。

    格式为@annotate(<signature>)signature用于定义函数的输入参数和返回值的数据类型。如果不指定函数签名,在SQL中调用UDTF时,可以匹配任意类型的输入参数,但返回值类型无法推导,所有输出参数都将会是STRING类型。更多函数签名信息,请参见函数签名及数据类型

  • 自定义Python类(派生类):必选。

    UDTF代码的组织单位,定义了实现业务需求的变量及方法。您还可以在代码中引用MaxCompute内置的第三方库或引用文件、表资源。更多信息,请参见第三方库引用资源

  • 实现Python类的方法:必选。

    Python类实现包含如下4个方法,您可以根据实际需要进行选择。

    方法定义描述
    BaseUDTF.init()初始化方法。派生类如果需要实现此方法,必须在一开始调用基类的初始化方法super(BaseUDTF, self).init()init方法在整个UDTF生命周期中只会被调用一次,即在处理第一条记录之前。当UDTF需要保存内部状态时,可以通过此方法初始化所有状态。
    BaseUDTF.process([args, ...])SQL中每一条记录都会对应调用一次processprocess的参数为SQL语句中指定的UDTF输入参数。
    BaseUDTF.forward([args, ...])UDTF的输出方法。此方法由用户代码调用。每调用一次forward,就会输出一条记录。forward的参数为SQL语句中指定的UDTF的输出参数。

    如果Python代码中未指定函数签名,在调用forward方法时,必须将所有输出值转换为STRING类型。

    BaseUDTF.close()UDTF的结束方法。只会被调用一次,即在处理完最后一条记录之后被调用。

UDTF代码示例如下。

#导入函数签名模块及基类。
from odps.udf import annotate
from odps.udf import BaseUDTF
#函数签名。
@annotate('string -> string')
#自定义Python类。
class Explode(BaseUDTF):
#实现Python类的方法。
   def process(self, arg):
       props = arg.split(',')
       for p in props:
           self.forward(p)
说明

Python 2 UDTF与Python 3 UDTF区别在于底层Python语言版本不一致,请您根据对应版本语言支持的能力编写UDTF。

注意事项

Python 3与Python 2不兼容。在您使用Python 3之前,需要考虑兼容性问题,在一个SQL中不允许同时使用Python 3和Python 2。

说明

Python 2官方已于2020年初停止维护,建议您根据项目类型执行迁移操作:全新项目:新MaxCompute停止维护Python 2,

Python 2 UDTF迁移

Python 2官方即将停止维护,建议您根据项目类型执行迁移操作:

  • 全新项目:新MaxCompute项目,或第一次使用Python语言编写UDTF的MaxCompute项目。建议所有的Python UDTF都直接使用Python 3语言编写。

  • 存量项目:创建了大量Python 2 UDTF的MaxCompute项目。请您谨慎开启Python 3。如果您计划逐步将所有Python 2 UDTF迁移为Python 3 UDTF,推荐方法如下:

    • 新作业和新UDTF:使用Python 3语言编写,在Session级别开启Python 3。开启Python 3方法,请参见开启Python 3

    • Python 2 UDTF:改写Python 2 UDTF,使其可以同时兼容Python 2和Python 3。改写方法请参见将Python 2代码移植到Python 3

      说明

      如果您需要编写公共UDTF,并为多个MaxCompute项目授权UDTF的操作权限,建议UDTF同时兼容Python 2和Python 3。

开启Python 3

MaxCompute默认使用Python 2,如果您要使用Python 3,可以在Session级别设置如下属性开启Python 3,并与SQL语句一起提交执行。

set odps.sql.python.version=cp37;

第三方库

MaxCompute内置的Python 3运行环境中未安装第三方库Numpy。如果您需要使用Numpy的UDTF,请手动上传Numpy的WHEEL包。从PyPI或镜像下载Numpy包时,包的文件名为numpy-<版本号>-cp37-cp37m-manylinux1_x86_64.whl。上传包的操作请参见资源操作UDF示例:Python UDF使用第三方包

函数签名及数据类型

函数签名格式如下。

@annotate(<signature>)

signature为函数签名字符串,用于标识输入参数和返回值的数据类型。执行UDTF时,UDTF函数的输入参数和返回值类型要与函数签名指定的类型一致。查询语义解析阶段会检查不符合函数签名定义的用法,检查到类型不匹配时会报错。具体格式如下。

'arg_type_list -> type_list'

其中:

  • type_list:表示返回值的数据类型。UDTF可以返回多列。支持的数据类型为:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。

  • arg_type_list:表示输入参数的数据类型。输入参数可以为多个,用英文逗号(,)分隔。支持的数据类型为BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、复杂数据类型(ARRAY、MAP、STRUCT)或复杂数据类型嵌套。

    arg_type_list还支持星号(*)或为空(''):

    • arg_type_list为星号(*)时,表示输入参数为任意个数。

    • arg_type_list为空('')时,表示无输入参数。

    更多Resolve注解语法扩展详情,请参见UDAF和UDTF动态参数说明

说明

在编写UDTF代码过程中,您可以根据MaxCompute项目的数据类型版本选取合适的数据类型,更多数据类型版本及各版本支持的数据类型信息,请参见数据类型版本说明

合法的函数签名示例如下。

函数签名示例

说明

@annotate('bigint,boolean->string,datetime')

输入参数类型为BIGINT、BOOLEAN,返回值类型为STRING、DATETIME。

@annotate('*->string, datetime')

输入任意个参数,返回值类型为STRING、DATETIME。

@annotate('->double, bigint, string')

无输入参数,返回值类型为DOUBLE、BIGINT、STRING。

@annotate("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")

输入参数类型为ARRAY、STRUCT、MAP,返回值类型为MAP、STRUCT。

为确保编写Python UDTF过程中使用的数据类型与MaxCompute支持的数据类型保持一致,您需要关注二者间的数据类型映射关系。具体映射关系如下。

MaxCompute SQL Type

Python 3 Type

BIGINT

INT

STRING

UNICODE

DOUBLE

FLOAT

BOOLEAN

BOOL

DATETIME

DATETIME.DATETIME

FLOAT

FLOAT

CHAR

UNICODE

VARCHAR

UNICODE

BINARY

BYTES

DATE

DATETIME.DATE

DECIMAL

DECIMAL.DECIMAL

ARRAY

LIST

MAP

DICT

STRUCT

COLLECTIONS.NAMEDTUPLE

引用资源

Python UDTF可以通过odps.distcache模块引用资源,支持引用文件资源和表资源。

  • odps.distcache.get_cache_file(resource_name):返回指定文件资源的内容。
    • resource_name为STRING类型,对应当前MaxCompute项目中已存在的文件资源名。如果文件资源名非法或者没有相应的文件资源,会返回异常。
      说明 使用UDTF访问资源,在创建UDTF时需要声明引用的资源,否则会报错。
    • 返回值为File-like对象。在使用完此对象后,您需要调用close方法释放打开的资源文件。
  • odps.distcache.get_cache_table(resource_name):返回指定表资源的内容。
    • resource_name支持STRING类型,对应当前MaxCompute项目中已存在的表资源名。如果表资源名非法或者没有相应的表资源,会返回异常。
    • 返回值为GENERATOR类型,调用者以遍历方式获取表的内容,每次遍历可得到以数组形式存在的表中的一条记录。

引用文件资源和表资源的代码示例如下。

from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
    """读取资源文件和资源表里的pageid、adid_list,生成dict
    """
    def __init__(self):
        import json
        cache_file = get_cache_file('test_json.txt')
        self.my_dict = json.load(cache_file)
        cache_file.close()
        records = list(get_cache_table('table_resource1'))
        for record in records:
            self.my_dict[record[0]] = record[1]
    """输入pageid,输出pageid以及它对应的所有adid
    """
    def process(self, pageid):
        for adid in self.my_dict[pageid]:
            self.forward(pageid, adid)

使用说明

按照开发流程,完成Python 3 UDTF开发后,您即可通过MaxCompute SQL调用Python 3 UDTF。调用方法如下:

  • 在归属MaxCompute项目中使用自定义函数:使用方法与内建函数类似,您可以参照内建函数的使用方法使用自定义函数。

  • 跨项目使用自定义函数:即在项目A中使用项目B的自定义函数,跨项目分享语句示例:select B:udf_in_other_project(arg0, arg1) as res from table_t;。更多跨项目分享信息,请参见基于Package跨项目访问资源

使用MaxCompute Studio完整开发及调用Python 3 UDTF的操作,请参见开发Python UDF