すべてのプロダクト
Search
ドキュメントセンター

Platform For AI:Python用のFeatureStore SDK

最終更新日:Aug 01, 2024

このトピックでは、FeatureStore SDK for Pythonを使用して、オフラインモデルトレーニングとオンライン推論の機能を統合および提供する方法について説明します。

背景情報

FeatureStoreは、機械学習モデルの機能を作成、共有、管理するのに役立つ集中リポジトリです。 FeatureStoreを使用すると、複数のユーザーやチームと機能を共有し、オフライン機能とオンライン機能の一貫性を確保し、低レイテンシでオンラインモデル推論の機能を提供できます。

FeatureStoreは、レコメンデーションシステムなどの機能ベースの機械学習モデル向けに設計されています。 FeatureStoreに機能テーブルを登録すると、FeatureStoreはオンラインとオフラインの機能テーブルを自動的に作成し、同期を維持します。 FeatureStoreは機能テーブルのコピーを1つだけ保存し、コピーを複数のユーザーと共有してリソースコストを削減できます。 FeatureStoreでは、単純なコードを使用して、トレーニングデータセットのエクスポートやHologresへのデータのインポートなど、以前は複雑だった操作を完了することもできます。

FeatureStoreは、機能からモデルへのプロセス全体をカプセル化し、MaxCompute、Hologres、GraphCompute、Tablestoreなどの機能ストレージ用に複数のAlibaba Cloudサービスを統合します。 開発者と機械学習エンジニアは、一元化されたwebインターフェイスまたはFeatureStore SDK for Pythonを使用してすべての操作を実行できます。 これにより、各クラウドサービスの機能を個別に管理する必要がなくなり、推奨システムでのトレーニングとサービスのスキューなど、モデルのパフォーマンスに影響を与える可能性のある問題を解決できます。

EasyRecと統合されたFeatureStoreは、効率的な機能エンジニアリング (FG) 、モデルトレーニング、およびオンラインモデル展開を実現します。 サウンドパフォーマンスを備えた高度な推奨システムを簡単に構築できます。

FeatureStoreを使用しているときに問題が発生した場合は、DingTalkグループ (グループID: 32260796) に参加してテクニカルサポートを行います。

前提条件

Platform for AI (PAI) がアクティブ化され、ワークスペースが作成されます。 詳細については、「PAIの有効化とデフォルトワークスペースの作成」をご参照ください。

準備

Python用FeatureStore SDKをインストールします。 SDKにはPython 3が必要です。 このトピックで説明されている操作は、PAIのデータサイエンスワークショップ (DSW) で実行することを推奨します。

! pip install https://feature-store-py.oss-cn-beijing.aliyuncs.com/package/feature_store_py-1.3.1-py3-none-any.whl

FeatureStoreクライアントを作成するには、Alibaba CloudアカウントのAccessKey IDAccessKey secretを指定する必要があります。 データ漏洩を防ぐために、環境変数を使用してAccessKeyペアを指定することを推奨します。 環境変数を設定するには、DSWインスタンスの開発環境に移動し、ページ上部の [ターミナル] をクリックして、次のコマンドを実行します。

次のコマンドで、YOUR_AccessKey_IDAccessKey IDに置き換えます。

echo "export AccessKeyID='YOUR_AccessKey_ID'" >> ~/.bashrc
source ~/.bashrc

次のコマンドで、YOUR_Access_Key_SecretAccessKey secretに置き換えます。

echo "export AccessKeySecret='YOUR_Access_Key_Secret'" >> ~/.bashrc
source ~/.bashrc

必要な機能モジュールをインポートします。

import unittest
import sys
import os
from os.path import dirname, join, abspath
from feature_store_py.fs_client import FeatureStoreClient, build_feature_store_client
from feature_store_py.fs_project import FeatureStoreProject
from feature_store_py.fs_datasource import UrlDataSource, MaxComputeDataSource, DatahubDataSource, HologresDataSource, SparkDataSource, LabelInput, TrainingSetOutput
from feature_store_py.fs_type import FSTYPE
from feature_store_py.fs_schema import OpenSchema, OpenField
from feature_store_py.fs_feature_view import FeatureView
from feature_store_py.fs_features import FeatureSelector
from feature_store_py.fs_config import EASDeployConfig, LabelInputConfig, PartitionConfig, FeatureViewConfig, TrainSetOutputConfig
import logging
logger = logging.getLogger("foo")
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

サンプルデータセット

このトピックでは、Moviedata-10Mという名前のオープンソースのムービーデータセットを例として使用します。 次の例では、Moviedata-10Mデータセットのmovies.csv、users.csv、およびratings.csvファイルを使用して、項目テーブル、ユーザーテーブル、およびラベルテーブルを作成します。

FeatureStoreプロジェクトの設定

