すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB for MySQL:データ同期を使用してApsaraMQ for Kafkaからdata Lakehouse Editionにデータを同期する

最終更新日:Jun 14, 2024

AnalyticDB for MySQL Data Lakehouse Edition (V3.0) は、特定のオフセットに基づいてApsaraMQ for KafkaインスタンスからAnalyticDB for MySQL data Lakehouse Edition (V3.0) クラスターにデータをリアルタイムで同期できるデータ同期機能を提供します。 この機能は、ほぼリアルタイムのデータ取り込み、完全なデータアーカイブ、弾性分析などの要件を満たすのに役立ちます。 このトピックでは、ApsaraMQ for Kafkaデータソースの作成、データ同期ジョブの作成と開始、データの分析、およびデータソースの管理方法について説明します。

前提条件

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスター用にジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。

  • データベースアカウントが作成されます。

  • ApsaraMQ for Kafkaインスタンスは、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターと同じリージョンに作成されます。Kafka

  • ApsaraMQ for KafkaインスタンスにKafkaトピックが作成され、メッセージが送信されます。 詳細については、概要トピックの「ApsaraMQ For Kafkaのクイックスタート」をご参照ください。

使用上の注意

  • トピックのデータが期限切れになると、システムは自動的にデータを削除します。 トピックデータの有効期限が切れ、データ同期ジョブが失敗した場合、データ同期ジョブを再起動したときに削除されたデータの読み取りに失敗します。 その結果、データが失われる可能性がある。 データ同期ジョブが失敗した場合は、トピックデータのライフサイクルを増やし、テクニカルサポートに連絡することを推奨します。

  • トピックから取得したサンプルデータのサイズが8 KBを超える場合、ApsaraMQ for Kafka APIはデータを切り捨て、データをJSON形式に解析できません。 その結果、フィールドマッピング情報を自動的に生成することができない。

手順

データソースの作成

説明

既存のApsaraMQ for Kafkaデータソースからデータを同期する場合は、この手順をスキップしてデータ同期ジョブを作成します。 詳細については、このトピックの「データ同期ジョブの作成」をご参照ください。

  1. AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

  2. 左側のナビゲーションウィンドウで、[データ取り込み]> [データソース] を選択します。

  3. ページの右上隅にある データソースの新規作成 をクリックします。

  4. データソースの新規作成 ページで、次の表に示すパラメーターを設定します。

    パラメーター

    説明

    データソースのタイプ

    データソースのタイプ。 Kafkaを選択します。

    データソース名

    データソースの名前。 デフォルトでは、データソースの種類と現在の時刻に基づいて名前が生成されます。 ビジネス要件に基づいて名前を変更できます。

    データソースの説明

    データソースの説明 たとえば、ユースケースとビジネスの制限を入力できます。

    デプロイモード

    Alibaba Cloudインスタンスのみがサポートされています。

    Kafka インスタンス

    ApsaraMQ for KafkaインスタンスのID。

    ApsaraMQ for Kafkaコンソールにログインし、[インスタンス] ページに移動してインスタンスIDを表示します。

    Kafka Topic

    ApsaraMQ for Kafkaインスタンスで作成するトピックの名前。

    ApsaraMQ for Kafkaコンソールにログインし、インスタンスの [トピック] ページに移動してトピック名を表示します。

    メッセージデータフォーマット

    メッセージのデータ形式。 JSONのみがサポートされています。

  5. クリック作成.

