AnalyticDB for MySQL支援通過外表讀取並匯入MaxCompute資料。通過外表匯入資料可以最大限度地利用叢集資源,實現高效能資料匯入。本文主要介紹如何通過外表將MaxCompute資料匯入AnalyticDB for MySQL數倉版。
前提條件
MaxCompute專案與AnalyticDB for MySQL叢集位於同一地區。具體操作,請參見建立叢集。
已添加AnalyticDB for MySQL的VPC網段到MaxCompute專案的白名單中。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在叢集資訊頁面查詢VPC ID。然後登入專用網路控制台,在專用網路頁面根據VPC ID查詢網段。設定MaxCompute白名單的操作,請參見管理IP白名單。
樣本資料
本文樣本中的MaxCompute專案為odps_project1
,樣本表odps_nopart_import_test
。樣本如下:
CREATE TABLE IF NOT EXISTS odps_nopart_import_test (
id int,
name string,
age int)
partitioned by (dt string);
在odps_nopart_import_test
表中添加分區,樣本如下:
ALTER TABLE odps_nopart_import_test
ADD
PARTITION (dt='202207');
向分區中添加資料,樣本如下:
INSERT INTO odps_project1.odps_nopart_import_test
PARTITION (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);
Tunnel方式訪問MaxCompute
串連目標AnalyticDB for MySQL叢集。詳細操作步驟,請參見串連叢集。
建立目標資料庫。
CREATE database test_adb;
建立MaxCompute外表。本文以
odps_nopart_import_test_external_table
為例。CREATE TABLE IF NOT EXISTS odps_nopart_import_test_external_table ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn.maxcompute.aliyun-inc.com/api", "accessid":"L*******FsE", "accesskey":"CcwF********iWjv", "partition_column":"dt", "project_name":"odps_project1", "table_name":"odps_nopart_import_test" }';
參數
說明
ENGINE=’ODPS’
外表的儲存引擎。讀寫MaxCompute資料時,取值為ODPS。
endpoint
MaxCompute的EndPoint(網域名稱節點)。
說明目前僅支援AnalyticDB for MySQL通過MaxCompute的VPC網路Endpoint訪問MaxCompute。
查詢各地區VPC網路的Endpoint,請參見各地區Endpoint對照表(阿里雲VPC網路連接方式)。
accessid
阿里雲帳號或者具備MaxCompute存取權限的RAM使用者的AccessKey ID。
如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權。
accesskey
阿里雲帳號或者具備MaxCompute存取權限的RAM使用者的AccessKey Secret。
如何擷取AccessKey ID和AccessKey Secret,請參見帳號與許可權。
partition_column
本文使用的樣本是建立分區表的樣本,所以需要配置
partition_column
。如果MaxCompute的表是非分區表,那麼AnalyticDB for MySQL中也需要建立非分區表,此時無需配置partition_column
。project_name
MaxCompute中的工作空間名稱。
table_name
MaxCompute中的資料來源表名。
在
test_adb
資料庫中建立表adb_nopart_import_test
,用於儲存從MaxCompute中匯入的資料。CREATE TABLE IF NOT EXISTS adb_nopart_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
匯入資料。
方式一:執行
INSERT INTO
匯入資料,當主鍵重複時會自動忽略當前寫入資料,不做更新,作用等同於INSERT IGNORE INTO
,詳情請參見INSERT INTO。樣本如下:INSERT INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;
通過SELECT查詢寫入表中的資料,樣本如下:
SELECT * FROM adb_nopart_import_test;
返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+
如果需要將特定分區的資料匯入
adb_nopart_import_test
,可以執行:INSERT INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table WHERE dt = '202207';
方式二:執行
INSERT OVERWRITE
匯入資料,會覆蓋表中原有的資料。樣本如下:INSERT OVERWRITE adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;
方式三:非同步執行
INSERT OVERWRITE
匯入資料。通常使用SUBMIT JOB
提交非同步任務,由後台調度,可以在寫入任務前增加Hint加速寫入任務。詳情請參見非同步寫入。樣本如下:SUBMIT JOB INSERT OVERWRITE adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** |
關於非同步提交任務詳情請參見非同步提交匯入任務。