全部產品
Search
文件中心

Platform For AI:PyAlink指令碼

更新時間:Jul 13, 2024

PyAlink指令碼支援通過編寫代碼的方式來調用Alink的所有演算法。您可以使用PyAlink指令碼調用Alink的分類演算法做分類、 調用迴歸演算法做迴歸、調用推薦演算法做推薦等。PyAlink指令碼也支援與其他Designer的演算法組件無縫銜接, 完成業務鏈路的搭建及效果驗證。本文為您介紹如何使用PyAlink指令碼。

背景資訊

PyAlink指令碼支援兩種使用方式(方式一:單獨使用PyAlink指令碼方式二:PyAlink指令碼與其他Designer的演算法組件組合使用),可以使用上百種Alink組件,且支援通過編寫代碼的方式讀入和寫出多種類型的資料(PyAlink指令碼不同資料類型的讀入和寫出方式)。後續您可以將PyAlink指令碼產生的PipelineModel模型部署為EAS服務,詳情請參見使用樣本:將PyAlink指令碼產生的模型部署為EAS服務

基本概念

在使用PyAlink指令碼之前,請先瞭解以下基本概念。

功能模組

基本概念

Operator

在Alink裡,每個演算法功能都是一個Operator。分為批式Operator和流式Operator。例如:羅吉斯迴歸包含以下Operator:

  • LogisticRegressionTrainBatchOp:羅吉斯迴歸訓練。

  • LogisticRegressionPredictBatchOp:羅吉斯迴歸批式預測。

  • LogisticRegressionPredictStreamhOp:羅吉斯迴歸流式預測。

Operator之間使用LinkLinkFrom串連,具體使用樣本如下。

# 定義資料。
data = CsvSourceBatchOp()
# 羅吉斯迴歸訓練。
lrTrain = LogisticRegressionTrainBatchOp()
# 羅吉斯迴歸預測。
LrPredict = LogisticRegressionPredictBatchOp()
# 訓練。
data.link(lrTrain)
# 預測。
LrPredict.linkFrom(lrTrain, data)

每個Operator都有參數。例如:羅吉斯迴歸包含以下參數。

  • labelCol:輸入表中的標籤列名,必選參數,類型為String。

  • featureCols:特徵列名數組,類型為String[],預設值為NULL,表示全選。

配置參數的方式為set+參數名稱,具體使用樣本如下。

lr = LogisticRegressionTrainBatchOp()\
            .setFeatureCols(colnames)\
            .setLabelCol("label")

資料匯入(Source)和資料匯出(Sink)是一類特殊的Operator,定義好之後,可以通過Link或LinkFrom和演算法元件連線,具體實現如下圖所示。

Alink包含常用的流式資料來源和批式資料來源,具體使用樣本如下。

df_data = pd.DataFrame([
    [2, 1, 1],
    [3, 2, 1],
    [4, 3, 2],
    [2, 4, 1],
    [2, 2, 1],
    [4, 3, 2],
    [1, 2, 1],
    [5, 3, 2]
])
input = BatchOperator.fromDataframe(df_data, schemaStr='f0 int, f1 int, label int')
# load data
dataTest = input
colnames = ["f0","f1"]
lr = LogisticRegressionTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model = input.link(lr)
predictor = LogisticRegressionPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, dataTest).print()

Pipeline

Alink演算法支援的另外一種使用方式。可以將資料處理、特徵產生、模型訓練放在一起,進行訓練、預測及線上服務,具體使用樣本如下。

quantileDiscretizer = QuantileDiscretizer()\
            .setNumBuckets(2)\
            .setSelectedCols("sepal_length")

binarizer = Binarizer()\
            .setSelectedCol("petal_width")\
            .setOutputCol("bina")\
            .setReservedCols("sepal_length", "petal_width", "petal_length", "category")\
            .setThreshold(1.);

lda = Lda()\
            .setPredictionCol("lda_pred")\
            .setPredictionDetailCol("lda_pred_detail")\
            .setSelectedCol("category")\
            .setTopicNum(2)\
            .setRandomSeed(0)

pipeline = Pipeline()\
    .add(binarizer)\
    .add(binarizer)\
    .add(lda)

pipeline.fit(data1)
pipeline.transform(data2)

Vector

Alink的一種自訂資料類型,支援以下兩種格式。

  • 稀疏向量(SparseVector)

    使用樣本:$4$1:0.1 2:0.2。其中:兩個貨幣符號($)中間的數字表示向量長度;貨幣符號($)之後的值表示列索引:列對應的值。

  • 稠密向量(DenseVector)

    使用樣本:0.1 0.2 0.3。表示按照順序,以空格為分隔字元的值。

