AnalyticDB for MySQLでは、外部テーブルを使用してMaxComputeデータにアクセスし、インポートできます。 これにより、クラスターリソースを最大限に活用してデータインポートのパフォーマンスを向上させることができます。 このトピックでは、外部テーブルを使用してMaxComputeからAnalyticDB for MySQL data Lakehouse Edition (V3.0) にデータをインポートする方法について説明します。
前提条件
MaxComputeプロジェクトとAnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが同じリージョンに作成されます。 詳細については、「クラスターの作成」をご参照ください。
Elastic Network Interface (ENI) 機能は、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターで有効になっています。
AnalyticDB for MySQLクラスターが存在する仮想プライベートクラウド (VPC) のCIDRブロックがMaxComputeプロジェクトのホワイトリストに追加されます。
AnalyticDB for MySQLコンソールにログインして、[クラスター情報] ページでVPC IDを表示できます。 次に、VPCコンソールにログインし、VPCページでVPC IDを見つけてCIDRブロックを表示します。 MaxComputeホワイトリストの設定方法については、「IPアドレスホワイトリストの管理」をご参照ください。
サンプルデータ
この例では、test_adb
という名前のMaxComputeプロジェクトとperson
という名前のMaxComputeテーブルが使用されています。 次の文を実行して、テーブルを作成します。
テーブルを作成します (
id int,
名前varchar(1023) 、
年齢int)
によって分割 (dt文字列);
次のステートメントを実行して、person
テーブルにパーティションを作成します。
ALTERテーブル人
追加
パーティー (dt='202207');
次のステートメントを実行して、パーティションにデータを挿入します。
test_adb.personに挿入
パーティション (dt='202207')
値 (1、「james」、10) 、(2、「bond」、20) 、(3、「jack」、30) 、(4、「lucy」、40);
トンネルモードでのMaxComputeデータへのアクセスとインポート
通常のインポートまたはエラスティックインポート方法を使用してデータをインポートできます。 デフォルトでは、通常のインポート方法が使用されます。 通常のインポート方法を使用する場合、データは計算ノードから読み取られ、インデックスはストレージノードに作成されます。 通常のインポート方法は、コンピューティングリソースとストレージリソースを消費します。 elastic importメソッドを使用すると、Serverless Sparkジョブのデータが読み取られ、インデックスが作成されます。 エラスティックインポート方式は、ジョブリソースグループのリソースを消費します。 ジョブリソースグループを持つAnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスタのみがV3.1.10.0以降でエラスティックインポートをサポートしています。 通常のインポート方法と比較して、エラスティックインポート方法はより少ないリソースを消費します。 これにより、リアルタイムのデータの読み取りと書き込みへの影響が軽減され、リソースの分離とデータのインポート効率が向上します。 詳細については、「データインポート方法」をご参照ください。
定期的なインポート
SQLエディターに移動します。
AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
- 左側のナビゲーションウィンドウで、 を選択します。
外部データベースを作成します。
外部データベースの作成adb_external_db;
外部テーブルを作成します。 この例では、
adb_external_db.test_adbが存在しない場合はtest_adb
という名前の外部テーブルが使用されています。外部テーブルを作成します ( id int, 名前varchar(1023) 、 age int, dtストリング ) エンジン='ODPS' TABLE_PROPERTIES='{ "accessid":"LTAILd4 ****" 、"endpoint":" http://service.cn-hangzhou.maxcompute.aliyun.com/api " 、"accesskey":"4A5Q7ZVzcYnWMQPysX ****" 、"partition_column":"dt" 、"project_name":"test_adb" 、"table_name":"person" } ';
説明AnalyticDB for MySQL外部テーブルは、MaxComputeテーブルと同じ名前、番号、およびフィールドの順序を使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。
外部テーブルの作成に使用されるパラメーターの詳細については、「外部テーブルの作成」をご参照ください。
データの照会
SELECT * からadb_external_db.test_adb;
サンプル結果:
+ ------ ------ ------- ------------- | id | name | 年齢 | dt | ------ ------ -------- -------------- | 1 | ジェームズ | 10 | 202207 | | 2 | ボンド | 20 | 202207 | | 3 | ジャック | 30 | 202207 | | 4 | ルーシー | 40 | 202207 | ------ ------ -------- -------------- セットの4列 (0.35秒)
MaxComputeからAnalyticDB for MySQLにデータをインポートするには、次の手順を実行します。
AnalyticDB for MySQLクラスターにデータベースを作成します。
AnalyticDB for MySQLデータベースにテーブルを作成し、MaxComputeからインポートされたデータを保存します。
テーブルにデータを書き込みます。
方法1: INSERT INTOステートメントを実行してデータをインポートします。 主キーの値が重複している場合、データは繰り返し挿入されません。 この場合、INSERT INTOステートメントは
INSERT IGNORE INTO
ステートメントに相当します。 詳細については、「INSERT INTO」をご参照ください。方法2: INSERT OVERWRITE INTOステートメントを実行してデータをインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。
方法3: INSERT OVERWRITE INTOステートメントを実行して、データを非同期的にインポートします。 ほとんどの場合、
SUBMIT JOB
ステートメントは非同期ジョブの送信に使用されます。 ジョブを高速化するために、データインポートステートメントの前にヒント (/* + direct_batch_load=true */
) を追加できます。 詳細については、「INSERT OVERWRITE SELECT」トピックの「非同期書き込み」セクションをご参照ください。ジョブを非同期で送信する方法については、「インポートジョブを非同期で送信」をご参照ください。
CREATE DATABASE adb_demo;
adb_demo.adb_import_testが存在しない場合は説明作成されたテーブルは、手順3で作成された外部テーブルと同じ数と順序のフィールドを使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。
テーブルを作成します ( id int, 名前文字列, age int, dtストリング 主要なキー (id、dt) ) ハッシュによる分配 (id) 値による部分 ('dt');
adb_demo.adb_import_testに挿入する SELECT * からadb_external_db.test_adb;
特定のパーティションから
adb_demo.adb_import_test
テーブルにデータをインポートする場合は、次のステートメントを実行します。adb_demo.adb_import_testに挿入する SELECT * からadb_external_db.test_adb WHERE dt='20220';
上書きをadb_demo.adb_import_testに挿入する SELECT * からadb_external_db.test_adb;
ジョブを送信する 上書きをadb_demo.adb_import_testに挿入する SELECT * からadb_external_db.test_adb;
サンプル結果:
+ --------------------------------------- + | job_id | + --------------------------------------- + | 2020112122202917203100908203303 ****** | + --------------------------------------- +
Elastic import
SQLエディターに移動します。
AnalyticDB for MySQLコンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 Lake Warehouse Edition(3.0) タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
- 左側のナビゲーションウィンドウで、 を選択します。
データベースを作成します。 すでにデータベースを作成している場合は、この手順をスキップしてください。
CREATE DATABASE adb_demo;
外部テーブルを作成します。
説明AnalyticDB for MySQL外部テーブルの名前は、MaxComputeプロジェクトの名前と同じである必要があります。 それ以外の場合、外部テーブルの作成に失敗します。
AnalyticDB for MySQL外部テーブルは、MaxComputeテーブルと同じ名前、番号、およびフィールドの順序を使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。
エラスティックインポートメソッドでは、
create TABLE
ステートメントを使用してのみ外部テーブルを作成できます。
が存在しない場合はテーブルを作成しますtest_adb ( id int, 名前文字列, age int, dtストリング ) エンジン='ODPS' TABLE_PROPERTIES='{ "endpoint":" http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api " 、 "accessid":"LTAILd4 ****" 、 "accesskey":"4A5Q7ZVzcYnWMQPysX ****" 、 "partition_column":"dt" 、 "project_name":"test_adb" 、 "table_name":"person" }';
外部テーブルパラメーターの詳細については、「外部テーブルを使用してデータをdata Warehouse Editionにインポートする」トピックのパラメーターテーブルをご参照ください。
データの照会
SELECT * からadb_demo.test_adb;
サンプル結果:
+ ------ ------ ------- ------------- | id | name | 年齢 | dt | ------ ------ -------- -------------- | 1 | ジェームズ | 10 | 202207 | | 2 | ボンド | 20 | 202207 | | 3 | ジャック | 30 | 202207 | | 4 | ルーシー | 40 | 202207 | ------ ------ -------- -------------- セットの4列 (0.35秒)
AnalyticDB for MySQLデータベースにテーブルを作成し、MaxComputeからインポートされたデータを保存します。
説明作成した内部テーブルは、手順3で作成した外部テーブルと同じフィールド名、番号、順序、およびデータ型を使用する必要があります。
が存在しない場合はテーブルを作成します。adb_import_test (id int, 名前文字列, age int, dtストリング、 主要なキー (id、dt) ) ハッシュによる分配 (id) 値によるパーティション ('dt') LIFECYCLE 30;
データをインポートする。
重要elastic importメソッドでは、
INSERT OVERWRITE INTO
ステートメントを使用してのみデータをインポートできます。方法1: INSERT OVERWRITE INTOステートメントを実行して、データを弾力的にインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。
/* + elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group | spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d ****]* / INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * からadb_demo.test_adb;
方法2: INSERT OVERWRITE INTO文を非同期的に実行して、データを弾力的にインポートします。 ほとんどの場合、
SUBMIT JOB
ステートメントは非同期ジョブの送信に使用されます。/* + elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group | spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d ****]* / ジョブ挿入の上書きをadb_demo.adb_import_test SELECT * からadb_demo.test_adbに送信します。
重要エラスティックインポートジョブを非同期で送信する場合、優先キューを設定できません。
サンプル結果:
+ --------------------------------------- + | job_id | + --------------------------------------- + | 2023081517192220291720310090151 ****** | + --------------------------------------- +
[ジョブの送信]
ステートメントを使用して非同期ジョブを送信すると、ジョブIDのみが返されます。これは、非同期ジョブが正常に送信されたことを示します。 返されたジョブIDを使用して、非同期ジョブを終了したり、非同期ジョブのステータスを照会したりできます。 詳細については、「インポートジョブの非同期送信」をご参照ください。ヒントパラメータ:
elastic_load: elastic_importを使用するかどうかを指定します。 有効な値: trueおよびfalse。 デフォルト値:false
elastic_load_configs: エラスティックインポート機能の設定パラメーター。 パラメーターを括弧 ([ ]) で囲み、複数のパラメーターを縦棒 (|) で区切る必要があります。 次の表にパラメーターを示します。
パラメーター
必須
説明
adb.load.resource.group.name
可
エラスティックインポートジョブを実行するジョブリソースグループの名前。
adb.load.job.max.acu
任意
エラスティックインポートジョブのリソースの最大量。 単位: AnalyticDBコンピューティングユニット (ACU) 。 最小値: 5 ACU。 デフォルト値: シャード数 + 1
次のステートメントを実行して、クラスター内のシャードの数を照会します。
SELECT count (1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
任意
Sparkドライバーのリソースタイプ。 デフォルト値: small。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。
spark.exe cutor.resourceSpec
任意
Sparkエグゼキュータのリソースタイプ。 デフォルト値: large。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。
spark.adb.exe cutorDiskSize
任意
Sparkエグゼキュータのディスク容量。 有効な値: (0,100) 。 単位:GiB。 デフォルト値: 10 GiB 詳細については、「Conf設定パラメーター」トピックの「ドライバーとエグゼキューターリソースの指定」をご参照ください。
(オプション) 送信されたジョブがエラスティックインポートジョブであるかどうかを確認します。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151 ******";
サンプル結果:
+ --------------------------------------- + ------------------ + | job_name | is_elastic_load | + --------------------------------------- + ------------------ + | 2023081517195203101701907203151 ****** | 1 | + --------------------------------------- + ------------------ +
エラスティックインポートジョブが送信されると、
is_elastic_load
パラメーターの1が返されます。 通常のインポートジョブが送信されると、0が返されます。