PyAlink指令碼支援通過編寫代碼的方式來調用Alink的所有演算法。您可以使用PyAlink指令碼調用Alink的分類演算法做分類、 調用迴歸演算法做迴歸、調用推薦演算法做推薦等。PyAlink指令碼也支援與其他Designer的演算法組件無縫銜接, 完成業務鏈路的搭建及效果驗證。本文為您介紹如何使用PyAlink指令碼。
背景資訊
PyAlink指令碼支援兩種使用方式(方式一:單獨使用PyAlink指令碼、方式二:PyAlink指令碼與其他Designer的演算法組件組合使用),可以使用上百種Alink組件,且支援通過編寫代碼的方式讀入和寫出多種類型的資料(PyAlink指令碼不同資料類型的讀入和寫出方式)。後續您可以將PyAlink指令碼產生的PipelineModel模型部署為EAS服務,詳情請參見使用樣本:將PyAlink指令碼產生的模型部署為EAS服務。
基本概念
在使用PyAlink指令碼之前,請先瞭解以下基本概念。
功能模組 | 基本概念 |
Operator | 在Alink裡,每個演算法功能都是一個Operator。分為批式Operator和流式Operator。例如:羅吉斯迴歸包含以下Operator:
Operator之間使用Link或LinkFrom串連,具體使用樣本如下。
每個Operator都有參數。例如:羅吉斯迴歸包含以下參數。
配置參數的方式為set+參數名稱,具體使用樣本如下。
資料匯入(Source)和資料匯出(Sink)是一類特殊的Operator,定義好之後,可以通過Link或LinkFrom和演算法元件連線,具體實現如下圖所示。 Alink包含常用的流式資料來源和批式資料來源,具體使用樣本如下。
|
Pipeline | Alink演算法支援的另外一種使用方式。可以將資料處理、特徵產生、模型訓練放在一起,進行訓練、預測及線上服務,具體使用樣本如下。
|
Vector | Alink的一種自訂資料類型,支援以下兩種格式。
說明 在Alink裡,如果列是Vector類型,則參數名稱一般為vectorColName。 |
PyAlink指令碼支援的Alink組件
您可以在PyAlink指令碼中使用上百種Alink組件,包括資料處理、特徵工程、模型訓練等組件。
PyAlink指令碼當前僅支援Pipeline組件和批組件,暫時不支援流組件。
方式一:單獨使用PyAlink指令碼
以ItemCf模型對movielens資料集進行打分為例,介紹如何在Designer平台使用阿里雲資源運行使用PyAlink指令碼實現的商務程序。具體操作步驟如下所示。
進入Designer頁面,並建立空白工作流程,具體操作請參見操作步驟。
在工作流程列表,選擇已建立的空白工作流程,單擊進入工作流程。
在左側組件列表的搜尋方塊中,搜尋PyAlink指令碼,並將PyAlink指令碼拖入右側畫布中,畫布中自動產生一個名稱為PyAlink指令碼-1的工作流程節點。
在畫布中選中PyAlink指令碼-1節點,在右側參數設定和執行調優頁簽配置相關參數。
在參數設定頁簽編寫代碼,代碼指令碼內容如下所示。
from pyalink.alink import * def main(sources, sinks, parameter): PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/" RATING_FILE = "ratings.csv" PREDICT_FILE = "predict.csv" RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long" ratingsData = CsvSourceBatchOp() \ .setFilePath(PATH + RATING_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING) predictData = CsvSourceBatchOp() \ .setFilePath(PATH + PREDICT_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING) itemCFModel = ItemCfTrainBatchOp() \ .setUserCol("user_id").setItemCol("item_id") \ .setRateCol("rating").linkFrom(ratingsData); itemCF = ItemCfRateRecommender() \ .setModelData(itemCFModel) \ .setItemCol("item_id") \ .setUserCol("user_id") \ .setReservedCols(["user_id", "item_id"]) \ .setRecommCol("prediction_score") result = itemCF.transform(predictData) result.link(sinks[0]) BatchOperator.execute()
PyAlink指令碼支援4個輸出樁,代碼指令碼中通過
result.link(sinks[0])
將輸出的資料寫出到第一個輸出樁,下遊可以通過串連PyAlink指令碼的第一個輸出樁來讀取該指令碼輸出的資料。PyAlink指令碼具體支援的不同資料類型的讀入和寫出方式請參見PyAlink指令碼不同資料類型的讀入和寫出方式。在執行調優頁簽設定運行模型和節點規格。
參數
描述
選擇作業的運行模式
支援以下兩種模式:
DLC(單機多並發):建議在任務資料規模小且在調實驗證階段時使用。
MaxCompute(分布式):建議在任務資料規模大或在實際生產任務時使用。
Flink全託管(分布式):表示使用當前工作空間綁定的Flink叢集資源進行分布式執行。
節點個數
僅當選擇作業的運行模式為MaxCompute(分布式)或Flink全託管(分布式)時,才需要配置該參數。執行節點的個數,為空白時系統根據任務資料自動分配,預設為空白。
每個節點的記憶體大小,單位MB
僅當選擇作業的運行模式為MaxCompute(分布式)或Flink全託管(分布式)時,才需要配置該參數。單個節點的記憶體大小,單位MB。取值為正整數,預設為8192。
每個節點的CPU核心數目
僅當選擇作業的運行模式為MaxCompute(分布式)或Flink全託管(分布式)時,才需要配置該參數。單個節點的CPU核心數目,取值為正整數,預設為空白。
選擇指令碼啟動並執行節點規格
DLC節點的資源類型配置,預設為2vCPU+8GB Mem-ecs.g6.large。
在畫布上方單擊儲存按鈕,然後單擊運行按鈕,運行PyAlink指令碼。
任務運行結束後,按右鍵畫布中的PyAlink指令碼-1,在捷徑功能表中,單擊 ,查看運行結果。
列名
描述
user_id
使用者ID。
item_id
電影ID。
prediction_score
用來表示使用者對電影的喜歡程度,作為電影推薦的參考指標。
方式二:PyAlink指令碼與其他Designer的演算法組件組合使用
PyAlink指令碼的輸入樁、輸出樁與其他Designer的演算法組件無任何差別,可以相互串連共同使用。具體使用方式如下圖所示。
PyAlink指令碼不同資料類型的讀入和寫出方式
讀入資料方式。
讀取MaxCompute表,通過輸入樁的方式從上遊傳入,程式碼範例如下。
train_data = sources[0] test_data = sources[1]
代碼中sources[0]表示第一個輸入樁對應的MaxCompute表,sources[1]表示第二個輸入樁對應的MaxCompute表,依此類推,最多支援4個輸入樁。
讀取網路檔案系統的資料,通過Alink的Source組件(CsvSourceBatchOp,AkSourceBatchOp)在代碼中實現資料的讀入。支援讀入以下兩種類型的檔案:
讀入HTTP格式的網際網路共用檔案,程式碼範例如下所示:
ratingsData = CsvSourceBatchOp() \ .setFilePath(PATH + RATING_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING)
讀取OSS網路檔案,需要按照下圖操作指引,設定資料讀取路徑。程式碼範例如下。
model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
寫出資料方式。
寫出MaxCompute表,通過輸出樁的方式寫出到下遊,程式碼範例如下所示。
result0.link(sinks[0]) result1.link(sinks[1]) BatchOperator.execute()
result0.link(sinks[0]),該行表示將資料寫出,並支援輸出樁訪問。該行表示第一個輸出樁輸出結果表,依此類推最多支援輸出4個結果表。
寫出OSS網路檔案,需要按照下圖操作指引,設定資料寫出路徑。程式碼範例如下。
result.link(AkSinkBatchOp() \ .setFilePath("oss://xxxxxxxx/model_20220323.ak") \ .setOverwriteSink(True)) BatchOperator.execute()
使用樣本:將PyAlink指令碼產生的模型部署為EAS服務
產生待部署的模型。
當PyAlink指令碼產生的模型為PipelineModel時,才能將模型部署為EAS服務。按照以下程式碼範例產生PipelineModel模型檔案,具體操作方法請參見方式一:單獨使用PyAlink指令碼。
from pyalink.alink import * def main(sources, sinks, parameter): PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/" RATING_FILE = "ratings.csv" PREDICT_FILE = "predict.csv" RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long" ratingsData = CsvSourceBatchOp() \ .setFilePath(PATH + RATING_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING) predictData = CsvSourceBatchOp() \ .setFilePath(PATH + PREDICT_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING) itemCFModel = ItemCfTrainBatchOp() \ .setUserCol("user_id").setItemCol("item_id") \ .setRateCol("rating").linkFrom(ratingsData); itemCF = ItemCfRateRecommender() \ .setModelData(itemCFModel) \ .setItemCol("item_id") \ .setUserCol("user_id") \ .setReservedCols(["user_id", "item_id"]) \ .setRecommCol("prediction_score") model = PipelineModel(itemCF) model.save().link(AkSinkBatchOp() \ .setFilePath("oss://<your_bucket_name>/model.ak") \ .setOverwriteSink(True)) BatchOperator.execute()
其中,
<your_bucket_name>
為OSS Bucket名稱。重要請確認您對PATH中配置的資料集路徑有讀許可權,否則組件將運行失敗。
產生EAS設定檔。
執行以下指令碼,將輸出結果寫入config.json檔案。
# EAS的設定檔 import json # 產生 EAS 模型配置 model_config = {} # EAS接收資料的schema。 model_config['inputDataSchema'] = "id long, movieid long" model_config['modelVersion'] = "v0.2" eas_config = { "name": "recomm_demo", "model_path": "http://xxxxxxxx/model.ak", "processor": "alink_outer_processor", "metadata": { "instance": 1, "memory": 2048, "region":"cn-beijing" }, "model_config": model_config } print(json.dumps(eas_config, indent=4))
config.json檔案中的關鍵參數解釋:
name:部署模型服務的名稱。
model_path:儲存PipelineModel模型檔案的OSS路徑,需要修改為實際存放模型檔案的OSS路徑。
config.json檔案中的其他參數解釋,詳情請參見命令使用說明。
將模型部署為EAS服務。
您可以登入eascmd用戶端部署模型服務。如何登入eascmd用戶端,請參見下載並認證用戶端。以Windows 64版本為例,使用以下命令部署模型服務。
eascmdwin64.exe create config.json