すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB for MySQL:データ同期を使用してSimple Log Serviceからdata Lakehouse Editionにデータを同期する (推奨)

最終更新日:Jun 13, 2024

AnalyticDB for MySQL Data Lakehouse Edition (V3.0) は、特定のオフセットに基づいて、Simple Log Service LogstoreからAnalyticDB for MySQL data Lakehouse Edition (V3.0) クラスターにデータをリアルタイムで同期できるデータ同期機能を提供します。 この機能は、ほぼリアルタイムのデータ取り込み、完全なデータアーカイブ、弾性分析などの要件を満たすのに役立ちます。 このトピックでは、Simple Log Serviceデータソースの作成、データ同期ジョブの作成と開始、データの分析、およびデータソースの管理方法について説明します。

前提条件

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスター用にジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。

  • データベースアカウントが作成されます。

  • Simple Log Serviceが有効化されています。 プロジェクトとLogstoreは、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターと同じリージョンに作成されます。 詳細については、「入門」をご参照ください。

使用上の注意

Simple Log Service Logstoreのデータは、AnalyticDB for MySQLクラスター内の単一のテーブルにのみ同期できます。

手順

RAM権限の設定

Simple Log ServiceデータをAlibaba Cloudアカウント間でAnalyticDB for MySQL data Lakehouse Edition (V3.0) クラスターに同期する場合、データソースでRAMロールを作成し、ポリシーをアタッチしてRAMロールに権限を付与し、RAMロールの信頼ポリシーを編集する必要があります。 同じAlibaba Cloudアカウント内でSimple Log Serviceデータを同期する場合は、この手順をスキップしてデータソースを作成できます。 詳細については、このトピックの「データソースの作成」をご参照ください。

  1. RAM ロールを作成します。 詳細については、「信頼できるAlibaba CloudアカウントのRAMロールの作成」をご参照ください。

    説明

    [信頼できるAlibaba Cloudアカウントの選択] パラメーターに [その他のAlibaba Cloudアカウント] を選択し、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが属するAlibaba CloudアカウントのIDを入力します。 アカウント管理コンソールにログインし、概要ページに移動してアカウントIDを表示します。

  2. AliyunAnalyticDBAccesssingLogRolePolicyポリシーをRAMロールにアタッチします。 詳細については、「RAMロールへの権限の付与」トピックの「方法2: [ロール] ページの [入力] と [アタッチ] をクリックしてRAMロールに権限を付与する」を参照してください。

  3. RAMロールの信頼ポリシーを編集します。 詳細については、「RAMロールの信頼ポリシーの編集」をご参照ください。

    {
      "Statement": [
        {
          "Action": "sts:AssumeRole",
          "Effect": "Allow",
          "Principal": {
            "RAM": [
                "acs:ram ::< Alibaba CloudアカウントID>:root"
            ],
            &quot;サービス&quot;:[
                "<Alibaba CloudアカウントID>@ ads.aliyuncs.com"
            ]
          }
        }
      ],
      "バージョン": "1"
    } 
    説明

    上記のコードでは、Alibaba CloudアカウントIDパラメーターは、ステップ1で入力したアカウントIDを指定します。 このパラメーターを指定するときは、角かっこ (<>) を削除します。

データソースの作成

説明

既存のデータソースからデータを同期する場合は、この手順をスキップしてデータ同期ジョブを作成します。 詳細については、このトピックの「データ同期ジョブの作成」をご参照ください。

  1. AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

  2. 左側のナビゲーションウィンドウで、[データ取り込み]> [データソース] を選択します。

  3. ページの右上隅にある データソースの新規作成 をクリックします。

  4. [データソースの作成] ページで、次の表に示すパラメーターを設定します。

    パラメーター

    説明

    データソースのタイプ

    データソースのタイプ。 [SLS] を選択します。

    データソース名

    データソースの名前。 デフォルトでは、データソースの種類と現在の時刻に基づいて名前が生成されます。 ビジネス要件に基づいて名前を変更できます。

    データソースの説明

    データソースの説明 たとえば、ユースケースとビジネスの制限を入力できます。

    デプロイモード

    データソースの展開モード。 Alibaba Cloudインスタンスのみがサポートされています。

    SLS プロジェクトのリージョン

    ソースのSimple Log Serviceプロジェクトが存在するリージョン。

    Alibaba Cloud プライマリアカウントをクロスするかどうか

    Simple Log ServiceデータをAlibaba Cloudアカウント間でAnalyticDB for MySQL data Lakehouse Edition (V3.0) クラスターに同期できるかどうかを指定します。 有効な値:

    • いいえ。

    • はい。 [はい] を選択した場合、Alibaba CloudアカウントおよびRAMロールパラメーターを指定する必要があります。

      説明
      • Alibaba Cloudアカウント: データソースが属するAlibaba CloudアカウントのID。

      • RAMロール: 手順1でデータソースに作成されたRAMロールの名前。

    SLS project

    ソースSimple Log Serviceプロジェクトの名前。

    重要

    Alibaba CloudアカウントとRAMユーザーに属するすべてのプロジェクトが表示されます。 Alibaba Cloudアカウントに属し、RAMユーザーとしてデータを同期するプロジェクトを選択する場合は、RAMユーザーにプロジェクトに対する権限が付与されていることを確認してください。 それ以外の場合、データをdata Lakehouse Edition (V3.0) に同期できません。

    SLS Logstore

    ソースSimple Log Service Logstoreの名前。

  5. クリック作成.

