全部产品
Search
文档中心

人工智能平台 PAI:PyAlink脚本

更新时间:Feb 19, 2024

PyAlink脚本支持通过编写代码的方式来调用Alink的所有算法。您可以使用PyAlink脚本调用Alink的分类算法做分类、 调用回归算法做回归、调用推荐算法做推荐等。PyAlink脚本也支持与其他Designer的算法组件无缝衔接, 完成业务链路的搭建及效果验证。本文为您介绍如何使用PyAlink脚本。

背景信息

PyAlink脚本支持两种使用方式(方式一:单独使用PyAlink脚本方式二:PyAlink脚本与其他Designer的算法组件组合使用),可以使用上百种Alink组件,且支持通过编写代码的方式读入和写出多种类型的数据(PyAlink脚本不同数据类型的读入和写出方式)。后续您可以将PyAlink脚本生成的PipelineModel模型部署为EAS服务,详情请参见使用示例:将PyAlink脚本生成的模型部署为EAS服务

基本概念

在使用PyAlink脚本之前,请先了解以下基本概念。

功能模块

基本概念

Operator

在Alink里,每个算法功能都是一个Operator。分为批式Operator和流式Operator。例如:逻辑回归包含以下Operator:

  • LogisticRegressionTrainBatchOp:逻辑回归训练。

  • LogisticRegressionPredictBatchOp:逻辑回归批式预测。

  • LogisticRegressionPredictStreamhOp:逻辑回归流式预测。

Operator之间使用LinkLinkFrom连接,具体使用示例如下。

# 定义数据。
data = CsvSourceBatchOp()
# 逻辑回归训练。
lrTrain = LogisticRegressionTrainBatchOp()
# 逻辑回归预测。
LrPredict = LogisticRegressionPredictBatchOp()
# 训练。
data.link(lrTrain)
# 预测。
LrPredict.linkFrom(lrTrain, data)

每个Operator都有参数。例如:逻辑回归包含以下参数。

  • labelCol:输入表中的标签列名,必选参数,类型为String。

  • featureCols:特征列名数组,类型为String[],默认值为NULL,表示全选。

配置参数的方式为set+参数名称,具体使用示例如下。

lr = LogisticRegressionTrainBatchOp()\
            .setFeatureCols(colnames)\
            .setLabelCol("label")

数据导入(Source)和数据导出(Sink)是一类特殊的Operator,定义好之后,可以通过Link或LinkFrom和算法组件连接,具体实现如下图所示。

image

Alink包含常用的流式数据源和批式数据源,具体使用示例如下。

df_data = pd.DataFrame([
    [2, 1, 1],
    [3, 2, 1],
    [4, 3, 2],
    [2, 4, 1],
    [2, 2, 1],
    [4, 3, 2],
    [1, 2, 1],
    [5, 3, 2]
])
input = BatchOperator.fromDataframe(df_data, schemaStr='f0 int, f1 int, label int')
# load data
dataTest = input
colnames = ["f0","f1"]
lr = LogisticRegressionTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model = input.link(lr)
predictor = LogisticRegressionPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, dataTest).print()

Pipeline

Alink算法支持的另外一种使用方式。可以将数据处理、特征生成、模型训练放在一起,进行训练、预测及在线服务,具体使用示例如下。

quantileDiscretizer = QuantileDiscretizer()\
            .setNumBuckets(2)\
            .setSelectedCols("sepal_length")

binarizer = Binarizer()\
            .setSelectedCol("petal_width")\
            .setOutputCol("bina")\
            .setReservedCols("sepal_length", "petal_width", "petal_length", "category")\
            .setThreshold(1.);

lda = Lda()\
            .setPredictionCol("lda_pred")\
            .setPredictionDetailCol("lda_pred_detail")\
            .setSelectedCol("category")\
            .setTopicNum(2)\
            .setRandomSeed(0)

pipeline = Pipeline()\
    .add(binarizer)\
    .add(binarizer)\
    .add(lda)

pipeline.fit(data1)
pipeline.transform(data2)

Vector

Alink的一种自定义数据类型,支持以下两种格式。

  • 稀疏向量(SparseVector)

    使用示例:$4$1:0.1 2:0.2。其中:两个美元符号($)中间的数字表示向量长度;美元符号($)之后的值表示列索引:列对应的值。

  • 稠密向量(DenseVector)

    使用示例:0.1 0.2 0.3。表示按照顺序,以空格为分隔符的值。

说明

在Alink里,如果列是Vector类型,则参数名称一般为vectorColName。

