すべてのプロダクト
Search
ドキュメントセンター

Function Compute:DTSトリガー

最終更新日:Jul 25, 2024

EventBridgeを使用してData Transmission Service (DTS) をFunction Computeと統合した後、DTSトリガーを使用してFunction Computeの関数を呼び出すことができます。 関数を使用して、DTS変更追跡タスクから取得したリアルタイムの増分データを処理できます。 このトピックでは、function ComputeコンソールでDTSトリガーを作成し、関数の入力パラメーターを設定し、関数コードを記述してテストする方法について説明します。

概要

Function Computeコンソールでトリガーを作成するリクエストを送信すると、トリガーの設定に基づいてEventBridgeにイベントストリームが自動的に作成されます。

トリガーの作成後、Function Computeコンソールでトリガーに関する情報を表示できます。 EventBridgeコンソールで、作成されたリソースに関する情報を表示することもできます。 DTS変更追跡タスクがデータベースの増分データをキャプチャした後、関連する関数が呼び出されます。 1つ以上のメッセージイベントがバッチで関数にプッシュされ、バッチ設定に基づいて処理されます。

使用上の注意

  • トリガーソースとして機能するDTS変更追跡タスクは、function Computeで呼び出される関数と同じリージョンに存在する必要があります。

  • 作成されたイベントストリームの数が上限に達すると、それ以上DTSトリガーは作成できません。 イベントストリーム数の制限の詳細については、「制限」をご参照ください。

前提条件

ステップ1: DTSトリガーの作成

  1. Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[関数] をクリックします。

  2. 上部のナビゲーションバーで、リージョンを選択します。 [関数] ページで、管理する関数をクリックします。

  3. 機能の詳細ページで、[設定] タブをクリックします。 左側のナビゲーションウィンドウで、[トリガー] をクリックします。 次に、[トリガーの作成] をクリックします。

  4. トリガーの作成パネルで、パラメーターを設定し、OKをクリックします。

    次の表に、基本的なパラメーターを示します。

    パラメーター

    説明

    トリガータイプ

    トリガーのタイプ。 サポートされているトリガータイプの詳細については、「トリガーの概要」をご参照ください。

    DTS

    名前

    トリガーの名前。

    dts-trigger

    バージョンまたはエイリアス

    トリガーのバージョンまたはエイリアス。 デフォルト値: LATEST。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅でバージョンまたはエイリアスを選択します。 バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。

    LATEST

    変更追跡タスク

    既存の変更追跡タスクの名前。

    dtsqntc2 ***

    消費者グループ

    変更追跡タスクのデータを消費するために作成した消費者グループの名前。

    重要

    コンシューマグループが1つのクライアントでのみ実行されるようにします。 そうしないと、指定された消費チェックポイントが無効になります。

    test

    [アカウント]

    コンシューマーグループを作成したときに指定されたアカウント。

    test

    Password

    コンシューマーグループを作成したときに指定したアカウントパスワード。

    ******

    消費者オフセット

    最初のデータエントリが消費される時間。 指定された消費チェックポイントは、変更追跡タスクのデータ範囲内にある必要があります。

    説明

    消費チェックポイントは、新しいコンシューマグループが初めて実行された場合にのみ有効になります。 後続のタスクが再開されると、以前の消費チェックポイントに基づいて消費が継続されます。

    2022-06-21 00:00:00

    呼び出しメソッド

    関数が呼び出されるモード。

    有効な値:

    • 同期呼び出し: このモードは、シーケンシャル呼び出しに適しています。 イベントまたはイベントのバッチが関数を呼び出すと、function Computeは関数を実行し、次のイベントまたはイベントのバッチを処理する前に応答を待ちます。 同期呼び出し要求のペイロードの上限は32 MBです。 詳細については、「」をご参照ください。

    • 非同期呼び出し: このモードでは、イベントをすばやく使用できます。 イベントまたはイベントのバッチが関数を呼び出すと、function Computeはすぐに応答を返し、次のイベントまたはイベントのバッチの処理を続行します。 このプロセス中、関数は非同期モードで実行されます。 非同期呼び出し要求のペイロードの上限は128 KBです。 詳細については、「概要」をご参照ください。

    同期呼び出し

    トリガー状態

    作成直後にトリガーを有効にするかどうかを指定します。 デフォルトでは、[トリガーの有効化] が選択されています。 トリガーは作成後すぐに有効になります。

    非該当

    プッシュ設定、再試行ポリシー、デットレターキューなどの高度な設定の詳細については、「」をご参照ください。

    トリガーが作成されると、[トリガー] タブに表示されます。 トリガーを変更または削除するには、「」をご参照ください。

ステップ2: 関数の入力パラメータを設定する

