AnalyticDB for MySQL在通過外表訪問並匯入MaxCompute資料時,預設使用Tunnel Record API方式。您也可以進一步選擇Tunnel Arrow API方式,相較於Tunnel Record API方式,Tunnel Arrow API方式可以列式讀取MaxCompute的資料,從而提高資料訪問和處理的效率。
前提條件
AnalyticDB for MySQL叢集的產品系列為湖倉版。
MaxCompute專案與AnalyticDB for MySQL叢集位於同一地區。
AnalyticDB for MySQL叢集已開啟ENI訪問。
說明登入雲原生資料倉儲AnalyticDB MySQL控制台,在 的網路資訊地區,開啟ENI網路開關。
已添加AnalyticDB for MySQL的VPC網段到MaxCompute專案的白名單中。
說明登入雲原生資料倉儲AnalyticDB MySQL控制台,在叢集資訊頁面查詢VPC ID。然後登入專用網路控制台,在專用網路頁面根據VPC ID查詢網段。設定MaxCompute白名單的操作,請參見管理IP白名單。
使用Tunnel Arrow API方式訪問並匯入MaxCompute資料時,AnalyticDB for MySQL叢集需為3.2.2.1及以上版本。
說明查看湖倉版叢集的核心版本,請執行
SELECT adb_version();
。如需升級核心版本,請聯絡支援人員。
樣本資料
本文樣本中的MaxCompute專案為test_adb
,樣本表person
。樣本如下:
CREATE TABLE IF NOT EXISTS person (
id INT,
name VARCHAR(1023),
age INT)
partitioned BY (dt string);
在person
表中添加分區,樣本如下:
ALTER TABLE person
ADD
PARTITION (dt='202207');
向分區中添加資料,樣本如下:
INSERT INTO test_adb.person
PARTITION (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);
開啟Arrow API(可選)
預設情況下,AnalyticDB for MySQL叢集會使用Tunnel Record API方式訪問並匯入MaxCompute資料。 若您需要通過Tunnel Arrow API方式訪問並匯入MaxCompute資料,請先開啟Arrow API功能。開啟後,AnalyticDB for MySQL叢集會使用Tunnel Arrow API方式進行匯入。
開啟方法
您可以通過SET命令或Hint在叢集層級和查詢層級開啟Arrow API:
叢集層級開啟Arrow API:
SET ADB_CONFIG <config_name>= <value>;
查詢層級開啟Arrow API:
/*<config_name>= <value>*/ SELECT * FROM table;
Arrow API相關配置參數
參數(config_name) | 說明 |
ODPS_TUNNEL_ARROW_ENABLED | 是否開啟Arrow API。取值:
|
ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED | 是否開啟動態Split切分。取值:
|
操作步驟
資料匯入方式分為常規匯入(預設)和彈性匯入。常規匯入在計算節點中讀取來源資料,然後在儲存節點中構建索引,消耗計算資源和儲存資源。彈性匯入在Serverless Spark Job中讀取來源資料和構建索引,消耗Job型資源群組的資源。僅核心版本3.1.10.0及以上且已建立Job型資源群組的叢集支援彈性匯入資料。相較於常規匯入,彈性匯入可以大幅減少資源的消耗,降低匯入處理程序中對線上讀寫業務的影響,提升資源隔離性和資料匯入效率。更多內容,請參見資料匯入方式介紹。
常規匯入
進入SQL編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
在左側導覽列,單擊 。
建立外部資料庫。樣本如下:
CREATE EXTERNAL DATABASE adb_external_db;
建立外表。本文樣本為
test_adb
。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"LTAILd4****", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
說明AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。
外表的參數說明,請參見CREATE EXTERNAL TABLE。
查詢資料。
SELECT * FROM adb_external_db.test_adb;
返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
執行以下步驟將MaxCompute資料匯入至AnalyticDB for MySQL。
在AnalyticDB for MySQL中建立資料庫,樣本如下:
CREATE DATABASE adb_demo;
在AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料,樣本如下:
說明新表和步驟3中建立的外表的欄位順序和欄位數量需要一致,欄位類型相容。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt');
向表中寫入資料,樣本如下:
方式一:執行INSERT INTO匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於
INSERT IGNORE INTO
,詳情請參見INSERT INTO。樣本如下:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
如果需要將特定分區的資料匯入
adb_demo.adb_import_test
,可以執行:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';
方式二:執行INSERT OVERWRITE INTO匯入資料,會覆蓋表中原有的資料。樣本如下:
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
方式三:非同步執行INSERT OVERWRITE INTO匯入資料。通常使用
SUBMIT JOB
提交非同步任務,由後台調度,可以在寫入任務前增加Hint(/*+ direct_batch_load=true*/
)加速寫入任務。詳情請參見非同步寫入。樣本如下:SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+
關於非同步提交任務詳情,請參見非同步提交匯入任務。
彈性匯入
進入SQL編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
在左側導覽列,單擊 。
建立資料庫。如果有已建立的資料庫,可以忽略本步驟。樣本如下:
CREATE DATABASE adb_demo;
建立外表。
說明AnalyticDB for MySQL外表的名稱需要和MaxCompute專案的名稱相同,否則建立外表會失敗。
AnalyticDB for MySQL外表和MaxCompute中表的欄位名稱、欄位數量、欄位順序需要一致,欄位類型需要相容。
彈性匯入僅支援
CREATE TABLE
語句建立外表。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"LTAILd4****", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
外表支援設定的參數及參數說明,請參見參數說明。
查詢資料。
SELECT * FROM adb_demo.test_adb;
返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
在AnalyticDB for MySQL中建立表用於儲存從MaxCompute中匯入的資料。樣本如下:
說明建立的內表和步驟3中建立的外表的欄位名稱、欄位數量、欄位順序、欄位類型必須相同。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
匯入資料。
重要彈性匯入僅支援通過
INSERT OVERWRITE INTO
語句匯入資料。方法一:執行INSERT OVERWRITE INTO彈性匯入資料,會覆蓋表中原有的資料。樣本如下:
/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法二:非同步執行INSERT OVERWRITE INTO彈性匯入資料。通常使用
SUBMIT JOB
提交非同步任務,由後台調度。/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
重要非同步提交彈性匯入任務時,不支援設定優先權隊列。
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
使用
SUBMIT JOB
提交非同步任務後,返回結果僅表示非同步任務提交成功。您可以通過job_id終止非同步任務或查詢非同步任務狀態,判斷任務是否執行成功。具體操作,請參見非同步提交匯入任務。Hint參數說明:
elastic_load:是否使用彈性匯入方式。取值:true或false(預設值)。
elastic_load_configs:彈性匯入方式支援配置的參數。參數需使用方括弧([ ])括起來,且多個參數之間以豎線(|)分隔,支援配置的參數如下表所示:
參數
是否必填
說明
adb.load.resource.group.name
是
執行彈性匯入任務的Job資源群組名稱。
adb.load.job.max.acu
否
單個彈性匯入任務最多使用的資源。單位為ACU,最小值為5 ACU。預設值為叢集Shard個數+1。
執行如下語句可查詢叢集Shard個數:
SELECT count(1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
否
Spark driver的資源規格。預設值為small。取值範圍,請參見Spark資源規格列表的型號列。
spark.executor.resourceSpec
否
Spark executor的資源規格。預設值為large。取值範圍,請參見Spark資源規格列表的型號列。
spark.adb.executorDiskSize
否
Spark executor的磁碟容量,取值範圍為(0,100],單位為GiB,預設值為10 Gi。更多資訊,請參見指定Driver和Executor資源。
(可選)查看已提交的匯入任務是否為彈性匯入任務。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";
返回結果如下:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+
is_elastic_load
的傳回值為1,表示已提交的匯入任務是彈性匯入任務;若為0,則表示已提交的匯入任務是常規匯入任務。