FeatureStoreで複数のプロジェクトを作成できます。 各プロジェクトは独立しています。 詳細については、「FeatureStoreプロジェクトの設定」をご参照ください。 DSWでサンプルコードを実行するには、FeatureStoreで特定の設定が必要です。 たとえば、FeatureStoreでデータストアを構成する必要があります。 詳細については、「データストアの設定」をご参照ください。

このトピックのサンプルコードでは、offline_datasource_idパラメーターはオフラインデータストアのIDを指定し、online_datasource_idパラメーターはオンラインデータストアのIDを指定します。

次のサンプルコードでは、fs_movieプロジェクトを例として使用して、FeatureStoreプロジェクトを構成する方法を示しています。

# Enter the environment variable of your AccessKey ID.
access_id = os.getenv("AccessKeyID")
# Enter the environment variable of your AccessKey secret.
access_ak = os.getenv("AccessKeySecret")
# Enter the ID of the region in which you activated the FeatureStore service. In this example, the China (Hangzhou) region is used.
region = 'cn-hangzhou'
fs = FeatureStoreClient(access_key_id=access_id, access_key_secret=access_ak, region=region)
# Enter the name of the FeatureStore project. In this example, the fs_movie project is used.
cur_project_name = "fs_movie"
project = fs.get_project(cur_project_name)
if project is None:
    raise ValueError("Need to create project : fs_movie")

プロジェクトを取得し、プロジェクト情報を印刷します。

project = fs.get_project(cur_project_name)
project.print_summary()

機能エンティティの設定

特徴エンティティは、意味的に関連する特徴の集合であり、複数の特徴ビューに関連付けることができる。 各エンティティは結合IDを有する。 結合IDを使用して、複数のフィーチャビューにフィーチャを関連付けることができます。 各フィーチャビューには、フィーチャを取得するために使用できる主キー (インデックス) があります。 主キーは、結合IDの名前とは異なる場合があります。

次のサンプルコードでは、movie、user、ratingという3つのエンティティを作成する方法の例を示します。

cur_entity_name_movie = "movie_data"
join_id = 'movie_id'
entity_movie = project.get_entity(cur_entity_name_movie)
if entity_movie is None:
	entity_movie = project.create_entity(name = cur_entity_name_movie, join_id=join_id)
entity_movie.print_summary()
cur_entity_name_user = "user_data"
join_id = 'user_md5'
entity_user = project.get_entity(cur_entity_name_user)
if entity_user is None:
  entity_user = project.create_entity(name = cur_entity_name_user, join_id=join_id)
entity_user.print_summary()
cur_entity_name_ratings = "rating_data"
join_id = 'rating_id'
entity_ratings = project.get_entity(cur_entity_name_ratings)
if entity_ratings is None:
  entity_ratings = project.create_entity(name = cur_entity_name_ratings, join_id=join_id)
entity_ratings.print_summary()

機能ビューの設定

フィーチャビューを作成して、フィーチャデータをFeatureStoreに取り込み、一元管理できます。 フィーチャビューには、データソース、必要な変換、フィーチャテーブルスキーマ、オンラインおよびオフラインのデータストアなど、フィーチャの管理に必要なすべての情報が含まれます。 フィーチャビューでは、プライマリキー、イベント時間、パーティションフィールド、フィーチャエンティティ、生存時間 (TTL) などのメタデータを管理することもできます。 TTLは、オンラインデータストアで機能データを使用できる期間を指定するパラメーターです。 デフォルト値-1は、オンラインデータストアがすべての機能データを保持することを指定します。 正の値は、オンラインデータストアが指定された期間内の最新のフィーチャデータのみを保持することを指定します。

機能ビューには、バッチ機能ビュー、ストリーム機能ビュー、およびシーケンス機能ビューが含まれます。

  • バッチ機能ビューには、オフライン機能のコレクションが含まれます。 バッチ機能ビューを使用すると、オフライン機能をオフラインデータストアに取り込み、オフライン機能をオンラインデータストアに同期できます。 これにより、リアルタイムで機能をクエリできます。

  • ストリームフィーチャビューには、リアルタイムフィーチャのコレクションが含まれます。 ストリーム機能ビューを使用すると、オンラインデータストアにリアルタイム機能を書き込み、オフラインデータストアにリアルタイム機能を同期できます。

  • シーケンス特徴ビューは、シーケンス特徴のコレクションを含む。 シーケンス機能ビューを使用すると、シーケンス機能をオフラインで記述し、リアルタイムシーケンス機能をオンラインで読み取ることができます。

バッチ機能ビュー

MaxComputeにデータをアップロードします。 データがCSVファイルに保存されている場合は、CSVファイルのURLを指定してMaxComputeにデータをアップロードできます。

