すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:外部テーブルを使用してdata Lakehouse Editionにデータをインポートする

最終更新日:Oct 24, 2024

デフォルトでは、AnalyticDB for MySQLでは、トンネルレコードAPIモードで外部テーブルを使用してMaxComputeデータにアクセスし、インポートできます。 トンネル矢印APIモードを選択することもできます。 トンネルレコードAPIモードと比較して、トンネル矢印APIモードは列のMaxComputeデータを読み取ることができ、データアクセスと処理の効率が向上します。

前提条件

  • AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。

  • MaxComputeプロジェクトは、AnalyticDB for MySQLクラスターと同じリージョンに作成されます。

  • AnalyticDB for MySQLクラスターでElastic Network Interface (ENI) が有効になっています。

    説明

    AnalyticDB for MySQLコンソールにログインし、クラスター詳細ページの左側のナビゲーションウィンドウで [クラスター管理] > [クラスター情報] を選択し、[ネットワーク情報] セクションでENIをオンにします。

  • AnalyticDB for MySQLクラスターが存在する仮想プライベートクラウド (VPC) のCIDRブロックが、MaxComputeプロジェクトのIPアドレスホワイトリストに追加されます。

    説明

    AnalyticDB for MySQLコンソールにログインして、[クラスター情報] ページでVPC IDを表示できます。 次に、VPCコンソールにログインし、VPCページでVPC IDを見つけてCIDRブロックを表示します。 MaxComputeホワイトリストの設定方法については、「IPアドレスホワイトリストの管理」をご参照ください。

  • トンネル矢印APIモードを使用してMaxComputeデータにアクセスし、インポートする場合、AnalyticDB for MySQLクラスターのマイナーバージョンは3.2.2.1以降です。

    説明

    AnalyticDB for MySQLのマイナーバージョンを照会するには Data Lakehouse Editionクラスターで、SELECT adb_version(); 文を実行します。 クラスターのマイナーバージョンを更新するには、テクニカルサポートにお問い合わせください。

サンプルデータ

この例では、test_adbという名前のMaxComputeプロジェクトとpersonという名前のMaxComputeテーブルが使用されています。

人がいない場合は

CREATE TABLE IF NOT EXISTS person (
    id int,
    name varchar(1023),
    age int)
partitioned by (dt string);

次のステートメントを実行して、personテーブルにパーティションを作成します。

ALTER TABLE person 
ADD 
PARTITION (dt='202207');

次のステートメントを実行して、パーティションにデータを挿入します。

INSERT INTO test_adb.person 
partition (dt='202207') 
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);

(オプション) arrow API機能の有効化

デフォルトでは、AnalyticDB for MySQLでは、トンネルレコードAPIモードでMaxComputeデータにアクセスしてインポートできます。 トンネル矢印APIモードを使用してMaxComputeデータにアクセスしてインポートする場合は、まず矢印API機能を有効にする必要があります。 矢印API機能を有効にすると、AnalyticDB for MySQLにより、トンネル矢印APIモードでMaxComputeデータにアクセスしてインポートできます。

矢印API機能の有効化

次のいずれかの方法を使用して、矢印API機能を有効にできます。

  • クラスターレベルで矢印API機能を有効にするには、次のSETステートメントを実行し。

    SET ADB_CONFIG <config_name>= <value>;
  • 次のヒントを使用して、クエリレベルで矢印API機能を有効にします。

    /*<config_name>= <value>*/ SELECT * FROM table;

Parameters

config_name

説明

ODPS_TUNNEL_ARROW_ENABLED

矢印API機能を有効にするかどうかを指定します。 有効な値:

  • true

  • false (デフォルト)

ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED

動的分割機能を有効にするかどうかを指定します。 有効な値:

  • true

  • false (デフォルト)

手順

通常のインポートまたはエラスティックインポート方法を使用してデータをインポートできます。 デフォルトでは、通常のインポート方法が使用されます。 通常のインポート方法を使用する場合、データは計算ノードから読み取られ、インデックスはストレージノードに作成されます。 通常のインポート方法は、コンピューティングリソースとストレージリソースを消費します。 elastic importメソッドを使用すると、Serverless Sparkジョブのデータが読み取られ、インデックスが作成されます。 エラスティックインポート方式は、ジョブリソースグループのリソースを消費します。 ジョブリソースグループを持つV3.1.10.0以降のAnalyticDB for MySQLクラスターのみがelastic importメソッドをサポートしています。 通常のインポート方法と比較して、エラスティックインポート方法はより少ないリソースを消費します。 これにより、リアルタイムのデータの読み取りと書き込みへの影響が軽減され、リソースの分離とデータのインポート効率が向上します。 詳細については、「データインポート方法」をご参照ください。

