AnalyticDB for MySQL湖倉版支援通過外表讀取並匯入外部資料。匯入資料時,您可以選擇常規匯入和彈性匯入兩種方式。彈性匯入相較於常規匯入,可以大幅減少資源的消耗,降低匯入處理程序中對線上讀寫業務的影響。本文介紹如何通過外表查詢OSS資料,並將OSS的資料匯入AnalyticDB for MySQL叢集。
前提條件
AnalyticDB for MySQL叢集的產品系列為湖倉版。
AnalyticDB for MySQL叢集與OSS儲存空間位於相同地區。
已將資料檔案上傳至OSS目錄中。具體操作,請參見上傳檔案。
樣本資料說明
本文樣本將資料檔案person
上傳至OSS中的testBucketName/adb/dt=2023-06-15
目錄,資料行分隔字元為分行符號,資料行分隔符號為英文逗號(,)。person
中的樣本資料如下:
1,james,10,2023-06-15
2,bond,20,2023-06-15
3,jack,30,2023-06-15
4,lucy,40,2023-06-15
操作步驟
進入SQL開發編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
在左側導覽列,單擊 。
匯入資料。
資料匯入方式分為常規匯入(預設)和彈性匯入。常規匯入在計算節點中讀取來源資料,然後在儲存節點中構建索引,消耗計算資源和儲存資源。彈性匯入在Serverless Spark Job中讀取來源資料和構建索引,消耗Job型資源群組的資源。僅核心版本3.1.10.0及以上且已建立Job型資源群組的企業版、基礎版及湖倉版叢集支援彈性匯入資料。更多內容,請參見資料匯入方式介紹。
常規匯入
建立外部資料庫。
CREATE EXTERNAL DATABASE adb_external_db;
建立外表。使用CREATE EXTERNAL TABLE語句在外部資料庫
adb_external_db
中建立OSS外表。本文以adb_external_db.person為例。說明AnalyticDB for MySQL外表的欄位名稱、欄位數量、欄位順序、欄位類型需要與和OSS檔案相同。
建立OSS非分區外表
建立OSS分區外表
OSS外表的文法說明,請參見CREATE EXTERNAL TABLE。
查詢資料。
資料表建立成功後,您可以在AnalyticDB for MySQL中通過SELECT語句查詢OSS的資料。
SELECT * FROM adb_external_db.person;
返回結果如下:
+------+-------+------+-----------+ | id | name | age | dt | +------+-------+------+-----------+ | 1 | james | 10 |2023-06-15 | | 2 | bond | 20 |2023-06-15 | | 3 | jack | 30 |2023-06-15 | | 4 | lucy | 40 |2023-06-15 | +------+-------+------+-----------+ 4 rows in set (0.35 sec)
在AnalyticDB for MySQL中建立資料庫。如果有已建立的資料庫,可以忽略本步驟。樣本如下:
CREATE DATABASE adb_demo;
在AnalyticDB for MySQL中建立表用於儲存從OSS中匯入的資料。樣本如下:
說明建立的內表和步驟b中建立的外表的欄位名稱、欄位數量、欄位順序、欄位類型必須相同。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id INT, name VARCHAR(1023), age INT, dt VARCHAR(1023) ) DISTRIBUTED BY HASH(id);
向表中匯入資料。
方法一:使用
INSERT INTO
語句匯入資料,當主鍵重複時會自動忽略當前寫入資料,資料不做更新,作用等同於INSERT IGNORE INTO
,更多資訊,請參見INSERT INTO。樣本如下:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.person;
方法二:使用
INSERT OVERWRITE INTO
語句同步匯入資料,會覆蓋表中原有的資料。樣本如下:INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.person;
方法三:使用
INSERT OVERWRITE INTO
語句非同步匯入資料,更多資訊,請參見非同步寫入。樣本如下:SUBMIT JOB INSERT OVERWRITE adb_demo.adb_import_test SELECT * FROM adb_external_db.person;
彈性匯入
建立資料庫。如果有已建立的資料庫,可以忽略本步驟。樣本如下:
CREATE DATABASE adb_demo;
建立外表。
說明AnalyticDB for MySQL外表的欄位名稱、欄位數量、欄位順序、欄位類型需要與和OSS檔案相同。
彈性匯入僅支援
CREATE TABLE
語句建立外表。
CREATE TABLE oss_import_test_external_table ( id INT(1023), name VARCHAR(1023), age INT, dt VARCHAR(1023) ) ENGINE='OSS' TABLE_PROPERTIES='{ "endpoint":"oss-cn-hangzhou-internal.aliyuncs.com", "url":"oss://<bucket-name>/adb/oss_import_test_data.csv", "accessid":"LTAI5t8sqJn5GhpBVtN8****", "accesskey":"HlClegbiV5mJjBYBJHEZQOnRF7****", "delimiter":"," }';
重要建立外表時,CSV、Parquet、ORC格式的外表支援設定的TABLE_PROPERTIES參數不同:
CSV格式:僅支援設定
endpoint
、url
、accessid
、accesskey
、format
、delimiter
、null_value
和partition_column
參數。Parquet格式:僅支援設定
endpoint
、url
、accessid
、accesskey
、format
和partition_column
參數。ORC格式:僅支援設定
endpoint
、url
、accessid
、accesskey
、format
和partition_column
參數。
查詢資料。
資料表建立成功後,您可以在AnalyticDB for MySQL中通過SELECT語句查詢OSS的資料。
SELECT * FROM oss_import_test_external_table;
返回結果如下:
+------+-------+------+-----------+ | id | name | age | dt | +------+-------+------+-----------+ | 1 | james | 10 |2023-06-15 | | 2 | bond | 20 |2023-06-15 | | 3 | jack | 30 |2023-06-15 | | 4 | lucy | 40 |2023-06-15 | +------+-------+------+-----------+ 4 rows in set (0.35 sec)
在AnalyticDB for MySQL中建立表用於儲存從OSS中匯入的資料。樣本如下:
說明建立的內表和步驟b中建立的外表的欄位名稱、欄位數量、欄位順序、欄位類型必須相同。
CREATE TABLE adb_import_test ( id INT, name VARCHAR(1023), age INT, dt VARCHAR(1023) ) DISTRIBUTED BY HASH(id);
匯入資料。
重要彈性匯入僅支援通過
INSERT OVERWRITE INTO
語句匯入資料。方法一:執行INSERT OVERWRITE INTO彈性匯入資料,會覆蓋表中原有的資料。樣本如下:
/+*elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.oss_import_test_external_table;
方法二:非同步執行INSERT OVERWRITE INTO彈性匯入資料。通常使用
SUBMIT JOB
提交非同步任務,由後台調度。/*+elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.oss_import_test_external_table;
重要非同步提交彈性匯入任務時,不支援設定優先權隊列。
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517195102101701907203151****** |
使用
SUBMIT JOB
提交非同步任務後,返回結果僅表示非同步任務提交成功。您可以通過job_id終止非同步任務或查詢非同步任務狀態,判斷任務是否執行成功。具體操作,請參見非同步提交匯入任務。Hint參數說明:
elastic_load:是否使用彈性匯入方式。取值:true或false(預設值)。
elastic_load_configs:彈性匯入方式支援配置的參數。參數需使用方括弧([ ])括起來,且多個參數之間以豎線(|)分隔,支援配置的參數如下表所示:
參數
是否必填
說明
adb.load.resource.group.name
是
執行彈性匯入任務的Job資源群組名稱。
adb.load.job.max.acu
否
單個彈性匯入任務最多使用的資源。單位為ACU,最小值為5 ACU。預設值為叢集Shard個數+1。
執行如下語句可查詢叢集Shard個數:
SELECT count(1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
否
Spark driver的資源規格。預設值為small。取值範圍,請參見Spark資源規格列表的型號列。
spark.executor.resourceSpec
否
Spark executor的資源規格。預設值為large。取值範圍,請參見Spark資源規格列表的型號列。
spark.adb.executorDiskSize
否
Spark executor的磁碟容量,取值範圍為(0,100],單位為GiB,預設值為10 Gi。更多資訊,請參見指定Driver和Executor資源。
(可選)查看已提交的匯入任務是否為彈性匯入任務。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs where job_name = "2023081818010602101701907303151******";
返回結果如下:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195102101701907203151****** | 1 | +---------------------------------------+------------------+
is_elastic_load
的傳回值為1,表示已提交的匯入任務是彈性匯入任務;若為0,則表示已提交的匯入任務是常規匯入任務。