すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB for MySQL:外部テーブルを使用してdata Lakehouse Editionにデータをインポートする

最終更新日:Jun 14, 2024

AnalyticDB for MySQLでは、外部テーブルを使用してMaxComputeデータにアクセスし、インポートできます。 これにより、クラスターリソースを最大限に活用してデータインポートのパフォーマンスを向上させることができます。 このトピックでは、外部テーブルを使用してMaxComputeからAnalyticDB for MySQL data Lakehouse Edition (V3.0) にデータをインポートする方法について説明します。

前提条件

  • MaxComputeプロジェクトとAnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが同じリージョンに作成されます。 詳細については、「クラスターの作成」をご参照ください。

  • Elastic Network Interface (ENI) 機能は、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターで有効になっています。

  • AnalyticDB for MySQLクラスターが存在する仮想プライベートクラウド (VPC) のCIDRブロックがMaxComputeプロジェクトのホワイトリストに追加されます。

    AnalyticDB for MySQLコンソールにログインして、[クラスター情報] ページでVPC IDを表示できます。 次に、VPCコンソールにログインし、VPCページでVPC IDを見つけてCIDRブロックを表示します。 MaxComputeホワイトリストの設定方法については、「IPアドレスホワイトリストの管理」をご参照ください。

サンプルデータ

この例では、test_adbという名前のMaxComputeプロジェクトとpersonという名前のMaxComputeテーブルが使用されています。 次の文を実行して、テーブルを作成します。

人がいない場合は
テーブルを作成します (
    id int,
    名前varchar(1023) 、
    年齢int)
によって分割 (dt文字列); 

次のステートメントを実行して、personテーブルにパーティションを作成します。

ALTERテーブル人
追加
パーティー (dt='202207'); 

次のステートメントを実行して、パーティションにデータを挿入します。

test_adb.personに挿入
パーティション (dt='202207')
値 (1、「james」、10) 、(2、「bond」、20) 、(3、「jack」、30) 、(4、「lucy」、40); 

トンネルモードでのMaxComputeデータへのアクセスとインポート

通常のインポートまたはエラスティックインポート方法を使用してデータをインポートできます。 デフォルトでは、通常のインポート方法が使用されます。 通常のインポート方法を使用する場合、データは計算ノードから読み取られ、インデックスはストレージノードに作成されます。 通常のインポート方法は、コンピューティングリソースとストレージリソースを消費します。 elastic importメソッドを使用すると、Serverless Sparkジョブのデータが読み取られ、インデックスが作成されます。 エラスティックインポート方式は、ジョブリソースグループのリソースを消費します。 ジョブリソースグループを持つAnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスタのみがV3.1.10.0以降でエラスティックインポートをサポートしています。 通常のインポート方法と比較して、エラスティックインポート方法はより少ないリソースを消費します。 これにより、リアルタイムのデータの読み取りと書き込みへの影響が軽減され、リソースの分離とデータのインポート効率が向上します。 詳細については、「データインポート方法」をご参照ください。

