Tablestoreは、Apsaraシステム上に構築された分散NoSQLデータストレージサービスです。 Tablestoreトリガーを作成して、TablestoreをFunction Computeのイベントソースとして接続できます。 指定されたイベントが発生すると、function Computeの関数が自動的にトリガーされ、Tablestoreデータが処理されます。
シナリオ
次の図は、Tablestoreトリガーの一般的なシナリオを示しています。
データソースは、データを表Aに保存し、データ更新は、データを消去し、消去されたデータを表Bに保存して直接読み取りを行う関数をトリガする。 プロセス全体は、柔軟でスケーラブルなサーバーレスwebアプリケーションです。
前提条件
制限事項
テーブルストアのトリガーは、中国 (北京) 、中国 (杭州) 、中国 (上海) 、中国 (深セン) 、日本 (東京) 、シンガポール、オーストラリア (シドニー) サービス終了、ドイツ (フランクフルト) 、中国 (香港) の各リージョンでサポートされています。
Tablestoreテーブルは、Function Computeの関連サービスと同じリージョンに存在する必要があります。
内部ネットワークを介してTablestoreトリガーに関連付けられている関数にアクセスする場合は、TablestoreのVPCエンドポイントを使用できます。これは、{instance}.{regio n}.vpc.tablestore.aliyuncs.comの形式です。 この場合、Tablestoreの内部エンドポイントは使用しないでください。
トリガーによって呼び出される関数の実行時間は1分を超えることはできません。
注意事項
関数コードを記述するときの呼び出しループを避けます。 たとえば、次のロジックは呼び出しループを引き起こします。テーブルAは関数BをトリガーしてテーブルAのデータを更新し、次に関数Bを再びトリガーします。
関数の実行中にエラーが発生した場合、Tablestoreのログデータの有効期限が切れるまで、関数は再試行を続けます。
説明関数の実行例外は、次のいずれかのシナリオで発生します。
関数インスタンスは起動されますが、関数コードは期待どおりに実行されません。 この場合、インスタンスに対して料金が発生します。
起動コマンドのエラーなどの理由により、関数インスタンスの起動に失敗しました。 この場合、インスタンスの料金は発生しません。
関数の実行例外が発生した場合、データテーブルのStream機能を無効にして、関数が無期限に再試行されるのを防ぐことができます。 Stream機能を無効にする前に、他のトリガーがデータテーブルを使用していないことを確認してください。 そうしないと、これらのトリガーは期待どおりに機能しません。
手順1: データテーブルのストリーム機能を有効にする
トリガーを作成する前に、Tablestoreコンソールでデータテーブルのストリーム機能を有効にして、テーブルに書き込まれた増分データを関数が処理できるようにする必要があります。
Tablestoreコンソールにログインします。
上部のナビゲーションバーで、リージョンを選択します。
[概要] ページで、管理するインスタンスの名前をクリックするか、[操作] 列の [インスタンスの管理] をクリックします。
[インスタンスの詳細] タブの [テーブル] セクションで、必要なデータテーブルの名前をクリックし、[トンネル] タブをクリックします。 または、アイコンをクリックし、[トンネル] をクリックします。
[トンネル] タブで、[ストリーム情報] セクションの [有効化] をクリックします。
[ストリームの有効化] ダイアログボックスで、Log Expiration Timeパラメーターを設定し、[有効化] をクリックします。
Log Expiration Timeパラメーターの値はゼロ以外の整数である必要があり、指定後は変更できません。 単位:時間。 最大値: 168。
重要Log Expiration Timeパラメーターは、設定後は変更できません。 作業は慎重に行ってください。
ステップ2: Tablestoreトリガーの作成
Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[関数] をクリックします。
上部のナビゲーションバーで、リージョンを選択します。 [関数] ページで、管理する関数をクリックします。
機能の詳細ページで、[設定] タブをクリックします。 左側のナビゲーションウィンドウで、[トリガー] をクリックします。 次に、[トリガーの作成] をクリックします。
トリガーの作成パネルで、パラメーターを設定し、OK.
パラメーター
説明
例
トリガータイプ
トリガーのタイプ。 [Tablestore] を選択します。
テーブルストア
名前
トリガーの名前。
Tablestore-トリガー
バージョンまたはエイリアス
トリガーのバージョンまたはエイリアス。 デフォルト値: LATEST。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページのバージョンまたはエイリアスドロップダウンリストからバージョンまたはエイリアスを選択します。 バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
インスタンス
既存のTablestoreインスタンスの名前。
d00dd8xm ****
テーブル
既存のテーブルの名前。
mytable
ロール名
[AliyunTableStoreStreamNotificationRole] を選択します。
説明上記のパラメーターを設定したら、[OK] をクリックします。 このタイプのトリガーを初めて作成する場合は、表示されるダイアログボックスで [今すぐ許可] をクリックします。
AliyunTableStoreStreamNotificationRole
トリガーが作成されると、[トリガー] タブに表示されます。 トリガーを変更または削除するには、「トリガー管理」をご参照ください。
ステップ3: 関数の入力パラメータを設定する
関数の詳細ページの [コード] タブで、[テスト関数] の隣のアイコンをクリックし、ドロップダウンリストから [テストパラメーターの設定] を選択します。
[テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、イベント名とイベント内容を入力し、[OK] をクリックします。
Tablestoreトリガーは、Concise Binary Object Representation (CBOR) 形式で増分データをエンコードして、function Computeで関数を呼び出すために使用されるイベントを構築します。 次のサンプルコードは、イベントコンテンツの形式の例を示しています。
{ "Version": "Sync-v1", "Records": [ { "Type": "PutRow", "Info": { "Timestamp": 1506416585740836 }, "PrimaryKey": [ { "ColumnName": "pk_0", "Value": 1506416585881590900 }, { "ColumnName": "pk_1", "Value": "2017-09-26 17:03:05.8815909 +0800 CST" }, { "ColumnName": "pk_2", "Value": 1506416585741000 } ], "Columns": [ { "Type": "Put", "ColumnName": "attr_0", "Value": "hello_table_store", "Timestamp": 1506416585741 }, { "Type": "Put", "ColumnName": "attr_1", "Value": 1506416585881590900, "Timestamp": 1506416585741 } ] } ] }
次の表に、イベントのフィールドを示します。
パラメーター
説明
バージョン
ペイロードのバージョン。 例: Sync-v1。 値は文字列です。
レコード
テーブル内の増分データの行を格納する配列。 各要素には、次のパラメータが含まれます。
Type: 行に対して実行される操作のタイプ。 有効な値: PutRow、UpdateRow、DeleteRow。 値は文字列です。
Info: 行が最後に変更された時刻を指定するTimestampパラメーターを含む、行に関する情報。 時刻は UTC である必要があります。 値はINT64型です。
PrimaryKey
主キー列を格納する配列。 各要素には、次のパラメータが含まれます。
ColumnName: 主キー列の名前。 値は文字列です。
値: 主キー列の値。 値はformated_value型で、INTEGER、STRING、またはBLOBです。
列
属性列を格納する配列です。 各要素には、次のパラメータが含まれます。
タイプ: 属性列に対して実行される操作のタイプ。 有効な値: Put、DeleteOneVersion、およびDeleteAllVersions。 値は文字列です。
ColumnName: 属性列の名前。 値は文字列です。
値: 属性列の値。 値はformatted_value型で、INTEGER、BOOLEAN、DOUBLE、STRING、またはBLOBです。
Timestamp: 属性列が最後に変更された時刻。 時刻は UTC である必要があります。 値はINT64型です。
ステップ4: 関数コードの書き込みとテスト
Tablestoreトリガーを作成した後、関数コードを記述し、関数コードをテストして、コードが有効かどうかを確認できます。 この関数は、Tablestoreのデータが更新されると自動的に呼び出されます。
関数の詳細ページで、コードタブで、コードエディターに関数コードを入力し、デプロイ.
この例では、関数コードはPythonで記述されています。 他のランタイム環境で関数コードを記述する方法の詳細については、「Tablestoreを使用してNode.js、PHP、Java、およびC# ランタイムでfunction Computeをトリガーする」をご参照ください。
import logging import cbor import json def get_attribute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] def handler(event, context): logger = logging.getLogger() logger.info("Begin to handle event") #records = cbor.loads(event) records = json.loads(event) for record in records['Records']: logger.info("Handle record: %s", record) pk_0 = get_pk_value(record, "pk_0") attr_0 = get_attribute_value(record, "attr_0") return 'OK'
テスト機能.
関数の実行後、[コード] タブで結果を表示できます。
FAQ
特定のリージョンでTablestoreトリガーを作成できない場合は、そのリージョンでTablestoreトリガーがサポートされているかどうかを確認します。 詳細については、「制限事項」をご参照ください。
Tablestoreトリガーの作成時に作成されたTablestoreテーブルが見つからない場合は、テーブルがFunction Computeの関連サービスと同じリージョンにあるかどうかを確認します。
ほとんどの場合、Tablestoreトリガーを使用したときにクライアントの呼び出しがキャンセルされたことを示すエラーが繰り返し報告された場合、クライアントでの関数実行に対して設定されたタイムアウト期間は、実際の関数実行時間よりも短くなります。 この場合、クライアントのタイムアウト期間を長くすることを推奨します。 詳細については、「」をご参照ください。クライアントが切断され、「クライアントによって呼び出しがキャンセルされました」というメッセージが報告された場合はどうすればよいですか?
データがTablestoreテーブルに追加されているが、関連付けられているTablestoreトリガーがトリガーされていない場合は、次の手順を実行して問題をトラブルシューティングできます。 トリガー障害のトラブルシューティング方法の詳細については、トリガーが関数の実行をトリガーできない場合はどうすればよいですか?
テーブルに対してStream機能が有効になっているかどうかを確認します。 詳細については、「手順1: データテーブルのストリーム機能の有効化」をご参照ください。
トリガーの作成時に正しいロールが設定されているかどうかを確認します。 デフォルトのトリガーロール
AliyunTableStoreStreamNotificationRole
を使用できます。 詳細については、「手順2: Tablestoreトリガーの作成」をご参照ください。関数の実行ログを確認して、関数の実行に失敗したかどうかを確認します。 関数の実行に失敗した場合、Tablestoreのログデータが期限切れになるまで、関数は再試行されます。