ApsaraMQ for Kafkaトリガーは、EventBridgeを使用して、イベントソースとしてのApsaraMQ for KafkaをFunction Computeに統合します。 トリガーの作成後、Function ComputeコンソールとEventBridgeコンソールでトリガーに関する情報を表示できます。 メッセージがエンキューされると、EventBridgeは、バッチ設定に基づいて1つ以上のメッセージイベントをバッチで関数にプッシュします。
制限事項
トリガーソースとして使用されるApsaraMQ for Kafkaインスタンスは、function Computeの関数と同じリージョンに存在する必要があります。
作成されるイベントストリームの数が上限に達すると、ApsaraMQ for Kafkaトリガーを作成できなくなります。 イベントストリームの上限については、「制限」をご参照ください。
始める前に
EventBridge
Function Compute
ApsaraMQ for Kafka
手順1: ApsaraMQ for Kafkaトリガーの作成
Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[サービスと機能] をクリックします。
上部のナビゲーションバーで、リージョンを選択します。 [サービス] ページで、目的のサービスをクリックします。
[関数] ページで、管理する関数をクリックします。
関数の詳細ページで、[トリガー] タブをクリックし、[バージョンまたはエイリアス] ドロップダウンリストからバージョンまたはエイリアスを選択し、[トリガーの作成] をクリックします。
[トリガーの作成] パネルでパラメーターを設定し、[OK] をクリックします。 下表に、各パラメーターを説明します。
次の表に、基本的なパラメーターを示します。
パラメーター
説明
例
トリガータイプ
[ApsaraMQ for Kafka] を選択します。
ApsaraMQ for Kafka
名前
トリガー名を入力します。
kafka-トリガー
バージョンまたはエイリアス
トリガーのバージョンまたはエイリアス。 デフォルト値: LATEST。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅でバージョンまたはエイリアスを選択します。 バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
ApsaraMQ for Kafkaインスタンス
メッセージをルーティングするApsaraMQ for Kafkaインスタンス。
alikafka_pre-cn-i7m2t7t1 ****
トピック
ApsaraMQ for Kafkaインスタンスで作成したトピックを選択します。
topic1
グループID
ApsaraMQ for Kafkaインスタンスで作成したグループを選択します。
説明一意のグループIDを使用してトリガーを作成します。 既存のサービスのグループIDは使用しないでください。 そうでない場合、既存のメッセージの送受信が影響を受ける。
GID_group1
同時消費タスク
同時コンシューマーの数を指定します。 有効な値: 1からトピック内のパーティション数まで。
2
消費者オフセット
ApsaraMQ for KafkaがEventBridgeからメッセージのプルを開始するオフセットを選択します。
有効な値:
Earlest Offset: 最も早いオフセットからメッセージをプルします。
最新のオフセット: 最新のオフセットからメッセージを取得します。
最新のオフセット
呼び出しメソッド
関数の呼び出し方法を選択します。
有効な値:
同期呼び出し: このメソッドは、シーケンシャル呼び出しに適しています。 イベントまたはイベントのバッチが関数をトリガーすると、function Computeは関数を実行し、関数が次のイベントまたはイベントのバッチを処理する前に応答を待ちます。 同期呼び出しリクエストの最大ペイロードは32 MBです。 詳細については、「同期呼び出し」をご参照ください。
非同期呼び出し: このメソッドを使用すると、イベントをすばやく消費できます。 イベントまたはイベントのバッチが関数をトリガーすると、function Computeはすぐに応答を返し、次のイベントまたはイベントのバッチの処理を続行します。 このプロセス中、関数は非同期モードで実行されます。 非同期呼び出しリクエストの最大ペイロードは128 KBです。 詳細については、「概要」をご参照ください。
同期呼び出し
マックス 配信の同時実行
Function Computeに同時に配信できるApsaraMQ for Kafkaメッセージの最大数を指定します。 有効な値:1 から 300。 このパラメーターは、Invocation MethodパラメーターをSync Invocationに設定した場合にのみ使用できます。 より高い同時実行性が必要な場合は、[EventBridgeのクォータセンター] に移動し、名前が [EventStreaming FC Sinkの同期投稿の最大同時実行数] であるクォータを見つけ、[操作] 列の [適用] をクリックしてクォータの増加を要求します。
1
トリガー状態
トリガーの作成直後に有効にするかどうかを指定します。 デフォルトでは、[トリガーの有効化] が選択されています。これは、トリガーが作成された直後に有効になることを示します。
非該当
プッシュ設定、再試行ポリシー、デットレターキューなどの高度な設定については、「トリガーの高度な機能」をご参照ください。
トリガーが作成されると、[トリガー] タブに表示されます。 既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。
ステップ2: 関数の入力パラメータを設定する
ApsaraMQ for Kafkaイベントソースは、event
の形式で入力パラメーターとして関数に渡されます。 event
を関数に手動で渡して、イベントのトリガーをシミュレートできます。
機能の詳細ページで、[コード] タブをクリックし、アイコンをクリックします。 表示されるドロップダウンリストから、[テストパラメーターの設定] を選択します。
[テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの変更] タブをクリックし、イベント名とイベント内容を指定します。 次に、[OK] をクリックします。
event
のサンプルコード:[ { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } }, { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } } ]
CloudEvents仕様で定義されているパラメーターについては、「概要」をご参照ください。
次の表に、dataに含まれるパラメーターを示します。
パラメーター
データ型
例
詳細
トピック
String
TopicName
トピック名。
パーティション
Int
1
ApsaraMQ for Kafkaインスタンスのパーティションに関する情報。
オフセット
Int
0
ApsaraMQ for Kafkaインスタンスのメッセージオフセット。
timestamp
String
1655952591589
メッセージの消費がいつ開始されたかを示すタイムスタンプ。
ステップ3: 関数コードを記述し、関数をテストする
トリガーの作成後、関数コードを記述し、関数をテストして、コードが正しいかどうかを確認できます。 ApsaraMQ for Kafkaでイベントが発生すると、関数はトリガーによって自動的に呼び出されます。
[関数の詳細] ページで、[コード] タブをクリックし、コードエディターに関数コードを入力し、[デプロイ] をクリックします。
次のサンプルコードは、Node.jsで関数コードを記述する方法の例を示しています。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // Parse the event parameters and process the event. callback(null, 'return result'); }
コードタブで、テスト機能をクリックします。
関数の実行後、[コード] タブで結果を表示できます。