說明

在Alink裡,如果列是Vector類型,則參數名稱一般為vectorColName。

PyAlink指令碼支援的Alink組件

您可以在PyAlink指令碼中使用上百種Alink組件,包括資料處理、特徵工程、模型訓練等組件。

說明

PyAlink指令碼當前僅支援Pipeline組件和批組件,暫時不支援流組件。

方式一:單獨使用PyAlink指令碼

以ItemCf模型對movielens資料集進行打分為例,介紹如何在Designer平台使用阿里雲資源運行使用PyAlink指令碼實現的商務程序。具體操作步驟如下所示。

  1. 進入Designer頁面,並建立空白工作流程,具體操作請參見操作步驟

  2. 在工作流程列表,選擇已建立的空白工作流程,單擊進入工作流程

  3. 在左側組件列表的搜尋方塊中,搜尋PyAlink指令碼,並將PyAlink指令碼拖入右側畫布中,畫布中自動產生一個名稱為PyAlink指令碼-1的工作流程節點。

    image

  4. 在畫布中選中PyAlink指令碼-1節點,在右側參數設定執行調優頁簽配置相關參數。

    • 參數設定頁簽編寫代碼,代碼指令碼內容如下所示。

      from pyalink.alink import *
      
      def main(sources, sinks, parameter):
          PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
          RATING_FILE = "ratings.csv"
          PREDICT_FILE = "predict.csv"
          RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
      
          ratingsData = CsvSourceBatchOp() \
                  .setFilePath(PATH + RATING_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          predictData = CsvSourceBatchOp() \
                  .setFilePath(PATH + PREDICT_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          itemCFModel = ItemCfTrainBatchOp() \
                  .setUserCol("user_id").setItemCol("item_id") \
                  .setRateCol("rating").linkFrom(ratingsData);
      
          itemCF = ItemCfRateRecommender() \
                  .setModelData(itemCFModel) \
                  .setItemCol("item_id") \
                  .setUserCol("user_id") \
                  .setReservedCols(["user_id", "item_id"]) \
                  .setRecommCol("prediction_score")
      
          result = itemCF.transform(predictData)
      
          result.link(sinks[0])
          BatchOperator.execute()

      PyAlink指令碼支援4個輸出樁,代碼指令碼中通過result.link(sinks[0])將輸出的資料寫出到第一個輸出樁,下遊可以通過串連PyAlink指令碼的第一個輸出樁來讀取該指令碼輸出的資料。PyAlink指令碼具體支援的不同資料類型的讀入和寫出方式請參見PyAlink指令碼不同資料類型的讀入和寫出方式

    • 執行調優頁簽設定運行模型和節點規格。

      參數

      描述

      選擇作業的運行模式

      支援以下兩種模式:

      • DLC(單機多並發):建議在任務資料規模小且在調實驗證階段時使用。

      • MaxCompute(分布式):建議在任務資料規模大或在實際生產任務時使用。

      • Flink全託管(分布式):表示使用當前工作空間綁定的Flink叢集資源進行分布式執行。

      節點個數

      僅當選擇作業的運行模式MaxCompute(分布式)Flink全託管(分布式)時,才需要配置該參數。執行節點的個數,為空白時系統根據任務資料自動分配,預設為空白。

      每個節點的記憶體大小,單位MB

      僅當選擇作業的運行模式MaxCompute(分布式)Flink全託管(分布式)時,才需要配置該參數。單個節點的記憶體大小,單位MB。取值為正整數,預設為8192。

      每個節點的CPU核心數目

      僅當選擇作業的運行模式MaxCompute(分布式)Flink全託管(分布式)時,才需要配置該參數。單個節點的CPU核心數目,取值為正整數,預設為空白。

      選擇指令碼啟動並執行節點規格

      DLC節點的資源類型配置,預設為2vCPU+8GB Mem-ecs.g6.large。

  5. 在畫布上方單擊儲存按鈕,然後單擊運行按鈕image,運行PyAlink指令碼。

  6. 任務運行結束後,按右鍵畫布中的PyAlink指令碼-1,在捷徑功能表中,單擊查看資料 > 輸出0,查看運行結果。

    image

    列名

    描述

    user_id

    使用者ID。

    item_id

    電影ID。

    prediction_score

    用來表示使用者對電影的喜歡程度,作為電影推薦的參考指標。

方式二:PyAlink指令碼與其他Designer的演算法組件組合使用

PyAlink指令碼的輸入樁、輸出樁與其他Designer的演算法組件無任何差別,可以相互串連共同使用。具體使用方式如下圖所示。組合使用

PyAlink指令碼不同資料類型的讀入和寫出方式

  • 讀入資料方式。

    • 讀取MaxCompute表,通過輸入樁的方式從上遊傳入,程式碼範例如下。

      train_data = sources[0]
      test_data = sources[1]

      代碼中sources[0]表示第一個輸入樁對應的MaxCompute表,sources[1]表示第二個輸入樁對應的MaxCompute表,依此類推,最多支援4個輸入樁。

    • 讀取網路檔案系統的資料,通過Alink的Source組件(CsvSourceBatchOp,AkSourceBatchOp)在代碼中實現資料的讀入。支援讀入以下兩種類型的檔案:

      • 讀入HTTP格式的網際網路共用檔案,程式碼範例如下所示:

        ratingsData = CsvSourceBatchOp() \
                    .setFilePath(PATH + RATING_FILE) \
                    .setFieldDelimiter("\t") \
                    .setSchemaStr(RATING_SCHEMA_STRING)
      • 讀取OSS網路檔案,需要按照下圖操作指引,設定資料讀取路徑。image程式碼範例如下。

        model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
  • 寫出資料方式。

    • 寫出MaxCompute表,通過輸出樁的方式寫出到下遊,程式碼範例如下所示。

      result0.link(sinks[0])
      result1.link(sinks[1])
      BatchOperator.execute()

      result0.link(sinks[0]),該行表示將資料寫出,並支援輸出樁訪問。該行表示第一個輸出樁輸出結果表,依此類推最多支援輸出4個結果表。

    • 寫出OSS網路檔案,需要按照下圖操作指引,設定資料寫出路徑。image程式碼範例如下。

      result.link(AkSinkBatchOp() \
                  .setFilePath("oss://xxxxxxxx/model_20220323.ak") \
                  .setOverwriteSink(True))
      BatchOperator.execute()

使用樣本:將PyAlink指令碼產生的模型部署為EAS服務

  1. 產生待部署的模型。

    當PyAlink指令碼產生的模型為PipelineModel時,才能將模型部署為EAS服務。按照以下程式碼範例產生PipelineModel模型檔案,具體操作方法請參見方式一:單獨使用PyAlink指令碼

    from pyalink.alink import *
    
    def main(sources, sinks, parameter):
        PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
        RATING_FILE = "ratings.csv"
        PREDICT_FILE = "predict.csv"
        RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
    
        ratingsData = CsvSourceBatchOp() \
                .setFilePath(PATH + RATING_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        predictData = CsvSourceBatchOp() \
                .setFilePath(PATH + PREDICT_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        itemCFModel = ItemCfTrainBatchOp() \
                .setUserCol("user_id").setItemCol("item_id") \
                .setRateCol("rating").linkFrom(ratingsData);
    
        itemCF = ItemCfRateRecommender() \
                .setModelData(itemCFModel) \
                .setItemCol("item_id") \
                .setUserCol("user_id") \
                .setReservedCols(["user_id", "item_id"]) \
                .setRecommCol("prediction_score")
    
        model = PipelineModel(itemCF)
        model.save().link(AkSinkBatchOp() \
                .setFilePath("oss://<your_bucket_name>/model.ak") \
                .setOverwriteSink(True))
        BatchOperator.execute()

    其中,<your_bucket_name>為OSS Bucket名稱。

    重要

    請確認您對PATH中配置的資料集路徑有讀許可權,否則組件將運行失敗。

  2. 產生EAS設定檔。

    執行以下指令碼,將輸出結果寫入config.json檔案。

    # EAS的設定檔
    import json
    
    # 產生 EAS 模型配置
    model_config = {}
    # EAS接收資料的schema。
    model_config['inputDataSchema'] = "id long, movieid long" 
    model_config['modelVersion'] = "v0.2"
    
    eas_config = {
        "name": "recomm_demo",
        "model_path": "http://xxxxxxxx/model.ak",
        "processor": "alink_outer_processor",
        "metadata": {
            "instance": 1,
            "memory": 2048,
            "region":"cn-beijing"
        },
        "model_config": model_config
    }
    print(json.dumps(eas_config, indent=4))

    config.json檔案中的關鍵參數解釋:

    • name:部署模型服務的名稱。

    • model_path:儲存PipelineModel模型檔案的OSS路徑,需要修改為實際存放模型檔案的OSS路徑。

    config.json檔案中的其他參數解釋,詳情請參見命令使用說明

  3. 將模型部署為EAS服務。

    您可以登入eascmd用戶端部署模型服務。如何登入eascmd用戶端,請參見下載並認證用戶端。以Windows 64版本為例,使用以下命令部署模型服務。

    eascmdwin64.exe create config.json