データ同期ジョブの作成

  1. 左側のナビゲーションウィンドウで、SLS/Kafkaデータ同期 を選択します。

  2. ページの右上隅にある 同期リンクの新規作成 をクリックします。

  3. 同期リンクの新規作成 ページの [Log Serviceデータソース] タブで、データソースと宛先の設定ターゲットデータベースとターゲットテーブルの設定同期設定 セクションのパラメーターを設定します。

    • 次の表に、[ソースと宛先の設定] セクションのパラメーターを示します。

      パラメーター

      説明

      データリンク名

      データ同期ジョブの名前。 デフォルトでは、データソースの種類と現在の時刻に基づいて名前が生成されます。 ビジネス要件に基づいて名前を変更できます。

      データソース

      データソース。 既存のSimple Log Serviceデータソースを選択するか、データソースを作成できます。

      宛先タイプ

      AnalyticDB for MySQLのデータストレージタイプ。 Data Lake - OSS Storageのみがサポートされています。

      OSS パス

      AnalyticDB for MySQLクラスターデータのObject Storage Service (OSS) ストレージパス。

      重要
      • AnalyticDB for MySQLクラスターと同じリージョンにあるすべてのバケットが表示されます。 いずれかのバケットを選択できます。 ビジネス要件に基づいてこのパラメーターを設定します。 このパラメーターを設定した後、変更することはできません。

      • 他のデータ同期ジョブのディレクトリとネストされた関係を持たない空のディレクトリを選択することを推奨します。 これにより、履歴データの上書きが防止されます。 たとえば、2つのデータ同期ジョブに関連するOSSストレージパスがoss:// adb_demo/test/sls1 /およびoss:// adb_demo/test /であるとします。 この場合、これら2つのパスは互いに入れ子の関係にあるため、データ同期中にデータの上書きが発生します。

    • 次の表に、ターゲットデータベースとターゲットテーブルの設定 セクションのパラメーターを示します。

      パラメーター

      説明

      ライブラリ名

      AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスター内のターゲットデータベースの名前。 同じ名前のデータベースが存在しない場合は, データベースが作成されます。 同じ名前を使用するデータベースがすでに存在する場合、データは既存のデータベースと同期されます。 データベースの命名規則については、「制限」をご参照ください。

      テーブル名

      AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのターゲットテーブルの名前。 同じ名前を使用するテーブルがデータベースに存在しない場合、テーブルが作成されます。 同じ名前を使用するテーブルがデータベースにすでに存在する場合、データの同期に失敗します。 テーブルの命名規則については、「制限」をご参照ください。

      スキーマフィールドマッピング

      ソースフィールドと宛先フィールド。 デフォルトでは、フィールドはSimple Log Serviceの配送タスク設定に基づいて取得されます。 Logstoreに出荷タスクが設定されていない場合、フィールドは最新のログデータに基づいて取得されます。

      • BOOLEAN、INT、BIGINT、FLOAT、DOUBLE、STRINGのデータ型がサポートされています。

      • Simple Log Serviceの予約フィールドを同期できます。 詳細については、「予約済みフィールド」をご参照ください。

      重要
      • 宛先フィールド名は変更できません。

      • データ同期ジョブが開始された場合、開始および実行中であるか、開始および完了済みであるかにかかわらず、既存の列に関する情報を変更することはできません。 ただし、列を追加することはできます。 データ同期ジョブが作成されてもまだ開始されていない場合は、既存の列に関する情報を変更できます。

      パーティションキーの設定

      ターゲットテーブルのパーティションフィールドの設定。 データが取り込まれ、クエリが期待どおりに実行されるようにするには、ログ時間またはビジネスロジックに基づいてパーティションを構成することをお勧めします。 パーティションを設定しない場合、ターゲットテーブルにパーティションは存在しません。

      Format Processing Methodパラメータの有効な値:

      • Formatted Time: [Partition field Name] ドロップダウンリストからdatetimeフィールドを選択し、[Format Processing Method] パラメーターを [Formatted Time] に設定して、[Source Field Format] パラメーターと [Destination Partition Format] パラメーターを設定します。 AnalyticDB for MySQLは、指定されたソースフィールド形式に基づいてパーティションフィールドの値を識別し、その値をパーティション分割のために指定された宛先パーティション形式に変換します。 たとえば、ソースフィールドがgmt_createdで、値が1711358834、ソースフィールド形式パラメーターがTimestamp Accurate to Secondsに設定され、Destination Partition FormatパラメーターがyyyyMMddに設定されている場合、値は20240325に基づいてパーティション分割されます。

      • 指定されたパーティションフィールド: Format Processing Methodパラメーターを指定されたパーティションフィールドに設定し、その他の必須パラメーターを設定します。

    • 次の表に、[同期設定] セクションのパラメーターを示します。

      パラメーター

      説明

      増分同期の最初のコンシューマーオフセット

      データ同期ジョブの開始時にシステムがSimple Log Serviceデータを消費する時点。 有効な値:

      • Earlest Offset (begin_cursor): システムは、最も早いデータレコードが生成された時点からSimple Log Serviceデータを消費します。

      • 最新のオフセット (end_cursor): システムは、最新のデータレコードが生成された時点からSimple Log Serviceデータを消費します。

      • カスタムオフセット: 時点を選択できます。 次に、システムは、選択した時点で生成された最初のエントリのSimple Log Serviceデータを消費します。

      求人リソースグループ

      データ同期ジョブを実行するジョブリソースグループ。

      増分同期のためのACU

      ジョブリソースグループがデータ同期ジョブを実行するために必要なAnalyticDBコンピューティングユニット (ACU) の数。 値の範囲は2から、ジョブリソースグループで使用可能なコンピューティングリソースの最大数です。 データ取り込みの安定性とパフォーマンスを向上させるために、より多くのACUを指定することを推奨します。

      説明

      ジョブリソースグループでデータ同期ジョブを作成すると、リソースグループ内のエラスティックリソースが使用され、システムは使用されたリソースをリソースグループから除外します。 たとえば、ジョブリソースグループが48のACUの予約済みコンピューティングリソースを有し、8つのACUを消費する同期ジョブが作成されると仮定する。 リソースグループで別の同期ジョブを作成する場合、最大40のACUを選択できます。

      詳細設定

      データ同期ジョブのパーソナライズされた設定。 パーソナライズされた設定を構成する場合は、テクニカルサポートにお問い合わせください。

  4. 送信 をクリックします。

