本文主要介绍使用FeatureStore Python SDK完成特征平台在推荐场景中全流程的创建以及上线的过程。
前提条件
在开始执行操作前,请确认您已完成以下准备工作。
依赖产品 | 具体操作 |
人工智能平台 PAI |
|
云原生大数据计算服务 MaxCompute |
|
实时数仓 Hologres |
|
大数据开发治理平台 DataWorks |
|
一、准备数据
同步数据表
一般对于推荐场景,需要准备以下数据表:user侧的特征表、item侧的特征表、Label表、序列特征表和行为表。
为方便实践操作,我们在MaxCompute的pai_online_project项目中提前准备了模拟生成的用户表、物料表、Label表、序列特征表和行为表进行示例说明。您需要在DataWorks中执行SQL命令,将以上表从pai_online_project项目同步到自己的MaxCompute中。具体操作步骤如下:
登录DataWorks控制台。
在左侧导航栏单击数据开发与治理 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
鼠标悬停至新建,选择新建节点 > MaxCompute > ODPS SQL,在弹出的页面中配置节点参数。
参数
取值建议
引擎实例
选择已创建的MaxCompute引擎。
节点类型
ODPS SQL
路径
业务流程/Workflow/MaxCompute
名称
可自定义名称。
单击确认。
在新建节点区域运行以下SQL命令,将用户表、物料表、Label表、序列特征表和行为表从pai_online_project项目同步到自己的MaxCompute中。资源组选择已创建的独享资源组。
同步用户表:rec_sln_demo_user_table_preprocess_all_feature_v1(单击查看详情)
同步物料表:rec_sln_demo_item_table_preprocess_all_feature_v1(单击查看详情)
同步Label表:rec_sln_demo_label_table(单击查看详情)
同步序列特征表:rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3(单击查看详情)
同步行为表:rec_sln_demo_behavior_table_preprocess_v3(单击查看详情)
完成上述操作后,您可以在自己的工作空间内查看已同步的表,后续的实践操作将以这五张表为例进行说明。
配置数据源
FeatureStore一般需要配置两个数据源:离线数据源(MaxCompute)和在线数据源(Hologres/GraphCompute/TableStore)。本文以MaxCompute和Hologres为例进行说明。
登录PAI控制台,在左侧导航栏单击数据准备>特征平台(FeatureStore)。
选择工作空间后,单击进入FeatureStore。
配置MaxCompute数据源。
在数据源页签,单击新建数据源,在弹出的页面中配置MaxCompute数据源具体参数。
参数
取值建议
类型
MaxCompute
名称
可自定义名称。
MaxCompute项目名
选择已创建的MaxCompute项目。
请复制语句并单击立即前往,同步数据至Hologres,在DataWorks执行该语句后,即可完成授权。
说明授权操作需要您的账号拥有admin权限,具体操作详情请参见通过命令管理用户权限或通过控制台管理用户权限。
完成后单击提交。
配置Hologres数据源。
在数据源页签,单击新建数据源,在弹出的页面中配置Hologres数据源具体参数。
参数
取值建议
类型
Hologres
名称
可自定义名称。
实例ID
选择已创建的Hologres实例名称。
数据库名称
选择已创建的实例数据库。
完成后单击提交。
对Hologres进行授权,具体操作详情请参见配置数据源。
二、FeatureStore Python SDK创建流程
安装特征平台Python SDK, 要求在Python3环境下运行。以下代码均建议在Jupyter Notebook环境下运行。
! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl
导入需要的功能模块。
import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient, build_feature_store_client
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
特征工程项目
您可以通过特征平台创建多个项目空间,每个项目空间是独立的。具体操作,请参见配置FeatureStore项目。运行Notebook需要FeatureStore服务端配合运行,开通特征平台后需要配置数据源,具体操作请参见配置数据源。
其中,offline_datasource_id指的是离线数据源ID,online_datasource_id指的是在线数据源ID。
此处以项目名称是fs_movie进行说明。
access_id = ''
access_ak = ''
region = 'cn-beijing'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_demo"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError("Need to create project : fs_movie")
运行以下代码获取当前的project并打印其信息。
project = fs.get_project(cur_project_name)
project.print_summary()
特征实体(FeatureEntity)
特征实体描述了一组相关的特征集合。多个特征视图可以关联一个特征实体。每个实体都会有一个JoinId,通过JoinId可以关联多个特征视图特征。每一个特征视图都有一个主键(索引键)来获取它的特征数据,但是索引键可以和JoinId定义的名称不一样。
一般在推荐系统中,特征只关联user和item两个特征实体,即特征属于user侧或者item侧。此处以创建 user和item两个特征实体为例进行说明。
创建user entity
user_entity_name = "user" user_join_id = 'user_id' user_entity = project.get_entity(user_entity_name) if user_entity is None: user_entity = project.create_entity(name = user_entity_name, join_id=user_join_id) user_entity.print_summary()
创建item entity
item_entity_name = "item" join_id = 'item_id' item_entity = project.get_entity(item_entity_name) if item_entity is None: item_entity = project.create_entity(name = item_entity_name, join_id=join_id) item_entity.print_summary()
特征视图(FeatureView)
FeatureStore是一个专门用来管理和组织特征数据的平台,外部数据需要通过特征视图进入FeatureStore。特征视图定义了数据从哪里来(DataSource)、需要进行哪些预处理或转换操作(如特征工程/Transformation)、特征的数据结构(包含特征名称和类型在内的特征schema)、数据存储的位置(OnlineStore/OfflineStore),并提供特征元信息管理,如主键、事件时间、分区键、特征实体以及有效期设定ttl(默认-1表示永久有效,正数则表示在线查询时会取ttl内的最新特征数据)。
特征视图分为三种类型:Batch FeatureView(离线特征,或者T-1天特征)、Stream FeatureView(实时特征)、Sequence FeatureView(序列特征)。
Batch FeatureView(离线特征视图)
将离线数据注入到FeatureStore的OfflineStore中,并可以根据需求同步至OnlineStore以支持实时查询。一般的离线特征,或者T-1天的特征。
注册user侧离线特征表
将rec_sln_demo_user_table_preprocess_all_feature_v1注册到特征平台。
user_feature_view_name = "user_table_preprocess_all_feature_v1" user_table_name = "rec_sln_demo_user_table_preprocess_all_feature_v1" user_feature_view = project.get_feature_view(user_feature_view_name) if user_feature_view is None: ds = MaxComputeDataSource(project.offline_datasource_id, user_table_name) user_feature_view = project.create_batch_feature_view(name=user_feature_view_name, datasource=ds, online=True, entity= user_entity_name, primary_key='user_id', register=True) print(user_feature_view)
将离线数据源中的rec_sln_demo_user_table_preprocess_all_feature_v1中20231023分区的数据同步到在线数据源。
user_task = user_feature_view.publish_table({'ds':'20231023'}) user_task.wait()
查看任务运行情况。
user_task.print_summary()
注册item侧离线特征表
将rec_sln_demo_item_table_preprocess_all_feature_v1表注册到特征平台。
item_feature_view_name = "item_table_preprocess_all_feature_v1" item_table_name = "rec_sln_demo_item_table_preprocess_all_feature_v1" item_feature_view = project.get_feature_view(item_feature_view_name) if item_feature_view is None: ds = MaxComputeDataSource(project.offline_datasource_id, item_table_name) item_feature_view = project.create_batch_feature_view(name=item_feature_view_name, datasource=ds, online = True, entity= item_entity_name, primary_key='item_id', register=True) print(item_feature_view)
将离线数据源中的rec_sln_demo_item_table_preprocess_all_feature_v1中20231023分区的数据同步到在线数据源。
item_task = item_feature_view.publish_table({'ds':'20231023'}) item_task.wait()
查看任务运行情况。
item_task.print_summary()
Sequence FeatureView(实时序列视图)
支持离线写入序列特征,以及查询和读取实时序列特征。一般在推荐场景中,离线序列特征表(记为F1)前期由模拟生成,后续可以直接由线上日志代替。而在线实时序列的查询中,会查询两张在线行为表的数据,一张为T-1天的行为表(记为B1),另外一张为T天当天的实时行为表(记为B2)。B2是实时计算更新的特征,同时查询B1和B2行为表后拼出该用户的特征序列,然后与其他特征合在一起送到模型打分。
在线T-1天的行为表B1一般是由离线T-1天行为表(记为A1)同步,同步过程中,特征平台会自动完成去重等操作。 在线T天的行为表B2目前需要您通过API接口或Flink等阿里云产品写入。
因此在注册实时特征视图过程中,相当于特征平台同时维护管理了四张表:离线序列表F1、离线T-1天的行为表A1、在线T-1天的行为表B1、在线T天的行为表B2。
注册时,只需要提供离线序列表F1,离线T-1天的行为表A1即可,在线行为表和同步去重过程由特征平台完成。
注册实时特征视图。
seq_feature_view_name = "wide_seq_feature_v3" seq_feature_view = project.get_feature_view(seq_feature_view_name) if seq_feature_view is None: seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3" behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3' ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name) event_time = 'event_unix_time' # 行为表中的 event time 字段名 item_id = 'item_id' # 行为表中的 item_id 字段名 event = 'event' # 行为表中的 event 字段名 # deduplication_method = 1, 表示 ['user_id', 'item_id', 'event'] 去重 # deduplication_method = 2, 表示 ['user_id', 'item_id', 'event', 'event_time'] 去重 sequence_feature_config_list = [SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50', seq_len=50)] # offline_seq_name 指的是离线序列表中序列特征字段名,seq_event 指的是行为字段名,online_seq_name 指的是特征平台在线 Go SDK 查出该 user 对应的行为序列 item_ids 后,会以该名称命名。 # seq_len 指的是序列长度,大于该长度的序列会被截断。 seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id', event_time='event_unix_time') seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds, event_time=event_time, item_id=item_id, event=event, deduplication_method=1, sequence_feature_config=sequence_feature_config_list, sequence_table_config=seq_table_config, entity=user_entity_name) # seq_feature_view.print_summary() print(seq_feature_view)
将离线数据源中rec_sln_demo_behavior_table_preprocess_v3中20231023分区的数据同步到在线数据源。同步时会自动检查是否有前N天分区的数据,如果没有会自动补充,N可以通过
days_to_load
指定,默认是30,一般取30即可。seq_task = seq_feature_view.publish_table({'ds':'20231023'}, days_to_load=30) seq_task.wait()
查看任务运行情况。
seq_task.print_summary()
Stream FeatureView(实时特征视图)
将数据直接写入OnlineStore,并同时同步到OfflineStore。用于特征实时更新的场景,例如商品的价格、销量的实时更新。
Label 表注册
label_table_name = 'rec_sln_demo_label_table'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='event_unix_time')
print(label_table)
获取在线特征
获取在线特征用于排查离线在线一致性,数据分析等需求,目前优先支持 Hologres。
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
ret_features_1 = user_feature_view.get_online_features(join_ids={'user_id':['169898460', '148811946']}, features=['user_id', 'gender', 'age', 'city'])
print("ret_features = ", ret_features_1)
TrainingSet
训练模型时,首先要构造样本表,样本表由Label数据和特征数据组成。在与FeatureStore交互时,Label数据需要由客户提供,并且需要定义要获取的特征名称,然后根据主键进行point-in-time join(存在event_time的情况下)。
# 指定 Label 表
label_table_name = 'rec_sln_demo_label_table'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_selector = FeatureSelector(user_feature_view_name, '*') # '*' 代表选择全部特征
item_feature_view_name = "item_table_preprocess_all_feature_v1"
item_feature_selector = FeatureSelector(item_feature_view_name, '*')
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_selector = FeatureSelector(seq_feature_view_name, ['click_seq_50_seq'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[user_feature_selector, item_feature_selector, seq_feature_selector])
print("train_set = ", train_set)
模型特征
训练模型并部署成服务后,进行业务预测。其中,训练样本可以从上文的train_set获得。
model_name = "fs_rank_v2"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)
三、导出样本表并训练模型
实际训练时,需要导出样本表。
导出样本表
指定Label表以及各个特征视图的分区、event_time。
cur_day = '20231024'
pre_day = '20231023'
label_partitions = PartitionConfig(name = 'ds', value = cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)
user_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1',
partition_config=user_partitions)
item_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1',
partition_config=item_partitions)
seq_partitions = PartitionConfig(name = 'ds', value = cur_day)
feature_view_seq_config = FeatureViewConfig(name = 'wide_seq_feature_v3', partition_config=seq_partitions, event_time='event_unix_time', equal=True)
feature_view_config_list = [feature_view_user_config, feature_view_item_config, feature_view_seq_config]
train_set_partitions = PartitionConfig(name = 'ds', value = cur_day)
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)
model_name = 'fs_rank_v2'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task_summary = ", task.task_summary)
训练模型
EasyRec是一个开源的推荐系统框架,可以与FeatureStore无缝衔接,进行训练模型、导出模型、上线模型的操作。推荐您将fs_demo_fs_rank_v2_trainning_set表作为输入,使用EasyRec训练模型。
更多EasyRec相关的问题,请通过钉钉群(32260796)加入阿里云人工智能平台PAI咨询群联系我们。
四、上线模型
训练并导出模型后,可以进行部署和上线的操作。如果是自建推荐系统,FeatureStore提供FeatureStore Python/Go/Cpp/Java SDK,可以与各式各样的自建推荐系统进行衔接。您也可以通过钉钉群(32260796)联系我们咨询和商讨具体方案。如果是阿里云系列产品,可以和FeatureStore无缝衔接,快速搭建推荐系统上线。
本文以阿里云系列产品为例介绍上线模型操作。
例行同步数据节点
上线前需要将数据同步节点例行,即例行将数据从离线数据源同步到在线数据源中
登录DataWorks控制台。
在左侧导航栏单击数据开发与治理 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
同步例行user表。
鼠标悬停至新建,选择新建节点 > MaxCompute > PyODPS 3。
在弹出的页面中配置节点参数,单击确认。
复制以下内容到脚本中,完成user_table_preprocess_all_feature_v1例行同步。
单击右侧调度配置,在弹出的页面中配置调度参数。
参数
取值建议
调度参数
参数名
dt
参数值
$[yyyymmdd-1]
资源属性
调度资源组
选择已创建的独享资源组。
调度依赖
选择已创建的user表。
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
同步例行item表。
鼠标悬停至新建,选择新建节点 > MaxCompute > PyODPS 3。
在弹出的页面中配置节点参数,单击确认。
复制以下内容到脚本中。
单击右侧调度配置,在弹出的页面中配置调度参数。
参数
取值建议
调度参数
参数名
dt
参数值
$[yyyymmdd-1]
资源属性
调度资源组
选择已创建的独享资源组。
调度依赖
选择已创建的item表。
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
同步实时序列行为表。
鼠标悬停至新建,选择新建节点 > MaxCompute > PyODPS 3。
在弹出的页面中配置节点参数,单击确认。
复制以下内容到脚本中。
单击右侧调度配置,在弹出的页面中配置调度参数。
参数
取值建议
调度参数
参数名
dt
参数值
$[yyyymmdd-1]
资源属性
调度资源组
选择已创建的独享资源组。
调度依赖
选择已创建的item表。
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
同步完成后,可以在Hologres中查看最新同步的特征。
创建与部署EAS模型服务
模型服务是为了接收推荐引擎的请求,根据请求为item集合打分并返回分数。其中EasyRec Processor内部包含了FeatureStore Cpp SDK,可以实现低延时、高性能的获取特征操作,EasyRec Processor从Feature Store Cpp SDK取得特征后,送入模型进行推理,拿到打分后返回给推荐引擎。
部署模型服务的流程如下。
登录DataWorks控制台。
在左侧导航栏单击数据开发与治理 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
鼠标悬停至新建,选择新建节点 > MaxCompute > PyODPS 3。
在弹出的页面中配置节点参数,单击确认。
复制以下内容到脚本中。
import os import json config = { "name": "fs_demo_v1", "metadata": { "cpu": 4, "rpc.max_queue_size": 256, "rpc.enable_jemalloc": 1, "gateway": "default", "memory": 16000 }, "model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg", "model_config": { "access_key_id": f'{o.account.access_id}', "access_key_secret": f'{o.account.secret_access_key}', "region": "cn-beijing", "fs_project": "fs_demo", "fs_model": "fs_rank_v2", "fs_entity": "item", "load_feature_from_offlinestore": True, "steady_mode": True, "period": 2880, "outputs": "probs_is_click,y_ln_playtime,probs_is_praise", "fg_mode": "tf" }, "processor": "easyrec-1.8", "processor_type": "cpp" } with open("echo.json", "w") as output_file: json.dump(config, output_file) # 第一次部署时运行这行 os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json") # 例行更新时运行下面这行 # os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")
单击右侧调度配置,在弹出的页面中配置调度参数。
参数
取值建议
调度参数
参数名
dt
参数值
$[yyyymmdd-1]
资源属性
调度资源组
选择已创建的独享资源组。
调度依赖
选择对应的训练任务和item_table_preprocess_all_feature_v1。
节点配置并测试完成后,运行查看部署情况。
部署完成后,注释掉第34行代码,并将37行取消注释,提交任务例行。
(可选)您可以在EAS模型在线服务页面的推理服务页签,查看已部署的服务。操作详情请参见服务部署:控制台。
配置PAI-Rec
PAI-Rec是推荐引擎服务,其中集成了FeatureStore的Go SDK,可以与FeatureStore和EAS无缝衔接。
具体配置步骤如下。
配置FeatureStoreConfs。
RegionId
:修改为产品实际所在区域,此处以cn-beijing为例。ProjectName
:已创建的FeatureStore项目名称,fs_demo。
"FeatureStoreConfs": { "pairec-fs": { "RegionId": "cn-beijing", "AccessId": "${AccessKey}", "AccessKey": "${AccessSecret}", "ProjectName": "fs_demo" } },
配置FeatureConfs。
FeatureStoreName
:和上一步FeatureStoreConfs中的设置pairec-fs保持一致。FeatureStoreModelName
:已创建的模型特征名称,fs_rank_v1。FeatureStoreEntityName
:已创建的特征实体名称,user。表示PAI-Rec引擎中通过FeatureStore Go SDK去获取模型fs_rank_v1中特征实体为user侧的特征。
"FeatureConfs": { "recreation_rec": { "AsynLoadFeature": true, "FeatureLoadConfs": [ { "FeatureDaoConf": { "AdapterType": "featurestore", "FeatureStoreName": "pairec-fs", "FeatureKey": "user:uid", "FeatureStoreModelName": "fs_rank_v1", "FeatureStoreEntityName": "user", "FeatureStore": "user" } } ] } },
配置AlgoConfs。
此配置代表告诉PAI-Rec去连接哪一个EAS模型打分服务。
Name
:和部署的EAS服务名称一致。Url
和Auth
:EAS服务给出的信息,您可以在EAS模型在线服务页面单击服务名称,然后在服务详情页签单击查看调用信息获取URL和Token。更多详细配置可参见EAS常见问题。
"AlgoConfs": [ { "Name": "fs_demo_v1", "Type": "EAS", "EasConf": { "Processor": "EasyRec", "Timeout": 300, "ResponseFuncName": "easyrecMutValResponseFunc", "Url": "eas_url_xxx", "EndpointType": "DIRECT", "Auth": "eas_token" } } ],