本文為您介紹基於MaxCompute用戶端通過Python 3 UDTF讀取MaxCompute資源的使用樣本。
前提條件
已安裝MaxCompute用戶端。更多安裝MaxCompute用戶端操作,請參見安裝並配置MaxCompute用戶端。
UDTF的動態參數說明
Python UDTF函數簽名格式請參見函數簽名及資料類型。
- 您可以在參數列表中使用
*
,表示接受任意長度、任意類型的輸入參數。例如@annotate('double,*->string')
表示接受第一個參數是DOUBLE類型,後接任意長度、任意類型的參數列表。此時,您需要自己編寫代碼判斷輸入的個數和參數類型,然後對它們進行相應的操作(您可以對比C語言裡面的printf
函數來理解此操作)。說明*
用在傳回值列表中時,表示的是不同的含義。 - UDTF的傳回值可以使用
*
,表示返回任意個STRING類型。傳回值的個數與調用函數時設定的別名個數有關。例如@annotate("bigint,string->double,*")
,調用方式是UDTF(x, y) as (a, b, c)
,此處as
後面設定了三個別名,即a
、b
、c
。編輯器會認定a
為DOUBLE類型(Annotation中傳回值第一列的類型是給定的),b
和c
為STRING類型。因為這裡給出了三個傳回值,所以UDTF在調用forward
時,forward
必須是長度為3的數組,否則會出現運行時報錯。說明 這種錯誤無法在編譯時間報出,因此UDTF的調用者在SQL中設定alias個數時,必須遵循該UDTF定義的規則。由於彙總函式的傳回值個數固定是1,所以這個功能對UDAF來說並無意義。
UDTF程式碼範例
- 讀取MaxCompute資原始碼樣本。
from odps.udf import annotate from odps.udf import BaseUDTF from odps.distcache import get_cache_file from odps.distcache import get_cache_table @annotate('string -> string, bigint') class UDTFExample(BaseUDTF): """讀取資源檔和資源表裡的pageid、adid_list,產生dict """ def __init__(self): import json cache_file = get_cache_file('test_json.txt') self.my_dict = json.load(cache_file) cache_file.close() records = list(get_cache_table('table_resource1')) for record in records: self.my_dict[record[0]] = record[1] """輸入pageid,輸出pageid以及它對應的所有adid """ def process(self, pageid): for adid in self.my_dict[pageid]: self.forward(pageid, adid)
- 動態參數程式碼範例。
from odps.udf import annotate from odps.udf import BaseUDTF import json @annotate('string,*->string,*') class JsonTuple(BaseUDTF): def process(self, *args): length = len(args) result = [None] * length try: obj = json.loads(args[0]) for i in range(1, length): result[i] = str(obj.get(args[i])) except Exception as err: result[0] = str(err) for i in range(1, length): result[i] = None self.forward(*result)
以上UDTF樣本中,傳回值個數會根據輸入參數的個數來決定。輸出參數中的第一個參數是一個JSON文本,後面的參數需要從JSON中根據Key進行解析。傳回值中的第一個傳回值是解析JSON過程中的出錯資訊,如果沒有出錯,則會根據輸入的Key依次輸出從JSON中解析出來的內容,使用樣本如下。-- 根據輸入參數的個數定製輸出別名個數。 SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons; -- 變長部分可以一列都沒有。 SELECT my_json_tuple(json) as exceptions FROM jsons; -- 下面這個SQL會出現執行階段錯誤,因為別名個數與實際輸出個數不符。 -- 注意編譯時間無法發現此錯誤。 SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;
操作步驟
- 將UDTF程式碼範例儲存為py_udtf_example.py檔案,放置於MaxCompute用戶端的bin目錄中。
- 登入MaxCompute用戶端建立資源表table_resource1和內部表tmp1(後續執行DML操作寫入的目標表)並插入資料,準備資源檔test_json.txt並放置於MaxCompute用戶端的bin目錄中。更多登入MaxCompute用戶端操作,請參見安裝並登入MaxCompute本地用戶端。建表、插入資料命令及資源檔內容樣本如下:
- 建立資源表table_resource1,並插入資料。
create table if not exists table_resource1 (pageid string, adid_list array<int>); insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));
說明 由於table_resource1中adid_list欄位資料類型為ARRAY,讀取表資源時需要在Session層級執行set odps.sql.python.version=cp37;
命令開啟Python 3來支援讀取ARRAY類型資料。 - 建立內部表tmp1,並插入資料。
create table if not exists tmp1 (pageid string); insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
- 資源檔test_json.txt的內容如下。
{"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
- 建立資源表table_resource1,並插入資料。
在MaxCompute用戶端上建立UDTF函數my_udtf。
更多建立函數資訊,請參見註冊函數。命令樣本如下。
create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
- 在MaxCompute用戶端上執行SQL命令調用新建立的UDTF。命令樣本如下:
- 樣本1:單純使用UDTF函數運行SQL。
返回結果如下。select my_udtf(pageid) as (pageid, adid) from tmp1;
+------------+------------+ | pageid | adid | +------------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | | contact_page3 | 6 | | contact_page3 | 7 | +------------+------------+
- 樣本2:對樣本1中的命令改寫,結合Lateral View運行SQL。
返回結果如下。select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;
+--------+------------+ | pageid | adid | +--------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | | contact_page3 | 6 | | contact_page3 | 7 | +--------+------------+
- 樣本3:結合彙總函式和Lateral View運行SQL。
返回結果如下。select adid, count(1) as cnt from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid group by adid;
+------------+------------+ | adid | cnt | +------------+------------+ | 1 | 1 | | 2 | 1 | | 3 | 2 | | 4 | 1 | | 5 | 2 | | 6 | 1 | | 7 | 1 | +------------+------------+
- 樣本1:單純使用UDTF函數運行SQL。