定期的なインポート

  1. SQLエディターに移動します。

    1. AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。   [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

    2. 左側のナビゲーションウィンドウで、ジョブ开発 > Sql開発.

  2. 外部データベースを作成します。

    CREATE EXTERNAL DATABASE adb_external_db;
  3. 外部テーブルを作成します。 この例では、test_adbという名前の外部テーブルが作成されます。

    adb_external_db.test_adbが存在しない場合は

    CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb (
        id int,
        name varchar(1023),
        age int,
        dt string
        ) ENGINE='ODPS'
    TABLE_PROPERTIES='{
    "accessid":"LTAILd4****",
    "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api",
    "accesskey":"4A5Q7ZVzcYnWMQPysX****",
    "partition_column":"dt",
    "project_name":"test_adb",
    "table_name":"person"
    }';
    説明
    • AnalyticDB for MySQL外部テーブルは、MaxComputeテーブルと同じ名前、番号、およびフィールドの順序を使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。

    • 外部テーブルの作成に使用されるパラメーターの詳細については、「外部テーブルの作成」をご参照ください。

  4. データの照会

    SELECT * FROM adb_external_db.test_adb;

    サンプル結果:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
    4 rows in set (0.35 sec)
  5. MaxComputeからAnalyticDB for MySQLにデータをインポートするには、次の手順を実行します。

    1. AnalyticDB for MySQLクラスターにデータベースを作成します。

      CREATE DATABASE adb_demo; 
    2. AnalyticDB for MySQLデータベースにテーブルを作成し、MaxComputeからインポートされたデータを保存します。

      説明

      作成されたテーブルは、手順3で作成された外部テーブルと同じ数と順序のフィールドを使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。

      adb_demo.adb_import_testが存在しない場合は

      CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test(
          id int,
          name string,
          age int,
          dt string
          PRIMARY KEY(id,dt)
      )
      DISTRIBUTE BY HASH(id)  
      PARTITION BY VALUE('dt'); 
    3. テーブルにデータを書き込みます。

      • 方法1: INSERT INTOステートメントを実行してデータをインポートします。 主キーの値が重複している場合、データは繰り返し挿入されません。 この場合、INSERT INTOステートメントはINSERT IGNORE INTOステートメントに相当します。 詳細については、「INSERT INTO」をご参照ください。

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        特定のパーティションからadb_demo.adb_import_testテーブルにデータをインポートする場合は、次のステートメントを実行します。

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb 
        WHERE dt = '202207'; 
      • 方法2: INSERT OVERWRITE INTOステートメントを実行してデータをインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。

        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;
      • 方法3: INSERT OVERWRITE INTOステートメントを実行して、データを非同期的にインポートします。 ほとんどの場合、SUBMIT JOBステートメントは非同期ジョブの送信に使用されます。 ジョブを高速化するために、データインポートステートメントの前にヒント (/* + direct_batch_load=true */) を追加できます。 詳細については、「INSERT OVERWRITE SELECT」トピックの「非同期書き込み」セクションをご参照ください。

        SUBMIT job 
        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        サンプル結果:

        +---------------------------------------+
        | job_id                                |
        +---------------------------------------+
        | 2020112122202917203100908203303****** |
        +---------------------------------------+

        ジョブを非同期で送信する方法については、「インポートジョブを非同期で送信」をご参照ください。

