すべてのプロダクト
Search
ドキュメントセンター

Function Compute:DTSトリガー

最終更新日:Sep 09, 2024

Data Transmission Service (DTS) は、イベントソースとしてEventBridgeと統合できます。 DTSがFunction Computeと統合された後、DTSトリガーを使用して、Function Computeでの関数の実行をトリガーできます。 関数を使用して、DTS変更追跡タスクから取得したリアルタイムの増分データベースデータを処理できます。 このトピックでは、Function ComputeコンソールでDTSトリガーを作成し、入力パラメーターを設定し、コードを記述してテストする方法について説明します。

概要

Function Computeコンソールでトリガーを作成するリクエストを送信すると、トリガー設定に基づいてEventBridge側にイベントストリームリソースが作成されます。

トリガーの作成後、Function Computeコンソールでトリガーに関する情報を表示できます。 EventBridgeコンソールで自動的に作成されたリソースに関する情報を表示することもできます。 DTS変更追跡タスクがデータベースの増分データをキャプチャすると、関数の実行がトリガーされ、1つ以上のメッセージイベントがバッチ構成に基づいて処理するためにバッチで関数にプッシュされます。

使用上の注意

  • トリガーソースとして使用されるDTS変更追跡タスクは、function Computeの関数と同じリージョンにある必要があります。

  • イベントストリームの作成数が上限に達すると、DTSトリガーを作成できません。 イベントストリームの制限の詳細については、「制限」をご参照ください。

始める前に

ステップ1: DTSトリガーの作成

  1. Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[サービスと機能] をクリックします。

  2. 上部のナビゲーションバーで、リージョンを選択します。 [サービス] ページで、目的のサービスをクリックします。

  3. [関数] ページで、管理する関数をクリックします。

  4. 関数の詳細ページで、[トリガー] タブをクリックし、[バージョンまたはエイリアス] ドロップダウンリストからバージョンまたはエイリアスを選択し、[トリガーの作成] をクリックします。

  5. [トリガーの作成] パネルで、関連パラメーターを指定します。 パラメーターを指定したら、[OK] をクリックします。

    次の表に、基本的なパラメーターを示します。

    パラメーター

    説明

    トリガータイプ

    トリガーのタイプ。 サポートされているトリガータイプの詳細については、「トリガーの概要」をご参照ください。

    DTS

    名前

    トリガーの名前。

    dts-trigger

    バージョンまたはエイリアス

    デフォルト値はLATESTです。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅でバージョンまたはエイリアスを選択します。 サービスのバージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。

    LATEST

    変更追跡タスク

    変更追跡タスクの名前。

    dtsqntc2 ***

    消費者グループ

    変更追跡タスクのデータを消費するために作成した消費者グループの名前。

    重要

    コンシューマーグループが1つのクライアントでのみ実行されていることを確認します。 そうしないと、指定されたコンシューマオフセットが無効になります。

    test

    [アカウント]

    コンシューマグループの作成時に指定されたアカウント名。

    test

    Password

    コンシューマーグループの作成時に指定されたアカウントパスワード。

    ******

    消費者オフセット

    消費される最初のデータエントリのタイムスタンプ。 コンシューマオフセットで指定されたデータエントリは、変更追跡タスクのデータ範囲内にある必要があります。

    説明

    コンシューマオフセットは、新しいコンシューマグループが最初に実行されたときにのみ有効になります。 後続のタスクで再起動が実行された場合、最後のコンシューマオフセットに基づいて消費が続行されます。

    2022-06-21 00:00:00

    呼び出しメソッド

    関数を呼び出すメソッドを選択します。

    有効な値:

    • 同期呼び出し: このモードは、シーケンシャル呼び出しに適しています。 イベントまたはイベントのバッチが関数をトリガーすると、function Computeは関数を実行し、次のイベントまたはイベントのバッチを処理する前に応答を待ちます。 同期呼び出し要求のペイロードの上限は32 MBです。 詳細については、「同期呼び出し」をご参照ください。

    • 非同期呼び出し: このモードでは、イベントをすばやく使用できます。 単一のイベントまたはイベントのバッチが関数をトリガーすると、function Computeはすぐに応答を返し、次のイベントまたはイベントのバッチの処理を続行します。このプロセス中、関数は非同期モードで実行されます。 非同期呼び出し要求のペイロードの上限は128 KBです。 詳細については、「概要」をご参照ください。

    同期呼び出し

    トリガー状態

    作成後にトリガーを有効にするかどうかを指定します。 デフォルトでは、[トリガーの有効化] が選択され、作成後にトリガーが有効になります。

    非該当

    メッセージのプッシュ、再試行、デッドレター設定などの高度な設定の詳細については、「トリガーの高度な機能」をご参照ください。

    トリガーが作成されると、[トリガー] タブに表示されます。 既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。

ステップ2: 関数の入力パラメータを設定する

