このトピックでは、ApsaraMQ for Kafka コンソールでシンクコネクタを作成し、ApsaraMQ for Kafka から ApsaraMQ for RabbitMQ にデータを同期する方法について説明します。
前提条件
ApsaraMQ for Kafka インスタンスが購入され、デプロイされていること。インスタンスが実行中状態であることを確認してください。詳細については、「ステップ 2: インスタンスを購入してデプロイする」をご参照ください。
ApsaraMQ for RabbitMQ インスタンスが購入され、デプロイされていること。インスタンスが [実行中] 状態であることを確認してください。詳細については、「ステップ 2: リソースを作成する」をご参照ください。
EventBridge がアクティブ化されており、必要な権限が Resource Access Management (RAM) ユーザーに付与されていること。詳細については、「EventBridge をアクティブ化し、RAM ユーザーに権限を付与する」をご参照ください。
シンクコネクタタスクの作成
ApsaraMQ for Kafka コンソール にログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
[タスク] ページで、[タスクの作成] をクリックします。
タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを構成し、画面の指示に従って他のパラメーターを構成します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。
タスクの作成
Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
[リージョン]
ソースの ApsaraMQ for Kafka インスタンスが存在するリージョン。
中国 (北京)
[apsaramq For Kafka インスタンス]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。
alikafka_post-cn-jte3****
[topic]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスのトピック。
demo-topic
[グループ ID]
ソース ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。
[クイック作成]: システムによって
GID_EVENTBRIDGE_xxx形式の名前を持つコンシューマーグループが自動的に作成されます。この値を選択することをお勧めします。[既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクライブに影響します。
クイック作成
[コンシューマーオフセット]
メッセージが消費されるオフセット。有効な値:
[最新のオフセット]
[最も古いオフセット]
最新のオフセット
[ネットワーク構成]
メッセージをルーティングするネットワークのタイプ。有効な値:
[ベーシックネットワーク]
[セルフマネージドインターネット]
ベーシックネットワーク
[VPC]
ApsaraMQ for Kafka インスタンスがデプロイされている Virtual Private Cloud (VPC) の ID。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。
vpc-bp17fapfdj0dwzjkd****
[vswitch]
ApsaraMQ for Kafka インスタンスが属する vSwitch の ID。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。
vsw-bp1gbjhj53hdjdkg****
[セキュリティグループ]
ApsaraMQ for Kafka インスタンスが属するセキュリティグループの ID。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。
alikafka_pre-cn-7mz2****
データ形式
データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコードに関する特別な要件がない場合は、値として [Json] を指定します。
[json]: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。
[テキスト]: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。これはデフォルト値です。
[バイナリ]: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
Json
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。
3
Filtering (フィルタリング) ステップで、パターン内容 コードエディターでデータパターンを定義してデータをフィルタリングします。詳細については、「イベントパターン」をご参照ください。
Transform (変換) ステップで、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。
Sink (ターゲット) ステップで、サービスタイプ パラメーターを [apsaramq For Rabbitmq] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。
パラメーター
説明
例
インスタンス ID
作成した宛先の ApsaraMQ for RabbitMQ インスタンス。
amqp-cn-zvp2pny6****
Vhost
作成した vhost。
test
ターゲットタイプ
Exchange: プロデューサーはメッセージを exchange に送信し、exchange はメッセージを 1 つ以上のキューにルーティングします。
Queue: 各メッセージは 1 つ以上のキューに送信されます。
キュー
Exchange
ApsaraMQ for RabbitMQ インスタンスでメッセージがルーティングされる exchange。ターゲットタイプ パラメーターを Exchange に設定した場合にのみ、このパラメーターが必要です。
exchange
Queue
ApsaraMQ for RabbitMQ インスタンスでメッセージがルーティングされるキュー。ターゲットタイプ パラメーターを Queue に設定した場合にのみ、このパラメーターが必要です。
queue
メッセージのルーティングルール (Routing Key)
EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。ターゲットタイプ パラメーターを Exchange に設定した場合にのみ、このパラメーターが必要です。
[データ抽出]
[固定値]
データ抽出
$.data.keyメッセージ本文
EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。
[完全なデータ]
[データ抽出]
[固定値]
[テンプレート]
データ抽出
$.data.bodyMessageId
EventBridge は、JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。
[空]
[データ抽出]
[固定値]
[テンプレート]
データ抽出
$.data.props.messageIdEventBridge は、JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。
[空]
[データ抽出]
[テンプレート]
データ抽出
$.data.props
タスクのプロパティ
イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーを処理するために使用するメソッドを構成します。詳細については、「リトライポリシーと配信不能キュー」をご参照ください。
タスクリスト ページに戻り、作成した ApsaraMQ for RabbitMQ シンクコネクタを見つけ、操作する 列の 有効化する をクリックします。
ヒント メッセージで、OK をクリックします。
シンクコネクタが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進行状況を確認できます。
その他の操作
タスクリスト ページで、管理するシンクコネクタを見つけ、操作する 列でその他の操作を実行します。
コネクタの詳細を表示する: [操作] 列の 詳細 をクリックします。[タスクの詳細] ページで、コネクタの基本情報、プロパティ、およびモニタリングメトリックを表示します。
コネクタの構成を編集する: [操作] 列の 編集する をクリックします。[タスクの編集] パネルで、コネクタの詳細とプロパティを変更します。
コネクタを有効または無効にする: [操作] 列の 有効化する または 無効化 をクリックします。ヒント メッセージで、OK をクリックします。
コネクタを削除する: [操作] 列の 削除する をクリックします。ヒント メッセージで、OK をクリックします。