全部产品
Search
文档中心

云数据库 SelectDB 版:Insert Into

更新时间:Oct 10, 2024

云数据库 SelectDB 版兼容标准SQL语法,可通过标准的Insert Into方式导入数据。

背景信息

Insert Into命令是MySQL等数据库中常用的数据导入方式。云数据库 SelectDB 版兼容标准SQL语法,支持通过Insert Into命令导入数据。包含以下两种:

  • Insert Into tbl SELECT ...

  • Insert Into tbl (col1, col2, ...) VALUES (1, 2, ...), (1,3, ...);

    重要

    此命令不建议在生产环境中使用。

Insert Into Select

通过SelectDB提供的大量SQL函数、联邦查询能力,Insert Into Select可以对SelectDB内部数据、外部数据湖数据等进行高效的计算处理,然后导入SelectDB的新表中,用来进一步进行数据分析服务。

内表数据ETL

如果数据已经在SelectDB表中,可通过Insert Into Select进行数据ETL转换,然后导入到一个新表中。示例如下。

INSERT INTO bj_store_sales
SELECT id, total, user_id, sale_timestamp FROM store_sales WHERE region = "bj";

数据湖数据同步

如果数据在数据湖等外部系统中,可以在SelectDB中创建Catalog,映射到数据湖等外部系统中的数据,然后通过Insert Into Select将其中的数据导入到SelectDB表中。SelectDB支持对接Hive、Iceberg、Hudi、Elasticsearch、JDBC等数据源,详细请参见湖仓一体

如下以Hive数据源为例,介绍如何同步数据湖数据到SelectDB中。

  • 创建Hive Catalog,即可通过联邦查询访问Hive中的数据,示例如下。

CREATE CATALOG test_catalog comment 'hive catalog' PROPERTIES (
    'type'='hms',
    'hive.metastore.uris' = 'thrift://127.0.0.1:7004',
    'dfs.nameservices'='HANN',
    'dfs.ha.namenodes.HANN'='nn1,nn2',
    'dfs.namenode.rpc-address.HANN.nn1'='nn1_host:rpc_port',
    'dfs.namenode.rpc-address.HANN.nn2'='nn2_host:rpc_port',
    'dfs.client.failover.proxy.provider.HANN'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
);
  • 通过Insert Into Select,同步Hive数据到SelectDB中,并指定导入作业唯一标识Label。

INSERT INTO bj_store_sales
WITH LABEL test_label
SELECT id, total, user_id, sale_timestamp FROM test_catalog.test_db.store_sales WHERE region = "bj";

Insert Into Values

Insert Into Values是MySQL等数据库中常用的数据写入方式,建议仅用于测试环境的使用。典型的使用方式是直接通过SQL客户端、JDBC程序发送数据写入请求。

创建待导入的SelectDB数据表如下。

CREATE TABLE test_table
(
    id int,
    name varchar(50),
    age int
)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES("replication_num" = "1");

SQL示例

BEGIN;
INSERT INTO db.tbl VALUES(),(),();
INSERT INTO db.tbl VALUES(),(),();
INSERT INTO db.tbl VALUES(),(),();
COMMIT;

JDBC程序示例

public static void main(String[] args) throws Exception {
    // 单次导入插入语句的数量。
    int insertNum = 10;
    // 单条插入攒批的数量。
    int batchSize = 10000;
    String URL="jdbc:mysql://<IP地址>:<MySQL协议端口>/test_db?useLocalSessionState=true";  // VPC ID所对应的IP地址。您可以登录VPC控制台在VPC列表中找到目标VPC ID所对应的IP地址。
    Connection connection = DriverManager.getConnection(URL, "admin", "password");  // 云数据库SelectDB版实例的账号和密码。
    Statement statement = connection.createStatement();
    statement.execute("begin");
    // 拼接多条插入语句。
    for (int num = 0; num < insertNum; num++) {
        StringBuilder sql = new StringBuilder();
        sql.append("Insert Into test_tbl values ");
        for(int i = 0; i < batchSize; i++){
            if(i > 0){
                sql.append(",");
            }
            // 拼接一行数据,如:姓名,年龄。可根据具体业务修改。
            sql.append("('zhangsan',18)");
        }
        //add sql to batch: Insert Into tbl values(),(),()
        statement.addBatch(sql.toString());
    }
    statement.addBatch("commit");
    statement.executeBatch();
    // 关闭资源。
    statement.close();
    connection.close();
}