定期的なインポート

  1. SQLエディターに移動します。

    1. AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

    2. 左側のナビゲーションウィンドウで、[ジョブ開発] > Sql開発 を選択します。
  2. 外部データベースを作成します。

    外部データベースの作成adb_external_db;
  3. 外部テーブルを作成します。 この例では、test_adbという名前の外部テーブルが使用されています。

    adb_external_db.test_adbが存在しない場合は
    外部テーブルを作成します (
        id int,
        名前varchar(1023) 、
        age int,
        dtストリング
        ) エンジン='ODPS'
    TABLE_PROPERTIES='{
    "accessid":"LTAILd4 ****" 、"endpoint":" http://service.cn-hangzhou.maxcompute.aliyun.com/api " 、"accesskey":"4A5Q7ZVzcYnWMQPysX ****" 、"partition_column":"dt" 、"project_name":"test_adb" 、"table_name":"person"
    } '; 
    説明
    • AnalyticDB for MySQL外部テーブルは、MaxComputeテーブルと同じ名前、番号、およびフィールドの順序を使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。

    • 外部テーブルの作成に使用されるパラメーターの詳細については、「外部テーブルの作成」をご参照ください。

  4. データの照会

    SELECT * からadb_external_db.test_adb;

    サンプル結果:

    + ------ ------ ------- -------------
    | id | name | 年齢 | dt |
    ------ ------ -------- --------------
    | 1 | ジェームズ | 10 | 202207 |
    | 2 | ボンド | 20 | 202207 |
    | 3 | ジャック | 30 | 202207 |
    | 4 | ルーシー | 40 | 202207 |
    ------ ------ -------- --------------
    セットの4列 (0.35秒) 
  5. MaxComputeからAnalyticDB for MySQLにデータをインポートするには、次の手順を実行します。

    1. AnalyticDB for MySQLクラスターにデータベースを作成します。

    2. CREATE DATABASE adb_demo;
    3. AnalyticDB for MySQLデータベースにテーブルを作成し、MaxComputeからインポートされたデータを保存します。

    4. 説明

      作成されたテーブルは、手順3で作成された外部テーブルと同じ数と順序のフィールドを使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。

      adb_demo.adb_import_testが存在しない場合は
      テーブルを作成します (
          id int,
          名前文字列,
          age int,
          dtストリング
          主要なキー (id、dt)
      )
      ハッシュによる分配 (id)
      値による部分 ('dt'); 
    5. テーブルにデータを書き込みます。

      • 方法1: INSERT INTOステートメントを実行してデータをインポートします。 主キーの値が重複している場合、データは繰り返し挿入されません。 この場合、INSERT INTOステートメントはINSERT IGNORE INTOステートメントに相当します。 詳細については、「INSERT INTO」をご参照ください。

      • adb_demo.adb_import_testに挿入する
        SELECT * からadb_external_db.test_adb; 

        特定のパーティションからadb_demo.adb_import_testテーブルにデータをインポートする場合は、次のステートメントを実行します。

        adb_demo.adb_import_testに挿入する
        SELECT * からadb_external_db.test_adb
        WHERE dt='20220'; 
      • 方法2: INSERT OVERWRITE INTOステートメントを実行してデータをインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。

      • 上書きをadb_demo.adb_import_testに挿入する
        SELECT * からadb_external_db.test_adb; 
      • 方法3: INSERT OVERWRITE INTOステートメントを実行して、データを非同期的にインポートします。 ほとんどの場合、SUBMIT JOBステートメントは非同期ジョブの送信に使用されます。 ジョブを高速化するために、データインポートステートメントの前にヒント (/* + direct_batch_load=true */) を追加できます。 詳細については、「INSERT OVERWRITE SELECT」トピックの「非同期書き込み」セクションをご参照ください。

      • ジョブを送信する
        上書きをadb_demo.adb_import_testに挿入する
        SELECT * からadb_external_db.test_adb; 

        サンプル結果:

        + --------------------------------------- +
        | job_id |
        + --------------------------------------- +
        | 2020112122202917203100908203303 ****** |
        + --------------------------------------- + 
      • ジョブを非同期で送信する方法については、「インポートジョブを非同期で送信」をご参照ください。

