EventBridgeを使用してApsaraMQ for KafkaをFunction Computeと統合した後、ApsaraMQ for Kafkaトリガーを使用して関数を呼び出すことができます。 これにより、関数を呼び出して、ビジネス要件に基づいてApsaraMQ for Kafkaに発行されるメッセージを処理できます。 このトピックでは、Function ComputeコンソールでApsaraMQ for Kafkaトリガーを作成し、関数の入力パラメーターを設定し、関数コードを記述してテストする方法について説明します。
概要
Function Computeコンソールでトリガーを作成するリクエストを送信すると、Function Computeはトリガーの設定に基づいてEventBridgeにイベントストリームを自動的に作成します。
トリガーの作成後、Function Computeコンソールでトリガーに関する情報を表示できます。 EventBridgeコンソールで、作成されたリソースに関する情報を表示することもできます。 メッセージがApsaraMQ for Kafkaでキューに入れられると、function Computeの関連する関数が呼び出されます。 1つ以上のメッセージイベントがバッチで関数にプッシュされ、バッチ設定に基づいて処理されます。
使用上の注意
トリガーソースとして機能するApsaraMQ for Kafkaインスタンスは、Function Computeで呼び出される関数と同じリージョンに存在する必要があります。
作成されたイベントストリームの数が上限に達した場合、ApsaraMQ for Kafkaトリガーは作成できません。 イベントストリームの上限については、「制限」をご参照ください。
前提条件
EventBridge
Function Compute
ApsaraMQ for Kafka
ApsaraMQ for Kafkaインスタンスが作成されました。 詳細については、次をご参照ください: ステップ2: インスタンスの購入とデプロイ
トピックとコンシューマーグループが作成されます。 詳細については、「手順3: リソースの作成」をご参照ください。
手順1: ApsaraMQ for Kafkaトリガーの作成
ApsaraMQ for Kafkaインスタンスを作成した場合は、Function Computeコンソールにログインし、目的の関数をクリックして関数の詳細ページに移動し、[設定] タブをクリックし、[トリガー] ページの [トリガーの作成] をクリックします。 トリガーの作成後、[OK] をクリックします。 詳細は以下の図をご参照ください。
次の表に、基本的なパラメーターを示します。
パラメーター | 説明 | 例 |
消費者オフセット | メッセージのコンシューマオフセット。 コンシューマーオフセットは、ApsaraMQ for KafkaがEventBridgeからメッセージのプルを開始するポイントを指定します。 オプション:
| 最も早いオフセット |
呼び出しメソッド | 関数の呼び出し方法を選択します。 オプション:
| 同期呼び出し |
マックス 配信の同時実行 | Function Computeに同時に配信できるApsaraMQ for Kafkaメッセージの最大数を指定します。 有効な値:1 から 300。 このパラメーターは、Invocation MethodパラメーターをSync Invocationに設定した場合にのみ使用できます。 より高い同時実行性が必要な場合は、[EventBridgeのクォータセンター] に移動し、名前が [EventStreaming FC Sinkの同期投稿の最大同時実行数] であるクォータを見つけ、[操作] 列の [適用] をクリックしてクォータの増加を要求します。 | 1 |
プッシュ設定、再試行ポリシー、デットレターキューなどの高度な設定については、「トリガーの高度な機能」をご参照ください。
ステップ2: (オプション) 入力パラメーターの設定
ApsaraMQ for Kafkaのイベントソースは、入力パラメーターとしてevent
の形式で関数に渡されます。 コードを使用して、イベントを解析および処理できます。 event
を関数に手動で渡して、トリガーイベントをシミュレートできます。
関数の詳細ページの [コード] タブで、[テスト関数] の隣のアイコンをクリックし、ドロップダウンリストから [テストパラメーターの設定] を選択します。
[テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、イベント名とイベント内容を入力し、[OK] をクリックします。
次のサンプルコードは、
イベント
コンテンツの形式の例を示しています。[ { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } }, { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } } ]
CloudEvents仕様で定義されているパラメーターについては、「概要」をご参照ください。
次の表に、dataに含まれるパラメーターを示します。
パラメーター
データ型
例
詳細
トピック
String
TopicName
トピック名。
パーティション
Int
1
ApsaraMQ for Kafkaインスタンスのパーティションに関する情報。
オフセット
Int
0
ApsaraMQ for Kafkaインスタンスのメッセージオフセット。
timestamp
String
1655952591589
メッセージの消費がいつ開始されたかを示すタイムスタンプ。
ステップ3: 関数コードの書き込みとテスト
トリガーの作成後、関数コードを記述し、関数をテストして、コードが正しいかどうかを確認できます。 ApsaraMQ for Kafkaイベントが発生すると、トリガーによって関数が自動的に呼び出されます。
On theコード関数の詳細ページのタブで、コードエディターでコードを記述し、コードのデプロイ.
この例では、関数コードはNode.jsで記述されています。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // Parse the event parameters and process the event. callback(null, 'return result'); }
関数をテストします。
方法1:
event
を設定してイベントソースをシミュレートする場合は、[Test Function] をクリックします。方法2: ApsaraMQ for Kafkaコンソールにログインし、作成したトピックを選択して、[メッセージの送信] をクリックします。 以下の図は一例です。
実行が完了したら、結果をリアルタイムログで表示します。
関連ドキュメント
既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。