path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/movies.csv'
delimiter = ','
omit_header = True
ds = UrlDataSource(path, delimiter, omit_header)
print(ds)

作成するフィーチャビューのスキーマを指定します。 スキーマは、各フィールドの名前とタイプを定義します。

movie_schema = OpenSchema(
    OpenField(name='movie_id', type='STRING'),
    OpenField(name='name', type='STRING'),
    OpenField(name='alias', type='STRING'),
    OpenField(name='actores', type='STRING'),
    OpenField(name='cover', type='STRING'),
    OpenField(name='directors', type='STRING'),
    OpenField(name='double_score', type='STRING'),
    OpenField(name='double_votes', type='STRING'),
    OpenField(name='genres', type='STRING'),
    OpenField(name='imdb_id', type='STRING'),
    OpenField(name='languages', type='STRING'),
    OpenField(name='mins', type='STRING'),
    OpenField(name='official_site', type='STRING'),
    OpenField(name='regions', type='STRING'),
    OpenField(name='release_data', type='STRING'),
    OpenField(name='slug', type='STRING'),
    OpenField(name='story', type='STRING'),
    OpenField(name='tags', type='STRING'),
    OpenField(name='year', type='STRING'),
    OpenField(name='actor_ids', type='STRING'),
    OpenField(name='director_ids', type='STRING'),
    OpenField(name='dt', type='STRING')
)
print(movie_schema)

バッチ機能ビューを作成します。

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_movie_name, schema=movie_schema, online = True, entity= cur_entity_name_movie, primary_key='movie_id', partitions=['dt'], ttl=-1)
batch_feature_view = project.get_feature_view(feature_view_movie_name)
batch_feature_view.print_summary()

MaxComputeのテーブルにデータを書き込みます。

cur_task = batch_feature_view.write_table(ds, partitions={'dt':'20220830'})
cur_task.wait()

現在のタスクに関する情報を表示します。

print(cur_task.task_summary)

データをオンラインデータストアに同期します。

cur_task = batch_feature_view.publish_table({'dt':'20220830'})
cur_task.wait()
print(cur_task.task_summary)

フィーチャビューを取得します。

batch_feature_view = project.get_feature_view(feature_view_movie_name)

フィーチャビューに関する情報を印刷します。

batch_feature_view.print_summary()

次のコマンドを順番に実行して、users.csvファイルとratings.csvファイルからデータをインポートし、対応する機能ビューを作成します。

users_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/users.csv'
ds = UrlDataSource(users_path, delimiter, omit_header)
print(ds)
user_schema = OpenSchema(
  OpenField(name='user_md5', type='STRING'),
  OpenField(name='user_nickname', type='STRING'),
  OpenField(name='ds', type='STRING')
)
print(user_schema)
feature_view_user_name = "feature_view_users"
batch_feature_view = project.get_feature_view(feature_view_user_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_user_name, schema=user_schema, online = True, entity= cur_entity_name_user, primary_key='user_md5',ttl=-1, partitions=['ds'])

{write_table_twe_tebus'};

write_table_task = batch_feature_view.write_table(ds, {'ds':'20220830'})
write_table_task.wait()
print(write_table_task.task_summary)
cur_task = batch_feature_view.publish_table({'ds':'20220830'})
cur_task.wait()
print(cur_task.task_summary)

batch_feature_view = project.get_feature_view(feature_view_user_name)
batch_feature_view.print_summary()
ratings_path = 'https://feature-store-test.oss-cn-beijing.aliyuncs.com/dataset/moviedata_all/ratings.csv'
ds = UrlDataSource(ratings_path, delimiter, omit_header)
print(ds)
ratings_schema = OpenSchema(
  OpenField(name='rating_id', type='STRING'),
  OpenField(name='user_md5', type='STRING'),
  OpenField(name='movie_id', type='STRING'),
  OpenField(name='rating', type='STRING'),
  OpenField(name='rating_time', type='STRING'),
  OpenField(name='dt', type='STRING')
)
feature_view_rating_name = "feature_view_ratings"
batch_feature_view = project.get_feature_view(feature_view_rating_name)
if batch_feature_view is None:
  batch_feature_view = project.create_batch_feature_view(name=feature_view_rating_name, schema=ratings_schema, online = True, entity= cur_entity_name_ratings, primary_key='rating_id', event_time='rating_time', partitions=['dt'])
cur_task = batch_feature_view.write_table(ds, {'dt':'20220831'})
cur_task.wait()
print(cur_task.task_summary)
batch_feature_view = project.get_feature_view(feature_view_rating_name)
batch_feature_view.print_summary()

ラベルテーブルを登録します。

label_table_name = 'fs_movie_feature_view_ratings_offline'
ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id, table=label_table_name)
label_table = project.get_label_table(label_table_name)
if label_table is None:
  label_table = project.create_label_table(datasource=ds, event_time='rating_time')

