すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:FeatureStore Python SDK を使用したレコメンデーションシステムの構築

最終更新日:Jan 28, 2026

このトピックでは、FeatureStore Python SDK (Software Development Kit) を使用して、Platform for AI (PAI) FeatureStore 上でエンドツーエンドのレコメンデーションシステムを構築し、公開する方法について説明します。

前提条件

開始する前に、以下の準備が完了していることを確認してください。

製品の依存関係

操作手順

Platform for AI (PAI)

  • PAI を有効化し、デフォルトワークスペースを作成します。詳細については、「PAI の有効化とデフォルトワークスペースの作成」をご参照ください。

  • Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret を取得します。詳細については、「AccessKey ペアの作成」をご参照ください。

    ローカル環境変数を設定して AccessKey ID と AccessKey Secret を保存することを推奨します。詳細については、「環境変数の設定」をご参照ください。

MaxCompute

FeatureDB

DataWorks

パート 1:データ準備

データテーブルの同期

レコメンデーションシナリオでは、通常、ユーザー特徴テーブルアイテム特徴テーブルラベルテーブルシーケンス特徴テーブル行動テーブルといったデータテーブルを準備する必要があります。

このチュートリアルを簡潔にするため、pai_online_project MaxCompute プロジェクトにサンプルのユーザー、アイテム、ラベル、シーケンス特徴、行動テーブルを用意しました。DataWorks で SQL コマンドを実行して、これらのテーブルを pai_online_project プロジェクトからご自身の MaxCompute プロジェクトに同期する必要があります。手順は以下の通りです:

  1. DataWorks コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] をクリックします。

  3. ご利用の DataWorks ワークスペースを選択し、[データ開発に進む] をクリックします。

  4. [新規] にカーソルを合わせ、[新規ノード] > [MaxCompute] > [ODPS SQL] を選択します。表示されたダイアログボックスで、ノードパラメーターを設定します。

    パラメーター

    推奨値

    エンジンインスタンス

    ご利用の MaxCompute エンジンを選択します。

    ノードタイプ

    ODPS SQL

    パス

    Business Flow/Workflow/MaxCompute

    名前

    カスタム名を指定できます。

  5. [確認] をクリックします。

  6. ノードエディターで、以下の SQL コマンドを実行して、ユーザー、アイテム、ラベル、シーケンス特徴、行動テーブルを pai_online_project プロジェクトからご利用の MaxCompute プロジェクトに同期します。リソースグループには、作成した専用リソースグループを選択します。

    ユーザーテーブルの同期:rec_sln_demo_user_table_preprocess_all_feature_v1 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_user_table_preprocess_all_feature_v1
    like pai_online_project.rec_sln_demo_user_table_preprocess_all_feature_v1
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_user_table_preprocess_all_feature_v1 PARTITION (ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_user_table_preprocess_all_feature_v1
    WHERE ds >= '20231022' and ds <='20231024';

    コマンドを実行すると、以下の 3 つのパーティションからデータが返されます:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    アイテムテーブルの同期:rec_sln_demo_item_table_preprocess_all_feature_v1 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_item_table_preprocess_all_feature_v1
    like pai_online_project.rec_sln_demo_item_table_preprocess_all_feature_v1
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_item_table_preprocess_all_feature_v1 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_item_table_preprocess_all_feature_v1
    WHERE ds >= '20231022' and ds <='20231024';

    コマンドを実行すると、以下の 3 つのパーティションからデータが返されます:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    ラベルテーブルの同期:rec_sln_demo_label_table (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_label_table
    like pai_online_project.rec_sln_demo_label_table
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_label_table PARTITION (ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_label_table
    WHERE ds >= '20231022' and ds <='20231024'

    コマンドを実行すると、以下の 3 つのパーティションからデータが返されます:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    シーケンス特徴テーブルの同期:rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    like pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3
    WHERE ds >= '20231022' and ds <='20231024';

    コマンドを実行すると、以下の 3 つのパーティションからデータが返されます:

    • ds=20231022

    • ds=20231023

    • ds=20231024

    行動テーブルの同期:rec_sln_demo_behavior_table_preprocess_v3 (クリックして詳細を表示)

    CREATE TABLE IF NOT EXISTS rec_sln_demo_behavior_table_preprocess_v3
    like pai_online_project.rec_sln_demo_behavior_table_preprocess_v3
    STORED AS ALIORC  
    LIFECYCLE 90;
    
    
    INSERT OVERWRITE TABLE rec_sln_demo_behavior_table_preprocess_v3 PARTITION(ds)
    SELECT *
    FROM pai_online_project.rec_sln_demo_behavior_table_preprocess_v3
    WHERE ds >= '20231022' and ds <='20231024';

これらの手順を完了すると、同期されたテーブルがワークスペースで利用可能になります。これら 5 つのテーブルは、以降のセクションで例として使用されます。

データソースの設定

通常、FeatureStore には 2 つのデータソースを設定する必要があります:オフラインデータソース (MaxCompute) とオンラインデータソース (FeatureDB、Hologres、または TableStore) です。このトピックでは、MaxComputeFeatureDB を例として使用します。

  1. PAI コンソールにログインします。左側のナビゲーションウィンドウで、Data Preparation > FeatureStore をクリックします。

  2. ワークスペースを選択し、[フィーチャーストアに進む] をクリックします。

  3. MaxCompute データソースを設定します。

    1. [データソース] タブで、Create Store をクリックします。表示されたダイアログボックスで、MaxCompute データソースのパラメーターを設定します。

      パラメーター

      推奨値

      Type

      MaxCompute

      Name

      カスタム名を入力します。

      MaxCompute Project Name

      ご利用の MaxCompute プロジェクトを選択します。

    2. 設定が完了したら、Submit をクリックします。

  4. FeatureDB データソースを設定します。

    1. すでに FeatureDB データソースを作成している場合は、この手順をスキップできます。

    2. Store タブで、Create Store をクリックします。表示されたダイアログボックスで、FeatureDB データソースのパラメーターを設定します。

      パラメーター

      推奨値

      Type

      FeatureDB (初めて使用する場合は、画面の指示に従って FeatureDB を有効化してください)

      Name

      カスタム名はサポートされていません。デフォルト値は feature_db です。

      Username

      ユーザー名を設定します。

      Password

      パスワードを設定します。

      VPC 高速接続 (オプション)

      設定が成功すると、VPC 内の FeatureStore SDK を使用して、PrivateLink 接続経由で FeatureDB に直接アクセスできます。これにより、データの読み書きパフォーマンスが向上し、アクセスレイテンシが削減されます。

      VPC

      オンライン FeatureStore サービスが配置されている VPC を選択します。

      ゾーンと vSwitch

      ゾーンと vSwitch を選択します。オンラインサービスのマシンが配置されているゾーンの vSwitch を選択してください。サービスの高い可用性と安定性を確保するために、少なくとも 2 つのゾーンの vSwitch を選択することを推奨します。

    3. 設定が完了したら、Submit をクリックします。

パート 2:FeatureStore Python SDK を使用した作成プロセス

FeatureStore Python SDK をインストールします。SDK には Python 3 環境が必要です。以下のコードはすべて Jupyter Notebook 環境で実行することを推奨します。

! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-2.0.2-py3-none-any.whl

必要な機能モジュールをインポートできます。

import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig, SequenceFeatureConfig, SequenceTableConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

特徴エンジニアリングプロジェクト

FeatureStore では、複数の独立したプロジェクトを作成できます。詳細については、「FeatureStore プロジェクトの設定」をご参照ください。Notebook を実行するには、FeatureStore のサーバーサイド環境が必要です。FeatureStore を有効化した後、データソースも設定する必要があります。詳細については、「新しいデータソースの作成」をご参照ください。

`offline_datasource_id` と `online_datasource_id` パラメーターは、それぞれオフラインとオンラインのデータソース ID を指定します。

このトピックでは、fs_movie という名前のプロジェクトを例として使用します。

access_id = ''
access_ak = ''
region = 'cn-beijing'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
cur_project_name = "fs_demo"
project = fs.get_project(cur_project_name)
if project is None:
  raise ValueError("Need to create project : fs_movie")

以下のコードを実行して、現在のプロジェクトを取得し、その情報を出力します。

project = fs.get_project(cur_project_name)
print(project)

特徴エンティティ (FeatureEntity)

特徴エンティティは、関連する特徴のセットを記述します。単一の特徴エンティティは、複数の特徴ビューに関連付けることができます。各エンティティには、これらのビューからの特徴を関連付けるために使用される JoinId があります。各特徴ビューには、その特徴データを取得するためのプライマリキー (インデックスキー) がありますが、インデックスキーの名前は JoinId で定義された名前と異なる場合があります。

レコメンデーションシステムでは、特徴は通常、ユーザーとアイテムの 2 つの特徴エンティティにのみ関連付けられます。つまり、特徴はユーザーまたはアイテムのいずれかに属します。このトピックでは、ユーザーとアイテムの特徴エンティティを作成する例を示します。

  • ユーザーエンティティの作成

    user_entity_name = "user"
    user_join_id = 'user_id'
    user_entity = project.get_entity(user_entity_name)
    if user_entity is None:
     user_entity = project.create_entity(name = user_entity_name, join_id=user_join_id)
    user_entity.print_summary()
    
  • アイテムエンティティの作成

    item_entity_name = "item"
    join_id = 'item_id'
    item_entity = project.get_entity(item_entity_name)
    if item_entity is None:
     item_entity = project.create_entity(name = item_entity_name, join_id=join_id)
    item_entity.print_summary()

特徴ビュー (FeatureView)

FeatureStore は、特徴データを管理および整理するためのプラットフォームです。外部データは、特徴ビューを介して FeatureStore に取り込まれます。特徴ビューは、データソース、必要な前処理または変換操作 (特徴エンジニアリングや変換など)、特徴のデータ構造 (特徴名と型を含む)、および保存場所 (オンラインストアまたはオフラインストア) を定義します。また、プライマリキー、イベント時間、パーティションキー、特徴エンティティ、生存時間 (TTL) などの特徴メタデータも管理します。TTL が -1 (デフォルト) の場合は、データが永続的に保存されることを示します。正の数の場合は、オンラインクエリが指定された TTL 内の最新の特徴データを取得することを意味します。

特徴ビューには、バッチ特徴ビュー (オフライン特徴または T-1 日特徴用)、ストリーム特徴ビュー (リアルタイム特徴用)、シーケンス特徴ビュー (シーケンス特徴用) の 3 種類があります。

バッチ特徴ビュー (オフライン特徴ビュー)

バッチ特徴ビューは、オフラインデータを FeatureStore のオフラインストアに注入し、データをオンラインストアに同期してリアルタイムクエリをサポートできます。このタイプのビューは、通常、オフライン特徴または T-1 日特徴に使用されます。

  • ユーザー側のオフライン特徴テーブルの登録

    1. rec_sln_demo_user_table_preprocess_all_feature_v1 テーブルを FeatureStore に登録します。

      user_feature_view_name = "user_table_preprocess_all_feature_v1"
      user_table_name = "rec_sln_demo_user_table_preprocess_all_feature_v1"
      user_feature_view = project.get_feature_view(user_feature_view_name)
      if user_feature_view is None:
       ds = MaxComputeDataSource(project.offline_datasource_id, user_table_name)
       user_feature_view = project.create_batch_feature_view(name=user_feature_view_name, datasource=ds, online=True, entity= user_entity_name, primary_key='user_id', register=True)
      print(user_feature_view)
    2. オフラインストアの rec_sln_demo_user_table_preprocess_all_feature_v1 テーブルの 20231023 パーティションからオンラインストアにデータを同期します。

      user_task = user_feature_view.publish_table({'ds':'20231023'})
      user_task.wait()
    3. タスクの実行ステータスを表示します。

      user_task.print_summary()
  • アイテム側のオフライン特徴テーブルの登録

    1. rec_sln_demo_item_table_preprocess_all_feature_v1 テーブルを FeatureStore に登録します。

      item_feature_view_name = "item_table_preprocess_all_feature_v1"
      item_table_name = "rec_sln_demo_item_table_preprocess_all_feature_v1"
      item_feature_view = project.get_feature_view(item_feature_view_name)
      if item_feature_view is None:
        ds = MaxComputeDataSource(project.offline_datasource_id, item_table_name)
        item_feature_view = project.create_batch_feature_view(name=item_feature_view_name, datasource=ds, online = True, entity= item_entity_name, primary_key='item_id', register=True)
      print(item_feature_view)
    2. オフラインストアの rec_sln_demo_item_table_preprocess_all_feature_v1 テーブルの 20231023 パーティションからオンラインストアにデータを同期します。

      item_task = item_feature_view.publish_table({'ds':'20231023'})
      item_task.wait()
    3. タスクの実行ステータスを表示します。

      item_task.print_summary()

シーケンス特徴ビュー (リアルタイムシーケンスビュー)

シーケンス特徴ビューは、オフラインシーケンス特徴の書き込みとリアルタイムシーケンス特徴のクエリをサポートします。典型的なレコメンデーションシナリオでは、オフラインシーケンス特徴テーブル (F1) は最初にシミュレーションデータから生成され、後でオンラインログに置き換えることができます。オンラインのリアルタイムシーケンスクエリ中、データは 2 つのオンライン行動テーブルからクエリされます:T-1 日の行動テーブル (B1) と T 日のリアルタイム行動テーブル (B2) です。B2 にはリアルタイムで更新される特徴が含まれています。B1 と B2 テーブルからデータがクエリされた後、ユーザーの特徴シーケンスが構築されます。このシーケンスは他の特徴と組み合わされ、スコアリングのためにモデルに送信されます。

オンラインの T-1 日行動テーブル (B1) は、通常、オフラインの T-1 日行動テーブル (A1) から同期されます。同期中、FeatureStore は重複排除などの操作を自動的に実行します。API 操作または Flink などの他の Alibaba Cloud プロダクトを使用して、オンラインの T 日行動テーブル (B2) にデータを書き込む必要があります。

したがって、リアルタイム特徴ビューを登録すると、FeatureStore は同時に 4 つのテーブルを管理します:オフラインシーケンステーブル (F1)、オフライン T-1 日行動テーブル (A1)、オンライン T-1 日行動テーブル (B1)、およびオンライン T 日行動テーブル (B2) です。

登録時には、オフラインシーケンステーブル (F1) とオフライン T-1 日行動テーブル (A1) を提供するだけで済みます。FeatureStore は、オンライン行動テーブルと同期および重複排除プロセスを処理します。

  1. リアルタイム特徴ビューを登録します。

    seq_feature_view_name = "wide_seq_feature_v3"
    seq_feature_view = project.get_feature_view(seq_feature_view_name)
    if seq_feature_view is None:
      seq_table_name = "rec_sln_demo_behavior_table_preprocess_sequence_wide_seq_feature_v3"
      behavior_table_name = 'rec_sln_demo_behavior_table_preprocess_v3'
      ds = MaxComputeDataSource(project.offline_datasource_id, behavior_table_name)
      event_time = 'event_unix_time' # 行動テーブルのイベント時間フィールドの名前。
      item_id = 'item_id' # 行動テーブルの item_id フィールドの名前。
      event = 'event' # 行動テーブルのイベントフィールドの名前。
      # deduplication_method = 1 は ['user_id', 'item_id', 'event'] に基づく重複排除を示します。
      # deduplication_method = 2 は ['user_id', 'item_id', 'event', 'event_time'] に基づく重複排除を示します。
      sequence_feature_config_list = [SequenceFeatureConfig(offline_seq_name='click_seq_50_seq', seq_event='click', online_seq_name='click_seq_50', seq_len=50)]
      # offline_seq_name はオフラインシーケンステーブルのシーケンス特徴フィールドの名前です。seq_event は行動フィールドの名前です。
      # online_seq_name は、FeatureStore オンライン Go SDK によって取得されるユーザーの item_id のシーケンスに使用される名前です。
      # seq_len はシーケンスの長さです。この長さを超えるシーケンスは切り捨てられます。
      seq_table_config = SequenceTableConfig(table_name=seq_table_name, primary_key='user_id', event_time='event_unix_time')
      seq_feature_view = project.create_sequence_feature_view(seq_feature_view_name, datasource=ds,
                                                  event_time=event_time, item_id=item_id, event=event, deduplication_method=1,
                                                  sequence_feature_config=sequence_feature_config_list, sequence_table_config=seq_table_config, entity=user_entity_name)
    # seq_feature_view.print_summary()
    print(seq_feature_view)
  2. オフラインストアの rec_sln_demo_behavior_table_preprocess_v3 テーブルの 20231023 パーティションからオンラインストアにデータを同期します。同期中、システムは過去 N 日間のパーティションのデータを自動的にチェックします。データが欠落している場合、自動的にバックフィルされます。N は days_to_load を使用して指定できます。デフォルト値は 30 で、ほとんどのシナリオで十分です。

    seq_task = seq_feature_view.publish_table({'ds':'20231023'}, days_to_load=30)
    seq_task.wait()
  3. タスクの実行ステータスを表示します。

    seq_task.print_summary()

ストリーム特徴ビュー (リアルタイム特徴ビュー)

OnlineStore に直接データを書き込み、同時に OfflineStore に同期することは、製品価格や販売数量の更新など、リアルタイムの特徴更新が必要なシナリオに最適です。

ラベルテーブルの登録

label_table_name = 'rec_sln_demo_label_table'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
  label_table = project.create_label_table(datasource=ds, event_time='event_unix_time')
print(label_table)

オンライン特徴の取得

オフラインストアとオンラインストア間のデータ整合性のトラブルシューティング、データ分析、その他の目的でオンライン特徴を取得できます。Hologres が推奨されるデータソースです。

user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_view = project.get_feature_view(user_feature_view_name)
ret_features_1 = user_feature_view.get_online_features(join_ids={'user_id':['169898460', '148811946']}, features=['user_id', 'gender', 'age', 'city'])
print("ret_features = ", ret_features_1)

トレーニングセット

モデルをトレーニングするときは、まずサンプルテーブルを構築する必要があります。サンプルテーブルは、ラベルデータと特徴データで構成されます。FeatureStore と対話するには、ラベルデータを提供し、取得したい特徴の名前を定義する必要があります。その後、プライマリキーとイベント時間 (存在する場合) に基づいてポイントインタイム結合が実行されます。

# Specify the label table.
label_table_name = 'rec_sln_demo_label_table'

output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
user_feature_view_name = "user_table_preprocess_all_feature_v1"
user_feature_selector = FeatureSelector(user_feature_view_name, '*') # '*' はすべての特徴を選択します。
item_feature_view_name = "item_table_preprocess_all_feature_v1"
item_feature_selector = FeatureSelector(item_feature_view_name, '*')
seq_feature_view_name = "wide_seq_feature_v3"
seq_feature_selector = FeatureSelector(seq_feature_view_name, ['click_seq_50_seq'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[user_feature_selector, item_feature_selector, seq_feature_selector])
print("train_set = ", train_set)

モデル特徴

モデルをトレーニングしてサービスとしてデプロイした後、ビジネス予測に使用できます。トレーニングサンプルは、前述の train_set から取得できます。

model_name = "fs_rank_v2"
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

パート 3:サンプルテーブルのエクスポートとモデルのトレーニング

実際のトレーニングでは、サンプルテーブルをエクスポートする必要があります。

サンプルテーブルのエクスポート

ラベルテーブルと、各特徴ビューのパーティションおよび event_time を指定します。

cur_day = '20231024'
pre_day = '20231023'
label_partitions = PartitionConfig(name = 'ds', value = cur_day)
label_input_config = LabelInputConfig(partition_config=label_partitions)

user_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_user_config = FeatureViewConfig(name = 'user_table_preprocess_all_feature_v1',
partition_config=user_partitions)

item_partitions = PartitionConfig(name = 'ds', value = pre_day)
feature_view_item_config = FeatureViewConfig(name = 'item_table_preprocess_all_feature_v1',
partition_config=item_partitions)

seq_partitions = PartitionConfig(name = 'ds', value = cur_day)
feature_view_seq_config = FeatureViewConfig(name = 'wide_seq_feature_v3', partition_config=seq_partitions, event_time='event_unix_time', equal=True)
feature_view_config_list = [feature_view_user_config, feature_view_item_config, feature_view_seq_config]
train_set_partitions = PartitionConfig(name = 'ds', value = cur_day)
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)


model_name = 'fs_rank_v2'
cur_model = project.get_model(model_name)
task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print("task_summary = ", task.task_summary)

モデルのトレーニング

EasyRec は、FeatureStore とシームレスに統合してモデルのトレーニング、エクスポート、公開を行うオープンソースのレコメンデーションシステムフレームワークです。fs_demo_fs_rank_v2_training_set テーブルを入力として使用し、EasyRec でモデルをトレーニングすることを推奨します。

  • EasyRec のオープンソースコードについては、「EasyRec」をご参照ください。

  • EasyRec のドキュメントについては、「EasyRec Introduction」をご参照ください。

  • EasyRec のトレーニングに関するドキュメントについては、「EasyRec Training」をご参照ください。

EasyRec に関するご質問は、DingTalk グループ (32260796) にてお問い合わせください。

パート 4:モデルの公開

モデルをトレーニングしてエクスポートした後、デプロイして公開できます。自社で構築したレコメンデーションシステムをお持ちの場合、FeatureStore は Python、Go、C++、Java の SDK を提供しており、お使いのシステムと統合できます。また、DingTalk グループ (32260796) を通じて、具体的なソリューションについてご相談いただくことも可能です。Alibaba Cloud プロダクトを使用している場合は、FeatureStore とシームレスに統合して、レコメンデーションシステムを迅速に構築および公開できます。

このトピックでは、Alibaba Cloud プロダクトを例として、モデルを公開する方法について説明します。

データ同期ノードのスケジューリング

公開する前に、データ同期ノードをスケジュールして、オフラインストアからオンラインストアに定期的にデータを同期する必要があります。

  1. DataWorks コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] をクリックします。

  3. ご利用の DataWorks ワークスペースを選択し、[データ開発に進む] をクリックします。

  4. ユーザーテーブルの定期的な同期

    1. [新規] にカーソルを合わせ、[新規ノード] > [MaxCompute] > [PyODPS 3] を選択します。

    2. 表示されたダイアログボックスで、ノードパラメーターを設定し、[確認] をクリックします。

    3. 以下の内容をスクリプトにコピーして、user_table_preprocess_all_feature_v1 の定期的な同期を完了します。

      user_table_preprocess_all_feature_v1 の定期的な同期 (クリックして詳細を表示)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      from odps.accounts import StsAccount
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      sts_token = None
      endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
      if isinstance(o.account, StsAccount):
          sts_token = o.account.sts_token
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'user_table_preprocess_all_feature_v1'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Overwrite', offline_to_online=True)
      task.wait()
      task.print_summary()
      
    4. 右側のペインで、[スケジューリング設定] をクリックします。表示されたダイアログボックスで、スケジューリングパラメーターを設定します。

      パラメーター

      推奨値

      スケジューリングパラメーター

      パラメーター名

      dt

      パラメーター値

      $[yyyymmdd-1]

      リソースプロパティ

      スケジューリング用リソースグループ

      専用スケジューリングリソースグループを選択します。

      スケジューリング依存関係

      作成したユーザーテーブルを選択します。

    5. ノードを設定してテストした後、ノード設定を保存して送信します。

    6. データバックフィル操作を実行します。詳細については、「データテーブルの同期」をご参照ください。

  5. アイテムテーブルの同期をスケジュールします。

    1. [新規] にカーソルを合わせ、[新規ノード] > [MaxCompute] > [PyODPS 3] を選択します。

    2. 表示されたダイアログボックスで、ノードパラメーターを設定し、[確認] をクリックします。

    3. 以下の内容をスクリプトにコピーします。

      item_table_preprocess_all_feature_v1 の定期的な同期 (クリックして詳細を表示)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      from odps.accounts import StsAccount
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      sts_token = None
      endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
      if isinstance(o.account, StsAccount):
          sts_token = o.account.sts_token
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'item_table_preprocess_all_feature_v1'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day}, mode='Overwrite', offline_to_online=True)
      task.wait()
      task.print_summary()
    4. 右側のペインで、[スケジューリング設定] をクリックします。表示されたダイアログボックスで、スケジューリングパラメーターを設定します。

      パラメーター

      推奨値

      スケジューリングパラメーター

      パラメーター名

      dt

      パラメーター値

      $[yyyymmdd-1]

      リソースプロパティ

      スケジューリング用リソースグループ

      専用スケジューリングリソースグループを選択します。

      スケジューリング依存関係

      作成したアイテムテーブルを選択します。

    5. ノードを設定してテストした後、ノード設定を保存して送信します。

    6. データバックフィル操作を実行します。詳細については、「データテーブルの同期」をご参照ください。

  6. リアルタイムシーケンス行動テーブルの同期をスケジュールします。

    1. [新規] にカーソルを合わせ、[新規ノード] > [MaxCompute] > [PyODPS 3] を選択します。

    2. 表示されたダイアログボックスで、ノードパラメーターを設定し、[確認] をクリックします。

    3. 以下の内容をスクリプトにコピーします。

      wide_seq_feature_v3 の定期的な同期 (クリックして詳細を表示)

      from feature_store_py.fs_client import FeatureStoreClient
      import datetime
      from feature_store_py.fs_datasource import MaxComputeDataSource
      import sys
      from odps.accounts import StsAccount
      
      cur_day = args['dt']
      print('cur_day = ', cur_day)
      
      access_key_id = o.account.access_id
      access_key_secret = o.account.secret_access_key
      sts_token = None
      endpoint = 'paifeaturestore-vpc.cn-beijing.aliyuncs.com'
      if isinstance(o.account, StsAccount):
          sts_token = o.account.sts_token
      fs = FeatureStoreClient(access_key_id=access_key_id, access_key_secret=access_key_secret, security_token=sts_token, endpoint=endpoint)
      cur_project_name = 'fs_demo'
      project = fs.get_project(cur_project_name)
      
      feature_view_name = 'wide_seq_feature_v3'
      batch_feature_view = project.get_feature_view(feature_view_name)
      task = batch_feature_view.publish_table(partitions={'ds':cur_day},days_to_load=30)
      task.wait()
      task.print_summary()
    4. 右側のペインで、[スケジューリング設定] をクリックします。表示されたダイアログボックスで、スケジューリングパラメーターを設定します。

      パラメーター

      推奨値

      スケジューリングパラメーター

      パラメーター名

      dt

      パラメーター値

      $[yyyymmdd-1]

      リソースプロパティ

      スケジューリング用リソースグループ

      専用スケジューリングリソースグループを選択します。

      スケジューリング依存関係

      作成したアイテムテーブルを選択します。

    5. ノードを設定してテストした後、ノード設定を保存して送信します。

    6. データバックフィル操作を実行します。詳細については、「データテーブルの同期」をご参照ください。

  7. 同期が完了したら、Hologres で最新の同期された特徴を表示できます。