データ同期ジョブの作成

  1. 左側のナビゲーションウィンドウで、SLS/Kafkaデータ同期 を選択します。

  2. ページの右上隅にある 同期リンクの新規作成 をクリックします。

  3. のKafkaデータソースタブ同期リンクの新規作成ページでパラメーターを設定します。データソースと宛先の設定,ターゲットデータベースとターゲットテーブルの設定、および同期設定セクションを使用します。

    • 次の表に、データソースと宛先の設定 セクションのパラメーターを示します。

      パラメーター

      説明

      データリンク名

      データ同期ジョブの名前。 デフォルトでは、データソースの種類と現在の時刻に基づいて名前が生成されます。 ビジネス要件に基づいて名前を変更できます。

      データソース

      データソース。 既存のApsaraMQ for Kafkaデータソースを選択するか、データソースを作成できます。

      宛先タイプ

      AnalyticDB for MySQLのデータストレージタイプ。 Data Lake - OSS Storageのみがサポートされています。

      OSS パス

      AnalyticDB for MySQLクラスターデータのObject Storage Service (OSS) ストレージパス。

      重要
      • AnalyticDB for MySQLクラスターと同じリージョンにあるすべてのバケットが表示されます。 いずれかのバケットを選択できます。 ビジネス要件に基づいてこのパラメーターを設定します。 このパラメーターを設定した後、変更することはできません。

      • 他のデータ同期ジョブのディレクトリとネストされた関係を持たない空のディレクトリを選択することを推奨します。 これにより、履歴データの上書きが防止されます。 たとえば、2つのデータ同期ジョブに関連するOSSストレージパスがoss:// adb_demo/test/sls1 /およびoss:// adb_demo/test /であるとします。 この場合、これら2つのパスは互いに入れ子の関係にあるため、データ同期中にデータの上書きが発生します。

    • 次の表に、ターゲットデータベースとターゲットテーブルの設定 セクションのパラメーターを示します。

      パラメーター

      説明

      ライブラリ名

      AnalyticDB for MySQLクラスター内のターゲットデータベースの名前。 同じ名前のデータベースが存在しない場合は, データベースが作成されます。 同じ名前を使用するデータベースがすでに存在する場合、データは既存のデータベースと同期されます。 データベースの命名規則については、「制限」をご参照ください。

      テーブル名

      AnalyticDB for MySQLクラスター内のターゲットテーブルの名前。 同じ名前を使用するテーブルがデータベースに存在しない場合、テーブルが作成されます。 同じ名前を使用するテーブルがデータベースにすでに存在する場合、データの同期に失敗します。 テーブルの命名規則については、「制限」をご参照ください。

      サンプルデータ

      ApsaraMQ for Kafkaトピックから自動的に取得される最新のデータ。

      説明

      ApsaraMQ for KafkaトピックのデータはJSON形式である必要があります。 それ以外の場合、データ同期中にエラーが報告されます。

      解析されたJSONレイヤー

      ネストされたJSONフィールドに対して解析されるレイヤーの数。 有効な値:

      • 0: ネストされたJSONフィールドは解析されません。

      • 1 (デフォルト): 1つのレイヤーが解析されます。

      • 2: 2つのレイヤーが解析されます。

      • 3: 3つのレイヤが解析される。

      • 4: 4つのレイヤーが解析されます。

      ネストされたJSONフィールドの解析方法の詳細については、このトピックの「異なる数のレイヤーで解析されたスキーマフィールドの例」をご参照ください。

      スキーマフィールドマッピング

      サンプルデータから解析されるスキーマフィールドに関する情報。 宛先フィールド名またはタイプを変更し、フィールドを追加または削除できます。

      パーティションキーの設定

      ターゲットテーブルのパーティションフィールドの設定。 データが取り込まれ、クエリが期待どおりに実行されるようにするには、ログ時間またはビジネスロジックに基づいてパーティションを構成することをお勧めします。 パーティションを設定しない場合、ターゲットテーブルにパーティションは存在しません。

      Format Processing Methodパラメータの有効な値:

      • Formatted Time: [Partition field Name] ドロップダウンリストからdatetimeフィールドを選択し、[Format Processing Method] パラメーターを [Formatted Time] に設定して、[Source Field Format] パラメーターと [Destination Partition Format] パラメーターを設定します。 AnalyticDB for MySQLは、指定されたソースフィールド形式に基づいてパーティションフィールドの値を識別し、その値をパーティション分割のために指定された宛先パーティション形式に変換します。 たとえば、ソースフィールドがgmt_createdで、値が1711358834、ソースフィールド形式パラメーターがTimestamp Accurate to Secondsに設定され、Destination Partition FormatパラメーターがyyyyMMddに設定されている場合、値は20240325に基づいてパーティション分割されます。

      • 指定されたパーティションフィールド: Format Processing Methodパラメーターを指定されたパーティションフィールドに設定し、その他の必須パラメーターを設定します。

    • 次の表に、同期設定 セクションのパラメーターを示します。

      パラメーター

      説明

      増分同期の最初のコンシューマーオフセット

      データ同期ジョブの開始時にシステムがApsaraMQ for Kafkaデータを消費する時点。 サンプル値:

      • 最古のオフセット: システムは、最古のデータレコードが生成された時点から、KafkaデータのApsaraMQを消費します。

      • 最新のオフセット: システムは、最新のデータレコードが生成された時点から、Kafkaデータ用のApsaraMQを消費します。

      • カスタムオフセット: 時点を選択できます。 その後、システムは、選択した時点で生成された最初のデータエントリからのApsaraMQ for Kafkaデータを消費します。

      ジョブ型リソースグループ

      データ同期ジョブを実行するジョブリソースグループ。

      増分同期に必要な ACU の数

      ジョブリソースグループがデータ同期ジョブを実行するために必要なAnalyticDBコンピューティングユニット (ACU) の数。 値の範囲は2から、ジョブリソースグループで使用可能なコンピューティングリソースの最大数です。 データ取り込みの安定性とパフォーマンスを向上させるために、より多くのACUを指定することを推奨します。

      説明

      ジョブリソースグループでデータ同期ジョブを作成すると、リソースグループ内のエラスティックリソースが使用され、システムは使用されたリソースをリソースグループから除外します。 たとえば、ジョブリソースグループが48のACUの予約済みコンピューティングリソースを有し、8つのACUを消費する同期ジョブが作成されると仮定する。 リソースグループで別の同期ジョブを作成する場合、最大40のACUを選択できます。

      詳細設定

      データ同期ジョブのカスタム設定。 カスタム設定を構成する場合は、テクニカルサポートにお問い合わせください。

  4. 上記のパラメーター設定を完了したら、をクリックします。送信.