DTSイベントは、function Computeの関数を呼び出すために使用されます。 イベントのパラメーターは、関数の入力パラメーターとして使用されます。 イベントのパラメーターを手動で渡して、関数をテストとして呼び出すことができます。

  1. 関数の詳細ページの [コード] タブで、[テスト関数] の隣のimage.pngアイコンをクリックし、ドロップダウンリストから [テストパラメーターの設定] を選択します。

  2. [テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、イベント名とイベント内容を入力し、[OK] をクリックします。

    次のサンプルコードは、イベントコンテンツの形式の例を示しています。

    [
      {
        "data": {
          "id": 321****,
          "topicPartition": {
            "hash": 0,
            "partition": 0,
            "topic": "cn_hangzhou_rm_1234****_test_version2"
          },
          "offset": 3218099,
          "sourceTimestamp": 1654847757,
          "operationType": "UPDATE",
          "schema": {
            "recordFields": [
              {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            ],
            "nameIndex": {
              "id": {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              "topic": {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            },
            "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
            "databaseName": "hangzhou--test-db",
            "tableName": "message_info",
            "primaryIndexInfo": {
              "indexType": "PrimaryKey",
              "indexFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                }
              ],
              "cardinality": 0,
              "nullable": true,
              "isFirstUniqueIndex": false
            },
            "uniqueIndexInfo": [],
            "foreignIndexInfo": [],
            "normalIndexInfo": [],
            "databaseInfo": {
              "databaseType": "MySQL",
              "version": "5.7.35-log"
            },
            "totalRows": 0
          },
          "beforeImage": {
            "recordSchema": {
              "recordFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              ],
              "nameIndex": {
                "id": {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                "topic": {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              },
              "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
              "databaseName": "hangzhou-test-db",
              "tableName": "message_info",
              "primaryIndexInfo": {
                "indexType": "PrimaryKey",
                "indexFields": [
                  {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    104,
                    101,
                    108,
                    108,
                    111
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 9,
                    "capacity": 9,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 45
                  },
                    "afterImage": {
                    "recordSchema": {
                    "recordFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                    ],
                    "nameIndex": {
                    "id": {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    "topic": {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                  },
                    "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
                    "databaseName": "hangzhou-test-db",
                    "tableName": "message_info",
                    "primaryIndexInfo": {
                    "indexType": "PrimaryKey",
                    "indexFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    98,
                    121,
                    101
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 11,
                    "capacity": 11,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 47
                  }
                  },
        "id": "12f701a43741d404fa9a7be89d9acae0-321****",
        "source": "DTSstreamDemo",
        "specversion": "1.0",
        "type": "dts:ConsumeMessage",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-10T07:55:57Z",
        "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
      }
    ]

    CloudEvents仕様で定義されているパラメーターについては、「概要」をご参照ください。

    次の表に、dataフィールドに含まれるパラメーターを示します。

    パラメーター

    データ型

    説明

    id

    String

    DTSデータエントリのID。

    topicPartition

    配列

    イベントがプッシュされるトピックに関するパーティション情報。

    hash

    String

    DTSの基になるストレージパラメータ。

    partition

    String

    パーティション。

    topic

    String

    トピック名。

    offset

    Int

    DTSデータエントリのオフセット。

    sourceTimestamp

    Int

    DTSデータエントリがいつ生成されたかを示すタイムスタンプ。

    operationType

    String

    DTSデータエントリに含まれる操作のタイプ。

    schema

    配列

    データベースに関するスキーマ情報。

    recordFields

    配列

    フィールドの詳細。

    fieldName

    String

    フィールド名。

    rawDataTypeNum

    Int

    フィールド型のマップされた値。

    このパラメーターの値は、変更追跡インスタンスからの逆シリアル化された増分データのdataTypeNumberフィールドの値に対応します。 詳細については、「Kafkaクライアントを使用した追跡データの使用」をご参照ください。

    isPrimaryKey

    Boolean

    フィールドが主キーフィールドかどうかを示します。

    isUniqueKey

    Boolean

    フィールドに一意のキーがあるかどうかを示します。

    fieldPosition

    String

    フィールドの位置。

    nameIndex

    配列

    フィールド名に基づくフィールドに関するインデックス情報。

    schemaId

    String

    データベーススキーマのID。

    databaseName

    String

    データベース名。

    tableName

    String

    <td class="en-UStry align-left colsep-1 rowsep-1">テーブル名。</td>

    primaryIndexInfo

    String

    主キーインデックス。

    indexType

    String

    インデックスタイプ。

    indexFields

    配列

    インデックスが作成されるフィールド。

    cardinality

    String

    主キーのカーディナリティ。

    nullable

    Boolean

    プライマリキーをnullにできるかどうかを示します。

    isFirstUniqueIndex

    Boolean

    インデックスが最初の一意のインデックスであるかどうかを示します。

    uniqueIndexInfo

    String

    一意のインデックス。

    foreignIndexInfo

    String

    外部キーのインデックス。

    normalIndexInfo

    String

    通常のインデックス。

    databaseInfo

    配列

    データベースに関する情報。

    databaseType

    String

    データベースエンジン。

    version

    String

    データベースエンジンのバージョン。

    totalRows

    Int

    テーブル内の行の総数。

    beforeImage

    String

    操作が実行される前にフィールド値を記録するイメージ。

    values

    String

    記録されたフィールド値。

    size

    Int

    記録されたフィールドのサイズ。

    afterImage

    String

    操作が実行された後のフィールド値を記録するイメージ。

ステップ3: 関数コードの書き込みとテスト

トリガーを作成した後、関数コードを記述し、関数コードをテストして、コードが有効かどうかを確認できます。 DTS変更追跡タスクがデータベースの増分データをキャプチャすると、トリガーは自動的に関数を呼び出します。

  1. 関数の詳細ページで、コードタブで、コードエディターに関数コードを入力し、デプロイ をクリックします。

    この例では、関数コードはNode.jsで記述されています。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. テスト機能 をクリックします。

詳細情報

Function Computeコンソールに加えて、次のいずれかの方法を使用してトリガーを設定できます。

  • Serverless Devsツールを使用してトリガーを設定します。 その他の操作については、を参照してください。

  • SDKを使用してトリガーを設定します。 詳細は、SDK をご参照ください。

既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。