すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:ApsaraMQ for RabbitMQ シンクコネクタの作成

最終更新日:Mar 14, 2025

このトピックでは、ApsaraMQ for Kafka コンソールでシンクコネクタを作成し、ApsaraMQ for Kafka から ApsaraMQ for RabbitMQ にデータを同期する方法について説明します。

前提条件

シンクコネクタタスクの作成

  1. ApsaraMQ for Kafka コンソール にログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、[コネクタエコシステム統合] > [タスク] を選択します。

  3. [タスク] ページで、[タスクの作成] をクリックします。

  4. タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを構成し、画面の指示に従って他のパラメーターを構成します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。

        パラメーター

        説明

        [リージョン]

        ソースの ApsaraMQ for Kafka インスタンスが存在するリージョン。

        中国 (北京)

        [apsaramq For Kafka インスタンス]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。

        alikafka_post-cn-jte3****

        [topic]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスのトピック。

        demo-topic

        [グループ ID]

        ソース ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。

        • [クイック作成]: システムによって GID_EVENTBRIDGE_xxx 形式の名前を持つコンシューマーグループが自動的に作成されます。この値を選択することをお勧めします。

        • [既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクライブに影響します。

        クイック作成

        [コンシューマーオフセット]

        メッセージが消費されるオフセット。有効な値:

        • [最新のオフセット]

        • [最も古いオフセット]

        最新のオフセット

        [ネットワーク構成]

        メッセージをルーティングするネットワークのタイプ。有効な値:

        • [ベーシックネットワーク]

        • [セルフマネージドインターネット]

        ベーシックネットワーク

        [VPC]

        ApsaraMQ for Kafka インスタンスがデプロイされている Virtual Private Cloud (VPC) の ID。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        vpc-bp17fapfdj0dwzjkd****

        [vswitch]

        ApsaraMQ for Kafka インスタンスが属する vSwitch の ID。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        vsw-bp1gbjhj53hdjdkg****

        [セキュリティグループ]

        ApsaraMQ for Kafka インスタンスが属するセキュリティグループの ID。[ネットワーク構成] パラメーターを セルフマネージドインターネット に設定した場合にのみ、このパラメーターが必要です。

        alikafka_pre-cn-7mz2****

        データ形式

        データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコードに関する特別な要件がない場合は、値として [Json] を指定します。

        • [json]: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。

        • [テキスト]: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。これはデフォルト値です。

        • [バイナリ]: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        Json

        一括プッシュの件数

        各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを指定します。

        3

      2. Filtering (フィルタリング) ステップで、パターン内容 コードエディターでデータパターンを定義してデータをフィルタリングします。詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) ステップで、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ パラメーターを [apsaramq For Rabbitmq] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。

        パラメーター

        説明

        インスタンス ID

        作成した宛先の ApsaraMQ for RabbitMQ インスタンス。

        amqp-cn-zvp2pny6****

        Vhost

        作成した vhost。

        test

        ターゲットタイプ

        • Exchange: プロデューサーはメッセージを exchange に送信し、exchange はメッセージを 1 つ以上のキューにルーティングします。

        • Queue: 各メッセージは 1 つ以上のキューに送信されます。

        キュー

        Exchange

        ApsaraMQ for RabbitMQ インスタンスでメッセージがルーティングされる exchange。ターゲットタイプ パラメーターを Exchange に設定した場合にのみ、このパラメーターが必要です。

        exchange

        Queue

        ApsaraMQ for RabbitMQ インスタンスでメッセージがルーティングされるキュー。ターゲットタイプ パラメーターを Queue に設定した場合にのみ、このパラメーターが必要です。

        queue

        メッセージのルーティングルール (Routing Key)

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。ターゲットタイプ パラメーターを Exchange に設定した場合にのみ、このパラメーターが必要です。

        • [データ抽出]

        • [固定値]

        データ抽出

        $.data.key

        メッセージ本文

        EventBridge は JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。

        • [完全なデータ]

        • [データ抽出]

        • [固定値]

        • [テンプレート]

        データ抽出

        $.data.body

        MessageId

        EventBridge は、JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。

        • [空]

        • [データ抽出]

        • [固定値]

        • [テンプレート]

        データ抽出

        $.data.props.messageId

        カスタムプロパティ (Properties)

        EventBridge は、JSONPath を使用してメッセージからデータを抽出し、メッセージの指定されたコンテンツをイベントターゲットにルーティングします。

        • [空]

        • [データ抽出]

        • [テンプレート]

        データ抽出

        $.data.props
    • タスクのプロパティ

      イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーを処理するために使用するメソッドを構成します。詳細については、「リトライポリシーと配信不能キュー」をご参照ください。

  5. タスクリスト ページに戻り、作成した ApsaraMQ for RabbitMQ シンクコネクタを見つけ、操作する 列の 有効化する をクリックします。

  6. ヒント メッセージで、OK をクリックします。

    シンクコネクタが有効になるまで 30 ~ 60 秒かかります。タスクリスト ページの Status 列で進行状況を確認できます。

その他の操作

タスクリスト ページで、管理するシンクコネクタを見つけ、操作する 列でその他の操作を実行します。

  • コネクタの詳細を表示する: [操作] 列の 詳細 をクリックします。[タスクの詳細] ページで、コネクタの基本情報、プロパティ、およびモニタリングメトリックを表示します。

  • コネクタの構成を編集する: [操作] 列の 編集する をクリックします。[タスクの編集] パネルで、コネクタの詳細とプロパティを変更します。

  • コネクタを有効または無効にする: [操作] 列の 有効化する または 無効化 をクリックします。ヒント メッセージで、OK をクリックします。

  • コネクタを削除する: [操作] 列の 削除する をクリックします。ヒント メッセージで、OK をクリックします。