デフォルトでは、AnalyticDB for MySQLでは、トンネルレコードAPIモードで外部テーブルを使用してMaxComputeデータにアクセスし、インポートできます。 トンネル矢印APIモードを選択することもできます。 トンネルレコードAPIモードと比較して、トンネル矢印APIモードは列のMaxComputeデータを読み取ることができ、データアクセスと処理の効率が向上します。
前提条件
AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。
MaxComputeプロジェクトは、AnalyticDB for MySQLクラスターと同じリージョンに作成されます。
AnalyticDB for MySQLクラスターでElastic Network Interface (ENI) が有効になっています。
説明AnalyticDB for MySQLコンソールにログインし、クラスター詳細ページの左側のナビゲーションウィンドウで を選択し、[ネットワーク情報] セクションでENIをオンにします。
AnalyticDB for MySQLクラスターが存在する仮想プライベートクラウド (VPC) のCIDRブロックが、MaxComputeプロジェクトのIPアドレスホワイトリストに追加されます。
説明AnalyticDB for MySQLコンソールにログインして、[クラスター情報] ページでVPC IDを表示できます。 次に、VPCコンソールにログインし、VPCページでVPC IDを見つけてCIDRブロックを表示します。 MaxComputeホワイトリストの設定方法については、「IPアドレスホワイトリストの管理」をご参照ください。
トンネル矢印APIモードを使用してMaxComputeデータにアクセスし、インポートする場合、AnalyticDB for MySQLクラスターのマイナーバージョンは3.2.2.1以降です。
説明AnalyticDB for MySQLのマイナーバージョンを照会するには Data Lakehouse Editionクラスターで、
SELECT adb_version();
文を実行します。 クラスターのマイナーバージョンを更新するには、テクニカルサポートにお問い合わせください。
サンプルデータ
この例では、test_adb
という名前のMaxComputeプロジェクトとperson
という名前のMaxComputeテーブルが使用されています。
人がいない場合は
CREATE TABLE IF NOT EXISTS person (
id int,
name varchar(1023),
age int)
partitioned by (dt string);
次のステートメントを実行して、person
テーブルにパーティションを作成します。
ALTER TABLE person
ADD
PARTITION (dt='202207');
次のステートメントを実行して、パーティションにデータを挿入します。
INSERT INTO test_adb.person
partition (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);
(オプション) arrow API機能の有効化
デフォルトでは、AnalyticDB for MySQLでは、トンネルレコードAPIモードでMaxComputeデータにアクセスしてインポートできます。 トンネル矢印APIモードを使用してMaxComputeデータにアクセスしてインポートする場合は、まず矢印API機能を有効にする必要があります。 矢印API機能を有効にすると、AnalyticDB for MySQLにより、トンネル矢印APIモードでMaxComputeデータにアクセスしてインポートできます。
矢印API機能の有効化
次のいずれかの方法を使用して、矢印API機能を有効にできます。
クラスターレベルで矢印API機能を有効にするには、次のSETステートメントを実行し。
SET ADB_CONFIG <config_name>= <value>;
次のヒントを使用して、クエリレベルで矢印API機能を有効にします。
/*<config_name>= <value>*/ SELECT * FROM table;
Parameters
config_name | 説明 |
ODPS_TUNNEL_ARROW_ENABLED | 矢印API機能を有効にするかどうかを指定します。 有効な値:
|
ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED | 動的分割機能を有効にするかどうかを指定します。 有効な値:
|
手順
通常のインポートまたはエラスティックインポート方法を使用してデータをインポートできます。 デフォルトでは、通常のインポート方法が使用されます。 通常のインポート方法を使用する場合、データは計算ノードから読み取られ、インデックスはストレージノードに作成されます。 通常のインポート方法は、コンピューティングリソースとストレージリソースを消費します。 elastic importメソッドを使用すると、Serverless Sparkジョブのデータが読み取られ、インデックスが作成されます。 エラスティックインポート方式は、ジョブリソースグループのリソースを消費します。 ジョブリソースグループを持つV3.1.10.0以降のAnalyticDB for MySQLクラスターのみがelastic importメソッドをサポートしています。 通常のインポート方法と比較して、エラスティックインポート方法はより少ないリソースを消費します。 これにより、リアルタイムのデータの読み取りと書き込みへの影響が軽減され、リソースの分離とデータのインポート効率が向上します。 詳細については、「データインポート方法」をご参照ください。
定期的なインポート
SQLエディターに移動します。
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
左側のナビゲーションウィンドウで、 .
外部データベースを作成します。
CREATE EXTERNAL DATABASE adb_external_db;
外部テーブルを作成します。 この例では、
test_adb
という名前の外部テーブルが作成されます。adb_external_db.test_adbが存在しない場合は
CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"LTAILd4****", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
説明AnalyticDB for MySQL外部テーブルは、MaxComputeテーブルと同じ名前、番号、およびフィールドの順序を使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。
外部テーブルの作成に使用されるパラメーターの詳細については、「外部テーブルの作成」をご参照ください。
データの照会
SELECT * FROM adb_external_db.test_adb;
サンプル結果:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
MaxComputeからAnalyticDB for MySQLにデータをインポートするには、次の手順を実行します。
AnalyticDB for MySQLクラスターにデータベースを作成します。
CREATE DATABASE adb_demo;
AnalyticDB for MySQLデータベースにテーブルを作成し、MaxComputeからインポートされたデータを保存します。
説明作成されたテーブルは、手順3で作成された外部テーブルと同じ数と順序のフィールドを使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。
adb_demo.adb_import_testが存在しない場合は
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt');
テーブルにデータを書き込みます。
方法1: INSERT INTOステートメントを実行してデータをインポートします。 主キーの値が重複している場合、データは繰り返し挿入されません。 この場合、INSERT INTOステートメントは
INSERT IGNORE INTO
ステートメントに相当します。 詳細については、「INSERT INTO」をご参照ください。INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
特定のパーティションから
adb_demo.adb_import_test
テーブルにデータをインポートする場合は、次のステートメントを実行します。INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';
方法2: INSERT OVERWRITE INTOステートメントを実行してデータをインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
方法3: INSERT OVERWRITE INTOステートメントを実行して、データを非同期的にインポートします。 ほとんどの場合、
SUBMIT JOB
ステートメントは非同期ジョブの送信に使用されます。 ジョブを高速化するために、データインポートステートメントの前にヒント (/* + direct_batch_load=true */
) を追加できます。 詳細については、「INSERT OVERWRITE SELECT」トピックの「非同期書き込み」セクションをご参照ください。SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
サンプル結果:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+
ジョブを非同期で送信する方法については、「インポートジョブを非同期で送信」をご参照ください。
Elastic import
SQLエディターに移動します。
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
左側のナビゲーションウィンドウで、 .
データベースを作成します。 すでにデータベースを作成している場合は、この手順をスキップしてください。
CREATE DATABASE adb_demo;
外部テーブルを作成します。
説明AnalyticDB for MySQL外部テーブルの名前は、MaxComputeプロジェクトの名前と同じである必要があります。 それ以外の場合、外部テーブルの作成に失敗します。
AnalyticDB for MySQL外部テーブルは、MaxComputeテーブルと同じ名前、番号、およびフィールドの順序を使用する必要があります。 フィールドのデータ型は、2つのテーブル間で互換性がある必要があります。
エラスティックインポートメソッドでは、
create TABLE
ステートメントを使用してのみ外部テーブルを作成できます。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"LTAILd4****", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
外部テーブルパラメーターの詳細については、「外部テーブルを使用してデータをdata Warehouse Editionにインポートする」トピックのパラメーターテーブルをご参照ください。
データの照会
SELECT * FROM adb_demo.test_adb;
サンプル結果:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
AnalyticDB for MySQLデータベースにテーブルを作成し、MaxComputeからインポートされたデータを保存します。
説明作成した内部テーブルは、手順3で作成した外部テーブルと同じフィールド名、番号、順序、およびデータ型を使用する必要があります。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
データをインポートする。
重要elastic importメソッドでは、
INSERT OVERWRITE INTO
ステートメントを使用してのみデータをインポートできます。方法1: INSERT OVERWRITE INTOステートメントを実行して、データを弾力的にインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。
/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法2: INSERT OVERWRITE INTO文を非同期的に実行して、データを弾力的にインポートします。 ほとんどの場合、
SUBMIT JOB
ステートメントは非同期ジョブの送信に使用されます。/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
重要エラスティックインポートジョブを非同期で送信する場合、優先キューを設定できません。
サンプル結果:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
[ジョブの送信]
ステートメントを使用して非同期ジョブを送信すると、ジョブIDのみが返されます。これは、非同期ジョブが正常に送信されたことを示します。 返されたジョブIDを使用して、非同期ジョブを終了したり、非同期ジョブのステータスを照会したりできます。 詳細については、「インポートジョブの非同期送信」をご参照ください。ヒントパラメータ:
elastic_load: elastic_importを使用するかどうかを指定します。 有効な値: trueおよびfalse。 デフォルト値:false
elastic_load_configs: エラスティックインポート機能の設定パラメーター。 パラメーターを括弧 ([ ]) で囲み、複数のパラメーターを縦棒 (|) で区切る必要があります。 次の表にパラメーターを示します。
パラメーター
必須
説明
adb.load.resource.group.name
必須
エラスティックインポートジョブを実行するジョブリソースグループの名前。
adb.load.job.max.acu
選択可能
エラスティックインポートジョブのリソースの最大量。 単位: AnalyticDBコンピューティングユニット (ACU) 。 最小値: 5 ACU。 デフォルト値: シャード数 + 1
次のステートメントを実行して、クラスター内のシャードの数を照会します。
SELECT count(1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
選択可能
Sparkドライバーのリソースタイプ。 デフォルト値: small。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。
spark.exe cutor.resourceSpec
選択可能
Sparkエグゼキュータのリソースタイプ。 デフォルト値: large。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。
spark.adb.exe cutorDiskSize
選択可能
Sparkエグゼキュータのディスク容量。 有効な値: (0,100) 。 単位は USD / GiB です。 デフォルト値: 10 GiB 詳細については、「Conf設定パラメーター」トピックの「ドライバーとエグゼキューターリソースの指定」をご参照ください。
(オプション) 送信されたジョブがエラスティックインポートジョブであるかどうかを確認します。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";
サンプル結果:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+
エラスティックインポートジョブが送信されると、
is_elastic_load
パラメーターの1が返されます。 通常のインポートジョブが送信されると、0が返されます。