PyAlink脚本支持通过编写代码的方式来调用Alink的所有算法。您可以使用PyAlink脚本调用Alink的分类算法做分类、 调用回归算法做回归、调用推荐算法做推荐等。PyAlink脚本也支持与其他Designer的算法组件无缝衔接, 完成业务链路的搭建及效果验证。本文为您介绍如何使用PyAlink脚本。
背景信息
PyAlink脚本支持两种使用方式(方式一:单独使用PyAlink脚本、方式二:PyAlink脚本与其他Designer的算法组件组合使用),可以使用上百种Alink组件,且支持通过编写代码的方式读入和写出多种类型的数据(PyAlink脚本不同数据类型的读入和写出方式)。后续您可以将PyAlink脚本生成的PipelineModel模型部署为EAS服务,详情请参见使用示例:将PyAlink脚本生成的模型部署为EAS服务。
基本概念
在使用PyAlink脚本之前,请先了解以下基本概念。
功能模块 | 基本概念 |
Operator | 在Alink里,每个算法功能都是一个Operator。分为批式Operator和流式Operator。例如:逻辑回归包含以下Operator:
Operator之间使用Link或LinkFrom连接,具体使用示例如下。
每个Operator都有参数。例如:逻辑回归包含以下参数。
配置参数的方式为set+参数名称,具体使用示例如下。
数据导入(Source)和数据导出(Sink)是一类特殊的Operator,定义好之后,可以通过Link或LinkFrom和算法组件连接,具体实现如下图所示。 Alink包含常用的流式数据源和批式数据源,具体使用示例如下。
|
Pipeline | Alink算法支持的另外一种使用方式。可以将数据处理、特征生成、模型训练放在一起,进行训练、预测及在线服务,具体使用示例如下。
|
Vector | Alink的一种自定义数据类型,支持以下两种格式。
说明 在Alink里,如果列是Vector类型,则参数名称一般为vectorColName。 |
PyAlink脚本支持的Alink组件
您可以在PyAlink脚本中使用上百种Alink组件,包括数据处理、特征工程、模型训练等组件。
PyAlink脚本当前仅支持Pipeline组件和批组件,暂时不支持流组件。
方式一:单独使用PyAlink脚本
以ItemCf模型对movielens数据集进行打分为例,介绍如何在Designer平台使用阿里云资源运行使用PyAlink脚本实现的业务流程。具体操作步骤如下所示。
进入Designer页面,并创建空白工作流,具体操作请参见操作步骤。
在工作流列表,选择已创建的空白工作流,单击进入工作流。
在左侧组件列表的搜索框中,搜索PyAlink脚本,并将PyAlink脚本拖入右侧画布中,画布中自动生成一个名称为PyAlink脚本-1的工作流节点。
在画布中选中PyAlink脚本-1节点,在右侧参数设置和执行调优页签配置相关参数。
在参数设置页签编写代码,代码脚本内容如下所示。
from pyalink.alink import * def main(sources, sinks, parameter): PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/" RATING_FILE = "ratings.csv" PREDICT_FILE = "predict.csv" RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long" ratingsData = CsvSourceBatchOp() \ .setFilePath(PATH + RATING_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING) predictData = CsvSourceBatchOp() \ .setFilePath(PATH + PREDICT_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING) itemCFModel = ItemCfTrainBatchOp() \ .setUserCol("user_id").setItemCol("item_id") \ .setRateCol("rating").linkFrom(ratingsData); itemCF = ItemCfRateRecommender() \ .setModelData(itemCFModel) \ .setItemCol("item_id") \ .setUserCol("user_id") \ .setReservedCols(["user_id", "item_id"]) \ .setRecommCol("prediction_score") result = itemCF.transform(predictData) result.link(sinks[0]) BatchOperator.execute()
PyAlink脚本支持4个输出桩,代码脚本中通过
result.link(sinks[0])
将输出的数据写出到第一个输出桩,下游可以通过连接PyAlink脚本的第一个输出桩来读取该脚本输出的数据。PyAlink脚本具体支持的不同数据类型的读入和写出方式请参见PyAlink脚本不同数据类型的读入和写出方式。在执行调优页签设置运行模型和节点规格。
参数
描述
选择作业的运行模式
支持以下两种模式:
DLC(单机多并发):建议在任务数据规模小且在调试验证阶段时使用。
MaxCompute(分布式):建议在任务数据规模大或在实际生产任务时使用。
Flink全托管(分布式):表示使用当前工作空间绑定的Flink集群资源进行分布式执行。
节点个数
仅当选择作业的运行模式为MaxCompute(分布式)或Flink全托管(分布式)时,才需要配置该参数。执行节点的个数,为空时系统根据任务数据自动分配,默认为空。
每个节点的内存大小,单位MB
仅当选择作业的运行模式为MaxCompute(分布式)或Flink全托管(分布式)时,才需要配置该参数。单个节点的内存大小,单位MB。取值为正整数,默认为8192。
每个节点的CPU核心数目
仅当选择作业的运行模式为MaxCompute(分布式)或Flink全托管(分布式)时,才需要配置该参数。单个节点的CPU核心数目,取值为正整数,默认为空。
选择脚本运行的节点规格
DLC节点的资源类型配置,默认为2vCPU+8GB Mem-ecs.g6.large。
在画布上方单击保存按钮,然后单击运行按钮,运行PyAlink脚本。
任务运行结束后,右键单击画布中的PyAlink脚本-1,在快捷菜单中,单击 ,查看运行结果。
列名
描述
user_id
用户ID。
item_id
电影ID。
prediction_score
用来表示用户对电影的喜欢程度,作为电影推荐的参考指标。
方式二:PyAlink脚本与其他Designer的算法组件组合使用
PyAlink脚本的输入桩、输出桩与其他Designer的算法组件无任何差别,可以相互连接共同使用。具体使用方式如下图所示。
PyAlink脚本不同数据类型的读入和写出方式
读入数据方式。
读取MaxCompute表,通过输入桩的方式从上游传入,代码示例如下。
train_data = sources[0] test_data = sources[1]
代码中sources[0]表示第一个输入桩对应的MaxCompute表,sources[1]表示第二个输入桩对应的MaxCompute表,依此类推,最多支持4个输入桩。
读取网络文件系统的数据,通过Alink的Source组件(CsvSourceBatchOp,AkSourceBatchOp)在代码中实现数据的读入。支持读入以下两种类型的文件:
读入HTTP格式的网络共享文件,代码示例如下所示:
ratingsData = CsvSourceBatchOp() \ .setFilePath(PATH + RATING_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING)
读取OSS网络文件,需要按照下图操作指引,设置数据读取路径。代码示例如下。
model_data = AkSourceBatchOp().setFilePath("oss://xxxxxxxx/model_20220323.ak")
写出数据方式。
写出MaxCompute表,通过输出桩的方式写出到下游,代码示例如下所示。
result0.link(sinks[0]) result1.link(sinks[1]) BatchOperator.execute()
result0.link(sinks[0]),该行表示将数据写出,并支持输出桩访问。该行表示第一个输出桩输出结果表,依此类推最多支持输出4个结果表。
写出OSS网络文件,需要按照下图操作指引,设置数据写出路径。代码示例如下。
result.link(AkSinkBatchOp() \ .setFilePath("oss://xxxxxxxx/model_20220323.ak") \ .setOverwriteSink(True)) BatchOperator.execute()
使用示例:将PyAlink脚本生成的模型部署为EAS服务
生成待部署的模型。
当PyAlink脚本生成的模型为PipelineModel时,才能将模型部署为EAS服务。按照以下代码示例生成PipelineModel模型文件,具体操作方法请参见方式一:单独使用PyAlink脚本。
from pyalink.alink import * def main(sources, sinks, parameter): PATH = "http://alink-test.oss-cn-beijing.aliyuncs.com/yuhe/movielens/" RATING_FILE = "ratings.csv" PREDICT_FILE = "predict.csv" RATING_SCHEMA_STRING = "user_id long, item_id long, rating int, ts long" ratingsData = CsvSourceBatchOp() \ .setFilePath(PATH + RATING_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING) predictData = CsvSourceBatchOp() \ .setFilePath(PATH + PREDICT_FILE) \ .setFieldDelimiter("\t") \ .setSchemaStr(RATING_SCHEMA_STRING) itemCFModel = ItemCfTrainBatchOp() \ .setUserCol("user_id").setItemCol("item_id") \ .setRateCol("rating").linkFrom(ratingsData); itemCF = ItemCfRateRecommender() \ .setModelData(itemCFModel) \ .setItemCol("item_id") \ .setUserCol("user_id") \ .setReservedCols(["user_id", "item_id"]) \ .setRecommCol("prediction_score") model = PipelineModel(itemCF) model.save().link(AkSinkBatchOp() \ .setFilePath("oss://<your_bucket_name>/model.ak") \ .setOverwriteSink(True)) BatchOperator.execute()
其中,
<your_bucket_name>
为OSS Bucket名称。重要请确认您对PATH中配置的数据集路径有读权限,否则组件将运行失败。
生成EAS配置文件。
执行以下脚本,将输出结果写入config.json文件。
# EAS的配置文件 import json # 生成 EAS 模型配置 model_config = {} # EAS接收数据的schema。 model_config['inputDataSchema'] = "id long, movieid long" model_config['modelVersion'] = "v0.2" eas_config = { "name": "recomm_demo", "model_path": "http://xxxxxxxx/model.ak", "processor": "alink_outer_processor", "metadata": { "instance": 1, "memory": 2048, "region":"cn-beijing" }, "model_config": model_config } print(json.dumps(eas_config, indent=4))
config.json文件中的关键参数解释:
name:部署模型服务的名称。
model_path:存储PipelineModel模型文件的OSS路径,需要修改为实际存放模型文件的OSS路径。
config.json文件中的其他参数解释,详情请参见命令使用说明。
将模型部署为EAS服务。
您可以登录eascmd客户端部署模型服务。如何登录eascmd客户端,请参见下载并认证客户端。以Windows 64版本为例,使用以下命令部署模型服务。
eascmdwin64.exe create config.json