本文为您介绍在不使用阿里云其他产品的场景下,如何在推荐系统中应用特征平台FeatureStore SDK管理特征。
背景信息
推荐系统是一种能够根据用户的兴趣和偏好,向用户推荐个性化的内容或产品的系统。在推荐系统中,提取和配置用户或物品的特征信息非常重要。通过本文提供的解决方案,您可以了解到如何使用FeatureStore搭建推荐系统,以及FeatureStore在推荐系统中如何通过各个版本的SDK管理特征数据。
更多关于FeatureStore的信息,请参见FeatureStore概述。
如果您在配置或使用过程中有任何问题,可以搜索钉钉群号:34415007523,进入答疑群联系技术人员进行咨询。
前提条件
在开始执行操作前,请确认您已完成以下准备工作。
依赖产品 | 具体操作 |
人工智能平台 PAI |
|
云原生大数据计算服务 MaxCompute |
|
实时数仓 Hologres |
|
大数据开发治理平台 DataWorks |
|
一、准备数据
同步数据表
一般对于推荐场景,需要准备三张数据表:user侧的特征表、item侧的特征表以及Label表。
为方便实践操作,我们在MaxCompute的pai_online_project项目中提前准备了模拟生成的用户表、物料表和Label表进行示例说明。其中,用户表、物料表每个分区大约有10万条数据,在MaxCompute中分别占用约70 MB;Label表每个分区约45万条数据,在MaxCompute中占用约5 MB。
您需要在DataWorks中执行SQL命令,将用户表、物料表、Label表从pai_online_project项目同步到自己的MaxCompute中。具体操作步骤如下:
登录DataWorks控制台。
在左侧导航栏单击数据开发与治理 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
鼠标悬停至新建,选择新建节点 > MaxCompute > ODPS SQL,在弹出的页面中配置节点参数。
参数
取值建议
引擎实例
选择已创建的MaxCompute引擎。
节点类型
ODPS SQL
路径
业务流程/Workflow/MaxCompute
名称
可自定义名称。
单击确认。
在新建节点区域运行以下SQL命令,将用户表、物料表、Label表从pai_online_project项目同步到自己的MaxCompute中。资源组选择已创建的独享资源组。
同步用户表:rec_sln_demo_user_table_preprocess_all_feature_v1(单击查看详情)
同步物料表:rec_sln_demo_item_table_preprocess_all_feature_v1(单击查看详情)
同步Label表:rec_sln_demo_label_table(单击查看详情)
完成上述操作后,您可以在自己的工作空间内查看用户表rec_sln_demo_user_table_preprocess_all_feature_v1、物料表rec_sln_demo_item_table_preprocess_all_feature_v1和Label表rec_sln_demo_label_table,后续的实践操作将以这三张表为例进行说明。
配置数据源
FeatureStore一般需要配置两个数据源:离线数据源(MaxCompute)和在线数据源(Hologres/GraphCompute/TableStore)。本文以MaxCompute和Hologres为例进行说明。
登录PAI控制台,在左侧导航栏单击数据准备>特征平台(FeatureStore)。
选择工作空间后,单击进入FeatureStore。
配置MaxCompute数据源。
在数据源页签,单击新建数据源,在弹出的页面中配置MaxCompute数据源具体参数。
参数
取值建议
类型
MaxCompute
名称
可自定义名称。
MaxCompute项目名
选择已创建的MaxCompute项目。
请复制语句并单击立即前往,同步数据至Hologres,在DataWorks执行该语句后,即可完成授权。
说明授权操作需要您的账号拥有admin权限,具体操作详情请参见通过命令管理用户权限或通过控制台管理用户权限。
完成后单击提交。
配置Hologres数据源。
在数据源页签,单击新建数据源,在弹出的页面中配置Hologres数据源具体参数。
参数
取值建议
类型
Hologres
名称
可自定义名称。
实例ID
选择已创建的Hologres实例名称。
数据库名称
选择已创建的实例数据库。
完成后单击提交。
对Hologres进行授权,具体操作详情请参见配置数据源。
二、创建并注册FeatureStore
您可以根据实际需求选择通过控制台或者SDK两种方式创建并注册FeatureStore。由于后续导出Training Set和同步数据都需要使用SDK,所以如果选择控制台操作的方式,完成控制台配置后,仍需要安装FeatureStore Python SDK。
方式一:控制台操作
创建FeatureStore项目
登录PAI控制台,在左侧导航栏单击数据准备>特征平台(FeatureStore)。
选择工作空间后,单击进入FeatureStore。
单击新建项目,在弹出的页面中配置项目参数。
参数
取值建议
名称
可自定义名称。本文以fs_demo为例进行说明。
描述
可自定义描述。
离线数据源(Offline Store)
选择已创建的MaxCompute数据源。
在线数据源(Online Store)
选择已创建的Hologres数据源。
单击提交,完成FeatureStore项目创建。
创建特征实体(Feature Entity)
在FeatureStore项目列表页面,单击项目名称,进入项目详情页面。
在特征实体页签中单击新建特征实体,在弹出的页面中配置user特征实体参数。
参数
取值建议
特征实体名称
可自定义名称。本文以user为例进行说明。
Join Id
user_id
单击提交。
单击新建特征实体,在弹出的页面中配置item特征实体参数。
参数
取值建议
特征实体名称
可自定义名称。本文以item为例进行说明。
Join Id
item_id
单击提交,完成特征实体创建。
创建特征视图(Feature View)
在特征项目详情页面的特征视图页签,单击新建特征视图,在弹出的页面中配置user特征视图参数。
参数
取值建议
视图名称
可自定义名称。本文以user_table_preprocess_all_feature_v1为例进行说明。
类型
离线
写入方式
使用离线表
数据源
选择已创建的MaxCompute数据源。
特征表
选择已准备的用户表rec_sln_demo_user_table_preprocess_all_feature_v1。
特征字段
勾选user_id主键。
同步在线特征表
是
特征实体
user
特征生命周期(秒)
保持默认。
单击提交。
单击新建特征视图,在弹出的页面中配置item特征视图。
参数
取值建议
视图名称
可自定义名称。本文以item_table_preprocess_all_feature_v1为例进行说明。
类型
离线
写入方式
使用离线表
数据源
选择已创建的MaxCompute数据源。
特征表
选择已准备的物料表rec_sln_demo_item_table_preprocess_all_feature_v1。
特征字段
勾选item_id主键。
同步在线特征表
是
特征实体
item
特征生命周期(秒)
保持默认。
完成后单击提交,完成特征视图创建。
创建Label表
在特征项目详情页面的Label表页签,单击新建Label表,在弹出的页面中配置Label表信息。
参数
取值建议
数据源
选择已创建的MaxCompute数据源。
表名
选择已准备的Label表rec_sln_demo_label_table。
单击提交。
创建模型特征
在特征项目详情页面的模型特征页签,单击新建模型特征,在弹出的页面中配置模型特征参数
参数
取值建议
模型特征名
自定义。本文以fs_rank_v1为例进行说明。
选择特征
选择已创建的user特征视图和item特征视图。
Label表名称
选择已创建的Label表rec_sln_demo_label_table。
单击提交,完成模型特征创建。
在模型特征列表页面,单击已创建模型右侧的详情。
在弹出的模型特征详情页面的基本信息页签中,可查看导出表名为fs_demo_fs_rank_v1_trainning_set,后续基于该表进行特征生产以及模型训练。
安装FeatureStore Python SDK,具体操作详情请参见在推荐系统中应用FeatureStore管理特征。
方式二:FeatureStore Python SDK操作
登录DataWorks控制台界面。
在左侧导航栏单击资源组列表。
在独享资源组页签中,单击相应调度资源后的图标,选择运维助手。
单击创建命令,在弹出的页面中配置命令参数。
参数
取值建议
命令名称
可自定义名称。本文以install为例进行说明。
命令类型
手动输入(无法使用pip命令安装第三方包)
命令内容
/home/tops/bin/pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl
超时时间
可自定义时间。
单击创建,完成命令创建。
单击运行命令,在弹出的页面中单击运行。
可单击刷新查看最新执行状态,待状态为成功时,即表示完成安装。
使用SDK的具体操作步骤请参见DSW Gallery。
三、同步数据节点运行
上线前需要例行将数据同步节点,即例行将数据从离线数据源同步到在线数据源中,在线会实时地从在线数据源中读取。本示例需要将user特征表和item特征表提交例行,具体操作如下。
登录DataWorks控制台。
在左侧导航栏单击数据开发与治理 > 数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
例行同步user表。
鼠标悬停至新建,选择新建节点 > MaxCompute > PyODPS 3。
复制以下内容到脚本中,完成user_table_preprocess_all_feature_v1例行同步。
from feature_store_py.fs_client import FeatureStoreClient import datetime from feature_store_py.fs_datasource import MaxComputeDataSource import sys cur_day = args['dt'] print('cur_day = ', cur_day) access_key_id = o.account.access_id access_key_secret = o.account.secret_access_key fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing') cur_project_name = 'fs_demo' project = fs.get_project(cur_project_name) feature_view_name = 'user_table_preprocess_all_feature_v1' batch_feature_view = project.get_feature_view(feature_view_name) task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Overwrite') task.wait() task.print_summary()
单击右侧调度配置,在弹出的页面中配置调度参数。
参数
取值建议
调度参数
参数名
dt
参数值
$[yyyymmdd-1]
资源属性
调度资源组
选择已创建的独享资源组。
调度依赖
选择已创建的user表。
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
同步例行item表。
鼠标悬停至新增,选择新建节点 > MaxCompute > PyODPS 3,在弹出的页面中配置节点参数。
单击确认。
复制以下内容到脚本中。
item_table_preprocess_all_feature_v1 同步例行(单击查看详情)
单击右侧调度配置,在弹出的页面中配置调度参数。
参数
取值建议
调度参数
参数名
dt
参数值
$[yyyymmdd-1]
资源属性
调度资源组
选择已创建的独享资源组。
调度依赖
选择已创建的item表。
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
同步完成后,可以在Hologres中查看最新同步的特征。
四、导出Training Set脚本
登录DataWorks控制台。
在左侧导航栏单击数据开发与治理>数据开发。
选择已创建的DataWorks工作空间后,单击进入数据开发。
鼠标悬停至新建,选择新建节点 > MaxCompute>PyODPS 3,在弹出的页面中配置节点参数。
参数
取值建议
引擎实例
选择已创建的MaxCompute引擎。
节点类型
PyODPS 3
路径
业务流程/Workflow/MaxCompute
名称
可自定义名称。
单击确认。
复制以下内容到脚本中。
from feature_store_py.fs_client import FeatureStoreClient from feature_store_py.fs_project import FeatureStoreProject from feature_store_py.fs_datasource import LabelInput, MaxComputeDataSource, TrainingSetOutput from feature_store_py.fs_features import FeatureSelector from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig from feature_store_py.fs_config import TrainSetOutputConfig, EASDeployConfig import datetime import sys cur_day = args['dt'] print('cur_day = ', cur_day) offset = datetime.timedelta(days=-1) pre_day = (datetime.datetime.strptime(cur_day, "%Y%m%d") + offset).strftime('%Y%m%d') print('pre_day = ', pre_day) access_key_id = o.account.access_id access_key_secret = o.account.secret_access_key fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, region='cn-beijing') cur_project_name = 'fs_demo' project = fs.get_project(cur_project_name) label_partitions = PartitionConfig(name = 'ds', value = cur_day) label_input_config = LabelInputConfig(partition_config=label_partitions) user_partitions = PartitionConfig(name = 'ds', value = pre_day) feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1', partition_config=user_partitions) item_partitions = PartitionConfig(name = 'ds', value = pre_day) feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1', partition_config=item_partitions) feature_view_config_list = [feature_view_user_config, feature_view_item_config] train_set_partitions = PartitionConfig(name = 'ds', value = cur_day) train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions) model_name = 'fs_rank_v1' cur_model = project.get_model(model_name) task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config) task.wait() print("task_summary = ", task.task_summary)
单击右侧的调度配置,在弹出的页面中配置调度参数。
参数
取值建议
调度参数
参数名
dt
参数值
$[yyyymmdd-1]
资源属性
调度资源组
选择已创建的独享资源组。
调度依赖
选择已创建的user表和item表。
节点配置并测试完成后,保存并提交节点配置。
执行补数据操作。操作详情请参见同步数据表。
五、安装并使用SDK
Python SDK
具体操作,请参见使用FeatureStore Python SDK搭建推荐系统。
Go SDK
FeatureStore Go SDK已开源,具体请参见aliyun-pai-featurestore-go-sdk。
安装
执行如下代码安装FeatureStore Go SDK。
go get github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2
使用方式
执行如下命令,初始化Client。
accessId := os.Getenv("AccessId") accessKey := os.Getenv("AccessKey") regionId := "cn-hangzhou" projectName := "fs_test_ots" client, err := NewFeatureStoreClient(regionId, accessId, accessKey, projectName)
说明由于SDK是直连在线数据源,客户端需要在VPC环境运行。例如Hologres和GraphCompute需要在指定的VPC才能连接。
获取FeatureView的特征数据。
// get project by name project, err := client.GetProject("fs_test_ots") if err != nil { // t.Fatal(err) } // get featureview by name user_feature_view := project.GetFeatureView("user_fea") if user_feature_view == nil { // t.Fatal("feature view not exist") } // get online features features, err := user_feature_view.GetOnlineFeatures([]interface{}{"100043186", "100060369"}, []string{"*"}, nil)
其中,
[]string{"*"}
代表获取FeatureView下的所有特征, 也可以指定部分特征名称。返回数据示例如下:
[ { "city":"合肥市", "follow_cnt":1, "gender":"male", "user_id":"100043186" }, { "city":"", "follow_cnt":5, "gender":"male", "user_id":"100060369" } ]
获取ModelFeature里的特征数据。
ModelFeature可以关联多个FeatureEntity,可以设置多个join_id,然后特征统一返回。
示例中有两个
join_id
、user_id
和item_id
。获取特征的时候需要设置相同的ID数量。// get project by name project, err := client.GetProject("fs_test_ots") if err != nil { // t.Fatal(err) } // get ModelFeature by name model_feature := project.GetModelFeature("rank") if model_feature == nil { // t.Fatal("model feature not exist") } // get online features features, err := model_feature.GetOnlineFeatures(map[string][]interface{}{"user_id": {"100000676", "100004208"}, "item_id":{"238038872", "264025480"}} )
返回数据示例如下:
[ { "age":26, "author":100015828, "category":"14", "city":"沈阳市", "duration":63, "gender":"male", "item_id":"238038872", "user_id":"100000676" }, { "age":23, "author":100015828, "category":"15", "city":"西安市", "duration":22, "gender":"male", "item_id":"264025480", "user_id":"100004208" } ]
也可以指定某个FeatureEntity,将FeatureEntity对应的特征一起返回。
返回数据示例如下:
[ { "age":26, "city":"沈阳市", "gender":"male", "user_id":"100000676" }, { "age":23, "city":"西安市", "gender":"male", "user_id":"100004208" } ]
Java SDK
FeatureStore Java SDK已开源,具体请参见aliyun-pai-featurestore-java-sdk。
本文以Hologres数据源为例进行说明。
执行如下代码,加载环境变量并初始化服务。
public static String accessId = ""; public static String accessKey = ""; #host具体根据所在地域配置 public static String host = ""; #获取本地配置的环境变量中配置好的accessId、accessKey static { accessId = System.getenv("ACCESS_KEY_ID"); accessKey = System.getenv("ACCESS_KEY_SECRET"); }
准备配置类(配置
regionId
、accessId
、accessKey
以及项目名称)。Configuration cf = new Configuration("cn-hangzhou",Constants.accessId,Constants.accessKey,"ele28"); cf.setDomain(Constants.host);//注意:默认vpc环境
初始化客户端。
ApiClient apiClient = new ApiClient(cf); #FS客户端 FeatureStoreClient featureStoreClient = new FeatureStoreClient(apiClient);
获取项目(此处以项目名称为ele28为例进行说明)。
Project project=featureStoreClient.getProject("ele28"); if(project==null){ throw new RuntimeException("Project not found"); }
获取项目的特征视图(此处以特征视图名称为mc_test为例进行说明)。
FeatureView featureView=project.getFeatureView("mc_test"); if (featureView == null) { throw new RuntimeException("FeatureView not found"); }
根据特征视图获取线上特征数据。
Map<String,String> m1=new HashMap<>(); m1.put("gender","gender1"); //起别名 user_id='100027781'(FS_INT64) age='28'(FS_INT64) city='null'(FS_STRING) item_cnt='0'(FS_INT64) follow_cnt='0'(FS_INT64) follower_cnt='2'(FS_INT64) register_time='1697641608'(FS_INT64) tags='0'(FS_STRING) gender1='female'(FS_STRING) ---------------
获取线上特征(
String[]{"*"}
获取所有属性信息,也可以传入部分属性,输出仅见部分信息的数据)。FeatureResult featureResult1=featureView.getOnlineFeatures(new String[]{"100017768","100027781","100072534"},new String[]{"*"},m1);
输出特征信息。
while(featureResult1.next()){ System.out.println("---------------"); #特征名 for(String m:featureResult1.getFeatureFields()){ System.out.print(String.format("%s=%s(%s) ",m,featureResult1.getObject(m),featureResult1.getType(m))); } System.out.println("---------------"); }
返回数据如下。
--------------- user_id='100017768'(FS_INT64) age='28'(FS_INT64) city='东莞市'(FS_STRING) item_cnt='1'(FS_INT64) follow_cnt='1'(FS_INT64) follower_cnt='0'(FS_INT64) register_time='1697202320'(FS_INT64) tags='1,2'(FS_STRING) gender1='female'(FS_STRING) ---------------
获取模型。
Model model=project.getModelFeature("model_t1"); if(model==null){ throw new RuntimeException("Model not found"); }
获取模型特征里的数据。
两个join_id实例(user_id,item_id),传入的值个数需对应(此处只传入了一个值)。
Map<String, List<String>> m2=new HashMap<>(); m2.put("user_id",Arrays.asList("101683057")); m2.put("item_id",Arrays.asList("203665415"));
获取模型特征有关于user特征实体的所有特征数据。
FeatureResult featureResult2 = model.getOnlineFeaturesWithEntity(m2,"user");
返回数据如下。
--------------- user_id='101683057' age='28' city='深圳市' follower_cnt='234' follow_cnt='0' gender='male' item_cnt='0' register_time='1696407642' tags='2' item_id='203665415' author='132920407' category='14' click_count='0' duration='18.0' praise_count='10' pub_time='1698218997' title='#成语故事' ---------------
Cpp SDK
目前FeatureStore Cpp SDK集成在EasyRec Processor(推荐打分服务) 中,针对特征提取、缓存管理及读取操作进行了专项优化,为大规模推荐场景提供高效能、低延迟的解决方案。
内存占用:在面临复杂且规模庞大的特征数据时,内存资源消耗得到大幅削减,尤其在高特征负载情况下,节省效果更为明显。
拉取特征时间:通过将在线存储(如Hologres、GraphCompute等)的拉取流程改为直接从 MaxCompute拉取至EAS缓存,特征加载时间显著缩短。同时,MaxCompute稳定性和扩展性更好,有效减轻了扩容对在线存储的压力。
模型打分耗时:使用该SDK后,模型打分性能指标tp100得到进一步改善,响应时间更稳定,超时请求显著减少,整体提升了推荐服务的可靠性和用户体验。
相关文档
特征平台也可以与其他云产品组合使用,搭建推荐系统。具体操作,请参见在推荐系统中应用FeatureStore管理特征。