データ同期ジョブの開始

  1. On theSLS/Kafkaデータ同期ページで、作成したデータ同期ジョブを見つけて、開始で、アクション列を作成します。

  2. ページの右上隅にある [検索] をクリックします。 ジョブの状態がStartingに変わると、データ同期ジョブが開始されます。

データの分析

データ同期ジョブが完了したら、Spark JAR Developmentを使用して、AnalyticDB for MySQLクラスターに同期されたデータを分析できます。 Spark開発の詳細については、「Sparkエディター」および「概要」をご参照ください。

  1. 左側のナビゲーションウィンドウで、ジョブ开発 > Spark Jar 開発.

  2. デフォルトのテンプレートにSQL文を入力し、実行.

    -- SparkSQLの例にすぎません。 コンテンツを変更し、sparkプログラムを実行します。
    
    conf spark.driver.resourceSpec=medium;
    conf spark.exe cutor.instances=2;
    conf spark.exe cutor.resourceSpec=medium;
    con f spark.app.name=Spark SQLテスト;
    conf spark.adb.connectors=oss;
    
    -- ここにあなたのsqlステートメントがあります
    ショーテーブルからlakehouse20220413156_adbTest; 
  3. (オプション) [アプリケーション] タブでアプリケーションを見つけ、[操作] 列の [ログ] をクリックして、アプリケーションのSpark SQL実行ログを表示します。

データソースの管理

[データソース] ページで、次の表の [操作] 列に記載されている操作を実行できます。

API 操作

説明

リンクの新規作成

同期ジョブの作成または移行ジョブの作成ページに移動し、データソースを使用するジョブを作成します。

表示

データソースの詳細な設定を表示できます。

編集

データソース名や説明などのデータソースパラメーターを変更します。

削除

データソースを削除します。

説明

データソースがデータ同期またはデータ移行ジョブで使用されている場合、データソースを削除することはできません。 この場合、最初に SLS/Kafkaデータ同期 ページに移動してジョブを見つけ、操作 列の 削除 をクリックしてジョブを削除する必要があります。

異なる数のレイヤーで解析されたスキーマフィールドの例

ネストされたJSONフィールドは、異なる数のレイヤーで解析できます。 この例では、次のJSONデータがApsaraMQ for Kafkaに送信されます。

{
  "name" : "zhangle" 、
  "年齢" : 18、
  "device" : {
    "os" : {
        "テスト": ラグ、
        "member":{
             "fa":zhangsan、
             "mo":limei
       }
     },
    "ブランド" : "なし" 、
    "version" : "11.4.2"
  }
}

前述のJSONデータは、異なる数のレイヤーで解析できます。

解析なし

JSONフィールドは解析されず、JSONデータは直接表示されます。

JSONフィールド

宛先フィールド名

__value__

{"name": "zhangle","age" : 18, "device": {"os": {"test":lag,"member": {"fa":zhangsan,"mo":limei }}, "brand": "none","version": "11.4.2" }}

__value__

1層解析

JSONフィールドの最初のレイヤーが解析されます。

JSONフィールド

宛先フィールド名

name

zhangle

name

age

18

age

device

{"os": {"test":lag,"member": {"fa":zhangsan,"mo":limei }}, "brand": "none","version": "11.4.2"}

device

2層解析

JSONフィールドの最初の2つのレイヤーが解析されます。 フィールドにネストされたフィールドがない場合、フィールドは直接表示されます。 たとえば、名前と年齢のフィールドが直接表示されます。 フィールドにネストされたフィールドがある場合、フィールドの子フィールドが表示されます。 たとえば、デバイスフィールドはdevice.osdevice.br anddevice.versionとして解析されます。

重要

宛先フィールド名では、ピリオド (.) が自動的にアンダースコア (_) に変更されます。

JSONフィールド

宛先フィールド名

name

zhangle

name

age

18

age

device.os

{"test":lag,"member":{ "fa":zhangsan,"mo":limei }}

device_os

device.brおよび

none

device_brand

デバイス. version

11.4.2

device_version

3層解析

JSONフィールド

宛先フィールド名

name

zhangle

name

age

18

age

device.os.test

ラグ

device_os_test

device.os.member

{"fa":zhangsan,"mo":limei}

device_os_member

device.brおよび

none

device_brand

デバイス. version

11.4.2

device_version

4層解析

JSONフィールド

宛先フィールド名

name

zhangle

name

age

18

age

device.os.test

ラグ

device_os_test

device.os.member.fa

zhangsan

device_os_member_fa

device.os.member.mo

ライム

device_os_member_mo

device.brおよび

none

device_brand

デバイス. version

11.4.2

device_version