このトピックでは、Function Compute を使用して、Tablestore の増分データに対してリアルタイム計算を実行する方法について説明します。
背景情報
Function Compute は、フルマネージドのイベント駆動型コンピューティングサービスです。サーバーなどのインフラストラクチャリソースを調達および管理する必要なく、コーディングに集中できます。コードまたはイメージをアップロードするだけで済みます。Function Compute は、コンピューティングリソースを割り当て、タスクを弾力的かつ確実に実行し、ログクエリ、パフォーマンス監視、アラートなどの機能を提供します。詳細については、Function Compute とは を参照してください。
Tablestore Stream は、Tablestore テーブルから増分データを取得するために使用されるトンネルです。Tablestore トリガーを作成すると、Tablestore Stream と Function Compute の関数を自動的に接続できます。これにより、関数内のカスタムプログラムロジックが、Tablestore テーブルのデータ変更を自動的に処理できるようになります。
シナリオ
次の図は、Function Compute を使用して実行できるタスクを示しています。
データ同期: Function Compute を使用して、Tablestore に格納されているリアルタイムデータをデータキャッシュ、検索エンジン、または他のデータベースインスタンスに同期できます。
データアーカイブ: Function Compute を使用して、コールドバックアップのために、Tablestore に格納されている増分データを OSS にアーカイブできます。
イベント駆動型アプリケーション: トリガーを作成して、IoT Hub およびクラウドアプリケーションによって提供される API 操作を呼び出す関数をトリガーできます。また、トリガーを作成して通知を送信することもできます。
前提条件
Tablestore インスタンスとデータテーブルが作成されていること。詳細については、手順 2: インスタンスを作成する および 手順 3: データテーブルを作成する を参照してください。
Function Compute が有効になっていること。詳細については、手順 1: Function Compute を有効にする を参照してください。
使用上の注意
Tablestore トリガーは、中国 (北京)、中国 (杭州)、中国 (上海)、中国 (深圳)、日本 (東京)、シンガポール、ドイツ (フランクフルト)、中国 (香港) の各リージョンでサポートされています。
Tablestore データテーブルは、Function Compute の関連サービスと同じリージョンに存在する必要があります。
Tablestore トリガーに関連付けられた関数が内部ネットワーク経由で Tablestore にアクセスするようにするには、Tablestore の仮想プライベートクラウド (VPC) エンドポイントを使用する必要があります。Tablestore の内部エンドポイントは使用できません。
VPC エンドポイントは、{instanceName}.{regionId}.vpc.tablestore.aliyuncs.com の形式です。詳細については、エンドポイントの照会 を参照してください。
Tablestore の関数コードを記述する際は、次のロジックを使用しないでください。関数 B はテーブル A のトリガーによって呼び出され、関数 B はテーブル A のデータを更新します。このロジックは、関数の呼び出しの無限ループを作成します。
トリガーによって呼び出される関数の最大実行時間は 1 分です。
関数の実行中に例外が発生した場合、Tablestore のログデータの有効期限が切れるまで、関数は無期限に再試行されます。
説明関数の実行例外は、次のシナリオで発生します。
関数インスタンスは開始されましたが、関数コードは期待どおりに実行されません。この場合、インスタンスの料金が発生します。
起動コマンドエラーなどの理由により、関数インスタンスが起動に失敗しました。この場合、インスタンスの料金は発生しません。
関数の実行例外が発生した場合、データテーブルの Stream 機能を無効にして、関数が無期限に再試行されないようにすることができます。Stream 機能を無効にする前に、他のトリガーがデータテーブルを使用していないことを確認してください。そうしないと、これらのトリガーが期待どおりに動作しない可能性があります。
手順 1: データテーブルの Stream 機能を有効にする
トリガーを作成する前に、Tablestore コンソールでデータテーブルの Stream 機能を有効にして、関数がテーブルに書き込まれた増分データを処理できるようにする必要があります。
Tablestore コンソール にログインします。
上部のナビゲーションバーで、リージョンを選択します。
概要ページで、管理するインスタンスの名前をクリックするか、インスタンスの管理列のアクションをクリックします。
インスタンスの詳細タブのテーブルタブで、管理するデータテーブルの名前をクリックし、トンネルタブをクリックします。または、
アイコンをクリックして、トンネルをクリックすることもできます。
トンネルタブの「Stream 情報」セクションで、有効にするをクリックします。
Stream を有効にするダイアログボックスで、「ログの有効期限」パラメーターを設定し、有効にするをクリックします。
「ログの有効期限」パラメーターの値は、ゼロ以外の整数である必要があります。単位: 時間。最大値: 168。
重要「ログの有効期限」パラメーターは、指定後に変更できません。慎重に行ってください。
ステップ 2: 関数と Tablestore トリガーを作成する
関数を作成します。
Function Compute コンソール にログインします。
オプション: ページの右上隅にあるFunction Compute 3.0 に移動をクリックします。
説明Function Compute 3.0 は、さまざまな拡張機能を提供します。この例では、Function Compute 3.0 を使用します。
ページの右上隅にFunction Compute 2.0 に戻ると表示されている場合は、既に Function Compute 3.0 コンソールにいるため、この手順をスキップしてください。
左側のナビゲーションペインで、関数をクリックします。
上部のナビゲーションバーで、リージョンを選択します。関数ページで、関数の作成をクリックします。
関数の作成ページで、関数を作成する方法を選択し、次のパラメーターを設定して、作成をクリックします。
この例では、イベント関数を選択して、Tablestore のデータ変更をリアルタイムで計算する関数を作成する方法を説明します。
説明Tablestore のデータを処理するために関数を作成する方法として、イベント関数、Web 関数、またはタスク関数を選択できます。詳細については、関数を作成する方法の選択 を参照してください。
Tablestore のデータ変更によってデータ処理を自動的にトリガーする場合は、イベント関数を選択します。詳細については、イベント関数を作成する を参照してください。
特定の HTTP リクエストによってデータ処理を自動的にトリガーする場合は、Web 関数を選択します。詳細については、Web 関数を作成する を参照してください。
データ処理を定期的にまたは非同期にトリガーする場合は、タスク関数を選択します。詳細については、タスク関数を作成する を参照してください。
基本設定: 関数名パラメーターを設定します。
コード: 関数のランタイムとコード関連の情報を設定します。
パラメーター
説明
例
ランタイム
Python、Java、PHP、Node.js などのランタイム、またはカスタムコンテナイメージを選択します。
カスタムコンテナイメージ
この例では、Python 3.9 が選択されています。
コードのアップロード方法
Function Compute にコードをアップロードする方法を指定します。
サンプルコードを使用する: Function Compute によって提供されるサンプルコードを選択して、ビジネス要件に基づいて関数を作成できます。これはデフォルトの方法です。
ZIP をアップロードする: 関数コードを含む .zip ファイルを選択してアップロードします。
フォルダーをアップロードする: 関数コードを含むフォルダーを選択してアップロードします。
OSS: オブジェクトストレージサービス (OSS) バケットからコードをアップロードします。この場合、バケット名とオブジェクト名パラメーターを指定する必要があります。
この例では、サンプルコードを使用するとHello, world! サンプルコードが選択されています。
詳細設定: インスタンス情報と関数のタイムアウト期間を設定します。
パラメーター
説明
例
仕様
ビジネス要件に基づいて、Vcpu 容量やメモリ容量などのインスタンス仕様を設定します。リソースの課金については、課金の概要 を参照してください。
説明vCPU 仕様とメモリ容量 (GB 単位) の比率は、1:1 から 1:4 の範囲である必要があります。
0.35 vCPU、512 MB
一時ディスクのサイズ
ビジネス要件に基づいて、ファイルを一時的に保存するために使用されるディスクのサイズを指定します。
有効な値:
512 MB: デフォルト値。このサイズのテンポラリディスクの使用は課金されません。Function Compute は 512 MB の空きディスク容量を提供します。
10 GB: 9.5 GB のディスクサイズに基づいて課金されます。
説明データは一時ディスクの領域を共有し、ディスク内のすべてのディレクトリに書き込むことができます。
一時ディスクのライフサイクルは、基盤となるインスタンスのライフサイクルと一致します。インスタンスがシステムによってリサイクルされると、ハードディスク上のデータは消去されます。ファイルを永続化するには、File Storage NAS または OSS を使用できます。詳細については、NAS ファイルシステムの構成 および OSS ファイルシステムの構成 を参照してください。
512 MB
実行タイムアウト期間
関数が実行されるタイムアウト期間を指定します。 デフォルトのタイムアウト期間は 180 秒で、最大タイムアウト期間は 86,400 秒です。
180
ハンドラー
関数のハンドラーを指定します。Function Computeランタイムは、ハンドラーをロードして呼び出し、リクエストを処理します。Web 関数を選択して関数を作成する場合は、このパラメーターをスキップします。
説明コードのアップロード方法パラメーターをサンプルコードを使用に設定する場合は、ハンドラーパラメーターの値を保持します。別のコードアップロード方法を選択する場合は、ビジネス要件に基づいてハンドラーパラメーターの値を変更します。変更しない場合、関数が実行されるとエラーが報告されます。
index.handler
タイムゾーン
関数のタイムゾーンを選択します。関数のタイムゾーンを指定すると、環境変数 TZ が関数に自動的に追加されます。値は指定したタイムゾーンです。
UTC
関数ロール
関数のResource Access Management (RAM)ロールを指定します。Function Computeはこのロールを使用して、Alibaba Cloudリソースにアクセスするために使用される一時的なAccessKeyペアを生成し、AccessKeyペアをコードに渡します。
重要詳細については、付録: Function ComputeにTablestoreへのアクセス権限を付与するを参照してください。
AliyunFCDefaultRole
VPCへのアクセス
関数がVPCリソースにアクセスすることを許可するかどうかを指定します。詳細については、ネットワーク設定の構成を参照してください。
はい
VPC
VPC を指定します。 VPC へのアクセス を はい に設定した場合、このパラメーターは必須です。ドロップダウンリストから、関数にアクセスさせたい既存の VPC の ID を選択するか、VPC を新規作成します。
fc.auto.create.vpc.1632317****
vSwitch
vSwitch を指定します。 VPC へのアクセス を はい に設定した場合、このパラメーターは必須です。vSwitch を作成するか、ドロップダウンリストから既存の vSwitch の ID を選択します。
fc.auto.create.vswitch.vpc-bp1p8248****
セキュリティグループ
セキュリティグループを指定します。このパラメーターは、VPCへのアクセスをはいに設定した場合に必須です。セキュリティグループを作成するか、ドロップダウンリストから既存のセキュリティグループを選択します。
fc.auto.create.SecurityGroup.vsw-bp15ftbbbbd****
デフォルトのNICのインターネットアクセスを許可
デフォルトのネットワークインターフェースコントローラー (NIC) を使用して、関数がインターネットにアクセスすることを許可するかどうかを指定します。いいえを選択した場合、関数は関数計算のデフォルトのNICを使用してインターネットにアクセスできません。
重要静的パブリックIPアドレスを使用する場合は、デフォルトのNICのインターネットアクセスを許可をいいえに設定する必要があります。そうしないと、構成された静的パブリックIPアドレスは有効になりません。詳細については、静的パブリックIPアドレスの構成を参照してください。
はい
ロギング
ロギング機能を有効にするかどうかを指定します。有効な値:
有効: Function Compute は、永続ストレージのために関数の実行ログを Simple Log Service に送信します。これらのログを使用して、コードのデバッグ、障害の分析、データの分析を行うことができます。
無効: Simple Log Service を使用して関数の実行ログを保存およびクエリすることはできません。
有効
(オプション) 環境変数: 関数の実行時環境で環境変数を構成します。詳細については、環境変数の構成を参照してください。
Tablestore トリガーを作成します。
関数詳細タブで、構成タブをクリックします。左側のナビゲーションペインで、トリガーをクリックし、次にトリガーの作成をクリックします。
[トリガーの作成] パネルで、パラメーターを構成し、OK をクリックします。
パラメーター
説明
例
トリガータイプ
トリガーのタイプ。 Tablestore を選択します。
Tablestore
名前
トリガーの名前。
Tablestore-trigger
バージョンまたはエイリアス
トリガーのバージョンまたはエイリアス。デフォルト値: LATEST。別のバージョンまたはエイリアスのトリガーを作成する場合は、関数詳細ページの バージョンまたはエイリアス ドロップダウンリストからバージョンまたはエイリアスを選択します。バージョンとエイリアスの詳細については、バージョンの管理 および エイリアスの管理 を参照してください。
LATEST
インスタンス
既存の Tablestore インスタンスの名前。
d00dd8xm****
テーブル
既存のテーブルの名前。
mytable
ロール名
AliyunTableStoreStreamNotificationRole を選択します。
説明上記のパラメーターを構成した後、OK をクリックします。このタイプのトリガーを初めて作成する場合は、表示されるダイアログボックスで 今すぐ承認 をクリックします。
AliyunTableStoreStreamNotificationRole
トリガーが作成されると、トリガータブに表示されます。トリガーを変更または削除するには、トリガーの管理 を参照してください。
ステップ 3: 関数のテストパラメータを構成する
コード関数詳細タブの
テスト関数テスト パラメーターの構成タブで、の横にあるアイコンをクリックし、ドロップダウンリストからを選択します。
テストパラメータの構成パネルで、新しいテストイベントの作成タブをクリックし、イベントテンプレートパラメータをTablestoreに設定し、イベント名とイベントコンテンツを指定します。OKをクリックします。
説明既存のテストイベントの変更タブで、作成済みのイベントの名前を選択できます。
Tablestoreトリガーは、簡潔なバイナリオブジェクト表現(CBOR)形式で増分データをエンコードして、Function Computeで関数を呼び出すために使用されるイベントを構築します。次のサンプルコードは、イベントコンテンツの形式の例を示しています。
{ "Version": "Sync-v1", "Records": [ { "Type": "PutRow", "Info": { "Timestamp": 1506416585740836 }, "PrimaryKey": [ { "ColumnName": "pk_0", "Value": 1506416585881590900 }, { "ColumnName": "pk_1", "Value": "2017-09-26 17:03:05.8815909 +0800 CST" }, { "ColumnName": "pk_2", "Value": 1506416585741000 } ], "Columns": [ { "Type": "Put", "ColumnName": "attr_0", "Value": "hello_table_store", "Timestamp": 1506416585741 }, { "Type": "Put", "ColumnName": "attr_1", "Value": 1506416585881590900, "Timestamp": 1506416585741 } ] } ] }
次の表は、イベントコンテンツのパラメータについて説明しています。
パラメータ
説明
Version
ペイロードのバージョン。例:Sync-v1。値は文字列です。
Records
データテーブルの増分データの行を格納する配列。各要素には、次のパラメータが含まれています。
Type: 行に対して実行された操作のタイプ。有効な値:PutRow、UpdateRow、およびDeleteRow。値は文字列です。
Info: 行に関する情報。Timestampパラメータが含まれ、行が最後に変更された時刻を指定します。時刻はUTCである必要があります。値はINT64型です。
PrimaryKey
プライマリキー列を格納する配列。各要素には、次のパラメータが含まれています。
ColumnName: プライマリキー列の名前。値は文字列です。
Value: プライマリキー列の値。値はformated_value型で、INTEGER、STRING、またはBLOBを指定できます。
Columns
属性列を格納する配列。各要素には、次のパラメータが含まれています。
Type: 属性列に対して実行された操作のタイプ。有効な値:Put、DeleteOneVersion、およびDeleteAllVersions。値は文字列です。
ColumnName: 属性列の名前。値は文字列です。
Value: 属性列の値。値はformatted_value型で、INTEGER、BOOLEAN、DOUBLE、STRING、またはBLOBを指定できます。
Timestamp: 属性列が最後に変更された時刻。時刻はUTCである必要があります。値はINT64型です。
ステップ 4: 関数コードを記述してテストする
Tablestore トリガーを作成した後、関数コードを記述してテストし、コードが有効かどうかを確認できます。関数は、Tablestore のデータが更新されると、トリガーによって自動的に呼び出されます。
コードタブの関数の詳細タブで、コードエディターにコードを記述し、デプロイをクリックします。
この例では、関数コードは Python で記述されています。他のランタイム環境で関数コードを記述する方法については、Node.js、PHP、Java、および C# ランタイムで Tablestore を使用して Function Compute をトリガーするを参照してください。
import logging import cbor import json def get_attribute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] def handler(event, context): logger = logging.getLogger() logger.info("Begin to handle event") # イベントの処理を開始 # records = cbor.loads(event) records = json.loads(event) for record in records['Records']: logger.info("Handle record: %s", record) # レコードを処理する: %s pk_0 = get_pk_value(record, "pk_0") attr_0 = get_attribute_value(record, "attr_0") return 'OK'
関数のテストをクリックします。
関数が実行された後、コードタブで結果を確認できます。
コードを変更してデプロイします。
records=json.loads(event)
の実行後に OK が返された場合は、records
をcbor.loads(event)
に設定します。デプロイをクリックします。
Tablestore にデータが書き込まれると、関連する関数ロジックがトリガーされます。
FAQ
Tablestore トリガーをリージョンに作成できない場合は、そのリージョンで Tablestore トリガーがサポートされているかどうかを確認してください。詳細については、使用上の注意を参照してください。
Tablestore トリガーを作成するときに既存の Tablestore データテーブルが見つからない場合は、そのデータテーブルが Function Compute の関連サービスと同じリージョンにあるかどうかを確認してください。
ほとんどの場合、Tablestore トリガーを使用するときにクライアントが呼び出しをキャンセルしたことを示すエラーが繰り返し報告される場合は、クライアントで関数の実行に設定されているタイムアウト期間が実際の関数の実行時間よりも短いことを意味します。この場合は、クライアントのタイムアウト期間を長くすることをお勧めします。詳細については、クライアントが切断され、「クライアントによって呼び出しがキャンセルされました」というメッセージが表示される場合の対処方法を参照してください。
データが Tablestore データテーブルに書き込まれたにもかかわらず、関連付けられている Tablestore トリガーがトリガーされない場合は、次の手順を実行して問題のトラブルシューティングを行うことができます。トリガーの失敗のトラブルシューティング方法の詳細については、トリガーが関数の実行をトリガーできない場合の対処方法を参照してください。
データテーブルで Stream 機能が有効になっていることを確認します。詳細については、手順 1: データテーブルの Stream 機能を有効にするを参照してください。
トリガーを作成するときにロールが正しく構成されていることを確認します。デフォルトロールの
AliyunTableStoreStreamNotificationRole
を使用できます。詳細については、Tablestore トリガーを作成するを参照してください。関数の実行ログを表示して、関数が実行に失敗したかどうかを確認します。関数が実行に失敗した場合、Tablestore のログデータの有効期限が切れるまで関数は再試行されます。
付録: Tablestore にアクセスするための権限を Function Compute に付与する
Tablestore で Function Compute が提供する機能を使用するには、Tablestore にアクセスするための権限を Function Compute に付与する必要があります。 粗粒度の権限を付与するには、Function Compute によって提供されるデフォルトの RAM ロールである AliyunFCDefaultRole を使用できます。細粒度の権限を付与するには、必要なポリシーがアタッチされているカスタム RAM ロールを使用できます。
デフォルトの RAM ロールを使用する
AliyunOTSFullAccess ポリシーを AliyunFCDefaultRole RAM ロールにアタッチして、RAM ロールに Tablestore を管理する権限を付与します。詳細については、RAM ロールに権限を付与する を参照してください。
説明AliyunFCDefaultRole RAM ロールは、Function Compute のデフォルトの RAM ロールです。この RAM ロールには、Tablestore を管理する権限がありません。
RAM ロールを初めて使用するときは、RAM ロールに Tablestore を管理する権限を付与する必要があります。
RAM ロールに Tablestore を管理する権限が既に付与されている場合は、この手順をスキップします。
カスタム RAM ロールを使用する
詳細については、例: オブジェクトストレージサービス (OSS) にアクセスするための権限を Function Compute に付与する を参照してください。