Elastic import

  1. SQLエディターに移動します。

    1. AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

    2. 左側のナビゲーションウィンドウで、[ジョブ開発] > Sql開発 を選択します。
  2. データベースを作成します。 すでにデータベースを作成している場合は、この手順をスキップしてください。

    CREATE DATABASE adb_demo;
  3. 外部テーブルを作成します。

    説明
    • AnalyticDB for MySQL外部テーブルの名前は、MaxComputeプロジェクトの名前と同じである必要があります。 それ以外の場合、外部テーブルの作成に失敗します。

    • AnalyticDB for MySQL外部テーブルは、MaxComputeテーブルと同じ名前、番号、およびフィールドの順序を使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。

    • エラスティックインポートメソッドでは、create TABLEステートメントを使用してのみ外部テーブルを作成できます。

    が存在しない場合はテーブルを作成しますtest_adb
    (
        id int,
        名前文字列,
        age int,
        dtストリング
    )
     エンジン='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":" http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api " 、
     "accessid":"LTAILd4 ****" 、
     "accesskey":"4A5Q7ZVzcYnWMQPysX ****" 、
     "partition_column":"dt" 、
     "project_name":"test_adb" 、
     "table_name":"person"
     }';                 

    外部テーブルパラメーターの詳細については、「外部テーブルを使用してデータをdata Warehouse Editionにインポートする」トピックのパラメーターテーブルをご参照ください。

  4. データの照会

    SELECT * からadb_demo.test_adb;

    サンプル結果:

    + ------ ------ ------- -------------
    | id | name | 年齢 | dt |
    ------ ------ -------- --------------
    | 1 | ジェームズ | 10 | 202207 |
    | 2 | ボンド | 20 | 202207 |
    | 3 | ジャック | 30 | 202207 |
    | 4 | ルーシー | 40 | 202207 |
    ------ ------ -------- --------------
    セットの4列 (0.35秒) 
  5. AnalyticDB for MySQLデータベースにテーブルを作成し、MaxComputeからインポートされたデータを保存します。

    説明

    作成した内部テーブルは、手順3で作成した外部テーブルと同じフィールド名、番号、順序、およびデータ型を使用する必要があります。

    が存在しない場合はテーブルを作成します。adb_import_test
    (id int,
        名前文字列,
        age int,
        dtストリング、
        主要なキー (id、dt)
    )
    ハッシュによる分配 (id)
    値によるパーティション ('dt') LIFECYCLE 30; 
  6. データをインポートする。

    重要

    elastic importメソッドでは、INSERT OVERWRITE INTOステートメントを使用してのみデータをインポートできます。

    • 方法1: INSERT OVERWRITE INTOステートメントを実行して、データを弾力的にインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。

      /* + elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group | spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d ****]* /
      INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * からadb_demo.test_adb; 
    • 方法2: INSERT OVERWRITE INTO文を非同期的に実行して、データを弾力的にインポートします。 ほとんどの場合、SUBMIT JOBステートメントは非同期ジョブの送信に使用されます。

      /* + elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group | spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d ****]* /
      ジョブ挿入の上書きをadb_demo.adb_import_test SELECT * からadb_demo.test_adbに送信します。
      重要

      エラスティックインポートジョブを非同期で送信する場合、優先キューを設定できません。

      サンプル結果:

      + --------------------------------------- +
      | job_id |
      + --------------------------------------- +
      | 2023081517192220291720310090151 ****** |
      + --------------------------------------- + 

    [ジョブの送信] ステートメントを使用して非同期ジョブを送信すると、ジョブIDのみが返されます。これは、非同期ジョブが正常に送信されたことを示します。 返されたジョブIDを使用して、非同期ジョブを終了したり、非同期ジョブのステータスを照会したりできます。 詳細については、「インポートジョブの非同期送信」をご参照ください。

    ヒントパラメータ:

    • elastic_load: elastic_importを使用するかどうかを指定します。 有効な値: trueおよびfalse。 デフォルト値:false

    • elastic_load_configs: エラスティックインポート機能の設定パラメーター。 パラメーターを括弧 ([ ]) で囲み、複数のパラメーターを縦棒 (|) で区切る必要があります。 次の表にパラメーターを示します。

      パラメーター

      必須

      説明

      adb.load.resource.group.name

      エラスティックインポートジョブを実行するジョブリソースグループの名前。

      adb.load.job.max.acu

      任意

      エラスティックインポートジョブのリソースの最大量。 単位: AnalyticDBコンピューティングユニット (ACU) 。 最小値: 5 ACU。 デフォルト値: シャード数 + 1

      次のステートメントを実行して、クラスター内のシャードの数を照会します。

      SELECT count (1) FROM information_schema.kepler_meta_shards;

      spark.driver.resourceSpec

      任意

      Sparkドライバーのリソースタイプ。 デフォルト値: small。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。

      spark.exe cutor.resourceSpec

      任意

      Sparkエグゼキュータのリソースタイプ。 デフォルト値: large。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。

      spark.adb.exe cutorDiskSize

      任意

      Sparkエグゼキュータのディスク容量。 有効な値: (0,100) 。 単位:GiB。 デフォルト値: 10 GiB 詳細については、「Conf設定パラメーター」トピックの「ドライバーとエグゼキューターリソースの指定」をご参照ください。

  7. (オプション) 送信されたジョブがエラスティックインポートジョブであるかどうかを確認します。

    SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151 ******";

    サンプル結果:

    + --------------------------------------- + ------------------ +
    | job_name | is_elastic_load |
    + --------------------------------------- + ------------------ +
    | 2023081517195203101701907203151 ****** | 1 |
    + --------------------------------------- + ------------------ + 

    エラスティックインポートジョブが送信されると、is_elastic_loadパラメーターの1が返されます。 通常のインポートジョブが送信されると、0が返されます。