Data Transmission Service (DTS) は、イベントソースとしてEventBridgeと統合できます。 DTSがFunction Computeと統合された後、DTSトリガーを使用して、Function Computeでの関数の実行をトリガーできます。 関数を使用して、DTS変更追跡タスクから取得したリアルタイムの増分データベースデータを処理できます。 このトピックでは、Function ComputeコンソールでDTSトリガーを作成し、入力パラメーターを設定し、コードを記述してテストする方法について説明します。
概要
Function Computeコンソールでトリガーを作成するリクエストを送信すると、トリガー設定に基づいてEventBridge側にイベントストリームリソースが作成されます。
トリガーの作成後、Function Computeコンソールでトリガーに関する情報を表示できます。 EventBridgeコンソールで自動的に作成されたリソースに関する情報を表示することもできます。 DTS変更追跡タスクがデータベースの増分データをキャプチャすると、関数の実行がトリガーされ、1つ以上のメッセージイベントがバッチ構成に基づいて処理するためにバッチで関数にプッシュされます。
使用上の注意
トリガーソースとして使用されるDTS変更追跡タスクは、function Computeの関数と同じリージョンにある必要があります。
イベントストリームの作成数が上限に達すると、DTSトリガーを作成できません。 イベントストリームの制限の詳細については、「制限」をご参照ください。
始める前に
EventBridge
Function Compute
DTS
ステップ1: DTSトリガーの作成
Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[サービスと機能] をクリックします。
上部のナビゲーションバーで、リージョンを選択します。 [サービス] ページで、目的のサービスをクリックします。
[関数] ページで、管理する関数をクリックします。
関数の詳細ページで、[トリガー] タブをクリックし、[バージョンまたはエイリアス] ドロップダウンリストからバージョンまたはエイリアスを選択し、[トリガーの作成] をクリックします。
[トリガーの作成] パネルで、関連パラメーターを指定します。 パラメーターを指定したら、[OK] をクリックします。
次の表に、基本的なパラメーターを示します。
パラメーター
説明
例
トリガータイプ
トリガーのタイプ。 サポートされているトリガータイプの詳細については、「トリガーの概要」をご参照ください。
DTS
名前
トリガーの名前。
dts-trigger
バージョンまたはエイリアス
デフォルト値はLATESTです。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅でバージョンまたはエイリアスを選択します。 サービスのバージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
変更追跡タスク
変更追跡タスクの名前。
dtsqntc2 ***
消費者グループ
変更追跡タスクのデータを消費するために作成した消費者グループの名前。
重要コンシューマーグループが1つのクライアントでのみ実行されていることを確認します。 そうしないと、指定されたコンシューマオフセットが無効になります。
test
[アカウント]
コンシューマグループの作成時に指定されたアカウント名。
test
Password
コンシューマーグループの作成時に指定されたアカウントパスワード。
******
消費者オフセット
消費される最初のデータエントリのタイムスタンプ。 コンシューマオフセットで指定されたデータエントリは、変更追跡タスクのデータ範囲内にある必要があります。
説明コンシューマオフセットは、新しいコンシューマグループが最初に実行されたときにのみ有効になります。 後続のタスクで再起動が実行された場合、最後のコンシューマオフセットに基づいて消費が続行されます。
2022-06-21 00:00:00
呼び出しメソッド
関数を呼び出すメソッドを選択します。
有効な値:
同期呼び出し: このモードは、シーケンシャル呼び出しに適しています。 イベントまたはイベントのバッチが関数をトリガーすると、function Computeは関数を実行し、次のイベントまたはイベントのバッチを処理する前に応答を待ちます。 同期呼び出し要求のペイロードの上限は32 MBです。 詳細については、「同期呼び出し」をご参照ください。
非同期呼び出し: このモードでは、イベントをすばやく使用できます。 単一のイベントまたはイベントのバッチが関数をトリガーすると、function Computeはすぐに応答を返し、次のイベントまたはイベントのバッチの処理を続行します。このプロセス中、関数は非同期モードで実行されます。 非同期呼び出し要求のペイロードの上限は128 KBです。 詳細については、「概要」をご参照ください。
同期呼び出し
トリガー状態
作成後にトリガーを有効にするかどうかを指定します。 デフォルトでは、[トリガーの有効化] が選択され、作成後にトリガーが有効になります。
非該当
メッセージのプッシュ、再試行、デッドレター設定などの高度な設定の詳細については、「トリガーの高度な機能」をご参照ください。
トリガーが作成されると、[トリガー] タブに表示されます。 既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。
ステップ2: 関数の入力パラメータを設定する
DTSイベントソースは、入力パラメーターとして機能するevent
の形式で関数に渡されます。 手動でイベント
を関数に渡して、関数をトリガーできます。
機能の詳細ページで、[コード] タブをクリックし、アイコンをクリックします。 表示されるドロップダウンリストから、[テストパラメーターの設定] を選択します。
[テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、[イベント名] とイベントの内容を指定します。 パラメーターを指定したら、[OK] をクリックします。
event
のサンプルコード:[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]
CloudEvents仕様で定義されているパラメーターについては、「概要」をご参照ください。
次の表に、dataフィールドに含まれるパラメーターを示します。
パラメーター
データ型
説明
id
String
DTSデータエントリのID。
topicPartition
配列
イベントがプッシュされるトピックに関するパーティション情報。
ハッシュ
String
DTSの基になるストレージパラメータ。
パーティション
String
パーティション。
トピック
String
トピック名。
オフセット
Int
DTSデータエントリのオフセット。
sourceTimestamp
Int
DTSデータエントリがいつ生成されたかを示すタイムスタンプ。
operationType
String
DTSデータエントリに含まれる操作のタイプ。
スキーマ
配列
データベースに関するスキーマ情報。
recordFields
配列
フィールドの詳細。
fieldName
String
フィールド名。
rawDataTypeNum
Int
フィールド型のマップされた値。
このパラメーターの値は、変更追跡インスタンスからの逆シリアル化された増分データのdataTypeNumberフィールドの値に対応します。 詳細については、「Kafkaクライアントを使用した追跡データの使用」をご参照ください。
isPrimaryKey
Boolean
フィールドが主キーフィールドかどうかを示します。
isUniqueKey
Boolean
フィールドに一意のキーがあるかどうかを示します。
fieldPosition
String
フィールドの位置。
nameIndex
配列
フィールド名に基づくフィールドに関するインデックス情報。
schemaId
String
データベーススキーマのID。
databaseName
String
データベース名。
tableName
String
<td class="en-UStry align-left colsep-1 rowsep-1">テーブル名。</td>
primaryIndexInfo
String
主キーインデックス。
indexType
String
インデックスタイプ。
indexFields
配列
インデックスが作成されるフィールド。
カーディナリティ
String
主キーのカーディナリティ。
nullable
Boolean
プライマリキーをnullにできるかどうかを示します。
isFirstUniqueIndex
Boolean
インデックスが最初の一意のインデックスであるかどうかを示します。
uniqueIndexInfo
String
一意のインデックス。
foreignIndexInfo
String
外部キーのインデックス。
normalIndexInfo
String
通常のインデックス。
databaseInfo
配列
データベースに関する情報。
databaseType
String
データベースエンジン。
バージョン
String
データベースエンジンのバージョン。
totalRows
Int
テーブル内の行の総数。
beforeImage
String
操作が実行される前にフィールド値を記録するイメージ。
値
String
記録されたフィールド値。
サイズ
Int
記録されたフィールドのサイズ。
afterImage
String
操作が実行された後のフィールド値を記録するイメージ。
ステップ3: 関数の書き込みとテスト
トリガーを作成した後、関数コードを記述し、関数をテストしてコードが正しいかどうかを確認できます。 実際のシナリオでは、DTS変更追跡タスクがデータベースの増分データをキャプチャすると、トリガーによって関数の実行が自動的にトリガーされます。
関数の詳細ページで、コードタブで、コードエディターで関数コードを編集し、デプロイ.
以下では、Node.js関数コードを例として使用します。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // Parse the event parameters and process the event. callback(null, 'return result'); }
コードタブで、テスト機能 をクリックします。
関数の実行後、[コード] タブで結果を表示できます。
関連ドキュメント
Function Computeコンソールに加えて、次の方法を使用してトリガーを設定できます。
Serverless Devsを使用してトリガーを設定します。 詳細については、「Serverless Devs」をご参照ください。
SDKを使用してトリガーを設定します。 詳細は、SDK をご参照ください。
既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。