ApsaraDB for SelectDB相容標準SQL文法,可通過標準的Insert Into方式匯入資料。
背景資訊
Insert Into命令是MySQL等資料庫中常用的資料匯入方式。ApsaraDB for SelectDB相容標準SQL文法,支援通過Insert Into命令匯入資料。包含以下兩種:
Insert Into tbl SELECT ...
Insert Into tbl (col1, col2, ...) VALUES (1, 2, ...), (1,3, ...);
重要此命令不建議在生產環境中使用。
Insert Into Select
通過SelectDB提供的大量SQL函數、聯邦查詢能力,Insert Into Select可以對SelectDB內部資料、外部資料湖資料等進行高效的計算處理,然後匯入SelectDB的新表中,用來進一步進行資料分析服務。
內表資料ETL
如果資料已經在SelectDB表中,可通過Insert Into Select進行資料ETL轉換,然後匯入到一個新表中。樣本如下。
INSERT INTO bj_store_sales
SELECT id, total, user_id, sale_timestamp FROM store_sales WHERE region = "bj";
資料湖資料同步
如果資料在資料湖等外部系統中,可以在SelectDB中建立Catalog,映射到資料湖等外部系統中的資料,然後通過Insert Into Select將其中的資料匯入到SelectDB表中。SelectDB支援對接Hive、Iceberg、Hudi、Elasticsearch、JDBC等資料來源,詳細請參見湖倉一體。
如下以Hive資料來源為例,介紹如何同步資料湖資料到SelectDB中。
建立Hive Catalog,即可通過聯邦查詢訪問Hive中的資料,樣本如下。
CREATE CATALOG test_catalog comment 'hive catalog' PROPERTIES (
'type'='hms',
'hive.metastore.uris' = 'thrift://127.0.0.1:7004',
'dfs.nameservices'='HANN',
'dfs.ha.namenodes.HANN'='nn1,nn2',
'dfs.namenode.rpc-address.HANN.nn1'='nn1_host:rpc_port',
'dfs.namenode.rpc-address.HANN.nn2'='nn2_host:rpc_port',
'dfs.client.failover.proxy.provider.HANN'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
通過Insert Into Select,同步Hive資料到SelectDB中,並指定匯入作業唯一標識Label。
INSERT INTO bj_store_sales
WITH LABEL test_label
SELECT id, total, user_id, sale_timestamp FROM test_catalog.test_db.store_sales WHERE region = "bj";
Insert Into Values
Insert Into Values是MySQL等資料庫中常用的資料寫入方式,建議僅用於測試環境的使用。典型的使用方式是直接通過SQL用戶端、JDBC程式發送資料寫入請求。
建立待匯入的SelectDB資料表如下。
CREATE TABLE test_table
(
id int,
name varchar(50),
age int
)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");
SQL樣本
BEGIN;
INSERT INTO db.tbl VALUES(),(),();
INSERT INTO db.tbl VALUES(),(),();
INSERT INTO db.tbl VALUES(),(),();
COMMIT;
JDBC程式樣本
public static void main(String[] args) throws Exception {
// 單次匯入插入語句的數量。
int insertNum = 10;
// 單條插入攢批的數量。
int batchSize = 10000;
String URL="jdbc:mysql://<IP地址>:<MySQL協議連接埠>/test_db?useLocalSessionState=true"; // VPC ID所對應的IP地址。您可以登入VPC控制台在VPC列表中找到目標VPC ID所對應的IP地址。
Connection connection = DriverManager.getConnection(URL, "admin", "password"); // 雲資料庫SelectDB版執行個體的帳號和密碼。
Statement statement = connection.createStatement();
statement.execute("begin");
// 拼接多條插入語句。
for (int num = 0; num < insertNum; num++) {
StringBuilder sql = new StringBuilder();
sql.append("Insert Into test_tbl values ");
for(int i = 0; i < batchSize; i++){
if(i > 0){
sql.append(",");
}
// 拼接一行資料,如:姓名,年齡。可根據具體業務修改。
sql.append("('zhangsan',18)");
}
//add sql to batch: Insert Into tbl values(),(),()
statement.addBatch(sql.toString());
}
statement.addBatch("commit");
statement.executeBatch();
// 關閉資源。
statement.close();
connection.close();
}
最佳實務
查看返回結果。
Insert Into操作是一個同步操作,返回結果即表示操作結束。您需要根據返回結果的不同,進行對應的處理。
執行成功,結果集為空白。
如果 insert 對應 select 語句的結果集為空白,則返回如下:
INSERT INTO tbl1 SELECT * FROM empty_tbl; Query OK, 0 rows affected (0.02 sec)
Query OK
表示執行成功。0 rows affected
表示沒有資料被匯入。執行成功,結果集不為空白。
在結果集不為空白的情況下。返回結果分為如下幾種情況。
INSERT INTO tbl1 SELECT * FROM tbl2; Query OK, 4 rows affected (0.38 sec) {'label':'insert_8510c568-9eda-****-9e36-6adc7d35291c', 'status':'visible', 'txnId':'4005'} INSERT INTO tbl1 with label my_label1 SELECT * FROM tbl2; Query OK, 4 rows affected (0.38 sec) {'label':'my_label1', 'status':'visible', 'txnId':'4005'} INSERT INTO tbl1 SELECT * FROM tbl2; Query OK, 2 rows affected, 2 warnings (0.31 sec) {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'visible', 'txnId':'4005'} INSERT INTO tbl1 SELECT * FROM tbl2; Query OK, 2 rows affected, 2 warnings (0.31 sec) {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}
其中,
Query OK
表示執行成功。4 rows affected
表示總共有4行資料被匯入。2 warnings
表示被過濾的行數。同時會返回一個 JSON 串。{'label':'my_label1', 'status':'visible', 'txnId':'4005'} {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'} {'label':'my_label1', 'status':'visible', 'txnId':'4005', 'err':'some other error'}
其中,
label
為您指定的 label 或自動產生的label,label是該Insert Into匯入作業的標識,每個匯入作業,都有一個在單database內部唯一的label。status
表示匯入資料是否可見,如果可見顯示visible
,如果不可見顯示committed
。txnId
為這個insert對應的匯入事務的id。err
欄位會顯示一些其他非預期錯誤。當需要查看被過濾的行時,您可以通過如下語句:
SHOW LOAD WHERE label="xxx";
返回結果中的 URL 可以用於查詢錯誤的資料,具體見後面查看錯誤行小結。資料不可見是一個臨時狀態,這批資料最終是一定可見的。可以通過如下語句查看這批資料的可見狀態:
SHOW TRANSACTION WHERE id=4005;
返回結果中的
TransactionStatus
列如果為visible
,則表述資料可見。執行失敗。
執行失敗表示沒有任何資料被成功匯入,並返回如下:
INSERT INTO tbl1 SELECT * FROM tbl2 WHERE k1 = "a"; ERROR 1064 (HY000): all partitions have no load data. url: http://10.74.167.16:8042/api/_load_error_log?file=__shard_2/error_log_insert_stmt_ba8bb9e158e4879-ae8de8507c0bf8a2_ba8bb9e158e4879_ae8de8507c0bf8a2
其中
ERROR 1064 (HY000): all partitions have no load data
顯示失敗原因。通過其中的 URL 可以用於查詢錯誤的資料:SHOW LOAD WARNINGS ON "url";
逾時時間。
Insert Into操作的逾時時間由會話變數
query_timeout
控制,預設為5分鐘。逾時則作業會被取消。Label和原子性。
Insert Into操作同樣能夠保證匯入的原子性。當需要使用
CTE(Common Table Expressions)
作為 Insert Into操作中的查詢部分時,必須指定WITH LABEL
和column
部分。過濾閾值。
與其他匯入方式不同,Insert Into操作不能指定過濾閾值(
max_filter_ratio
)。預設的過濾閾值為 1,即有錯誤行都可以被忽略。對於有要求資料不能夠被過濾的業務情境,可以通過設定會話變數
enable_insert_strict
為true
來確保當有資料被過濾掉的時候,Insert Into
不會被執行成功。效能問題。
不建議使用
Insert Into Values
方式進行資料匯入,尤其是巨量資料的線上生產環境。如果必須這樣使用,請將多行資料合併到一個Insert Into語句中進行批量提交,單個批次建議1000~1000000條資料。部分列更新。
Insert Into的預設行為是整行寫入。在Unique資料模型MOW實現方式中,客戶可按需開啟部分列更新功能,需要設定如下會話變數:
set enable_unique_key_partial_update=true
需要注意的是,控制Insert Into語句是否開啟strict 模式的會話變數
enable_insert_strict
的預設值為true,意味著不允許更新不存在的key。所以,在使用Insert Into語句進行部分列更新時,如果希望能插入不存在的key,需要同時將enable_insert_strict
設定為false。