全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:MERGE INTO

更新时间:Jun 07, 2024

当您需要对Transactional表或Delta Table执行insertupdatedelete操作时,可以通过merge into功能将这些操作合并为一条SQL语句,根据与源表关联的结果,对目标Transactional表执行插入、更新或删除操作,只需要进行一次全表扫描操作,以提高执行效率。

前提条件

执行merge into操作前需要具备目标Transactional表的读取表数据权限(Select)及更新表数据权限(Update)。授权操作请参见MaxCompute权限

功能介绍

MaxCompute支持了deleteupdate功能,但当您需要使用多个insertupdatedelete对目标表进行批量操作时,需要编写多条SQL语句,然后进行多次全表扫描才能完成操作。MaxCompute提供的merge into功能,只需要进行一次全表扫描操作,就可以完成全部操作,执行效率要高于insert+update+delete

merge into操作具备原子性,作业中的insertupdatedelete操作都执行成功时,作业才算执行成功;任一内部逻辑处理失败,则整体作业执行失败。

同时,merge into可以为您避免分别执行insertupdatedelete操作时,可能导致部分操作执行成功,部分操作执行失败,其中成功部分无法回退的问题。

使用限制

不允许在同一条merge into语句中对相同的行执行多次insertupdate操作。

命令格式

merge into <target_table> as <alias_name_t> using <source expression|table_name> as <alias_name_s>
--从on开始对源表和目标表的数据进行关联判断。
on <boolean expression1>
--when matched…then指定on的结果为True的行为。多个when matched…then之间的数据无交集。
when matched [and <boolean expression2>] then update set <set_clause_list>
when matched [and <boolean expression3>] then delete 
--when not matched…then指定on的结果为False的行为。
when not matched [and <boolean expression4>] then insert values <value_list>
  • target_table:必填。目标表名称,必须是实际存在的表。

  • alias_name_t:必填。目标表的别名。

  • source expression|table_name:必填。关联的源表名称、视图或子查询。

  • alias_name_s:必填。关联的源表、视图或子查询的别名。

  • boolean expression1:必填。BOOLEAN类型判断条件,判断结果必须为True或False。

  • boolean expression2boolean expression3boolean expression4:可选。updatedeleteinsert操作相应的BOOLEAN类型判断条件。需要注意的是:

    • 当出现三个WHEN子句时,updatedeleteinsert都只能出现一次。

    • 如果updatedelete同时出现,出现在前的操作必须包括[and <boolean expression>]

    • when not matched只能出现在最后一个WHEN子句中,并且只支持insert操作。

  • set_clause_list:当出现update操作时必填。待更新数据信息。更多update信息,请参见更新数据(UPDATE)

  • value_list:当出现insert操作时必填。待插入数据信息。更多values信息,请参见VALUES

