Table Store トリガー
Table Store は、Alibaba Cloud の Apsara 分散システム上に構築された分散 NoSQL データストレージサービスであり、データシャーディングとロードバランシング技術を通じて、同時アクセスでのデータスケーリングをシームレスに実現でき、大規模な構造化データへのストレージとリアルタイムのアクセスを提供します。Table Store は、Function Compute (FC) と統合されており、FC にアクセスするためのイベントソースとして使用されます。Table Store 内のデータが変更されると、データ変更情報をパラメーターとして関数の実行がトリガーされます。
上の図に示す典型的なユースケースのシナリオでは、元のソースデータが元のテーブル A に格納され、A のデータ変更情報に従ってカスタムデータクリーニングが実行されます。クリーニングされたデータはテーブル B に格納され、ユーザーが直接テーブル B のデータを読み込んで表示させると、柔軟でスケーラブルなサーバーレス Web アプリケーションを完成させることができます。
初めて Table Store トリガーを使用する場合は、リージョン制限と注意事項を必ずお読みください。
リージョン制限
現在、 Table Store トリガーをサポートしているリージョン
中国 (北京)、中国 (杭州)、中国 (上海)、中国 (深セン)、日本 (東京)、シンガポール、オーストラリア (シドニー)、ドイツ (フランクフルト),中国 (香港)
注意事項
無限ループの状態を避けるように注意してください。
ユーザーが関数を作成するときは、次のようなロジックがないように注意してください。Table Store テーブル A が 関数 B をトリガーし、関数 B がテーブル A のデータを更新することにより、関数が無限ループで呼び出される現象。
Function Compute が VPC ネットワーク上にある場合
現在、作成した関数は Table Store のイントラネットエンドポイントを使用することができず、internal endpoint で Table Store サービスへアクセスすることができません。この問題は 2019 年初頭に解決する予定です。
Table Store トリガーイベントの定義
データ形式
Table Store トリガーは、 CBOR 形式で増分データをエンコードして Function Compute の Event を構成します。増分データの具体的なデータ形式は次のとおりです。
{
"Version": "string",
"Records": [
{
"Type": "string",
"Info": {
"Timestamp": int64
},
"PrimaryKey": [
{
"ColumnName": "string",
"Value": formated_value
}
],
"Columns": [
{
"Type": "string",
"ColumnName": "string",
"Value": formated_value,
"Timestamp": int64
}
]
}
]
}
- Version
- 意味: payload のバージョン。現在は Sync-v1 です。
- データ型: string
- Records
- 意味: データテーブル内の増分データの行配列。
- 内部メンバ:
- Type
- 意味: データ行タイプ PutRow 、UpdateRow および DeleteRow
- データ型: string
- Info
- 意味: データ行の基本情報。
- 内部メンバ:
- Timestamp
- 含义: その行の最終修正 UTC 時刻
- データ型: int64
- PrimaryKey
- 意味: 主キー列配列
- 内部メンバ
- ColumnName
- 意味: 主キー列名
- データ型: string
- Value
- 意味: 主キー
- データ型: formated_value で、 integer、string および blob をサポートします。
- ColumnName
- Colunms
- 意味: プロパティ列の配列
- 内部メンバ
- Type
- 意味: プロパティ列のタイプ。 Put、DeleteOneVersion および DeleteAllVersions があります。
- 类型: string
- ColumnName
- 意味: プロパティ列名
- 类型: string
- Value
- 意味: プロパティ列の値
- データ型: formated_value で、 integer、boolean、double、string および blob をサポートします。
- Timestamp
- 意味: プロパティー列の最終変更 UTC 時刻
- データ型: int64
- Type
- Type
データの例
{
"Version": "Sync-v1",
"Records": [
{
"Type": "PutRow",
"Info": {
"Timestamp": 1506416585740836
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_1",
"Value": "2017-09-26 17:03:05.8815909 +0800 CST"
},
{
"ColumnName": "pk_2",
"Value": 1506416585741000
}
],
"Columns": [
{
"Type": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
"Timestamp": 1506416585741
},
{
"Type": "Put",
"ColumnName": "attr_1",
"Value": 1506416585881590900,
"Timestamp": 1506416585741
}
]
}
]
}
Table Store トリガーの設定
トリガーの例:tablestore_trigger.yml
triggerConfig:
トリガーのパラメーターの説明:なし
使用例
関数の Table Store トリガーは、Function Compute コンソール、コマンドラインツール fcli および SDK の 3 つの方法で設定できます。以下に、3 つの方法をそれぞれ紹介します。
以下の関数例は python で書かれています。他のランタイムを使用する場合は、 Table Store で Function Compute をトリガーする例 Nodejs/Php/Java Runtime をご参照ください。
例1: コンソールで Table Store トリガーの作成
この例では、コンソールを使用して RDS トリガーを設定する方法を示します。トリガーの設定は関数の作成時でも作成後でも可能です。トリガーとその作成について不明点がある場合は、イベントソースサービスの使用および トリガーの作成をご参照ください。
まず Function Compute コンソールにログインし、適切なリージョンとサービスを選択します。新しいサービスを作成していない場合は、サービスの作成をご参照ください。
関数作成時にトリガーを設定
[関数の作成] をクリックして [関数の作成] ページに移動し、 空白の関数を選択して [次へ] をクリックします。
Table Store トリガーを選択して、以下のように設定します。
関数を作成して適切な情報を入力し、オンライン編集を選択して次の python ランタイムの関数サンプルコードを貼り付けて、[次へ] をクリックします。
import logging
import cbor
import json
def get_attrbute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
def handler(event, context):
logger = logging.getLogger()
logger.info("Begin to handle event")
#records = cbor.loads(event)
records = json.loads(event)
for record in records['Records']:
logger.info("Handle record: %s", record)
pk_0 = get_pk_value(record, "pk_0")
attr_0 = get_attrbute_value(record, "attr_0")
return 'OK'
権限(オプション)を設定します。 [次へ] をクリックし、 入力情報が正しいことを確認したら、 [作成] をクリックします。
オンラインデバッグを行います。Table Store トリガー関数サービスのイベントは CBOR 形式なので、データ形式は JSON のようなバイナリ形式であり、次のようにオンラインでデバッグできます。
- コードに cbor と json を同時に import します。
- トリガーイベントをクリックして [カスタマイズ] を選択し、上記のデータ例の JSON ファイルを編集ボックスに貼り付け、実際の状況に応じて変更して保存します。
- コードでは、まず records = json.loads(event) を使用してカスタマイズしたテストトリガーイベントに対して [実行] をクリックしてそれが期待どおりになるかどうかを確認します。
- records=json.loads(event) でテスト結果が OK になると、records = cbor.loads(event) に変更されて保存されます。Table Store にデータが書き込まれると、関連する関数ロジックがトリガーされます。
関数が作成された後にトリガーを設定
- 既存の関数を選択して[トリガー] > [トリガーの作成] の順でクリックします。
- トリガーの作成ページで次のように、Table Store トリガーを設定して [OK] をクリックします。
注意:細かい粒度での権限付与を考慮する必要がない場合は、直接
クイック承認
を実行できます。
例 2: fcli で TableStore トリガーの作成
まず、Trigger Config を含む yaml ファイルを作成します。その yaml ファイルの内容は次のとおりです。
triggerConfig:
対応する fuction のディレクトリの下にトリガーを作成します。
mkt triggerName serviceName/functionName -t tablestore -c TriggerConfig.yaml -r acs:ram::$accountId:role/aliyuntablestorestreamnotificationrole -s acs:mns:cn-hangzhou:$accountId:/topics/$topicName
-r
—invocation-role string トリガーのロールを設定します。-s
—source-arn string イベントソースのリソースシンボル。例:acs:ots:region:$accountId:instance/$instanceName]/table/$tableName。-c
—trigger-config string トリガー設定ファイルを設定します。-t
—type string トリガーのタイプ。
fcli の使用方法については、fcli をご参照ください。
例 3: SDK プログラミングで Table Store トリガーの作成
fc-python-sdk を例として、SDK で Table Store トリガーを作成する方法を説明します。Function Compute は、 fc-nodejs-sdk, fc-java-sdk, fc-php-sdk も提供しています。
トリガー作成コード
client = fc2.Client(
endpoint='<Your Endpoint>',
accessKeyID='<Your AccessKeyID>',
accessKeySecret='<Your AccessKeySecret>')
service_name = 'serviceName'
function_name = 'functionName'
trigger_name = 'triggerName'
trigger_type = 'tablestore'
source_arn = 'acs:ots:region:<Your Account ID>:instance/<Your instanceName>/table/<Your tableName>'
invocation_role = 'acs:ram::<Your Account ID>:role/<Your Invocation Role>'
trigger_config = {}
client.create_trigger(service_name, function_name, trigger_name, trigger_type, trigger_config, source_arn, invocation_role)
関数コード
import logging
import cbor
import json
def get_attrbute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
def handler(event, context):
logger = logging.getLogger()
logger.info("Begin to handle event")
#records = cbor.loads(event)
records = json.loads(event)
for record in records['Records']:
logger.info("Handle record: %s", record)
pk_0 = get_pk_value(record, "pk_0")
attr_0 = get_attrbute_value(record, "attr_0")
return 'OK'