全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:MERGE INTO

更新时间:Dec 23, 2024

当您需要对Transactional表或Delta表执行INSERTUPDATEDELETE操作时,可以通过MERGE INTO功能将这些操作合并为一条SQL语句,根据与源表关联的结果,对目标Transactional表执行插入、更新或删除操作时,只需要执行一次全表扫描操作,以提高执行效率。

前提条件

执行MERGE INTO操作前需要具备目标Transactional表的读取表数据权限(Select)及更新表数据权限(Update)。授权操作请参见MaxCompute权限

功能介绍

MaxCompute支持了DELETEUPDATE功能,但当您需要使用多个INSERTUPDATEDELETE对目标表进行批量操作时,需要编写多条SQL语句,然后进行多次全表扫描才能完成操作。MaxCompute提供的MERGE INTO功能,只需要进行一次全表扫描操作,就可以完成全部操作,执行效率要高于INSERT+UPDATE+DELETE

MERGE INTO操作具备原子性,作业中的INSERTUPDATEDELETE操作都执行成功时,作业才算执行成功;任一内部逻辑处理失败,则整体作业执行失败。

同时,MERGE INTO可以为您避免分别执行INSERTUPDATEDELETE操作时,可能导致部分操作执行成功,部分操作执行失败,其中成功部分无法回退的问题。

使用限制

不允许在同一条MERGE INTO语句中对相同的行执行多次INSERTUPDATE操作。

命令格式

MERGE INTO <target_table> AS <alias_name_t> USING <source expression|table_name> AS <alias_name_s>
--从on开始对源表和目标表的数据进行关联判断。
ON <BOOLEAN expression1>
--when matched…then指定on的结果为True的行为。多个when matched…then之间的数据无交集。
WHEN matched [AND <BOOLEAN expression2>] THEN UPDATE SET <set_clause_list>
WHEN matched [AND <BOOLEAN expression3>] THEN DELETE 
--when not matched…then指定on的结果为False的行为。
WHEN NOT matched [AND <BOOLEAN expression4>] THEN INSERT VALUES <value_list>

参数说明

参数名

是否必填

描述

target_table

目标表名称,必须是实际存在的表。

alias_name_t

目标表的别名。

source expression|table_name

关联的源表名称、视图或子查询。

alias_name_s

关联的源表、视图或子查询的别名

BOOLEAN expression1

BOOLEAN类型判断条件,判断结果必须为True或False。

BOOLEAN expression2、BOOLEAN expression3、BOOLEAN expression4

UPDATEDELETEINSERT操作相应的BOOLEAN类型判断条件。需要注意:

  • 当出现三个WHEN子句时,UPDATEDELETEINSERT都只能出现一次。

  • 如果UPDATEDELETE同时出现,出现在前的操作必须包括[AND <BOOLEAN expression>]

  • WHEN NOT MATCHED只能出现在最后一个WHEN子句中,并且只支持INSERT操作。

set_clause_list

待更新数据信息。当出现UPDATE操作时必填。

更多UPDATE信息,请参见更新数据(UPDATE)

value_list

待插入数据信息。当出现INSERT操作时必填。

更多VALUES信息,请参见VALUES