使用示例

  • 示例1:创建目标表acid_address_book_base1及源表tmp_table1,并插入数据。执行merge into操作,对符合on条件的数据用源表的数据对目标表进行更新操作,对不符合on条件并且源表中满足event_typeI的数据插入目标表。命令示例如下:

    --创建目标表acid_address_book_base1。
    create table if not exists acid_address_book_base1 
    (id bigint,first_name string,last_name string,phone string) 
    partitioned by(year string, month string, day string, hour string) 
    tblproperties ("transactional"="true"); 
    
    --创建源表tmp_table1。
    create table if not exists tmp_table1 
    (id bigint, first_name string, last_name string, phone string, _event_type_ string);
    
    --向目标表acid_address_book_base1插入测试数据。
    insert overwrite table acid_address_book_base1 
    partition(year='2020', month='08', day='20', hour='16') 
    values (4, 'nihaho', 'li', '222'), (5, 'tahao', 'ha', '333'), 
    (7, 'djh', 'hahh', '555');
    
    --查询目标表的数据确认插入测试数据的操作结果。
    set odps.sql.allow.fullscan=true;
    select * from acid_address_book_base1;
    +------------+------------+------------+------------+------------+------------+------------+------------+
    | id         | first_name | last_name  | phone      | year       | month      | day        | hour       |
    +------------+------------+------------+------------+------------+------------+------------+------------+
    | 4          | nihaho     | li         | 222        | 2020       | 08         | 20         | 16         |
    | 5          | tahao      | ha         | 333        | 2020       | 08         | 20         | 16         |
    | 7          | djh        | hahh       | 555        | 2020       | 08         | 20         | 16         |
    +------------+------------+------------+------------+------------+------------+------------+------------+
    
    --向源表tmp_table1插入测试数据。
    insert overwrite table tmp_table1 values 
    (1, 'hh', 'liu', '999', 'I'), (2, 'cc', 'zhang', '888', 'I'),
    (3, 'cy', 'zhang', '666', 'I'),(4, 'hh', 'liu', '999', 'U'),
    (5, 'cc', 'zhang', '888', 'U'),(6, 'cy', 'zhang', '666', 'U');
    
    --查询源表的数据确认插入测试数据的操作结果。
    select * from tmp_table1;
    +------------+------------+------------+------------+--------------+
    | id         | first_name | last_name  | phone      | _event_type_ |
    +------------+------------+------------+------------+--------------+
    | 1          | hh         | liu        | 999        | I            |
    | 2          | cc         | zhang      | 888        | I            |
    | 3          | cy         | zhang      | 666        | I            |
    | 4          | hh         | liu        | 999        | U            |
    | 5          | cc         | zhang      | 888        | U            |
    | 6          | cy         | zhang      | 666        | U            |
    +------------+------------+------------+------------+--------------+
    
    --执行merge into操作。
    merge into acid_address_book_base1 as t using tmp_table1 as s 
    on s.id = t.id and t.year='2020' and t.month='08' and t.day='20' and t.hour='16' 
    when matched then update set t.first_name = s.first_name, t.last_name = s.last_name, t.phone = s.phone 
    when not matched and (s._event_type_='I') then insert values(s.id, s.first_name, s.last_name,s.phone,'2020','08','20','16');
    
    --查询目标表的数据确认merge into操作结果。
    select * from acid_address_book_base1;
    
    +------------+------------+------------+------------+------------+------------+------------+------------+
    | id         | first_name | last_name  | phone      | year       | month      | day        | hour       |
    +------------+------------+------------+------------+------------+------------+------------+------------+
    | 4          | hh         | liu        | 999        | 2020       | 08         | 20         | 16         |
    | 5          | cc         | zhang      | 888        | 2020       | 08         | 20         | 16         |
    | 7          | djh        | hahh       | 555        | 2020       | 08         | 20         | 16         |
    | 1          | hh         | liu        | 999        | 2020       | 08         | 20         | 16         |
    | 2          | cc         | zhang      | 888        | 2020       | 08         | 20         | 16         |
    | 3          | cy         | zhang      | 666        | 2020       | 08         | 20         | 16         |
    +------------+------------+------------+------------+------------+------------+------------+------------+
  • 示例2:创建目标表merge_acid_dp及源表merge_acid_source,并插入数据。以不指定分区方式执行merge into命令,进行更新或者插入数据,对目标表的所有分区生效。

    --创建目标表merge_acid_dp。
    create table if not exists merge_acid_dp(c1 bigint not null, c2 bigint not null)
    partitioned by (dd string, hh string) tblproperties ("transactional" = "true");
    --创建源表merge_acid_source。
    create table if not exists merge_acid_source(c1 bigint not null, c2 bigint not null,
      c3 string, c4 string) lifecycle 30;
    
    --向目标表merge_acid_dp插入测试数据。
    insert overwrite table merge_acid_dp partition (dd='01', hh='01')
    values (1, 1), (2, 2);
    insert overwrite table merge_acid_dp partition (dd='02', hh='02')
    values (4, 1), (3, 2);
    
    --查询目标表的数据确认插入测试数据的操作结果。
    set odps.sql.allow.fullscan=true;
    select * from merge_acid_dp;
    +------------+------------+----+----+
    | c1         | c2         | dd | hh |
    +------------+------------+----+----+
    | 1          | 1          | 01 | 01 |
    | 2          | 2          | 01 | 01 |
    | 4          | 1          | 02 | 02 |
    | 3          | 2          | 02 | 02 |
    +------------+------------+----+----+
    
    --向源表merge_acid_source插入测试数据。
    insert overwrite table merge_acid_source values(8, 2, '03', '03'),
    (5, 5, '05', '05'), (6, 6, '02', '02');
    
    --查询源表的数据确认插入测试数据的操作结果。
    select * from merge_acid_source;
    +------------+------------+----+----+
    | c1         | c2         | c3 | c4 |
    +------------+------------+----+----+
    | 8          | 2          | 03 | 03 |
    | 5          | 5          | 05 | 05 |
    | 6          | 6          | 02 | 02 |
    +------------+------------+----+----+
    
    --执行merge into操作。
    merge into merge_acid_dp tar using merge_acid_source src
    on tar.c2 = src.c2
    when matched then
    update set tar.c1 = src.c1
    when not matched then
    insert values(src.c1, src.c2, src.c3, src.c4);
    
    --查询目标表的数据确认merge into操作结果。
    select * from merge_acid_dp;
    +------------+------------+----+----+
    | c1         | c2         | dd | hh |
    +------------+------------+----+----+
    | 6          | 6          | 02 | 02 |
    | 5          | 5          | 05 | 05 |
    | 8          | 2          | 02 | 02 |
    | 8          | 2          | 01 | 01 |
    | 1          | 1          | 01 | 01 |
    | 4          | 1          | 02 | 02 |
    +------------+------------+----+----+
  • 示例3:创建目标表merge_acid_sp及源表merge_acid_source,并插入数据。以指定分区方式执行merge into命令,进行更新或者插入数据,对目标表的指定分区生效。

    --创建目标表merge_acid_sp。
    create table if not exists merge_acid_sp(c1 bigint not null, c2 bigint not null)
    partitioned by (dd string, hh string) tblproperties ("transactional" = "true");
    --创建源表merge_acid_source。
    create table if not exists merge_acid_source(c1 bigint not null, c2 bigint not null,
      c3 string, c4 string) lifecycle 30;
    
    --向目标表merge_acid_sp插入测试数据。
    insert overwrite table merge_acid_sp partition (dd='01', hh='01')
    values (1, 1), (2, 2);
    insert overwrite table merge_acid_sp partition (dd='02', hh='02')
    values (4, 1), (3, 2);
    
    --查询目标表的数据确认插入测试数据的操作结果。
    set odps.sql.allow.fullscan=true;
    select * from merge_acid_sp;
    +------------+------------+----+----+
    | c1         | c2         | dd | hh |
    +------------+------------+----+----+
    | 1          | 1          | 01 | 01 |
    | 2          | 2          | 01 | 01 |
    | 4          | 1          | 02 | 02 |
    | 3          | 2          | 02 | 02 |
    +------------+------------+----+----+
    
    --向源表merge_acid_source插入测试数据。
    insert overwrite table merge_acid_source values(8, 2, '03', '03'),
    (5, 5, '05', '05'), (6, 6, '02', '02');
    
    --查询源表的数据确认插入测试数据的操作结果。
    select * from merge_acid_source;
    +------------+------------+----+----+
    | c1         | c2         | c3 | c4 |
    +------------+------------+----+----+
    | 8          | 2          | 03 | 03 |
    | 5          | 5          | 05 | 05 |
    | 6          | 6          | 02 | 02 |
    +------------+------------+----+----+
    
    --执行merge into操作,同时在on条件中指定只对目标表的dd = '01' and hh = '01'分区执行更新或者插入操作。
    merge into merge_acid_sp tar using merge_acid_source src
    on tar.c2 = src.c2 and tar.dd = '01' and tar.hh = '01'
    when matched then
    update set tar.c1 = src.c1
    when not matched then
    insert values(src.c1, src.c2, src.c3, src.c4);
    
    --查询目标表的数据确认merge into操作结果。
    select * from merge_acid_sp;
    +------------+------------+----+----+
    | c1         | c2         | dd | hh |
    +------------+------------+----+----+
    | 5          | 5          | 05 | 05 |
    | 6          | 6          | 02 | 02 |
    | 8          | 2          | 01 | 01 |
    | 1          | 1          | 01 | 01 |
    | 4          | 1          | 02 | 02 |
    | 3          | 2          | 02 | 02 |
    +------------+------------+----+----+
  • 示例4:创建Delta类型目标表mf_tt6及源表mf_delta,并插入数据。以指定分区方式执行merge into命令,进行更新、插入或删除数据,对目标表的指定分区生效。

    --创建Delta类型目标表mf_tt6。
    create table if not exists mf_tt6 (pk bigint not null primary key, 
                      val bigint not null) 
                      partitioned by (dd string, hh string) 
                      tblproperties ("transactional"="true");
    --向目标表mf_tt6插入测试数据。
    insert overwrite table mf_tt6 partition (dd='01', hh='02') values (1, 1), (2, 2), (3, 3);
    insert overwrite table mf_tt6 partition (dd='01', hh='01') values (1, 10), (2, 20), (3, 30);
    
    --开启全表扫描,仅此Session有效。执行select语句查看表mf_tt6中的数据。
    set odps.sql.allow.fullscan=true;
    select * from mf_tt6;
    +------------+------------+----+----+
    | pk         | val        | dd | hh |
    +------------+------------+----+----+
    | 1          | 10         | 01 | 01 |
    | 3          | 30         | 01 | 01 |
    | 2          | 20         | 01 | 01 |
    | 1          | 1          | 01 | 02 |
    | 3          | 3          | 01 | 02 |
    | 2          | 2          | 01 | 02 |
    +------------+------------+----+----+
    
    --创建源表mf_delta,并插入测试数据。
    create table if not exists mf_delta as select pk, val from values (1, 10), (2, 20), (6, 60) t (pk, val);
    
    --查询源表的数据,确认插入测试数据的操作结果。
    select * from mf_delta;
    +------+------+
    | pk   | val  |
    +------+------+
    | 1    | 10   |
    | 2    | 20   |
    | 6    | 60   |
    +------+------+
    
    --执行merge into操作,同时在on条件中指定只对目标表mf_tt6的dd = '01' and hh = '02'分区执行更新、插入或删除操作。
    merge into mf_tt6 using mf_delta 
    on mf_tt6.pk = mf_delta.pk and mf_tt6.dd='01' and mf_tt6.hh='02' 
    when matched and (mf_tt6.pk > 1) then
    update set mf_tt6.val = mf_delta.val 
    when matched then delete 
    when not matched then
    insert values (mf_delta.pk, mf_delta.val, '01', '02');         
    
    --查询目标表的数据确认merge into操作结果。
    set odps.sql.allow.fullscan=true;
    select * from mf_tt6;
    +------------+------------+----+----+
    | pk         | val        | dd | hh |
    +------------+------------+----+----+
    | 1          | 10         | 01 | 01 |
    | 3          | 30         | 01 | 01 |
    | 2          | 20         | 01 | 01 |
    | 3          | 3          | 01 | 02 |
    | 6          | 60         | 01 | 02 |
    | 2          | 20         | 01 | 02 |
    +------------+------------+----+----+