AnalyticDB for MySQL Data Lakehouse Edition (V3.0) では、外部テーブルを使用してデータにアクセスおよびインポートできます。 通常のインポートまたはエラスティックインポートを使用してデータをインポートできます。 通常のインポートと比較して、elastic importはリソースの消費量が少なく、リアルタイムのデータの読み取りと書き込みへの影響が少なくなります。 このトピックでは、外部テーブルを使用してObject Storage Service (OSS) データを照会し、OSSからAnalyticDB for MySQL data Lakehouse Edition (V3.0) にデータをインポートする方法について説明します。
前提条件
OSSが有効化されています。 バケットとディレクトリがOSSに作成されます。 詳細については、「OSSの有効化」、「バケットの作成」、「ディレクトリの作成」をご参照ください。
データがOSSディレクトリにアップロードされます。 詳細については、「オブジェクトのアップロード」をご参照ください。
AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターは、OSSバケットと同じリージョンに作成されます。
サンプルデータ
この例では、person
オブジェクトはOSSのtestBucketName/adb/dt=2023-06-15
ディレクトリにアップロードされます。 改行は行区切り文字として使用され、コンマ (,) は列区切り文字として使用されます。 person
オブジェクトのサンプルデータ:
1、ジェームズ、10,2023-06-15
2、ボンド、20,2023-06-15
3、ジャック、30,2023-06-15
4、ルーシー、40,2023-06-15
手順
- SQLエディターに移動します。
- AnalyticDB for MySQLコンソールにログインします。
- ページの左上隅で、クラスターが存在するリージョンを選択します。
- 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。
- Lake Warehouse Edition(3.0) タブでクラスターを見つけ、クラスター ID をクリックします。
- 左側のナビゲーションウィンドウで、 を選択します。
データをインポートする。
通常のインポートまたはエラスティックインポートを使用してデータをインポートできます。 デフォルトの方法は通常のインポートです。 通常のインポートは、計算ノードからデータを読み取り、ストレージノードにインデックスを作成します。 この方法は、コンピューティングおよびストレージリソースを消費する。 Elastic importはデータを読み取り、Serverless Sparkジョブのインデックスを作成します。 ジョブリソースグループのリソースを消費します。 ジョブリソースグループを持つAnalyticDB for MySQL Data Lakehouse Edition (V3.0) V3.1.10.0以降のクラスターのみがエラスティックインポートをサポートしています。 詳細については、「データインポート方法」をご参照ください。
エラスティックインポートを使用する場合は、次の項目に注意してください。
エラスティックインポートを実行するには、次のステートメントを実行してエラスティックインポート機能を有効にする必要があります。
SET adb_config RC_ELASTIC_JOB_SCHEDULER_ENABLE=true;
Elastic importでは、CSV、Parquet、またはORCデータのみをOSSからインポートできます。
Elastic importは、ジョブリソースグループに基づいてデータを読み取り、インデックスを作成します。 これにより、ジョブリソースグループのリソースが消費され、リソースに対して課金されます。 詳細については、「リソースグループのモニタリング情報の表示」および「Data Lakehouse Edition (V3.0) の課金項目」をご参照ください。
ジョブの長い待機時間、長い実行時間、およびジョブの失敗を防ぐために、ジョブリソースグループに十分なリソースがあることを確認してください。
エラスティックインポートジョブの完了には少なくとも2分から3分かかり、少量のデータには適していません。 インポートジョブを3分以内に完了する必要がある場合は、通常のインポートを使用することを推奨します。
エラスティックインポートジョブは、同じリソースを使用する通常のインポートジョブよりも、完了するまでに長い期間が必要です。 インポートジョブを短時間で完了させたい場合は、エラスティックインポートジョブのリソースの最大量を増やしてジョブを高速化することをお勧めします。
定期的なインポート
外部データベースを作成します。
外部データベースの作成adb_external_db;
CREATE EXTERNAL TABLEステートメントを使用して、
adb_external_db
データベースにOSS外部テーブルを作成します。 この例では、外部テーブルの名前はadb_external_db.personです。説明AnalyticDB for MySQLのOSS外部テーブルは、OSSオブジェクトと同じフィールドの名前、数量、順序、およびデータ型を使用する必要があります。
パーティション化されていないOSS外部テーブルの作成
パーティション化されたOSS外部テーブルの作成
構文の詳細については、「CREATE EXTERNAL TABLE」をご参照ください。
データの照会
外部テーブルを作成した後、AnalyticDB for MySQLでSELECTステートメントを実行して、外部テーブルのデータをクエリできます。
SELECT * からadb_external_db.person;
サンプル結果:
+ ------ ------ ------- ----------------- | id | name | 年齢 | dt | ------ ------ -------- ---------------- | 1 | ジェームズ | 10 | 2023-06-15 | | 2 | ボンド | 20 | 2023-06-15 | | 3 | ジャック | 30 | 2023-06-15 | | 4 | ルーシー | 40 | 2023-06-15 | ------ ------ -------- ---------------- セットの4列 (0.35秒)
AnalyticDB for MySQLクラスターにデータベースを作成します。 すでにデータベースを作成している場合は、この手順をスキップしてください。
CREATE DATABASE adb_demo;
AnalyticDB MySQLクラスターにテーブルを作成し、OSSからインポートされたデータを保存します。
adb_demo.adb_import_testが存在しない場合は説明作成された内部テーブルは、手順bで作成された外部テーブルと同じフィールドの名前、数量、順序、およびデータ型を使用する必要があります。
テーブルを作成します ( id INT, 名前VARCHAR(1023) 、 年齢INT、 dt VARCHAR(1023) ) ハッシュによる分配 (id);
テーブルにデータをインポートします。
方法1:
INSERT INTO
ステートメントを実行してデータをインポートします。 主キーの値が重複している場合、データは繰り返し挿入されず、INSERT INTOステートメントはINSERT IGNORE INTO
と同等です。 詳細については、「INSERT INTO」をご参照ください。adb_demo.adb_import_test SELECT * からadb_external_db.personに挿入します。
方法2:
INSERT OVERWRITE INTO
ステートメントを実行して、データを同期的にインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * からadb_external_db.person;
方法3:
INSERT OVERWRITE INTO
ステートメントを実行して、データを非同期的にインポートします。 詳細については、「INSERT OVERWRITE SELECT」トピックの「非同期書き込み」セクションをご参照ください。ジョブ挿入の上書きadb_demo.adb_import_test SELECT * からadb_external_db.person;
Elastic import
データベースを作成します。 すでにデータベースを作成している場合は、この手順をスキップしてください。
CREATE DATABASE adb_demo;
OSS外部テーブルを作成します。
説明AnalyticDB for MySQLのOSS外部テーブルは、OSSオブジェクトと同じフィールドの名前、数量、順序、およびデータ型を使用する必要があります。
Elastic importでは、
create TABLE
ステートメントのみを使用して外部テーブルを作成できます。
CREATE TABLE oss_import_test_external_table ( id INT(1023) 、 名前VARCHAR(1023) 、 年齢INT、 dt VARCHAR(1023) ) エンジン='OSS' TABLE_PROPERTIES='{ "endpoint":"oss-cn-hangzhou-internal.aliyuncs.com" 、 "url":"oss://<bucket-name>/adb/oss_import_test_data.csv" 、 "accessid":"LTAI5t8sqJn5GhpBVtN8 ****" 、 "accesskey":"HlClegbiV5mJjBYBJHEZQOnRF7 ****" 、 "delimiter":"," } ';
重要外部テーブルを作成するときは、CSV、Parquet、またはORC外部テーブルに次のTABLE_PROPERTIESパラメーターを設定できます。
CSV:
endpoint
、url
、accessid
、accesskey
、format
、delimiter
、null_value
、partition_column
Parquet:
endpoint
、url
、accessid
、accesskey
、format
、partition_column
ORC:
endpoint
、url
、accessid
、accesskey
、format
、partition_column
外部テーブルパラメーターの詳細については、「パーティション化されていないオブジェクトのOSS外部テーブルの作成」および「パーティション化されたオブジェクトのOSS外部テーブルの作成」をご参照ください。
データの照会
外部テーブルを作成した後、AnalyticDB for MySQLでSELECTステートメントを実行して、外部テーブルのデータをクエリできます。
SELECT * からoss_import_test_external_table;
サンプル結果:
+ ------ ------ ------- ----------------- | id | name | 年齢 | dt | ------ ------ -------- ---------------- | 1 | ジェームズ | 10 | 2023-06-15 | | 2 | ボンド | 20 | 2023-06-15 | | 3 | ジャック | 30 | 2023-06-15 | | 4 | ルーシー | 40 | 2023-06-15 | ------ ------ -------- ---------------- セットの4列 (0.35秒)
AnalyticDB MySQLクラスターにテーブルを作成し、OSSからインポートされたデータを保存します。
説明作成された内部テーブルは、手順bで作成された外部テーブルと同じフィールドの名前、数量、順序、およびデータ型を使用する必要があります。
テーブルの作成adb_import_test ( id INT, 名前VARCHAR(1023) 、 年齢INT、 dt VARCHAR(1023) 主キー (id) ) ハッシュによって分布 (uid);
データをインポートする。
重要Elastic importでは、
INSERT OVERWRITE INTO
ステートメントのみを使用してデータをインポートできます。方法1: INSERT OVERWRITE INTOステートメントを実行して、データを弾力的にインポートします。 主キーの値が重複している場合、元の値は新しい値で上書きされます。
/+ * elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]* / INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.oss_import_test_external_table;
方法2: INSERT OVERWRITE INTO文を非同期的に実行して、データを弾力的にインポートします。 通常、
SUBMIT JOB
ステートメントは非同期ジョブを送信するために使用されます。/* elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]* / ジョブの挿入を提出するadb_demo.adb_import_test SELECT * FROM adb_demo.oss_import_test_external_tableに上書きします。
重要エラスティックインポートジョブを非同期で送信する場合、優先キューを設定できません。
サンプル結果:
+ --------------------------------------- + | job_id | + --------------------------------------- + | 2023081517195102101701907203151 ****** |
[ジョブの送信]
ステートメントを使用して非同期ジョブを送信すると、ジョブIDのみが返されます。これは、非同期ジョブが正常に送信されたことを示します。 返されたジョブIDを使用して、非同期ジョブを終了したり、非同期ジョブのステータスを照会したりできます。 詳細については、「インポートジョブの非同期送信」をご参照ください。ヒントパラメータ:
elastic_load: elastic_importを使用するかどうかを指定します。 有効な値: trueおよびfalse。 デフォルト値:false
elastic_load_configs: エラスティックインポート機能の設定パラメーター。 パラメーターを括弧 ([ ]) で囲み、複数のパラメーターを縦棒 (|) で区切る必要があります。 次の表にパラメーターを示します。
パラメーター
必須
説明
adb.load.resource.group.name
可
エラスティックインポートジョブを実行するジョブリソースグループの名前。
adb.load.job.max.acu
任意
エラスティックインポートジョブのリソースの最大量。 単位: AnalyticDBコンピューティングユニット (ACU) 。 最小値: 5 ACU。 デフォルト値: シャード数 + 1
次のステートメントを実行して、クラスター内のシャードの数を照会します。
SELECT count (1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
任意
Sparkドライバーのリソースタイプ。 デフォルト値: small。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。
spark.exe cutor.resourceSpec
任意
Sparkエグゼキュータのリソースタイプ。 デフォルト値: large。 有効な値の詳細については、Conf設定パラメータートピックの「Sparkリソースの仕様」テーブルの「タイプ」列を参照してください。
spark.adb.exe cutorDiskSize
任意
Sparkエグゼキュータのディスク容量。 有効な値: (0,100) 。 単位:GiB。 デフォルト値: 10 GiB 詳細については、「Conf設定パラメーター」トピックの「ドライバーとエグゼキューターリソースの指定」をご参照ください。
(オプション) 送信されたジョブがエラスティックインポートジョブであるかどうかを確認します。
SELECT job_name, (job_type == 3) AS is_elastic_load FROM job_name = "2023081818010602101701907303151 ******"
サンプル結果:
+ --------------------------------------- + ------------------ + | job_name | is_elastic_load | + --------------------------------------- + ------------------ + | 2023081517195102101701907203151 ****** | 1 | + --------------------------------------- + ------------------ +
エラスティックインポートジョブが送信されると、
is_elastic_load
パラメーターの1が返されます。 通常のインポートジョブが送信されると、0が返されます。