オフラインデータストア の設定

オフラインデータストアは、オフライン機能が格納されるデータウェアハウスです。 オフライン機能は、Apache Sparkを使用して、MaxComputeまたはHadoop分散ファイルシステム (HDFS) のオフラインデータストアに書き込まれます。 オフラインデータストアを使用して、モデルトレーニング用のトレーニングデータセットを生成し、バッチ予測用の機能をモデルに提供できます。

オンラインデータストアの設定

オンラインデータストアは、リアルタイム機能が格納されるデータウェアハウスです。 オンラインデータストアでは、オンライン推論中に最新の機能に低遅延でアクセスできます。 リアルタイム機能ストレージには、Hologres、Tablestore、GraphComputeを使用できます。

オンライン機能を取得する

ストリーム機能ビューからオンライン機能を取得できます。

feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_1 = batch_feature_view.get_online_features(join_ids={'movie_id':['26357307']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_1)
feature_view_movie_name = "feature_view_movie"
batch_feature_view = project.get_feature_view(feature_view_movie_name)
ret_features_2 = batch_feature_view.get_online_features(join_ids={'movie_id':['30444960', '3317352']}, features=['name', 'actores', 'regions'])
print("ret_features = ", ret_features_2)

FeatureSelectorの設定

FeatureSelectorは、オンラインおよびオフラインのデータストアから取得する機能の範囲を指定します。 フィーチャビューを指定して、フィーチャビューからフィーチャを取得できます。

feature_view_name = 'feature_view_movie'
# Retrieve specific features.
feature_selector = FeatureSelector(feature_view_name, ['site_id', 'site_category'])

# Retrieve all features.
feature_selector = FeatureSelector(feature_view_name, '*')

# Configure an alias for the feature that you want to retrieve.
feature_selector = FeatureSelector(
    feature_view='user1',
    features = ['f1','f2', 'f3'],
    alias={"f1":"f1_1"} # Specify the f1_1 field as the alias for the f1 field. 
)

サンプルテーブル (データセットのトレーニング) の設定

FeatureStoreを使用して、モデルトレーニング用のサンプルテーブルを生成できます。 サンプルテーブルには、ラベルと機能が含まれています。 モデルトレーニング用のラベルを準備し、モデルがフィーチャビューから取得する必要があるフィーチャを定義する必要があります。 ラベルは、主キーに基づいてポイント・イン・タイム結合を使用することによって特徴に関連付けられる。

label_table_name = 'fs_movie_feature_view_ratings_offline'
output_ds = MaxComputeDataSource(data_source_id=project.offline_datasource_id)
train_set_output = TrainingSetOutput(output_ds)
feature_view_movie_name = "feature_view_movie"
feature_movie_selector = FeatureSelector(feature_view_movie_name, ['name', 'actores', 'regions','tags'])
feature_view_user_name = 'feature_view_users'
feature_user_selector = FeatureSelector(feature_view_user_name, ['user_nickname'])
train_set = project.create_training_set(label_table_name=label_table_name, train_set_output= train_set_output, feature_selectors=[feature_movie_selector, feature_user_selector])
print("train_set = ", train_set)

モデルを訓練する

FeatureStoreによって生成されたサンプルテーブルでモデルをトレーニングし、トレーニングされたモデルをPAIの推論サービスとしてデプロイできます。

model_name = "fs_rank_v1"
cur_model = project.get_model(model_name)
if cur_model is None:
  cur_model = project.create_model(model_name, train_set)
print("cur_model_train_set_table_name = ", cur_model.train_set_table_name)

サンプルテーブルのエクスポート

モデルトレーニング用のサンプルテーブルをエクスポートするには、ラベルテーブルと、各フィーチャビューのイベント時間とパーティションを指定します。

label_partitions = PartitionConfig(name = 'dt', value = '20220831')
label_input_config = LabelInputConfig(partition_config=label_partitions, event_time='1999-01-00 00:00:00')

movie_partitions = PartitionConfig(name = 'dt', value = '20220830')
feature_view_movie_config = FeatureViewConfig(name = 'feature_view_movie', partition_config=movie_partitions)

user_partitions = PartitionConfig(name = 'ds', value = '20220830')
feature_view_user_config = FeatureViewConfig(name = 'feature_view_users', partition_config=user_partitions)
feature_view_config_list = [feature_view_movie_config, feature_view_user_config]
train_set_partitions = PartitionConfig(name = 'dt', value = '20220831')
train_set_output_config = TrainSetOutputConfig(partition_config=train_set_partitions)

指定した条件に基づいてサンプルテーブルをエクスポートします。

task = cur_model.export_train_set(label_input_config, feature_view_config_list, train_set_output_config)
task.wait()
print(task.summary)