PyAlink脚本支持的Alink组件

您可以在PyAlink脚本中使用上百种Alink组件,包括数据处理、特征工程、模型训练等组件。

说明

PyAlink脚本当前仅支持Pipeline组件和批组件,暂时不支持流组件。

方式一:单独使用PyAlink脚本

以ItemCf模型对movielens数据集进行打分为例,介绍如何在Designer平台使用阿里云资源运行使用PyAlink脚本实现的业务流程。具体操作步骤如下所示。

  1. 进入Designer页面,并创建空白工作流,具体操作请参见操作步骤

  2. 在工作流列表,选择已创建的空白工作流,单击进入工作流

  3. 在左侧组件列表的搜索框中,搜索PyAlink脚本,并将PyAlink脚本拖入右侧画布中,画布中自动生成一个名称为PyAlink脚本-1的工作流节点。

    image

  4. 在画布中选中PyAlink脚本-1节点,在右侧参数设置执行调优页签配置相关参数。

    • 参数设置页签编写代码,代码脚本内容如下所示。

      from pyalink.alink import *
      
      def main(sources, sinks, parameter):
          PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
          RATING_FILE = "ratings.csv"
          PREDICT_FILE = "predict.csv"
          RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
      
          ratingsData = CsvSourceBatchOp() \
                  .setFilePath(PATH + RATING_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          predictData = CsvSourceBatchOp() \
                  .setFilePath(PATH + PREDICT_FILE) \
                  .setFieldDelimiter("\t") \
                  .setSchemaStr(RATING_SCHEMA_STRING)
      
          itemCFModel = ItemCfTrainBatchOp() \
                  .setUserCol("user_id").setItemCol("item_id") \
                  .setRateCol("rating").linkFrom(ratingsData);
      
          itemCF = ItemCfRateRecommender() \
                  .setModelData(itemCFModel) \
                  .setItemCol("item_id") \
                  .setUserCol("user_id") \
                  .setReservedCols(["user_id", "item_id"]) \
                  .setRecommCol("prediction_score")
      
          result = itemCF.transform(predictData)
      
          result.link(sinks[0])
          BatchOperator.execute()

      PyAlink脚本支持4个输出桩,代码脚本中通过result.link(sinks[0])将输出的数据写出到第一个输出桩,下游可以通过连接PyAlink脚本的第一个输出桩来读取该脚本输出的数据。PyAlink脚本具体支持的不同数据类型的读入和写出方式请参见PyAlink脚本不同数据类型的读入和写出方式

    • 执行调优页签设置运行模型和节点规格。

      参数

      描述

      选择作业的运行模式

      支持以下两种模式:

      • DLC(单机多并发):建议在任务数据规模小且在调试验证阶段时使用。

      • MaxCompute(分布式):建议在任务数据规模大或在实际生产任务时使用。

      • Flink全托管(分布式):表示使用当前工作空间绑定的Flink集群资源进行分布式执行。

      节点个数

      仅当选择作业的运行模式MaxCompute(分布式)Flink全托管(分布式)时,才需要配置该参数。执行节点的个数,为空时系统根据任务数据自动分配,默认为空。

      每个节点的内存大小,单位MB

      仅当选择作业的运行模式MaxCompute(分布式)Flink全托管(分布式)时,才需要配置该参数。单个节点的内存大小,单位MB。取值为正整数,默认为8192。

      每个节点的CPU核心数目

      仅当选择作业的运行模式MaxCompute(分布式)Flink全托管(分布式)时,才需要配置该参数。单个节点的CPU核心数目,取值为正整数,默认为空。

      选择脚本运行的节点规格

      DLC节点的资源类型配置,默认为2vCPU+8GB Mem-ecs.g6.large。

  5. 在画布上方单击保存按钮,然后单击运行按钮image,运行PyAlink脚本。

  6. 任务运行结束后,右键单击画布中的PyAlink脚本-1,在快捷菜单中,单击查看数据 > 输出0,查看运行结果。

    image

    列名

    描述

    user_id

    用户ID。

    item_id

    电影ID。

    prediction_score

    用来表示用户对电影的喜欢程度,作为电影推荐的参考指标。

方式二:PyAlink脚本与其他Designer的算法组件组合使用

PyAlink脚本的输入桩、输出桩与其他Designer的算法组件无任何差别,可以相互连接共同使用。具体使用方式如下图所示。组合使用

PyAlink脚本不同数据类型的读入和写出方式

  • 读入数据方式。

    • 读取MaxCompute表,通过输入桩的方式从上游传入,代码示例如下。

      train_data = sources[0]
      test_data = sources[1]

      代码中sources[0]表示第一个输入桩对应的MaxCompute表,sources[1]表示第二个输入桩对应的MaxCompute表,依此类推,最多支持4个输入桩。

    • 读取网络文件系统的数据,通过Alink的Source组件(CsvSourceBatchOp,AkSourceBatchOp)在代码中实现数据的读入。支持读入以下两种类型的文件:

      • 读入HTTP格式的网络共享文件,代码示例如下所示:

        ratingsData = CsvSourceBatchOp() \
                    .setFilePath(PATH + RATING_FILE) \
                    .setFieldDelimiter("\t") \
                    .setSchemaStr(RATING_SCHEMA_STRING)
      • 读取OSS网络文件,需要按照下图操作指引,设置数据读取路径。image代码示例如下。

        model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
  • 写出数据方式。

    • 写出MaxCompute表,通过输出桩的方式写出到下游,代码示例如下所示。

      result0.link(sinks[0])
      result1.link(sinks[1])
      BatchOperator.execute()

      result0.link(sinks[0]),该行表示将数据写出,并支持输出桩访问。该行表示第一个输出桩输出结果表,依此类推最多支持输出4个结果表。

    • 写出OSS网络文件,需要按照下图操作指引,设置数据写出路径。image代码示例如下。

      result.link(AkSinkBatchOp() \
                  .setFilePath("oss://xxxxxxxx/model_20220323.ak") \
                  .setOverwriteSink(True))
      BatchOperator.execute()

使用示例:将PyAlink脚本生成的模型部署为EAS服务

  1. 生成待部署的模型。

    当PyAlink脚本生成的模型为PipelineModel时,才能将模型部署为EAS服务。按照以下代码示例生成PipelineModel模型文件,具体操作方法请参见方式一:单独使用PyAlink脚本

    from pyalink.alink import *
    
    def main(sources, sinks, parameter):
        PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/"
        RATING_FILE = "ratings.csv"
        PREDICT_FILE = "predict.csv"
        RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long"
    
        ratingsData = CsvSourceBatchOp() \
                .setFilePath(PATH + RATING_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        predictData = CsvSourceBatchOp() \
                .setFilePath(PATH + PREDICT_FILE) \
                .setFieldDelimiter("\t") \
                .setSchemaStr(RATING_SCHEMA_STRING)
    
        itemCFModel = ItemCfTrainBatchOp() \
                .setUserCol("user_id").setItemCol("item_id") \
                .setRateCol("rating").linkFrom(ratingsData);
    
        itemCF = ItemCfRateRecommender() \
                .setModelData(itemCFModel) \
                .setItemCol("item_id") \
                .setUserCol("user_id") \
                .setReservedCols(["user_id", "item_id"]) \
                .setRecommCol("prediction_score")
    
        model = PipelineModel(itemCF)
        model.save().link(AkSinkBatchOp() \
                .setFilePath("oss://<your_bucket_name>/model.ak") \
                .setOverwriteSink(True))
        BatchOperator.execute()

    其中,<your_bucket_name>为OSS Bucket名称。

    重要

    请确认您对PATH中配置的数据集路径有读权限,否则组件将运行失败。

  2. 生成EAS配置文件。

    执行以下脚本,将输出结果写入config.json文件。

    # EAS的配置文件
    import json
    
    # 生成 EAS 模型配置
    model_config = {}
    # EAS接收数据的schema。
    model_config['inputDataSchema'] = "id long, movieid long" 
    model_config['modelVersion'] = "v0.2"
    
    eas_config = {
        "name": "recomm_demo",
        "model_path": "http://xxxxxxxx/model.ak",
        "processor": "alink_outer_processor",
        "metadata": {
            "instance": 1,
            "memory": 2048,
            "region":"cn-beijing"
        },
        "model_config": model_config
    }
    print(json.dumps(eas_config, indent=4))

    config.json文件中的关键参数解释:

    • name:部署模型服务的名称。

    • model_path:存储PipelineModel模型文件的OSS路径,需要修改为实际存放模型文件的OSS路径。

    config.json文件中的其他参数解释,详情请参见命令使用说明

  3. 将模型部署为EAS服务。

    您可以登录eascmd客户端部署模型服务。如何登录eascmd客户端,请参见下载并认证客户端。以Windows 64版本为例,使用以下命令部署模型服务。

    eascmdwin64.exe create config.json