EAS モデルサービスの作成とデプロイ

モデルサービスは、レコメンデーションエンジンからのリクエストを受け取り、リクエストに基づいてアイテムセットをスコアリングし、スコアを返します。EasyRec プロセッサには FeatureStore C++ SDK が含まれており、低レイテンシでパフォーマンスの高い特徴取得が可能です。EasyRec プロセッサは、FeatureStore C++ SDK から特徴を取得し、推論のためにモデルに送信し、スコアを取得してレコメンデーションエンジンに返します。

モデルサービスのデプロイ手順は以下の通りです。

  1. DataWorks コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[データ開発と O&M] > [データ開発] をクリックします。

  3. ご利用の DataWorks ワークスペースを選択し、[データ開発に進む] をクリックします。

  4. [新規] にカーソルを合わせ、[新規ノード] > [MaxCompute] > [PyODPS 3] を選択します。

  5. 表示されたダイアログボックスで、ノードパラメーターを設定し、[確認] をクリックします。

  6. 以下の内容をスクリプトにコピーします。

    import os
    import json
    config = {
      "name": "fs_demo_v1",
      "metadata": {
        "cpu": 4,
        "rpc.max_queue_size": 256,
        "rpc.enable_jemalloc": 1,
        "gateway": "default",
        "memory": 16000
      },
      "model_path": f"oss://beijing0009/EasyRec/deploy/rec_sln_demo_dbmtl_v1/{args['ymd']}/export/final_with_fg",
      "model_config": {
        "access_key_id": f'{o.account.access_id}',
        "access_key_secret": f'{o.account.secret_access_key}',
        "region": "cn-beijing",
        "fs_project": "fs_demo",
        "fs_model": "fs_rank_v2",
        "fs_entity": "item",
        "load_feature_from_offlinestore": True,
        "steady_mode": True,
        "period": 2880,
        "outputs": "probs_is_click,y_ln_playtime,probs_is_praise",
        "fg_mode": "tf"
      },
      "processor": "easyrec-1.8",
      "processor_type": "cpp"
    }
    
    with open("echo.json", "w") as output_file:
        json.dump(config, output_file)
    
    # 最初のデプロイメントではこの行を実行します。
    os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com create echo.json")
    
    # 定期的な更新では以下の行を実行します。
    # os.system(f"/home/admin/usertools/tools/eascmd -i {o.account.access_id} -k {o.account.secret_access_key} -e pai-eas.cn-beijing.aliyuncs.com modify fs_demo_v1 -s echo.json")
  7. 右側のペインで、[スケジューリング設定] をクリックします。表示されたダイアログボックスで、スケジューリングパラメーターを設定します。

    パラメーター

    推奨値

    スケジューリングパラメーター

    パラメーター名

    dt

    パラメーター値

    $[yyyymmdd-1]

    リソースプロパティ

    スケジューリング用リソースグループ

    専用スケジューリングリソースグループを選択します。

    スケジューリング依存関係

    対応するトレーニングタスクと item_table_preprocess_all_feature_v1 を選択します。

  8. ノードを設定してテストした後、実行してデプロイメントステータスを表示します。

  9. デプロイメントが完了したら、34 行目をコメントアウトし、37 行目のコメントを解除して、定期実行のためにタスクを送信します。

  10. (オプション) デプロイされたサービスは、Elastic Algorithm Service (EAS) ページの Inference Service タブで表示できます。詳細については、「カスタムデプロイメント」をご参照ください。