Elastic import

  1. SQLエディターに移動します。

    1. AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。   [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。

    2. 左側のナビゲーションウィンドウで、ジョブ开発 > Sql開発.

  2. データベースを作成します。 すでにデータベースを作成している場合は、この手順をスキップしてください。

    CREATE DATABASE adb_demo; 
  3. 外部テーブルを作成します。

    説明
    • AnalyticDB for MySQL外部テーブルの名前は、MaxComputeプロジェクトの名前と同じである必要があります。 それ以外の場合、外部テーブルの作成に失敗します。

    • AnalyticDB for MySQL外部テーブルは、MaxComputeテーブルと同じ名前、番号、およびフィールドの順序を使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。

    • エラスティックインポートメソッドでは、create TABLEステートメントを使用してのみ外部テーブルを作成できます。

    CREATE TABLE IF NOT EXISTS test_adb
    (
        id int,
        name string,
        age int,
        dt string
    )
     ENGINE='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api",
     "accessid":"LTAILd4****",
     "accesskey":"4A5Q7ZVzcYnWMQPysX****",
     "partition_column":"dt",
     "project_name":"test_adb",
     "table_name":"person"
     }';                 

    外部テーブルパラメーターの詳細については、「外部テーブルを使用してデータをdata Warehouse Editionにインポートする」トピックのパラメーターテーブルをご参照ください。

  4. データの照会

    SELECT * FROM adb_demo.test_adb;

    サンプル結果:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
    4 rows in set (0.35 sec)
  5. AnalyticDB for MySQLデータベースにテーブルを作成し、MaxComputeからインポートされたデータを保存します。

    説明

    作成した内部テーブルは、手順3で作成した外部テーブルと同じフィールド名、番号、順序、およびデータ型を使用する必要があります。

    CREATE TABLE IF NOT EXISTS adb_import_test
    (   id int,
        name string,
        age int,
        dt string,
        PRIMARY KEY(id,dt)
    )
    DISTRIBUTE BY HASH(id)
    PARTITION BY VALUE('dt') LIFECYCLE 30;  
  6. データをインポートする。

    重要

    elastic importメソッドでは、INSERT OVERWRITE INTOステートメントを使用してのみデータをインポートできます。

    • 方法1: INSERT OVERWRITE INTOステートメントを実行して、データを弾力的にインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/
      INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
    • 方法2: INSERT OVERWRITE INTO文を非同期的に実行して、データを弾力的にインポートします。 ほとんどの場合、SUBMIT JOBステートメントは非同期ジョブの送信に使用されます。

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/
      SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
      重要

      エラスティックインポートジョブを非同期で送信する場合、優先キューを設定できません。

      サンプル結果:

      +---------------------------------------+
      | job_id                                |
      +---------------------------------------+
      | 2023081517192220291720310090151****** |
      +---------------------------------------+

    [ジョブの送信] ステートメントを使用して非同期ジョブを送信すると、ジョブIDのみが返されます。これは、非同期ジョブが正常に送信されたことを示します。 返されたジョブIDを使用して、非同期ジョブを終了したり、非同期ジョブのステータスを照会したりできます。 詳細については、「インポートジョブの非同期送信」をご参照ください。

    ヒントパラメータ:

    • elastic_load: elastic_importを使用するかどうかを指定します。 有効な値: trueおよびfalse。 デフォルト値:false

    • elastic_load_configs: エラスティックインポート機能の設定パラメーター。 パラメーターを括弧 ([ ]) で囲み、複数のパラメーターを縦棒 (|) で区切る必要があります。 次の表にパラメーターを示します。

      パラメーター

      必須

      説明

      adb.load.resource.group.name

      必須

      エラスティックインポートジョブを実行するジョブリソースグループの名前。

      adb.load.job.max.acu

      選択可能

      エラスティックインポートジョブのリソースの最大量。 単位: AnalyticDBコンピューティングユニット (ACU) 。 最小値: 5 ACU。 デフォルト値: シャード数 + 1

      次のステートメントを実行して、クラスター内のシャードの数を照会します。

      SELECT count(1) FROM information_schema.kepler_meta_shards;

      spark.driver.resourceSpec

      選択可能

      Sparkドライバーのリソースタイプ。 デフォルト値: small。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。

      spark.exe cutor.resourceSpec

      選択可能

      Sparkエグゼキュータのリソースタイプ。 デフォルト値: large。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。

      spark.adb.exe cutorDiskSize

      選択可能

      Sparkエグゼキュータのディスク容量。 有効な値: (0,100) 。 単位は USD / GiB です。 デフォルト値: 10 GiB 詳細については、「Conf設定パラメーター」トピックの「ドライバーとエグゼキューターリソースの指定」をご参照ください。

  7. (オプション) 送信されたジョブがエラスティックインポートジョブであるかどうかを確認します。

    SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";

    サンプル結果:

    +---------------------------------------+------------------+
    | job_name                              | is_elastic_load  |
    +---------------------------------------+------------------+
    | 2023081517195203101701907203151****** |       1          |
    +---------------------------------------+------------------+

    エラスティックインポートジョブが送信されると、is_elastic_loadパラメーターの1が返されます。 通常のインポートジョブが送信されると、0が返されます。