AnalyticDB for MySQL支援通過外表讀取並匯入MaxCompute資料。通過外表匯入資料可以最大限度地利用叢集資源,實現高效能資料匯入。本文介紹如何通過外表將MaxCompute資料匯入至AnalyticDB MySQL企業版及湖倉版。
前提條件
MaxCompute專案AnalyticDB for MySQL企業版及湖倉版叢集位於同一地區。具體操作,請參見建立叢集。
AnalyticDB MySQL企業版及湖倉版已開啟ENI訪問。
已添加AnalyticDB MySQL的VPC網段到MaxCompute專案的白名單中。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在叢集資訊頁面查詢VPC ID。然後登入專用網路控制台,在專用網路頁面根據VPC ID查詢網段。設定MaxCompute白名單的操作,請參見管理IP白名單。
樣本資料
本文樣本中的MaxCompute專案為test_adb
,樣本表person
。樣本如下:
CREATE TABLE IF NOT EXISTS person (
id int,
name varchar(1023),
age int)
partitioned by (dt string);
在person
表中添加分區,樣本如下:
ALTER TABLE person
ADD
PARTITION (dt='202207');
向分區中添加資料,樣本如下:
INSERT INTO test_adb.person
partition (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);
Tunnel方式訪問MaxCompute
資料匯入方式分為常規匯入(預設)和彈性匯入。常規匯入在計算節點中讀取來源資料,然後在儲存節點中構建索引,消耗計算資源和儲存資源。彈性匯入在Serverless Spark Job中讀取來源資料和構建索引,消耗Job型資源群組的資源。僅核心版本3.1.10.0及以上且已建立Job型資源群組的叢集支援彈性匯入資料。相較於常規匯入,彈性匯入可以大幅減少資源的消耗,降低匯入處理程序中對線上讀寫業務的影響,提升資源隔離性和資料匯入效率。更多內容,請參見資料匯入方式介紹。
常規匯入
進入SQL編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
- 在左側導覽列,單擊 。
建立外部資料庫。樣本如下:
CREATE EXTERNAL DATABASE adb_external_db;
建立外表。本文樣本為
test_adb
。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"LTAILd4****", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
說明AnalyticDB MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。
外表的參數說明,請參見CREATE EXTERNAL TABLE。
查詢資料。
SELECT * FROM adb_external_db.test_adb;
返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
執行以下步驟將MaxCompute資料匯入至AnalyticDB MySQL。
在AnalyticDB MySQL中建立資料庫,樣本如下:
在AnalyticDB MySQL中建立表用於儲存從MaxCompute中匯入的資料,樣本如下:
向表中寫入資料,樣本如下:
方式一:執行INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於
INSERT IGNORE INTO
,詳情請參見INSERT INTO。樣本如下:方式二:執行INSERT OVERWRITE INTO匯入資料,會覆蓋表中原有的資料。樣本如下:
方式三:非同步執行INSERT OVERWRITE INTO匯入資料。通常使用
SUBMIT JOB
提交非同步任務,由後台調度,可以在寫入任務前增加Hint(/*+ direct_batch_load=true*/
)加速寫入任務。詳情請參見非同步寫入。樣本如下:關於非同步提交任務詳情,請參見非同步提交匯入任務。
CREATE DATABASE adb_demo;
說明新表和步驟3中建立的外表的欄位順序和欄位數量需要一致,欄位類型相容。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt');
INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
如果需要將特定分區的資料匯入
adb_demo.adb_import_test
,可以執行:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+
彈性匯入
進入SQL編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
- 在左側導覽列,單擊 。
建立資料庫。如果有已建立的資料庫,可以忽略本步驟。樣本如下:
CREATE DATABASE adb_demo;
建立外表。
說明AnalyticDB MySQL外表的名稱需要和MaxCompute專案的名稱相同,否則建立外表會失敗。
AnalyticDB MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。
彈性匯入僅支援
CREATE TABLE
語句建立外表。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"LTAILd4****", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
外表支援設定的參數及參數說明,請參見參數說明。
查詢資料。
SELECT * FROM adb_demo.test_adb;
返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
在AnalyticDB MySQL中建立表用於儲存從MaxCompute中匯入的資料。樣本如下:
說明建立的內表和步驟3中建立的外表的欄位名稱、欄位數量、欄位順序、欄位類型必須相同。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
匯入資料。
重要彈性匯入僅支援通過
INSERT OVERWRITE INTO
語句匯入資料。方法一:執行INSERT OVERWRITE INTO彈性匯入資料,會覆蓋表中原有的資料。樣本如下:
/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法二:非同步執行INSERT OVERWRITE INTO彈性匯入資料。通常使用
SUBMIT JOB
提交非同步任務,由後台調度。/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
重要非同步提交彈性匯入任務時,不支援設定優先權隊列。
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
使用
SUBMIT JOB
提交非同步任務後,返回結果僅表示非同步任務提交成功。您可以通過job_id終止非同步任務或查詢非同步任務狀態,判斷任務是否執行成功。具體操作,請參見非同步提交匯入任務。Hint參數說明:
elastic_load:是否使用彈性匯入方式。取值:true或false(預設值)。
elastic_load_configs:彈性匯入方式支援配置的參數。參數需使用方括弧([ ])括起來,且多個參數之間以豎線(|)分隔,支援配置的參數如下表所示:
參數
是否必填
說明
adb.load.resource.group.name
是
執行彈性匯入任務的Job資源群組名稱。
adb.load.job.max.acu
否
單個彈性匯入任務最多使用的資源。單位為ACU,最小值為5 ACU。預設值為叢集Shard個數+1。
執行如下語句可查詢叢集Shard個數:
SELECT count(1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
否
Spark driver的資源規格。預設值為small。取值範圍,請參見Spark資源規格列表的型號列。
spark.executor.resourceSpec
否
Spark executor的資源規格。預設值為large。取值範圍,請參見Spark資源規格列表的型號列。
spark.adb.executorDiskSize
否
Spark executor的磁碟容量,取值範圍為(0,100],單位為GiB,預設值為10 Gi。更多資訊,請參見指定Driver和Executor資源。
(可選)查看已提交的匯入任務是否為彈性匯入任務。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";
返回結果如下:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+
is_elastic_load
的傳回值為1,表示已提交的匯入任務是彈性匯入任務;若為0,則表示已提交的匯入任務是常規匯入任務。