PAI-Rec の設定

PAI-Rec は、FeatureStore Go SDK を統合したレコメンデーションエンジンサービスです。FeatureStore および EAS とシームレスに接続できます。

設定手順は以下の通りです。

  1. FeatureStoreConfs を設定します。

    • RegionId:プロダクトが所在するリージョンに変更します。このトピックでは、cn-beijing を例として使用します。

    • ProjectName:FeatureStore プロジェクトの名前です。この例では、プロジェクト名は fs_demo です。

        "FeatureStoreConfs": {
            "pairec-fs": {
                "RegionId": "cn-beijing",
                "AccessId": "${AccessKey}",
                "AccessKey": "${AccessSecret}",
                "ProjectName": "fs_demo"
            }
        },
  2. FeatureConfs を設定します。

    • FeatureStoreName:前の FeatureStoreConfs ステップの pairec-fs 設定と一致させてください。

    • FeatureStoreModelName:モデル特徴の名前です。この例では、モデル特徴名は fs_rank_v1 です。

    • FeatureStoreEntityName:特徴エンティティの名前、user です。これは、PAI-Rec エンジンが FeatureStore Go SDK を使用して fs_rank_v1 モデルのユーザー特徴を取得することを示します。

        "FeatureConfs": {
            "recreation_rec": {
                "AsynLoadFeature": true,
                "FeatureLoadConfs": [
                    {
                        "FeatureDaoConf": {
                            "AdapterType": "featurestore",
                            "FeatureStoreName": "pairec-fs",
                            "FeatureKey": "user:uid",
                            "FeatureStoreModelName": "fs_rank_v1",
                            "FeatureStoreEntityName": "user",
                            "FeatureStore": "user"
                        }
                    }
                ]
            }
        },
  3. AlgoConfs を設定します。

    この設定は、PAI-Rec がどの EAS モデルスコアリングサービスに接続するかを指示します。

    • Name:デプロイされた EAS サービスの名前と同じでなければなりません。

    • UrlAuth:これは EAS サービスによって提供される情報です。Elastic Algorithm Service (EAS) ページでサービス名をクリックします。次に、Overview タブで View Endpoint Information をクリックして、URL とトークンを取得します。設定の詳細については、 EAS のよくある質問をご参照ください。

        "AlgoConfs": [
            {
                "Name": "fs_demo_v1",
                "Type": "EAS",
                "EasConf": {
                    "Processor": "EasyRec",
                    "Timeout": 300,
                    "ResponseFuncName": "easyrecMutValResponseFunc",
                    "Url": "eas_url_xxx",
                    "EndpointType": "DIRECT",
                    "Auth": "eas_token"
                }
            }
        ],