EventBridgeを使用してApsaraMQ for RocketMQイベントソースをFunction Computeと統合した後、ApsaraMQ for RocketMQトリガーで関連する関数をトリガーできます。 この関数は、ApsaraMQ for RocketMQにパブリッシュされたメッセージに対してカスタム操作を実行するために使用できます。 このトピックでは、ApsaraMQ for RocketMQトリガーの作成、関数入力パラメーターの設定、およびfunction Computeコンソールでのコードの書き込みとテストの方法について説明します。
概要
Function Computeコンソールでトリガーを作成すると、Function Computeはトリガー設定に基づいてEventBridgeリソースを作成します。 Function Computeは、メッセージをプッシュするためのイベントモードとイベントストリームモードを提供します。 次の項目は、各モデルで作成できるリソースを示しています。
トリガーの作成後、Function Computeコンソールでトリガーに関する情報を表示できます。 EventBridgeコンソールで、作成されたリソースに関する情報を表示することもできます。 トリガーソースとして使用されるApsaraMQ for RocketMQインスタンスにメッセージが送信されると、function Computeで関数の実行がトリガーされます。 異なるプッシュモデルは異なるパラメータをサポートします。 詳細については、「手順2: 関数の入力パラメーターの設定」をご参照ください。
イベントモード: 各メッセージは、イベントパラメータとして関数に渡されます。 イベントはCloudEventsの仕様に従います。 メッセージコンテンツとCloudEventsの関係については、「手順2: 関数の入力パラメーターの設定」をご参照ください。
イベントストリームモード: バッチ設定に基づいて、1つ以上のメッセージがFunction Computeにプッシュされ、バッチ処理されます。 このモデルは、エンドツーエンドのストリーミングデータが処理されるシナリオに適しています。
注意事項
トリガーソースとして使用されるApsaraMQ for RocketMQインスタンスは、function Computeの関数と同じリージョンに存在する必要があります。
作成されたカスタムイベントバスの数と作成されたイベントルールの数が上限に達すると、イベントモデルのApsaraMQ for RocketMQトリガーを作成できなくなります。
作成されたイベントストリームの数が上限に達した場合、イベントストリームモードのApsaraMQ for RocketMQトリガーは作成できません。
各リージョンでAlibaba Cloudアカウントを使用して作成できるリソース数の制限については、「制限」をご参照ください。
始める前に
EventBridge
Function Compute
ApsaraMQ for RocketMQ
ステップ1: トリガーを作成する
Function Computeコンソールにログインします。 左側のナビゲーションウィンドウで、[サービスと機能] をクリックします。
上部のナビゲーションバーで、リージョンを選択します。 [サービス] ページで、目的のサービスをクリックします。
[関数] ページで、管理する関数をクリックします。
関数の詳細ページで、[トリガー] タブをクリックし、[バージョンまたはエイリアス] ドロップダウンリストからバージョンまたはエイリアスを選択し、[トリガーの作成] をクリックします。
[トリガーの作成] パネルで、関連パラメーターを指定します。 パラメーターを指定したら、[OK] をクリックします。
次の表に、基本的なパラメーターを示します。
パラメーター
説明
例
トリガータイプ
ドロップダウンリストからApsaraMQ for RocketMQを選択します。
ApsaraMQ for RocketMQ
名前
トリガー名を入力します。
rocketmq-trigger
バージョンまたはエイリアス
デフォルト値はLATESTです。 別のバージョンまたはエイリアスのトリガーを作成する場合は、関数の詳細ページの右上隅で、指定したバージョンまたはエイリアスに切り替えます。 サービスのバージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
ApsaraMQ for RocketMQインスタンス
ドロップダウンリストからApsaraMQ for RocketMQインスタンスを選択します。
MQ_INST_164901546557 ****_BX7 ****
トピック
ドロップダウンリストからApsaraMQ for RocketMQインスタンスのトピックを選択します。
topic1
タグ
メッセージフィルタリング用のタグを指定します。
関数の実行は、指定されたフィルタリングタグを含むメッセージが受信された場合にのみトリガーされます。
tag
グループID
作成したApsaraMQ for RocketMQインスタンスのグループIDを選択します。
GID_group1
消費者オフセット
メッセージの消費者オフセットを選択します。 コンシューマーオフセットは、ApsaraMQ for RocketMQがイベントバスからメッセージのプルを開始するポイントを指定します。 有効な値:
最新のオフセット: 最新のオフセットからメッセージを消費します。
Earlest Offset: 最も早いオフセットからメッセージを消費します。
Timestamp: 指定されたタイムスタンプからメッセージを消費します。
最新のオフセット
呼び出しメソッド
関数を呼び出すメソッドを選択します。
有効な値:
同期呼び出し: このモードは、シーケンシャル呼び出しに適しています。 単一のイベントまたはイベントのバッチが関数をトリガーすると、function Computeは関数を実行し、次のイベントまたはイベントのバッチを処理する前に応答を待ちます。 同期呼び出し要求のペイロードの上限は32 MBです。 詳細については、「同期呼び出し」をご参照ください。
非同期呼び出し: このモードでは、イベントをすばやく使用できます。 単一のイベントまたはイベントのバッチが関数をトリガーすると、function Computeはすぐに応答を返し、次のイベントまたはイベントのバッチの処理を続行します。このプロセス中、関数は非同期モードで実行されます。 非同期呼び出し要求のペイロードの上限は128 KBです。 詳細については、「概要」をご参照ください。
同期呼び出し
メッセージプッシュモード
メッセージデータをFunction Computeにプッシュするために使用される基本アプリケーションモード。
有効な値:
イベントストリームモード: バッチ設定に基づいて、1つ以上のメッセージがFunction Computeにプッシュされ、バッチ処理されます。 このモデルは、エンドツーエンドのストリーミングデータが処理されるシナリオに適しています。
イベントモード: 各メッセージはイベントパラメータとして関数に渡されます。 イベントはCloudEventsの仕様に従います。 メッセージコンテンツとCloudEventsの関係については、「手順2: 関数の入力パラメーターの設定」をご参照ください。
イベントモード
トリガー状態
作成後にトリガーを有効にするかどうかを指定します。 デフォルトでは、[トリガーの有効化] が選択され、作成後にトリガーが有効になります。
非該当
メッセージのプッシュ、再試行、デッドレター設定などの高度な設定の詳細については、「トリガーの高度な機能」をご参照ください。
トリガーが作成されると、[トリガー] タブに表示されます。 既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。
ステップ2: 関数の入力パラメータを設定する
ApsaraMQ for RocketMQイベントソースは、入力パラメーターとして機能するevent
の形式の関数に渡されます。 event
を関数に手動で渡して、トリガーイベントをシミュレートし、関数のコードが正しいかどうかをテストできます。
関数の詳細ページで、[コード] タブをクリックし、アイコンをクリックします。 表示されるドロップダウンリストから、[テストパラメーターの設定] を選択します。
Configure Test Parametersパネルで、Create New Test EventまたはModify Existing Test Eventタブをクリックし、Event Nameとイベントの内容を指定します。 パラメーターを指定したら、[OK] をクリックします。
イベントモードの
イベント
のサンプルコード:{ "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } }
イベントストリームモード
のイベントのサンプルコード:[ { "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } }, { "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } } ]
次の表に、dataのパラメーターを示します。 CloudEvents仕様で定義されているパラメーターについては、「概要」をご参照ください。
パラメーター
データ型
例
説明
トピック
String
TopicName
トピック名。
systemProperties
地図
システムのプロパティ。
MIN_OFFSET
Int
0
最も早いオフセット。
TRACE_ON
Boolean
true
メッセージトレースが存在するかどうかを示します。 有効な値:
true
false
MAX_OFFSET
Int
8
最新のオフセット。
MSG_地域
String
cn-hangzhou
メッセージの送信元のリージョン。
キー
String
systemProperties.KEYS
メッセージのフィルタリングに使用されるキー。
CONSUME_START_TIME
Long
1628577790396
メッセージ消費の開始時刻。 単位:ミリ秒。
UNIQ_KEY
String
AC14C305069E1B28CDFA3181CDA2 ****
メッセージの一意のキー。
タグ
String
systemProperties.TAGS
メッセージのフィルタリングに使用されるタグ。
INSTANCE_ID
String
MQ_INST_123456789098 ****_BXhFHryi
ApsaraMQ for RocketMQインスタンスのID。
userProperties
地図
なし
ユーザーのプロパティ。
ボディ
String
TEST
メッセージ本文。
ステップ3: 関数コードを記述し、関数をテストする
トリガーを作成した後、関数コードを記述し、関数をテストして、コードが正しいことを確認できます。 ApsaraMQ for RocketMQイベントがEventBridgeを使用してFunction Computeに配信されると、トリガーは関数の実行をトリガーします。
関数の詳細ページで、コードタブで、コードエディターで関数コードを編集し、デプロイをクリックします。
このトピックでは、Node.jsを関数コードとして使用します。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // Parse the event parameters and process the event. callback(null, 'return result'); }
コードタブでテスト機能 をクリックします。
関数の実行後、[コード] タブで結果を表示できます。
関連ドキュメント
既存のトリガーを変更または削除するには、「トリガーの管理」をご参照ください。