すべてのプロダクト
Search
ドキュメントセンター

Function Compute:ApsaraMQ for Kafkaトリガー

最終更新日:Sep 30, 2024

EventBridgeを使用してApsaraMQ for KafkaFunction Computeと統合した後、ApsaraMQ for Kafkaトリガーを使用して関数を呼び出すことができます。 これにより、関数を呼び出して、ビジネス要件に基づいてApsaraMQ for Kafkaに発行されるメッセージを処理できます。 このトピックでは、Function ComputeコンソールでApsaraMQ for Kafkaトリガーを作成し、関数の入力パラメーターを設定し、関数コードを記述してテストする方法について説明します。

概要

Function Computeコンソールでトリガーを作成するリクエストを送信すると、Function Computeはトリガーの設定に基づいてEventBridgeイベントストリームを自動的に作成します。

トリガーの作成後、Function Computeコンソールでトリガーに関する情報を表示できます。 EventBridgeコンソールで、作成されたリソースに関する情報を表示することもできます。 メッセージがApsaraMQ for Kafkaでキューに入れられると、function Computeの関連する関数が呼び出されます。 1つ以上のメッセージイベントがバッチで関数にプッシュされ、バッチ設定に基づいて処理されます。

使用上の注意

  • トリガーソースとして機能するApsaraMQ for Kafkaインスタンスは、Function Computeで呼び出される関数と同じリージョンに存在する必要があります。

  • 作成されたイベントストリームの数が上限に達した場合、ApsaraMQ for Kafkaトリガーは作成できません。 イベントストリームの上限については、「制限」をご参照ください。

前提条件

手順1: ApsaraMQ for Kafkaトリガーの作成

ApsaraMQ for Kafkaインスタンスを作成した場合は、Function Computeコンソールにログインし、目的の関数をクリックして関数の詳細ページに移動し、[設定] タブをクリックし、[トリガー] ページの [トリガーの作成] をクリックします。 トリガーの作成後、[OK] をクリックします。 詳細は以下の図をご参照ください。

image

次の表に、基本的なパラメーターを示します。

パラメーター

説明

消費者オフセット

メッセージのコンシューマオフセット。 コンシューマーオフセットは、ApsaraMQ for KafkaがEventBridgeからメッセージのプルを開始するポイントを指定します。

オプション:

  • Earlest Offset: 最も早いオフセットからメッセージをプルします。

  • 最新のオフセット: 最新のオフセットからメッセージを取得します。

最も早いオフセット

呼び出しメソッド

関数の呼び出し方法を選択します。

オプション:

  • 同期呼び出し: このモードは、シーケンシャル呼び出しに適しています。 イベントまたはイベントのバッチが関数を呼び出すと、function Computeは関数を実行し、次のイベントまたはイベントのバッチを処理する前に応答を待ちます。 同期呼び出し要求のペイロードの上限は32 MBです。 詳細については、「同期呼び出し」をご参照ください。

  • 非同期呼び出し: このモードでは、イベントをすばやく使用できます。 イベントまたはイベントのバッチが関数を呼び出すと、function Computeはすぐに応答を返し、次のイベントまたはイベントのバッチの処理を続行します。 このプロセス中、関数は非同期モードで実行されます。 非同期呼び出し要求のペイロードの上限は128 KBです。 詳細については、「概要」をご参照ください。

同期呼び出し

マックス 配信の同時実行

Function Computeに同時に配信できるApsaraMQ for Kafkaメッセージの最大数を指定します。 有効な値:1 から 300。 このパラメーターは、Invocation MethodパラメーターをSync Invocationに設定した場合にのみ使用できます。 より高い同時実行性が必要な場合は、[EventBridgeのクォータセンター] に移動し、名前が [EventStreaming FC Sinkの同期投稿の最大同時実行数] であるクォータを見つけ、[操作] 列の [適用] をクリックしてクォータの増加を要求します。

1

プッシュ設定、再試行ポリシー、デットレターキューなどの高度な設定については、「トリガーの高度な機能」をご参照ください。

ステップ2: (オプション) 入力パラメーターの設定

ApsaraMQ for Kafkaのイベントソースは、入力パラメーターとしてeventの形式で関数に渡されます。 コードを使用して、イベントを解析および処理できます。 eventを関数に手動で渡して、トリガーイベントをシミュレートできます。

  1. 関数の詳細ページの [コード] タブで、[テスト関数] の隣のimage.pngアイコンをクリックし、ドロップダウンリストから [テストパラメーターの設定] を選択します。

  2. [テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、イベント名とイベント内容を入力し、[OK] をクリックします。

    次のサンプルコードは、イベントコンテンツの形式の例を示しています。

    [
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        },
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        }
    ]

    CloudEvents仕様で定義されているパラメーターについては、「概要」をご参照ください。

    次の表に、dataに含まれるパラメーターを示します。

    パラメーター

    データ型

    詳細

    トピック

    String

    TopicName

    トピック名。

    パーティション

    Int

    1

    ApsaraMQ for Kafkaインスタンスのパーティションに関する情報。

    オフセット

    Int

    0

    ApsaraMQ for Kafkaインスタンスのメッセージオフセット。

    timestamp

    String

    1655952591589

    メッセージの消費がいつ開始されたかを示すタイムスタンプ。

ステップ3: 関数コードの書き込みとテスト

トリガーの作成後、関数コードを記述し、関数をテストして、コードが正しいかどうかを確認できます。 ApsaraMQ for Kafkaイベントが発生すると、トリガーによって関数が自動的に呼び出されます。

  1. On theコード関数の詳細ページのタブで、コードエディターでコードを記述し、コードのデプロイ.

    この例では、関数コードはNode.jsで記述されています。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. 関数をテストします。

    方法1: eventを設定してイベントソースをシミュレートする場合は、[Test Function] をクリックします。

    方法2: ApsaraMQ for Kafkaコンソールにログインし、作成したトピックを選択して、[メッセージの送信] をクリックします。 以下の図は一例です。

    image

  3. 実行が完了したら、結果をリアルタイムログで表示します。

    image

関連ドキュメント

既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。