全部產品
Search
文件中心

ApsaraDB for SelectDB:Insert Into

更新時間:Oct 11, 2024

ApsaraDB for SelectDB相容標準SQL文法,可通過標準的Insert Into方式匯入資料。

背景資訊

Insert Into命令是MySQL等資料庫中常用的資料匯入方式。ApsaraDB for SelectDB相容標準SQL文法,支援通過Insert Into命令匯入資料。包含以下兩種:

  • Insert Into tbl SELECT ...

  • Insert Into tbl (col1, col2, ...) VALUES (1, 2, ...), (1,3, ...);

    重要

    此命令不建議在生產環境中使用。

Insert Into Select

通過SelectDB提供的大量SQL函數、聯邦查詢能力,Insert Into Select可以對SelectDB內部資料、外部資料湖資料等進行高效的計算處理,然後匯入SelectDB的新表中,用來進一步進行資料分析服務。

內表資料ETL

如果資料已經在SelectDB表中,可通過Insert Into Select進行資料ETL轉換,然後匯入到一個新表中。樣本如下。

INSERT INTO bj_store_sales
SELECT id, total, user_id, sale_timestamp FROM store_sales WHERE region = "bj";

資料湖資料同步

如果資料在資料湖等外部系統中,可以在SelectDB中建立Catalog,映射到資料湖等外部系統中的資料,然後通過Insert Into Select將其中的資料匯入到SelectDB表中。SelectDB支援對接Hive、Iceberg、Hudi、Elasticsearch、JDBC等資料來源,詳細請參見湖倉一體

如下以Hive資料來源為例,介紹如何同步資料湖資料到SelectDB中。

  • 建立Hive Catalog,即可通過聯邦查詢訪問Hive中的資料,樣本如下。

