Tablestoreは、Apsaraシステムの上に構築された分散NoSQLデータストレージサービスです。 Tablestoreトリガーを作成して、TablestoreをFunction Computeのイベントソースとして接続できます。 指定されたイベントが発生すると、function Computeの関数が自動的にトリガーされてTablestoreデータが処理されます。
シナリオ
次の図は、Tablestoreトリガーの一般的なシナリオを示しています。
データソースは、データを表Aに保存し、データ更新は、データを消去し、消去されたデータを表Bに保存して直接読み取りを行う関数をトリガする。 プロセス全体は、柔軟でスケーラブルなサーバーレスwebアプリケーションです。
始める前に
制限事項
テーブルストアのトリガーは、中国 (北京) 、中国 (杭州) 、中国 (上海) 、中国 (深セン) 、日本 (東京) 、シンガポール、オーストラリア (シドニー) サービス終了、ドイツ (フランクフルト) 、中国 (香港) の各リージョンでサポートされています。
Tablestoreテーブルは、関連付けられているFunction Computeサービスと同じリージョンに存在する必要があります。
内部ネットワークを介してTablestoreトリガーに関連付けられている関数にアクセスする場合は、TablestoreのVPCエンドポイントを使用できます。これは、{instance}.{regio n}.vpc.tablestore.aliyuncs.comの形式です。 この場合、Tablestoreの内部エンドポイントは使用しないでください。
トリガーされた関数の実行時間は1分を超えることはできません。
使用上の注意
関数コードを記述するときの呼び出しループを避けます。 呼び出しループでは、テーブルAは関数BをトリガーしてテーブルAのデータを更新し、テーブルAは関数Bを再びトリガーします。
関数の実行中にエラーが発生した場合、Tablestoreのログデータの有効期限が切れるまで、関数は再試行を続けます。
手順1: データテーブルのストリーム機能を有効にする
トリガーを作成する前に、Tablestoreコンソールでデータテーブルのストリーム機能を有効にして、テーブルに書き込まれた増分データを関数が処理できるようにする必要があります。
Tablestoreコンソールにログインします。
上部のナビゲーションバーで、リージョンを選択します。
[概要] ページで、管理するインスタンスの名前をクリックするか、[操作] 列の [インスタンスの管理] をクリックします。
[インスタンスの詳細] タブの [テーブル] タブで、管理するデータテーブルの名前をクリックし、[トンネル] タブをクリックします。 または、アイコンをクリックし、[トンネル] をクリックします。
[トンネル] タブで、[ストリーム情報] セクションの [有効化] をクリックします。
[ストリームの有効化] ダイアログボックスで、Log Expiration Timeパラメーターを設定し、[有効化] をクリックします。
Log Expiration Timeパラメーターの値は、ゼロ以外の整数である必要があります。 単位:時間。 最大値: 168。
重要Log Expiration Timeパラメーターは、指定後は変更できません。 作業は慎重に行ってください。
ステップ2: Tablestoreトリガーの作成
Function Computeコンソール. にログインします。
左側のナビゲーションウィンドウで、サービス&機能をクリックします。
上部のナビゲーションバーで、リージョンを選択します。
[サービス] ページで目的のサービスを見つけ、[操作] 列の [関数] をクリックします。
[関数] ページで、管理する関数をクリックします。
関数の詳細ページで、[トリガー] タブをクリックし、[バージョンまたはエイリアス] ドロップダウンリストからバージョンまたはエイリアスを選択し、[トリガーの作成] をクリックします。
トリガーの作成パネルで、パラメータを設定し、OKをクリックします。
パラメーター
説明
例
トリガータイプ
[Tablestore] を選択します。
テーブルストア
パラメーター
カスタムトリガー名を入力します。
Tablestore-トリガー
バージョンまたはエイリアス
デフォルト値はLATESTです。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページのバージョンまたはエイリアスドロップダウンリストからバージョンまたはエイリアスを選択します。 バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
インスタンス
ドロップダウンリストからTablestoreインスタンスを選択します。
d00dd8xm ****
テーブル
ドロップダウンリストからテーブルを選択します。
mytable
ロール名
[AliyunTableStoreStreamNotificationRole] を選択します。
説明このタイプのトリガーを初めて作成する場合は、表示されるメッセージの [今すぐ許可] をクリックします。
AliyunTableStoreStreamNotificationRole
トリガーが作成されると、[トリガー] タブに表示されます。 既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。
ステップ3: 関数の入力パラメータを設定する
関数の詳細ページで、コードタブをクリックし、隣のアイコンテスト機能を選択し、テストパラメーターの設定ドロップダウンリストから
では、テストパラメーターの設定パネルを選択します。新しいテストイベントの作成または既存のテストイベントの変更タブ、設定イベント名をクリックし、コードエディターでイベントの内容をOK.
Tablestoreトリガーは、Concise Binary Object Representation (CBOR) 形式で増分データをエンコードして、Function Computeのイベントパラメーターを作成します。 次のサンプルコードに例を示します。
{ "Version": "Sync-v1", "Records": [ { "Type": "PutRow", "Info": { "Timestamp": 1506416585740836 }, "PrimaryKey": [ { "ColumnName": "pk_0", "Value": 1506416585881590900 }, { "ColumnName": "pk_1", "Value": "2017-09-26 17:03:05.8815909 +0800 CST" }, { "ColumnName": "pk_2", "Value": 1506416585741000 } ], "Columns": [ { "Type": "Put", "ColumnName": "attr_0", "Value": "hello_table_store", "Timestamp": 1506416585741 }, { "Type": "Put", "ColumnName": "attr_1", "Value": 1506416585881590900, "Timestamp": 1506416585741 } ] } ] }
次の表に、イベントのフィールドを示します。
項目
説明
バージョン
ペイロードのバージョン。 例: Sync-v1。 値は文字列です。
レコード
テーブル内の増分データの行を格納する配列。 このパラメータには、次のフィールドが含まれます。
Type: 行に対して実行される操作のタイプ。 有効な値: PutRow、UpdateRow、DeleteRow。 値は文字列です。
Info: 行が最後に変更された時刻を指定するTimestampパラメーターを含む、行に関する情報。 時刻は UTC である必要があります。 値のタイプは "Int64" です。
PrimaryKey
主キー列を格納する配列。 このパラメータには、次のフィールドが含まれます。
ColumnName: 主キー列の名前。 値は文字列です。
値: 主キー列の内容。 値はformated_value型で、Integer、String、またはBlobです。
列
属性列を格納する配列です。 このパラメータには、次のフィールドが含まれます。
タイプ: 属性列に対して実行される操作のタイプ。 有効な値: Put、DeleteOneVersion、およびDeleteAllVersions。 値は文字列です。
ColumnName: 属性列の名前。 値は文字列です。
値: 属性列の内容。 値はformatt_value型で、Integer、Boolean、Double、String、またはBlobのいずれかです。
Timestamp: 属性列が最後に変更された時刻。 時刻は UTC である必要があります。 値のタイプは "Int64" です。
ステップ4: 関数の書き込みとテスト
Tablestoreトリガーを作成した後、関数コードを記述し、関数をテストしてコードが正しいかどうかを確認できます。 Tablestoreのデータが更新されると、関数が自動的にトリガーされます。
関数の詳細ページで、コードタブで、コードエディターに関数コードを入力し、デプロイ.
この例では、関数コードはPythonで記述されています。 他のランタイム環境で関数を記述する方法の詳細については、「Tablestoreトリガーを使用したfunction Computeの呼び出し例」をご参照ください。
import logging import cbor import json def get_attribute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] def handler(event, context): logger = logging.getLogger() logger.info("Begin to handle event") #records = cbor.loads(event) records = json.loads(event) for record in records['Records']: logger.info("Handle record: %s", record) pk_0 = get_pk_value(record, "pk_0") attr_0 = get_attribute_value(record, "attr_0") return 'OK'
コードタブでテスト機能をクリックします。
関数の実行後、[コード] タブで結果を表示できます。
関連ドキュメント
特定のリージョンでTablestoreトリガーを作成できない場合は、そのリージョンでTablestoreトリガーがサポートされているかどうかを確認します。 詳細については、「制限事項」をご参照ください。
Tablestoreトリガーの作成時に作成されたTablestoreテーブルが見つからない場合は、テーブルが関連付けられているFunction Computeサービスと同じリージョンにあるかどうかを確認します。
ほとんどの場合、Tablestoreトリガーを使用したときにクライアントの呼び出しがキャンセルされたことを示すエラーが繰り返し報告された場合、クライアントでの関数実行に設定されたタイムアウト期間は関数実行時間よりも短くなります。 この場合、クライアントのタイムアウト期間を長くすることを推奨します。 詳細については、「クライアントが切断され、「クライアントによって呼び出しがキャンセルされました」というメッセージが報告された場合はどうすればよいですか? 」をご参照ください。
データがTablestoreテーブルに追加されているが、関連付けられているTablestoreトリガーがトリガーされていない場合は、次の手順を実行して問題のトラブルシューティングを行うことができます。 トリガー障害のトラブルシューティング方法の詳細については、「トリガーが関数の実行をトリガーできない場合はどうすればよいですか? 」をご参照ください。
テーブルに対してStream機能が有効になっているかどうかを確認します。 詳細については、「手順1: データテーブルのストリーム機能の有効化」をご参照ください。
トリガーの作成時に正しいロールが設定されているかどうかを確認します。 デフォルトのトリガーロール
AliyunTableStoreStreamNotificationRole
を使用できます。 詳細については、「手順2: Tablestoreトリガーの作成」をご参照ください。関数の実行ログを確認して、関数の実行に失敗したかどうかを確認します。 関数の実行に失敗した場合、Tablestoreのログデータが期限切れになるまで、関数は再試行されます。