データ同期ジョブの開始

  1. On theSLS/Kafkaデータ同期ページで、作成したデータ同期ジョブを見つけて、開始で、アクション列を作成します。

  2. ページの右上隅にある [検索] をクリックします。 ジョブの状態がStartingに変わると、データ同期ジョブが開始されます。

データの分析

データ同期ジョブが完了したら、Spark JAR Developmentを使用して、AnalyticDB for MySQLクラスターに同期されたデータを分析できます。 Spark開発の詳細については、「Sparkエディター」および「概要」をご参照ください。

  1. 左側のナビゲーションウィンドウで、ジョブ开発 > Spark Jar 開発.

  2. デフォルトのテンプレートにSQL文を入力し、実行.

    -- SparkSQLの例にすぎません。 コンテンツを変更し、sparkプログラムを実行します。
    
    conf spark.driver.resourceSpec=medium;
    conf spark.exe cutor.instances=2;
    conf spark.exe cutor.resourceSpec=medium;
    con f spark.app.name=Spark SQLテスト;
    conf spark.adb.connectors=oss;
    
    -- ここにあなたのsqlステートメントがあります
    ショーテーブルからlakehouse20220413156_adbTest; 
  3. (オプション) [アプリケーション] タブでアプリケーションを見つけ、[操作] 列の [ログ] をクリックして、アプリケーションのSpark SQL実行ログを表示します。

データソースの管理

[データソース] ページで、次の表の [操作] 列に記載されている操作を実行できます。

API 操作

説明

リンクの新規作成

同期ジョブの作成または移行ジョブの作成ページに移動し、データソースを使用するジョブを作成します。

表示

データソースの詳細な設定を表示できます。

編集

データソース名や説明などのデータソースパラメーターを変更します。

削除

データソースを削除します。

説明

データソースがデータ同期またはデータ移行ジョブで使用されている場合、データソースを削除することはできません。 この場合、最初に SLS/Kafkaデータ同期 ページに移動してジョブを見つけ、操作 列の 削除 をクリックしてジョブを削除する必要があります。