使用示例

  • 示例1:创建目标表acid_address_book_base1及源表tmp_table1,并插入数据。执行MERGE INTO操作,对符合ON条件的数据用源表的数据对目标表进行更新操作,对不符合ON条件并且源表中满足_event_type_I的数据插入目标表。命令示例如下:

    --创建目标表acid_address_book_base1。
    CREATE TABLE IF NOT EXISTS acid_address_book_base1 
    (id BIGINT,first_name STRING,last_name STRING,phone STRING) 
    PARTITIONED BY(year STRING, month STRING, day STRING, hour STRING) 
    tblproperties ("transactional"="true"); 
    
    --创建源表tmp_table1。
    CREATE TABLE IF NOT EXISTS tmp_table1 
    (id BIGINT, first_name STRING, last_name STRING, phone STRING, _event_type_ STRING);
    
    --向目标表acid_address_book_base1插入测试数据。
    INSERT OVERWRITE TABLE acid_address_book_base1 
    PARTITION(year='2020', month='08', day='20', hour='16') 
    VALUES (4, 'nihaho', 'li', '222'), (5, 'tahao', 'ha', '333'), 
    (7, 'djh', 'hahh', '555');
    
    --查询目标表的数据确认插入测试数据的操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM acid_address_book_base1;
    --返回结果
    +------------+------------+------------+------------+------------+------------+------------+------------+
    | id         | first_name | last_name  | phone      | year       | month      | day        | hour       |
    +------------+------------+------------+------------+------------+------------+------------+------------+
    | 4          | nihaho     | li         | 222        | 2020       | 08         | 20         | 16         |
    | 5          | tahao      | ha         | 333        | 2020       | 08         | 20         | 16         |
    | 7          | djh        | hahh       | 555        | 2020       | 08         | 20         | 16         |
    +------------+------------+------------+------------+------------+------------+------------+------------+
    
    --向源表tmp_table1插入测试数据。
    INSERT OVERWRITE TABLE tmp_table1 VALUES 
    (1, 'hh', 'liu', '999', 'I'), (2, 'cc', 'zhang', '888', 'I'),
    (3, 'cy', 'zhang', '666', 'I'),(4, 'hh', 'liu', '999', 'U'),
    (5, 'cc', 'zhang', '888', 'U'),(6, 'cy', 'zhang', '666', 'U');
    
    --查询源表的数据确认插入测试数据的操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM tmp_table1;
    --返回结果
    +------------+------------+------------+------------+--------------+
    | id         | first_name | last_name  | phone      | _event_type_ |
    +------------+------------+------------+------------+--------------+
    | 1          | hh         | liu        | 999        | I            |
    | 2          | cc         | zhang      | 888        | I            |
    | 3          | cy         | zhang      | 666        | I            |
    | 4          | hh         | liu        | 999        | U            |
    | 5          | cc         | zhang      | 888        | U            |
    | 6          | cy         | zhang      | 666        | U            |
    +------------+------------+------------+------------+--------------+
    
    --执行merge into操作。
    MERGE INTO acid_address_book_base1 AS t USING tmp_table1 as s 
    ON s.id = t.id AND t.year='2020' AND t.month='08' AND t.day='20' AND t.hour='16' 
    WHEN MATCHED THEN UPDATE SET t.first_name = s.first_name, t.last_name = s.last_name, t.phone = s.phone 
    WHEN NOT MATCHED AND (s._event_type_='I') THEN INSERT VALUES(s.id, s.first_name, s.last_name,s.phone,'2020','08','20','16');
    
    --查询目标表的数据确认merge into操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM acid_address_book_base1;
    --返回结果
    +------------+------------+------------+------------+------------+------------+------------+------------+
    | id         | first_name | last_name  | phone      | year       | month      | day        | hour       |
    +------------+------------+------------+------------+------------+------------+------------+------------+
    | 4          | hh         | liu        | 999        | 2020       | 08         | 20         | 16         |
    | 5          | cc         | zhang      | 888        | 2020       | 08         | 20         | 16         |
    | 7          | djh        | hahh       | 555        | 2020       | 08         | 20         | 16         |
    | 1          | hh         | liu        | 999        | 2020       | 08         | 20         | 16         |
    | 2          | cc         | zhang      | 888        | 2020       | 08         | 20         | 16         |
    | 3          | cy         | zhang      | 666        | 2020       | 08         | 20         | 16         |
    +------------+------------+------------+------------+------------+------------+------------+------------+
  • 示例2:创建目标表merge_acid_dp及源表merge_acid_source,并插入数据。以不指定分区方式执行MERGE INTO命令,进行更新或者插入数据,对目标表的所有分区生效。

    --创建目标表merge_acid_dp。
    CREATE TABLE IF NOT EXISTS merge_acid_dp(c1 BIGINT NOT NULL, c2 BIGINT NOT NULL)
    PARTITIONED BY (dd STRING, hh STRING) tblproperties ("transactional" = "true");
    --创建源表merge_acid_source。
    CREATE TABLE IF NOT EXISTS merge_acid_source(c1 BIGINT NOT NULL, c2 BIGINT NOT NULL,
      c3 STRING, c4 STRING) lifecycle 30;
    
    --向目标表merge_acid_dp插入测试数据。
    INSERT OVERWRITE TABLE merge_acid_dp PARTITION (dd='01', hh='01')
    VALUES (1, 1), (2, 2);
    INSERT OVERWRITE TABLE merge_acid_dp PARTITION (dd='02', hh='02')
    VALUES (4, 1), (3, 2);
    
    --查询目标表的数据确认插入测试数据的操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM merge_acid_dp;
    --返回结果
    +------------+------------+----+----+
    | c1         | c2         | dd | hh |
    +------------+------------+----+----+
    | 1          | 1          | 01 | 01 |
    | 2          | 2          | 01 | 01 |
    | 4          | 1          | 02 | 02 |
    | 3          | 2          | 02 | 02 |
    +------------+------------+----+----+
    
    --向源表merge_acid_source插入测试数据。
    INSERT OVERWRITE TABLE merge_acid_source VALUES(8, 2, '03', '03'),
    (5, 5, '05', '05'), (6, 6, '02', '02');
    
    --查询源表的数据确认插入测试数据的操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM merge_acid_source;
    --返回结果
    +------------+------------+----+----+
    | c1         | c2         | c3 | c4 |
    +------------+------------+----+----+
    | 8          | 2          | 03 | 03 |
    | 5          | 5          | 05 | 05 |
    | 6          | 6          | 02 | 02 |
    +------------+------------+----+----+
    
    --执行merge into操作。
    SET odps.sql.allow.fullscan=true;
    MERGE INTO merge_acid_dp tar USING merge_acid_source src 
    ON tar.c2 = src.c2 
    WHEN MATCHED THEN 
    UPDATE SET tar.c1 = src.c1 
    WHEN NOT MATCHED THEN 
    INSERT VALUES(src.c1, src.c2, src.c3, src.c4);
    
    --查询目标表的数据确认merge into操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM merge_acid_dp;
    --返回结果
    +------------+------------+----+----+
    | c1         | c2         | dd | hh |
    +------------+------------+----+----+
    | 6          | 6          | 02 | 02 |
    | 5          | 5          | 05 | 05 |
    | 8          | 2          | 02 | 02 |
    | 8          | 2          | 01 | 01 |
    | 1          | 1          | 01 | 01 |
    | 4          | 1          | 02 | 02 |
    +------------+------------+----+----+
  • 示例3:创建目标表merge_acid_sp及源表merge_acid_source,并插入数据。以指定分区方式执行MERGE INTO命令,进行更新或者插入数据,对目标表的指定分区生效。

    --创建目标表merge_acid_sp。
    CREATE TABLE IF NOT EXISTS merge_acid_sp(c1 BIGINT NOT NULL, c2 BIGINT NOT NULL)
    PARTITIONED BY (dd STRING, hh STRING) tblproperties ("transactional" = "true");
    --创建源表merge_acid_source。
    CREATE TABLE IF NOT EXISTS merge_acid_source(c1 BIGINT NOT NULL, c2 BIGINT NOT NULL,
      c3 STRING, c4 STRING) lifecycle 30;
    
    --向目标表merge_acid_sp插入测试数据。
    INSERT OVERWRITE TABLE merge_acid_sp PARTITION (dd='01', hh='01')
    VALUES (1, 1), (2, 2);
    INSERT OVERWRITE TABLE merge_acid_sp PARTITION (dd='02', hh='02')
    VALUES (4, 1), (3, 2);
    
    --查询目标表的数据确认插入测试数据的操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM merge_acid_sp;
    --返回结果
    +------------+------------+----+----+
    | c1         | c2         | dd | hh |
    +------------+------------+----+----+
    | 1          | 1          | 01 | 01 |
    | 2          | 2          | 01 | 01 |
    | 4          | 1          | 02 | 02 |
    | 3          | 2          | 02 | 02 |
    +------------+------------+----+----+
    
    --向源表merge_acid_source插入测试数据。
    INSERT OVERWRITE TABLE merge_acid_source VALUES(8, 2, '03', '03'),
    (5, 5, '05', '05'), (6, 6, '02', '02');
    
    --查询源表的数据确认插入测试数据的操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM merge_acid_source;
    --返回结果
    +------------+------------+----+----+
    | c1         | c2         | c3 | c4 |
    +------------+------------+----+----+
    | 8          | 2          | 03 | 03 |
    | 5          | 5          | 05 | 05 |
    | 6          | 6          | 02 | 02 |
    +------------+------------+----+----+
    
    --执行merge into操作,同时在on条件中指定只对目标表的dd = '01' and hh = '01'分区执行更新或者插入操作。
    SET odps.sql.allow.fullscan=true;
    MERGE INTO merge_acid_sp tar USING merge_acid_source src 
    ON tar.c2 = src.c2 AND tar.dd = '01' AND tar.hh = '01' 
    WHEN MATCHED THEN 
    UPDATE SET tar.c1 = src.c1 
    WHEN NOT MATCHED THEN 
    INSERT VALUES(src.c1, src.c2, src.c3, src.c4);
    
    --查询目标表的数据确认merge into操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM merge_acid_sp;
    +------------+------------+----+----+
    | c1         | c2         | dd | hh |
    +------------+------------+----+----+
    | 5          | 5          | 05 | 05 |
    | 6          | 6          | 02 | 02 |
    | 8          | 2          | 01 | 01 |
    | 1          | 1          | 01 | 01 |
    | 4          | 1          | 02 | 02 |
    | 3          | 2          | 02 | 02 |
    +------------+------------+----+----+
  • 示例4:创建Delta Table类型目标表mf_tt6及源表mf_delta,并插入数据。以指定分区方式执行MERGE INTO命令,进行更新、插入或删除数据,对目标表的指定分区生效。

    --创建Transaction Table2.0类型目标表mf_tt6。
    CREATE TABLE IF NOT EXISTS mf_tt6 (pk BIGINT NOT NULL PRIMARY key, 
                      val BIGINT NOT NULL) 
                      PARTITIONED BY (dd STRING, hh STRING) 
                      tblproperties ("transactional"="true");
    --向目标表mf_tt6插入测试数据。
    INSERT OVERWRITE TABLE mf_tt6 PARTITION (dd='01', hh='02') VALUES (1, 1), (2, 2), (3, 3);
    INSERT OVERWRITE TABLE mf_tt6 PARTITION (dd='01', hh='01') VALUES (1, 10), (2, 20), (3, 30);
    
    --开启全表扫描,仅此Session有效。执行select语句查看表mf_tt6中的数据。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM mf_tt6;
    --返回结果
    +------------+------------+----+----+
    | pk         | val        | dd | hh |
    +------------+------------+----+----+
    | 1          | 10         | 01 | 01 |
    | 3          | 30         | 01 | 01 |
    | 2          | 20         | 01 | 01 |
    | 1          | 1          | 01 | 02 |
    | 3          | 3          | 01 | 02 |
    | 2          | 2          | 01 | 02 |
    +------------+------------+----+----+
    
    --创建源表mf_delta,并插入测试数据。
    CREATE TABLE IF NOT EXISTS mf_delta AS SELECT pk, val FROM VALUES (1, 10), (2, 20), (6, 60) t (pk, val);
    
    --查询源表的数据,确认插入测试数据的操作结果。
    SELECT * FROM mf_delta;
    --返回结果
    +------+------+
    | pk   | val  |
    +------+------+
    | 1    | 10   |
    | 2    | 20   |
    | 6    | 60   |
    +------+------+
    
    --执行merge into操作,同时在on条件中指定只对目标表mf_tt6的dd = '01' and hh = '02'分区执行更新、插入或删除操作。
    
    MERGE INTO mf_tt6 USING mf_delta 
    ON mf_tt6.pk = mf_delta.pk AND mf_tt6.dd='01' AND mf_tt6.hh='02' 
    WHEN MATCHED AND (mf_tt6.pk > 1) THEN 
    UPDATE SET mf_tt6.val = mf_delta.val 
    WHEN MATCHED THEN DELETE 
    WHEN NOT MATCHED THEN 
    INSERT VALUES (mf_delta.pk, mf_delta.val, '01', '02');         
    
    --查询目标表的数据确认merge into操作结果。
    SET odps.sql.allow.fullscan=true;
    SELECT * FROM mf_tt6;
    --返回结果
    +------------+------------+----+----+
    | pk         | val        | dd | hh |
    +------------+------------+----+----+
    | 1          | 10         | 01 | 01 |
    | 3          | 30         | 01 | 01 |
    | 2          | 20         | 01 | 01 |
    | 3          | 3          | 01 | 02 |
    | 6          | 60         | 01 | 02 |
    | 2          | 20         | 01 | 02 |
    +------------+------------+----+----+