DTSイベントソースは、入力パラメーターとして機能するeventの形式で関数に渡されます。 手動でイベントを関数に渡して、関数をトリガーできます。

  1. 機能の詳細ページで、[コード] タブをクリックし、xialatubiaoアイコンをクリックします。 表示されるドロップダウンリストから、[テストパラメーターの設定] を選択します。

  2. [テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、[イベント名] とイベントの内容を指定します。 パラメーターを指定したら、[OK] をクリックします。

    eventのサンプルコード:

    [
      {
        "data": {
          "id": 321****,
          "topicPartition": {
            "hash": 0,
            "partition": 0,
            "topic": "cn_hangzhou_rm_1234****_test_version2"
          },
          "offset": 3218099,
          "sourceTimestamp": 1654847757,
          "operationType": "UPDATE",
          "schema": {
            "recordFields": [
              {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            ],
            "nameIndex": {
              "id": {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              "topic": {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            },
            "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
            "databaseName": "hangzhou--test-db",
            "tableName": "message_info",
            "primaryIndexInfo": {
              "indexType": "PrimaryKey",
              "indexFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                }
              ],
              "cardinality": 0,
              "nullable": true,
              "isFirstUniqueIndex": false
            },
            "uniqueIndexInfo": [],
            "foreignIndexInfo": [],
            "normalIndexInfo": [],
            "databaseInfo": {
              "databaseType": "MySQL",
              "version": "5.7.35-log"
            },
            "totalRows": 0
          },
          "beforeImage": {
            "recordSchema": {
              "recordFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              ],
              "nameIndex": {
                "id": {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                "topic": {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              },
              "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
              "databaseName": "hangzhou-test-db",
              "tableName": "message_info",
              "primaryIndexInfo": {
                "indexType": "PrimaryKey",
                "indexFields": [
                  {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    104,
                    101,
                    108,
                    108,
                    111
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 9,
                    "capacity": 9,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 45
                  },
                    "afterImage": {
                    "recordSchema": {
                    "recordFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                    ],
                    "nameIndex": {
                    "id": {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    "topic": {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                  },
                    "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
                    "databaseName": "hangzhou-test-db",
                    "tableName": "message_info",
                    "primaryIndexInfo": {
                    "indexType": "PrimaryKey",
                    "indexFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    98,
                    121,
                    101
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 11,
                    "capacity": 11,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 47
                  }
                  },
        "id": "12f701a43741d404fa9a7be89d9acae0-321****",
        "source": "DTSstreamDemo",
        "specversion": "1.0",
        "type": "dts:ConsumeMessage",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-10T07:55:57Z",
        "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
      }
    ]

    CloudEvents仕様で定義されているパラメーターについては、「概要」をご参照ください。

    次の表に、dataフィールドに含まれるパラメーターを示します。

    パラメーター

    データ型

    説明

    id

    String

    DTSデータエントリのID。

    topicPartition

    配列

    イベントがプッシュされるトピックに関するパーティション情報。

    ハッシュ

    String

    DTSの基になるストレージパラメータ。

    パーティション

    String

    パーティション。

    トピック

    String

    トピック名。

    オフセット

    Int

    DTSデータエントリのオフセット。

    sourceTimestamp

    Int

    DTSデータエントリがいつ生成されたかを示すタイムスタンプ。

    operationType

    String

    DTSデータエントリに含まれる操作のタイプ。

    スキーマ

    配列

    データベースに関するスキーマ情報。

    recordFields

    配列

    フィールドの詳細。

    fieldName

    String

    フィールド名。

    rawDataTypeNum

    Int

    フィールド型のマップされた値。

    このパラメーターの値は、変更追跡インスタンスからの逆シリアル化された増分データのdataTypeNumberフィールドの値に対応します。 詳細については、「Kafkaクライアントを使用した追跡データの使用」をご参照ください。

    isPrimaryKey

    Boolean

    フィールドが主キーフィールドかどうかを示します。

    isUniqueKey

    Boolean

    フィールドに一意のキーがあるかどうかを示します。

    fieldPosition

    String

    フィールドの位置。

    nameIndex

    配列

    フィールド名に基づくフィールドに関するインデックス情報。

    schemaId

    String

    データベーススキーマのID。

    databaseName

    String

    データベース名。

    tableName

    String

    <td class="en-UStry align-left colsep-1 rowsep-1">テーブル名。</td>

    primaryIndexInfo

    String

    主キーインデックス。

    indexType

    String

    インデックスタイプ。

    indexFields

    配列

    インデックスが作成されるフィールド。

    カーディナリティ

    String

    主キーのカーディナリティ。

    nullable

    Boolean

    プライマリキーをnullにできるかどうかを示します。

    isFirstUniqueIndex

    Boolean

    インデックスが最初の一意のインデックスであるかどうかを示します。

    uniqueIndexInfo

    String

    一意のインデックス。

    foreignIndexInfo

    String

    外部キーのインデックス。

    normalIndexInfo

    String

    通常のインデックス。

    databaseInfo

    配列

    データベースに関する情報。

    databaseType

    String

    データベースエンジン。

    バージョン

    String

    データベースエンジンのバージョン。

    totalRows

    Int

    テーブル内の行の総数。

    beforeImage

    String

    操作が実行される前にフィールド値を記録するイメージ。

    String

    記録されたフィールド値。

    サイズ

    Int

    記録されたフィールドのサイズ。

    afterImage

    String

    操作が実行された後のフィールド値を記録するイメージ。

ステップ3: 関数の書き込みとテスト

トリガーを作成した後、関数コードを記述し、関数をテストしてコードが正しいかどうかを確認できます。 実際のシナリオでは、DTS変更追跡タスクがデータベースの増分データをキャプチャすると、トリガーによって関数の実行が自動的にトリガーされます。

  1. 関数の詳細ページで、コードタブで、コードエディターで関数コードを編集し、デプロイ.

    以下では、Node.js関数コードを例として使用します。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. コードタブで、テスト機能 をクリックします。

    関数の実行後、[コード] タブで結果を表示できます。

関連ドキュメント

Function Computeコンソールに加えて、次の方法を使用してトリガーを設定できます。

  • Serverless Devsを使用してトリガーを設定します。 詳細については、「Serverless Devs」をご参照ください。

  • SDKを使用してトリガーを設定します。 詳細は、SDK をご参照ください。

既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。