EventBridgeを使用してData Transmission Service (DTS) をFunction Computeと統合した後、DTSトリガーを使用してFunction Computeの関数を呼び出すことができます。 関数を使用して、DTS変更追跡タスクから取得したリアルタイムの増分データを処理できます。 このトピックでは、function ComputeコンソールでDTSトリガーを作成し、関数の入力パラメーターを設定し、関数コードを記述してテストする方法について説明します。
概要
Function Computeコンソールでトリガーを作成するリクエストを送信すると、トリガーの設定に基づいてEventBridgeにイベントストリームが自動的に作成されます。
トリガーの作成後、Function Computeコンソールでトリガーに関する情報を表示できます。 EventBridgeコンソールで、作成されたリソースに関する情報を表示することもできます。 DTS変更追跡タスクがデータベースの増分データをキャプチャした後、関連する関数が呼び出されます。 1つ以上のメッセージイベントがバッチで関数にプッシュされ、バッチ設定に基づいて処理されます。
使用上の注意
トリガーソースとして機能するDTS変更追跡タスクは、function Computeで呼び出される関数と同じリージョンに存在する必要があります。
作成されたイベントストリームの数が上限に達すると、それ以上DTSトリガーは作成できません。 イベントストリーム数の制限の詳細については、「制限」をご参照ください。
前提条件
EventBridge
EventBridgeがアクティブ化され、必要な権限がResource Access Management (RAM) ユーザーに付与されます。 詳細については、「EventBridgeの有効化とRAMユーザーへの権限付与」をご参照ください。
Function Compute
関数が作成されます。 詳細については、「関数の管理」トピックの関数の作成セクションをご参照ください。
DTS
変更追跡タスクが作成されます。 詳細については、「変更追跡タスクの管理」をご参照ください。
コンシューマグループが作成されます。 詳細については、「コンシューマーグループの作成」をご参照ください。
ステップ1: DTSトリガーの作成
Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[関数] をクリックします。
上部のナビゲーションバーで、リージョンを選択します。 [関数] ページで、管理する関数をクリックします。
機能の詳細ページで、[設定] タブをクリックします。 左側のナビゲーションウィンドウで、[トリガー] をクリックします。 次に、[トリガーの作成] をクリックします。
トリガーの作成パネルで、パラメーターを設定し、OKをクリックします。
次の表に、基本的なパラメーターを示します。
パラメーター
説明
例
トリガータイプ
トリガーのタイプ。 サポートされているトリガータイプの詳細については、「トリガーの概要」をご参照ください。
DTS
名前
トリガーの名前。
dts-trigger
バージョンまたはエイリアス
トリガーのバージョンまたはエイリアス。 デフォルト値: LATEST。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅でバージョンまたはエイリアスを選択します。 バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
変更追跡タスク
既存の変更追跡タスクの名前。
dtsqntc2 ***
消費者グループ
変更追跡タスクのデータを消費するために作成した消費者グループの名前。
重要コンシューマグループが1つのクライアントでのみ実行されるようにします。 そうしないと、指定された消費チェックポイントが無効になります。
test
[アカウント]
コンシューマーグループを作成したときに指定されたアカウント。
test
Password
コンシューマーグループを作成したときに指定したアカウントパスワード。
******
消費者オフセット
最初のデータエントリが消費される時間。 指定された消費チェックポイントは、変更追跡タスクのデータ範囲内にある必要があります。
説明消費チェックポイントは、新しいコンシューマグループが初めて実行された場合にのみ有効になります。 後続のタスクが再開されると、以前の消費チェックポイントに基づいて消費が継続されます。
2022-06-21 00:00:00
呼び出しメソッド
関数が呼び出されるモード。
有効な値:
同期呼び出し: このモードは、シーケンシャル呼び出しに適しています。 イベントまたはイベントのバッチが関数を呼び出すと、function Computeは関数を実行し、次のイベントまたはイベントのバッチを処理する前に応答を待ちます。 同期呼び出し要求のペイロードの上限は32 MBです。 詳細については、「」をご参照ください。
非同期呼び出し: このモードでは、イベントをすばやく使用できます。 イベントまたはイベントのバッチが関数を呼び出すと、function Computeはすぐに応答を返し、次のイベントまたはイベントのバッチの処理を続行します。 このプロセス中、関数は非同期モードで実行されます。 非同期呼び出し要求のペイロードの上限は128 KBです。 詳細については、「概要」をご参照ください。
同期呼び出し
トリガー状態
作成直後にトリガーを有効にするかどうかを指定します。 デフォルトでは、[トリガーの有効化] が選択されています。 トリガーは作成後すぐに有効になります。
非該当
ステップ2: 関数の入力パラメータを設定する
DTSイベントは、function Computeの関数を呼び出すために使用されます。 イベント
のパラメーターは、関数の入力パラメーターとして使用されます。 イベント
のパラメーターを手動で渡して、関数をテストとして呼び出すことができます。
関数の詳細ページの [コード] タブで、[テスト関数] の隣のアイコンをクリックし、ドロップダウンリストから [テストパラメーターの設定] を選択します。
[テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、イベント名とイベント内容を入力し、[OK] をクリックします。
次のサンプルコードは、
イベント
コンテンツの形式の例を示しています。[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]
CloudEvents仕様で定義されているパラメーターについては、「概要」をご参照ください。
次の表に、dataフィールドに含まれるパラメーターを示します。
パラメーター
データ型
説明
id
String
DTSデータエントリのID。
topicPartition
配列
イベントがプッシュされるトピックに関するパーティション情報。
hash
String
DTSの基になるストレージパラメータ。
partition
String
パーティション。
topic
String
トピック名。
offset
Int
DTSデータエントリのオフセット。
sourceTimestamp
Int
DTSデータエントリがいつ生成されたかを示すタイムスタンプ。
operationType
String
DTSデータエントリに含まれる操作のタイプ。
schema
配列
データベースに関するスキーマ情報。
recordFields
配列
フィールドの詳細。
fieldName
String
フィールド名。
rawDataTypeNum
Int
フィールド型のマップされた値。
このパラメーターの値は、変更追跡インスタンスからの逆シリアル化された増分データのdataTypeNumberフィールドの値に対応します。 詳細については、「Kafkaクライアントを使用した追跡データの使用」をご参照ください。
isPrimaryKey
Boolean
フィールドが主キーフィールドかどうかを示します。
isUniqueKey
Boolean
フィールドに一意のキーがあるかどうかを示します。
fieldPosition
String
フィールドの位置。
nameIndex
配列
フィールド名に基づくフィールドに関するインデックス情報。
schemaId
String
データベーススキーマのID。
databaseName
String
データベース名。
tableName
String
<td class="en-UStry align-left colsep-1 rowsep-1">テーブル名。</td>
primaryIndexInfo
String
主キーインデックス。
indexType
String
インデックスタイプ。
indexFields
配列
インデックスが作成されるフィールド。
cardinality
String
主キーのカーディナリティ。
nullable
Boolean
プライマリキーをnullにできるかどうかを示します。
isFirstUniqueIndex
Boolean
インデックスが最初の一意のインデックスであるかどうかを示します。
uniqueIndexInfo
String
一意のインデックス。
foreignIndexInfo
String
外部キーのインデックス。
normalIndexInfo
String
通常のインデックス。
databaseInfo
配列
データベースに関する情報。
databaseType
String
データベースエンジン。
version
String
データベースエンジンのバージョン。
totalRows
Int
テーブル内の行の総数。
beforeImage
String
操作が実行される前にフィールド値を記録するイメージ。
values
String
記録されたフィールド値。
size
Int
記録されたフィールドのサイズ。
afterImage
String
操作が実行された後のフィールド値を記録するイメージ。
ステップ3: 関数コードの書き込みとテスト
トリガーを作成した後、関数コードを記述し、関数コードをテストして、コードが有効かどうかを確認できます。 DTS変更追跡タスクがデータベースの増分データをキャプチャすると、トリガーは自動的に関数を呼び出します。
関数の詳細ページで、コードタブで、コードエディターに関数コードを入力し、デプロイ をクリックします。
この例では、関数コードはNode.jsで記述されています。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // Parse the event parameters and process the event. callback(null, 'return result'); }
テスト機能 をクリックします。
詳細情報
Function Computeコンソールに加えて、次のいずれかの方法を使用してトリガーを設定できます。
SDKを使用してトリガーを設定します。 詳細は、SDK をご参照ください。
既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。