本文主要介紹使用FeatureStore Python SDK完成特徵平台在推薦情境中全流程的建立以及上線的過程。
前提條件
在開始執行操作前,請確認您已完成以下準備工作。
依賴產品 | 具體操作 |
人工智慧平台 PAI |
|
雲原生MaxCompute MaxCompute |
|
即時數倉 Hologres |
|
巨量資料開發治理平台 DataWorks |
|
一、準備資料
同步資料表
一般對於推薦情境,需要準備以下資料表:user側的特徵表、item側的特徵表、Label表、序列特徵表和行為表。
為方便實踐操作,我們在MaxCompute的pai_online_project專案中提前準備了類比產生的使用者表、物料表、Label表、序列特徵表和行為表進行樣本說明。您需要在DataWorks中執行SQL命令,將以上表從pai_online_project專案同步到自己的MaxCompute中。具體操作步驟如下:
登入DataWorks控制台。
在左側導覽列單擊資料開發與治理 > 資料開發。
選擇已建立的DataWorks工作空間後,單擊進入資料開發。
滑鼠移至上方至建立,選擇建立節點 > MaxCompute > ODPS SQL,在彈出的頁面中配置節點參數。
參數
取值建議
引擎執行個體
選擇已建立的MaxCompute引擎。
節點類型
ODPS SQL
路徑
商務程序/Workflow/MaxCompute
名稱
可自訂名稱。
單擊確認。
在建立節點地區運行以下SQL命令,將使用者表、物料表、Label表、序列特徵表和行為表從pai_online_project專案同步到自己的MaxCompute中。資源群組選擇已建立的獨享資源群組。
同步處理的使用者表:rec_sln_demo_user_table_preprocess_all_feature_v1(單擊查看詳情)
同步物料表:rec_sln_demo_item_table_preprocess_all_feature_v1(單擊查看詳情)
同步Label表:rec_sln_demo_label_table(單擊查看詳情)
同步序列特徵表:rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3(單擊查看詳情)
同步行為表:rec_sln_demo_behavior_table_preprocess_v3(單擊查看詳情)
完成上述操作後,您可以在自己的工作空間內查看已同步的表,後續的實踐操作將以這五張表為例進行說明。
配置資料來源
FeatureStore一般需要配置兩個資料來源:離線資料來源(MaxCompute)和線上資料來源(Hologres/GraphCompute/TableStore)。本文以MaxCompute和Hologres為例進行說明。
登入PAI控制台,在左側導覽列單擊資料準備>特徵平台(FeatureStore)。
選擇工作空間後,單擊進入FeatureStore。
配置MaxCompute資料來源。
在資料來源頁簽,單擊建立資料來源,在彈出的頁面中配置MaxCompute資料來源具體參數。
參數
取值建議
類型
MaxCompute
名稱
可自訂名稱。
MaxCompute專案名
選擇已建立的MaxCompute專案。
請複製語句並單擊立即前往,同步資料至Hologres,在DataWorks執行該語句後,即可完成授權。
說明授權操作需要您的帳號擁有admin許可權,具體操作詳情請參見通過命令系統管理使用者許可權或通過控制台系統管理使用者許可權。
完成後單擊提交。
配置Hologres資料來源。
在資料來源頁簽,單擊建立資料來源,在彈出的頁面中配置Hologres資料來源具體參數。
參數
取值建議
類型
Hologres
名稱
可自訂名稱。
執行個體ID
選擇已建立的Hologres執行個體名稱。
資料庫名稱
選擇已建立的執行個體資料庫。
完成後單擊提交。
對Hologres進行授權,具體操作詳情請參見配置資料來源。
二、FeatureStore Python SDK建立流程
安裝特徵平台Python SDK, 要求在Python3環境下運行。以下代碼均建議在Jupyter Notebook環境下運行。
! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl
匯入需要的功能模組。
import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient, build_feature_store_client
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
特徵工程專案
您可以通過特徵平台建立多重專案空間,每個專案空間是獨立的。具體操作,請參見配置FeatureStore專案。運行Notebook需要FeatureStore服務端配合運行,開通特徵平台後需要配置資料來源,具體操作請參見配置資料來源。
其中,offline_datasource_id指的是離線資料來源ID,online_datasource_id指的是線上資料來源ID。
此處以專案名稱是fs_movie進行說明。
access_id = ''
access_ak = ''
region = 'cn-beijing'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_demo"
project = fs.get_project(cur_project_name)
if project is None:
raise ValueError("Need to create project : fs_movie")
運行以下代碼擷取當前的project並列印其資訊。
project = fs.get_project(cur_project_name)
project.print_summary()
特徵實體(FeatureEntity)
特徵實體描述了一組相關的特徵集合。多個特徵視圖可以關聯一個特徵實體。每個實體都會有一個JoinId,通過JoinId可以關聯多個特徵視圖特徵。每一個特徵視圖都有一個主鍵(索引鍵)來擷取它的特徵資料,但是索引鍵可以和JoinId定義的名稱不一樣。
一般在推薦系統中,特徵只關聯user和item兩個特徵實體,即特徵屬於user側或者item側。此處以建立 user和item兩個特徵實體為例進行說明。
建立user entity
user_entity_name = "user" user_join_id = 'user_id' user_entity = project.get_entity(user_entity_name) if user_entity is None: user_entity = project.create_entity(name = user_entity_name, join_id=user_join_id) user_entity.print_summary()
建立item entity
item_entity_name = "item" join_id = 'item_id' item_entity = project.get_entity(item_entity_name) if item_entity is None: item_entity = project.create_entity(name = item_entity_name, join_id=join_id) item_entity.print_summary()
特徵視圖(FeatureView)
FeatureStore是一個專門用來管理和組織特徵資料的平台,外部資料需要通過特徵視圖進入FeatureStore。特徵視圖定義了資料從哪裡來(DataSource)、需要進行哪些預先處理或轉換操作(如特徵工程/Transformation)、特徵的資料結構(包含特徵名稱和類型在內的特徵schema)、資料存放區的位置(OnlineStore/OfflineStore),並提供特徵元資訊管理,如主鍵、事件時間、分區鍵、特徵實體以及有效期間設定ttl(預設-1表示永久有效,正數則表示線上查詢時會取ttl內的最新特徵資料)。
特徵視圖分為三種類型:Batch FeatureView(離線特徵,或者T-1天特徵)、Stream FeatureView(即時特徵)、Sequence FeatureView(序列特徵)。
Batch FeatureView(離線特徵視圖)
將離線資料注入到FeatureStore的OfflineStore中,並可以根據需求同步至OnlineStore以支援即時查詢。一般的離線特徵,或者T-1天的特徵。
註冊user側離線特徵表
將rec_sln_demo_user_table_preprocess_all_feature_v1註冊到特徵平台。
user_feature_view_name = "user_table_preprocess_all_feature_v1" user_table_name = "rec_sln_demo_user_table_preprocess_all_feature_v1" user_feature_view = project.get_feature_view(user_feature_view_name) if user_feature_view is None: ds = MaxComputeDataSource(project.offline_datasource_id, user_table_name) user_feature_view = project.create_batch_feature_view(name=user_feature_view_name, datasource=ds, online=True, entity= user_entity_name, primary_key='user_id', register=True) print(user_feature_view)
將離線資料來源中的rec_sln_demo_user_table_preprocess_all_feature_v1中20231023分區的資料同步到線上資料來源。
user_task = user_feature_view.publish_table({'ds':'20231023'}) user_task.wait()
查看任務運行情況。
user_task.print_summary()
註冊item側離線特徵表
將rec_sln_demo_item_table_preprocess_all_feature_v1表註冊到特徵平台。
item_feature_view_name = "item_table_preprocess_all_feature_v1" item_table_name = "rec_sln_demo_item_table_preprocess_all_feature_v1" item_feature_view = project.get_feature_view(item_feature_view_name) if item_feature_view is None: ds = MaxComputeDataSource(project.offline_datasource_id, item_table_name) item_feature_view = project.create_batch_feature_view(name=item_feature_view_name, datasource=ds, online = True, entity= item_entity_name, primary_key='item_id', register=True) print(item_feature_view)
將離線資料來源中的rec_sln_demo_item_table_preprocess_all_feature_v1中20231023分區的資料同步到線上資料來源。
item_task = item_feature_view.publish_table({'ds':'20231023'}) item_task.wait()
查看任務運行情況。
item_task.print_summary()
Sequence FeatureView(即時序列視圖)
支援離線寫入序列特徵,以及查詢和讀取即時序列特徵。一般在推薦情境中,離線序列特徵表(記為F1)前期由類比產生,後續可以直接由線上日誌代替。而線上即時序列的查詢中,會查詢兩張線上行為表的資料,一張為T-1天的行為表(記為B1),另外一張為T天當天的即時行為表(記為B2)。B2是Realtime Compute更新的特徵,同時查詢B1和B2行為表後拼出該使用者的特徵序列,然後與其他特徵合在一起送到模型打分。
線上T-1天的行為表B1一般是由離線T-1天行為表(記為A1)同步,同步過程中,特徵平台會自動完成去重等操作。 線上T天的行為表B2目前需要您通過API介面或Flink等阿里雲產品寫入。
因此在註冊即時特徵視圖過程中,相當於特徵平台同時維護管理了四張表:離線序列表F1、離線T-1天的行為表A1、線上T-1天的行為表B1、線上T天的行為表B2。
註冊時,只需要提供離線序列表F1,離線T-1天的行為表A1即可,線上行為表和同步去重過程由特徵平台完成。
註冊即時特徵視圖。
seq_feature_view_name = "wide_seq_feature_v3" seq_feature_view = project.get_feature_view(seq_feature_view_name) if seq_feature_view is None: seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3" behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3' ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name) event_time = 'event_unix_time' # 行為表中的 event time 欄位名 item_id = 'item_id' # 行為表中的 item_id 欄位名 event = 'event' # 行為表中的 event 欄位名 # deduplication_method = 1, 表示 ['user_id', 'item_id', 'event'] 去重 # deduplication_method = 2, 表示 ['user_id', 'item_id', 'event', 'event_time'] 去重 sequence_feature_config_list = [SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50', seq_len=50)] # offline_seq_name 指的是離線序列表中序列特徵欄位名,seq_event 指的是行為欄位名,online_seq_name 指的是特徵平台線上 Go SDK 查出該 user 對應的行為序列 item_ids 後,會以該名稱命名。 # seq_len 指的是序列長度,大於該長度的序列會被截斷。 seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id', event_time='event_unix_time') seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds, event_time=event_time, item_id=item_id, event=event, deduplication_method=1, sequence_feature_config=sequence_feature_config_list, sequence_table_config=seq_table_config, entity=user_entity_name) # seq_feature_view.print_summary() print(seq_feature_view)
將離線資料來源中rec_sln_demo_behavior_table_preprocess_v3中20231023分區的資料同步到線上資料來源。同步時會自動檢查是否有前N天分區的資料,如果沒有會自動補充,N可以通過
days_to_load
指定,預設是30,一般取30即可。seq_task = seq_feature_view.publish_table({'ds':'20231023'}, days_to_load=30) seq_task.wait()
查看任務運行情況。
seq_task.print_summary()
Stream FeatureView(即時特徵視圖)
將資料直接寫入OnlineStore,並同時同步到OfflineStore。用於特徵即時更新的情境,例如商品的價格、銷量的即時更新。
Label 表註冊
label_table_name = 'rec_sln_demo_label_table'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
label_table = project.create_label_table(datasource=ds, event_time='event_unix_time')
print(label_table)
擷取線上特徵
擷取線上特徵用於排查離線線上一致性,資料分析等需求,目前優先支援 Hologres。
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
ret_features_1 = user_feature_view.get_online_features(join_ids={'user_id':['169898460', '148811946']}, features=['user_id', 'gender', 'age', 'city'])
print("ret_features = ", ret_features_1)
TrainingSet
訓練模型時,首先要構造樣本表,樣本表由Label資料和特徵資料群組成。在與FeatureStore互動時,Label資料需要由客戶提供,並且需要定義要擷取的特徵名稱,然後根據主鍵進行point-in-time join(存在event_time的情況下)。
# 指定 Label 表
label_table_name = 'rec_sln_demo_label_table'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_selector = FeatureSelector(user_feature_view_name, '*') # '*' 代表選擇全部特徵
item_feature_view_name = "item_table_preprocess_all_feature_v1"
item_feature_selector = FeatureSelector(item_feature_view_name, '*')
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_selector = FeatureSelector(seq_feature_view_name, ['click_seq_50_seq'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[user_feature_selector, item_feature_selector, seq_feature_selector])
print("train_set = ", train_set)
模型特徵
訓練模型並部署成服務後,進行業務預測。其中,訓練樣本可以從上文的train_set獲得。
model_name = "fs_rank_v2"
cur_model = project.get_model(model_name)
if cur_model is None:
cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)
三、匯出樣本表並訓練模型
實際訓練時,需要匯出樣本表。
匯出樣本表
指定Label表以及各個特徵視圖的分區、event_time。
cur_day = '20231024'
pre_day = '20231023'
label_partitions = PartitionConfig(name = 'ds', value = cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)
user_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1',
partition_config=user_partitions)
item_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1',
partition_config=item_partitions)
seq_partitions = PartitionConfig(name = 'ds', value = cur_day)
feature_view_seq_config = FeatureViewConfig(name = 'wide_seq_feature_v3', partition_config=seq_partitions, event_time='event_unix_time', equal=True)
feature_view_config_list = [feature_view_user_config, feature_view_item_config, feature_view_seq_config]
train_set_partitions = PartitionConfig(name = 'ds', value = cur_day)
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)
model_name = 'fs_rank_v2'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task_summary = ", task.task_summary)
訓練模型
EasyRec是一個開源的推薦系統架構,可以與FeatureStore無縫銜接,進行訓練模型、匯出模型、上線模型的操作。推薦您將fs_demo_fs_rank_v2_trainning_set表作為輸入,使用EasyRec訓練模型。
更多EasyRec相關的問題,請通過DingTalk群(32260796)加入阿里雲人工智慧平台PAI諮詢群聯絡我們。
四、上線模型
訓練並匯出模型後,可以進行部署和上線的操作。如果是自建推薦系統,FeatureStore提供FeatureStore Python/Go/Cpp/Java SDK,可以與各式各樣的自建推薦系統進行銜接。您也可以通過DingTalk群(32260796)聯絡我們諮詢和商討具體方案。如果是阿里雲系列產品,可以和FeatureStore無縫銜接,快速搭建推薦系統上線。
本文以阿里雲系列產品為例介紹上線模型操作。
例行同步資料節點
上線前需要將資料同步節點例行,即例行將資料從離線資料來源同步到線上資料來源中
登入DataWorks控制台。
在左側導覽列單擊資料開發與治理 > 資料開發。
選擇已建立的DataWorks工作空間後,單擊進入資料開發。
同步例行user表。
滑鼠移至上方至建立,選擇建立節點 > MaxCompute > PyODPS 3。
在彈出的頁面中配置節點參數,單擊確認。
複製以下內容到指令碼中,完成user_table_preprocess_all_feature_v1例行同步。
單擊右側調度配置,在彈出的頁面中配置調度參數。
參數
取值建議
調度參數
參數名
dt
參數值
$[yyyymmdd-1]
資源屬性
調度資源群組
選擇已建立的獨享資源群組。
調度依賴
選擇已建立的user表。
節點配置並測試完成後,儲存並提交節點配置。
執行補資料操作。操作詳情請參見同步資料表。
同步例行item表。
滑鼠移至上方至建立,選擇建立節點 > MaxCompute > PyODPS 3。
在彈出的頁面中配置節點參數,單擊確認。
複製以下內容到指令碼中。
單擊右側調度配置,在彈出的頁面中配置調度參數。
參數
取值建議
調度參數
參數名
dt
參數值
$[yyyymmdd-1]
資源屬性
調度資源群組
選擇已建立的獨享資源群組。
調度依賴
選擇已建立的item表。
節點配置並測試完成後,儲存並提交節點配置。
執行補資料操作。操作詳情請參見同步資料表。
同步即時序列行為表。
滑鼠移至上方至建立,選擇建立節點 > MaxCompute > PyODPS 3。
在彈出的頁面中配置節點參數,單擊確認。
複製以下內容到指令碼中。
單擊右側調度配置,在彈出的頁面中配置調度參數。
參數
取值建議
調度參數
參數名
dt
參數值
$[yyyymmdd-1]
資源屬性
調度資源群組
選擇已建立的獨享資源群組。
調度依賴
選擇已建立的item表。
節點配置並測試完成後,儲存並提交節點配置。
執行補資料操作。操作詳情請參見同步資料表。
同步完成後,可以在Hologres中查看最新同步的特徵。
建立與部署EAS模型服務
模型服務是為了接收Recommendation Engine的請求,根據請求為item集合打分並返回分數。其中EasyRec Processor內部包含了FeatureStore Cpp SDK,可以實現低延時、高效能的擷取特徵操作,EasyRec Processor從Feature Store Cpp SDK取得特徵後,送入模型進行推理,拿到打分後返回給Recommendation Engine。
部署模型服務的流程如下。
登入DataWorks控制台。
在左側導覽列單擊資料開發與治理 > 資料開發。
選擇已建立的DataWorks工作空間後,單擊進入資料開發。
滑鼠移至上方至建立,選擇建立節點 > MaxCompute > PyODPS 3。
在彈出的頁面中配置節點參數,單擊確認。
複製以下內容到指令碼中。
import os import json config = { "name": "fs_demo_v1", "metadata": { "cpu": 4, "rpc.max_queue_size": 256, "rpc.enable_jemalloc": 1, "gateway": "default", "memory": 16000 }, "model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg", "model_config": { "access_key_id": f'{o.account.access_id}', "access_key_secret": f'{o.account.secret_access_key}', "region": "cn-beijing", "fs_project": "fs_demo", "fs_model": "fs_rank_v2", "fs_entity": "item", "load_feature_from_offlinestore": True, "steady_mode": True, "period": 2880, "outputs": "probs_is_click,y_ln_playtime,probs_is_praise", "fg_mode": "tf" }, "processor": "easyrec-1.8", "processor_type": "cpp" } with open("echo.json", "w") as output_file: json.dump(config, output_file) # 第一次部署時運行這行 os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json") # 例行更新時運行下面這行 # os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")
單擊右側調度配置,在彈出的頁面中配置調度參數。
參數
取值建議
調度參數
參數名
dt
參數值
$[yyyymmdd-1]
資源屬性
調度資源群組
選擇已建立的獨享資源群組。
調度依賴
選擇對應的訓練任務和item_table_preprocess_all_feature_v1。
節點配置並測試完成後,運行查看部署情況。
部署完成後,注釋掉第34行代碼,並將37行取消注釋,提交任務例行。
(可選)您可以在EAS模型線上服務頁面的推理服務頁簽,查看已部署的服務。操作詳情請參見服務部署:控制台。
配置PAI-Rec
PAI-Rec是Recommendation Engine服務,其中整合了FeatureStore的Go SDK,可以與FeatureStore和EAS無縫銜接。
具體配置步驟如下。
配置FeatureStoreConfs。
RegionId
:修改為產品實際所在地區,此處以cn-beijing為例。ProjectName
:已建立的FeatureStore專案名稱,fs_demo。
"FeatureStoreConfs": { "pairec-fs": { "RegionId": "cn-beijing", "AccessId": "${AccessKey}", "AccessKey": "${AccessSecret}", "ProjectName": "fs_demo" } },
配置FeatureConfs。
FeatureStoreName
:和上一步FeatureStoreConfs中的設定pairec-fs保持一致。FeatureStoreModelName
:已建立的模型特徵名稱,fs_rank_v1。FeatureStoreEntityName
:已建立的特徵實體名稱,user。表示PAI-Rec引擎中通過FeatureStore Go SDK去擷取模型fs_rank_v1中特徵實體為user側的特徵。
"FeatureConfs": { "recreation_rec": { "AsynLoadFeature": true, "FeatureLoadConfs": [ { "FeatureDaoConf": { "AdapterType": "featurestore", "FeatureStoreName": "pairec-fs", "FeatureKey": "user:uid", "FeatureStoreModelName": "fs_rank_v1", "FeatureStoreEntityName": "user", "FeatureStore": "user" } } ] } },
配置AlgoConfs。
此配置代表告訴PAI-Rec去串連哪一個EAS模型打分服務。
Name
:和部署的EAS服務名稱一致。Url
和Auth
:EAS服務給出的資訊,您可以在EAS模型線上服務頁面單擊服務名稱,然後在服務詳情頁簽單擊查看調用資訊擷取URL和Token。更多詳細配置可參見EAS常見問題。
"AlgoConfs": [ { "Name": "fs_demo_v1", "Type": "EAS", "EasConf": { "Processor": "EasyRec", "Timeout": 300, "ResponseFuncName": "easyrecMutValResponseFunc", "Url": "eas_url_xxx", "EndpointType": "DIRECT", "Auth": "eas_token" } } ],