最佳实践

  • 查看返回结果。

    Insert Into操作是一个同步操作,返回结果即表示操作结束。您需要根据返回结果的不同,进行对应的处理。

    1. 执行成功,结果集为空。

      如果 insert 对应 select 语句的结果集为空,则返回如下:

      INSERT INTO tbl1 SELECT * FROM empty_tbl;
      Query OK, 0 rows affected (0.02 sec)

      Query OK表示执行成功。0 rows affected表示没有数据被导入。

    2. 执行成功,结果集不为空。

      在结果集不为空的情况下。返回结果分为如下几种情况。

      INSERT INTO tbl1 SELECT * FROM tbl2;
      Query OK, 4 rows affected (0.38 sec)
      {'label':'insert_8510c568-9eda-****-9e36-6adc7d35291c', 'status':'visible', 'txnId':'4005'}
       
      INSERT INTO tbl1 with label my_label1 SELECT * FROM tbl2;
      Query OK, 4 rows affected (0.38 sec)
      {'label':'my_label1', 'status':'visible', 'txnId':'4005'}
       
      INSERT INTO tbl1 SELECT * FROM tbl2;
      Query OK, 2 rows affected, 2 warnings (0.31 sec)
      {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'visible', 'txnId':'4005'}
       
      INSERT INTO tbl1 SELECT * FROM tbl2;
      Query OK, 2 rows affected, 2 warnings (0.31 sec)
      {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}

      其中,Query OK表示执行成功。4 rows affected表示总共有4行数据被导入。2 warnings表示被过滤的行数。同时会返回一个 JSON 串。

      {'label':'my_label1', 'status':'visible', 'txnId':'4005'}
      {'label':'insert_f0747f0e-7a35-****-affa-13a235f4020d', 'status':'committed', 'txnId':'4005'}
      {'label':'my_label1', 'status':'visible', 'txnId':'4005', 'err':'some other error'}

      其中,label为您指定的 label 或自动生成的label,label是该Insert Into导入作业的标识,每个导入作业,都有一个在单database内部唯一的label。status表示导入数据是否可见,如果可见显示visible,如果不可见显示committedtxnId为这个insert对应的导入事务的id。err字段会显示一些其他非预期错误。

      当需要查看被过滤的行时,您可以通过如下语句:

      SHOW LOAD WHERE label="xxx";

      返回结果中的 URL 可以用于查询错误的数据,具体见后面查看错误行小结。数据不可见是一个临时状态,这批数据最终是一定可见的。可以通过如下语句查看这批数据的可见状态:

      SHOW TRANSACTION WHERE id=4005;

      返回结果中的TransactionStatus列如果为visible,则表述数据可见。

    3. 执行失败。

      执行失败表示没有任何数据被成功导入,并返回如下:

      INSERT INTO tbl1 SELECT * FROM tbl2 WHERE k1 = "a";
      ERROR 1064 (HY000): all partitions have no load data. url: http://10.74.167.16:8042/api/_load_error_log?file=__shard_2/error_log_insert_stmt_ba8bb9e158e4879-ae8de8507c0bf8a2_ba8bb9e158e4879_ae8de8507c0bf8a2

      其中ERROR 1064 (HY000): all partitions have no load data显示失败原因。通过其中的 URL 可以用于查询错误的数据:

      SHOW LOAD WARNINGS ON "url";
  • 超时时间。

    Insert Into操作的超时时间由会话变量query_timeout控制,默认为5分钟。超时则作业会被取消。

  • Label和原子性。

    Insert Into操作同样能够保证导入的原子性。当需要使用CTE(Common Table Expressions)作为 Insert Into操作中的查询部分时,必须指定WITH LABELcolumn部分。

  • 过滤阈值。

    与其他导入方式不同,Insert Into操作不能指定过滤阈值(max_filter_ratio)。默认的过滤阈值为 1,即有错误行都可以被忽略。

    对于有要求数据不能够被过滤的业务场景,可以通过设置会话变量enable_insert_stricttrue来确保当有数据被过滤掉的时候,Insert Into不会被执行成功。

  • 性能问题。

    不建议使用Insert Into Values方式进行数据导入,尤其是大数据的线上生产环境。如果必须这样使用,请将多行数据合并到一个Insert Into语句中进行批量提交,单个批次建议1000~1000000条数据。

  • 部分列更新。

    Insert Into的默认行为是整行写入。在Unique数据模型MOW实现方式中,客户可按需开启部分列更新功能,需要设置如下会话变量:

    set enable_unique_key_partial_update=true

    需要注意的是,控制Insert Into语句是否开启严格模式的会话变量enable_insert_strict的默认值为true,意味着不允许更新不存在的key。所以,在使用Insert Into语句进行部分列更新时,如果希望能插入不存在的key,需要同时将enable_insert_strict设置为false。