CREATE CATALOG test_catalog comment 'hive catalog' PROPERTIES (
    'type'='hms',
    'hive.metastore.uris' = 'thrift://127.0.0.1:7004',
    'dfs.nameservices'='HANN',
    'dfs.ha.namenodes.HANN'='nn1,nn2',
    'dfs.namenode.rpc-address.HANN.nn1'='nn1_host:rpc_port',
    'dfs.namenode.rpc-address.HANN.nn2'='nn2_host:rpc_port',
    'dfs.client.failover.proxy.provider.HANN'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
  • 通過Insert Into Select,同步Hive資料到SelectDB中,並指定匯入作業唯一標識Label。

INSERT INTO bj_store_sales
WITH LABEL test_label
SELECT id, total, user_id, sale_timestamp FROM test_catalog.test_db.store_sales WHERE region = "bj";

Insert Into Values

Insert Into Values是MySQL等資料庫中常用的資料寫入方式,建議僅用於測試環境的使用。典型的使用方式是直接通過SQL用戶端、JDBC程式發送資料寫入請求。

建立待匯入的SelectDB資料表如下。

CREATE TABLE test_table
(
    id int,
    name varchar(50),
    age int
)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");

SQL樣本

BEGIN;
INSERT INTO db.tbl VALUES(),(),();
INSERT INTO db.tbl VALUES(),(),();
INSERT INTO db.tbl VALUES(),(),();
COMMIT;

JDBC程式樣本

public static void main(String[] args) throws Exception {
    // 單次匯入插入語句的數量。
    int insertNum = 10;
    // 單條插入攢批的數量。
    int batchSize = 10000;
    String URL="jdbc:mysql://<IP地址>:<MySQL協議連接埠>/test_db?useLocalSessionState=true";  // VPC ID所對應的IP地址。您可以登入VPC控制台在VPC列表中找到目標VPC ID所對應的IP地址。
    Connection connection = DriverManager.getConnection(URL, "admin", "password");  // 雲資料庫SelectDB版執行個體的帳號和密碼。
    Statement statement = connection.createStatement();
    statement.execute("begin");
    // 拼接多條插入語句。
    for (int num = 0; num < insertNum; num++) {
        StringBuilder sql = new StringBuilder();
        sql.append("Insert Into test_tbl values ");
        for(int i = 0; i < batchSize; i++){
            if(i > 0){
                sql.append(",");
            }
            // 拼接一行資料,如:姓名,年齡。可根據具體業務修改。
            sql.append("('zhangsan',18)");
        }
        //add sql to batch: Insert Into tbl values(),(),()
        statement.addBatch(sql.toString());
    }
    statement.addBatch("commit");
    statement.executeBatch();
    // 關閉資源。
    statement.close();
    connection.close();
}

最佳實務

  • 查看返回結果。

    Insert Into操作是一個同步操作,返回結果即表示操作結束。您需要根據返回結果的不同,進行對應的處理。

    1. 執行成功,結果集為空白。

      如果 insert 對應 select 語句的結果集為空白,則返回如下:

      INSERT INTO tbl1 SELECT * FROM empty_tbl;
      Query OK, 0 rows affected (0.02 sec)

      Query OK表示執行成功。0 rows affected表示沒有資料被匯入。

    2. 執行成功,結果集不為空白。

      在結果集不為空白的情況下。返回結果分為如下幾種情況。

      INSERT INTO tbl1 SELECT * FROM tbl2;
      Query OK, 4 rows affected (0.38 sec)
      {'label':'insert_8510c568-9eda-****-9e36-6adc7d35291c', 'status':'visible', 'txnId':'4005'}
       
      INSERT INTO tbl1 with label my_label1 SELECT * FROM tbl2;
      Query OK, 4 rows affected (0.38 sec)
      {'label':'my_label1', 'status':'visible', 'txnId':'4005'}
       
      INSERT INTO tbl1 SELECT * FROM tbl2;
      Query OK, 2 rows affected, 2 warnings (0.31 sec)
      {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'visible', 'txnId':'4005'}
       
      INSERT INTO tbl1 SELECT * FROM tbl2;
      Query OK, 2 rows affected, 2 warnings (0.31 sec)
      {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}

      其中,Query OK表示執行成功。4 rows affected表示總共有4行資料被匯入。2 warnings表示被過濾的行數。同時會返回一個 JSON 串。

      {'label':'my_label1', 'status':'visible', 'txnId':'4005'}
      {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}
      {'label':'my_label1', 'status':'visible', 'txnId':'4005', 'err':'some other error'}

      其中,label為您指定的 label 或自動產生的label,label是該Insert Into匯入作業的標識,每個匯入作業,都有一個在單database內部唯一的label。status表示匯入資料是否可見,如果可見顯示visible,如果不可見顯示committedtxnId為這個insert對應的匯入事務的id。err欄位會顯示一些其他非預期錯誤。

      當需要查看被過濾的行時,您可以通過如下語句:

      SHOW LOAD WHERE label="xxx";

      返回結果中的 URL 可以用於查詢錯誤的資料,具體見後面查看錯誤行小結。資料不可見是一個臨時狀態,這批資料最終是一定可見的。可以通過如下語句查看這批資料的可見狀態:

      SHOW TRANSACTION WHERE id=4005;

      返回結果中的TransactionStatus列如果為visible,則表述資料可見。

    3. 執行失敗。

      執行失敗表示沒有任何資料被成功匯入,並返回如下:

      INSERT INTO tbl1 SELECT * FROM tbl2 WHERE k1 = "a";
      ERROR 1064 (HY000): all partitions have no load data. url: http://10.74.167.16:8042/api/_load_error_log?file=__shard_2/error_log_insert_stmt_ba8bb9e158e4879-ae8de8507c0bf8a2_ba8bb9e158e4879_ae8de8507c0bf8a2

      其中ERROR 1064 (HY000): all partitions have no load data顯示失敗原因。通過其中的 URL 可以用於查詢錯誤的資料:

      SHOW LOAD WARNINGS ON "url";
  • 逾時時間。

    Insert Into操作的逾時時間由會話變數query_timeout控制,預設為5分鐘。逾時則作業會被取消。

  • Label和原子性。

    Insert Into操作同樣能夠保證匯入的原子性。當需要使用CTE(Common Table Expressions)作為 Insert Into操作中的查詢部分時,必須指定WITH LABELcolumn部分。

  • 過濾閾值。

    與其他匯入方式不同,Insert Into操作不能指定過濾閾值(max_filter_ratio)。預設的過濾閾值為 1,即有錯誤行都可以被忽略。

    對於有要求資料不能夠被過濾的業務情境,可以通過設定會話變數enable_insert_stricttrue來確保當有資料被過濾掉的時候,Insert Into不會被執行成功。

  • 效能問題。

    不建議使用Insert Into Values方式進行資料匯入,尤其是巨量資料的線上生產環境。如果必須這樣使用,請將多行資料合併到一個Insert Into語句中進行批量提交,單個批次建議1000~1000000條資料。

  • 部分列更新。

    Insert Into的預設行為是整行寫入。在Unique資料模型MOW實現方式中,客戶可按需開啟部分列更新功能,需要設定如下會話變數:

    set enable_unique_key_partial_update=true

    需要注意的是,控制Insert Into語句是否開啟strict 模式的會話變數enable_insert_strict的預設值為true,意味著不允許更新不存在的key。所以,在使用Insert Into語句進行部分列更新時,如果希望能插入不存在的key,需要同時將enable_insert_strict設定為false。