全部产品
Search
文档中心

实时数仓Hologres:INSERT ON CONFLICT(UPSERT)

更新时间:Aug 06, 2024

本文为您介绍在Hologres中INSERT ON CONFLICT语句的用法。

应用场景

INSERT ON CONFLICT命令适用于通过SQL方式导入数据的场景。

使用数据集成或Flink写入数据时,如果需要对主键重复的行数据执行更新或跳过操作,则需进行如下配置:

  • 通过DataWorks的数据集成导入数据。

    数据集成已内置INSERT ON CONFLICT功能,该功能的实现原理请参见Hologres Writer。同时,您需要进行如下配置:

    • 离线同步数据时,写入冲突策略选择忽略(Ignore)或者更新(Replace)

    • 实时同步数据时,写入冲突策略选择忽略(Ignore)或者更新(Replace)

    说明

    同步数据时,Hologres的表均需要设置主键,才能更新数据。

  • 通过Flink写入数据。

    通过Flink写入数据默认写入冲突策略使用InsertOrIgnore(保留首次出现的数据,忽略后续所有数据),但是需要您在Hologres建表时设置主键。如果使用ctas语法,则写入冲突策略默认为InsertOrUpdate(替换部分已有数据)。

命令介绍

INSERT ON CONFLICT语句用于在指定列插入某行数据时,如果主键存在重复的行数据,则对该数据执行更新或跳过操作,实现UPSERT(INSERT OR UPDATE)的效果。INSERT ON CONFLICT的语法格式如下。

INSERT INTO <table_name> [ AS <alias> ] [ ( <column_name> [, ...] ) ]
    {  VALUES ( { <expression> } [, ...] ) [, ...] | <query> }
    [ ON CONFLICT [ conflict_target ] conflict_action ]

where conflict_target is pk

    ON CONSTRAINT constraint_name

and conflict_action is one of:

    DO NOTHING
    DO UPDATE SET { <column_name> = { <expression> } |
                    ( <column_name> [, ...] ) = ( { <expression> } [, ...] ) |
                  } [, ...]
              [ WHERE condition ]

参数说明如下表所示。

参数

描述

table_name

插入数据的目标表名称。

alias

别名。目标表的替代名称。

column_name

目标表中目标列名称。

DO NOTHING

InsertOrIgnore,即在指定列插入某行数据时,如果主键存在重复的行数据,则对该数据执行跳过操作。

DO UPDATE

InsertOrUpdate,即在指定列插入某行数据时,如果主键存在重复的行数据,则对该数据执行更新操作。

存在如下情况:

  • 更新全部列:全部列的数据都更新,即整行更新。

  • 更新部分列:即局部列更新,缺失的列不更新。

  • 如果要实现InsertOrReplace的效果,同时缺失的列补null,需要手动在值内传null,见下方使用示例。

  • 重要
    • 当列中有默认(default)值时,DO UPDATE不更新有默认值的列,性能会比较低。

    • 通过SQL(INSERT ON CONFLICT)实现的InsertOrReplace,缺失的列补null,需要在insert的值内传null;如果是使用Flink、数据集成等方式,选择InsertOrReplace会自动补null

expression

对应列执行的相关表达式,您可以参考Postgres来设置表达式。

常用表达式 b=excluded.b,或者(a, b, c) = ROW (excluded.*)简化表达,等式左侧表示要被更新的字段,等式右侧表示插入的表达式,即values部分的值,或者select表达式,excluded是对插入表达式的别名,不是写入源头表的别名。例如,column_name = excluded.column_namecolumn_name为插入数据至目标表指定列的列名称,假设column_name为目标表的第N列,则excluded.column_name为插入表达式的第N列,当使用excluded.*时,表示选择所有列,列的顺序为插入表达式中列的顺序,需要保证插入目标列的顺序与被写入表的DDL顺序一致。

技术原理

INSERT ON CONFLICT的技术实现原理同UPDATE,详情请参见UPDATE。不同表存储格式(行存、列存、行列共存)在更新时的细节处理会略有不同,这就导致不同存储模式的表在更新时,性能会有不同。而根据业务的需求,INSERT ON CONFLICT又可以分为InsertOrIgnoreInsertOrReplaceInsertOrUpdate,三者的具体区别如下:

更新模式

说明

InsertOrIgnore

写入时忽略更新,结果表有主键,实时写入时如果主键重复,丢弃后到的数据,通过insert on conflict do nothing实现。

InsertOrUpdate

写入更新,结果表有主键,实时写入时如果主键重复,按照主键更新。分为整行更新和部分列更新,部分列更新指如果写入的一行数据不包含所有列,缺失的列不更新。通过insert on conflict do update实现。

InsertOrReplace

写入覆盖,结果表有主键,实时写入时如果主键重复,按照主键更新。如果写入的一行数据不包含所有列,缺失的列的数据补Null,需要通过insert on conflict do update和手动补Null值一起实现。

根据UPDATE的原理,当表设置不同的存储格式时,不同UPDATE模式下的更新性能如下:

  • 列存表不同写入模式的性能排序如下。

    • 结果表无主键性能最高。

    • 结果表有主键时:InsertOrIgnore > InsertOrReplace >= InsertOrUpdate(整行)> InsertOrUpdate(部分列)

  • 行存表不同写入模式的性能排序如下。

    InsertOrReplace = InsertOrUpdate(整行)>= InsertOrUpdate(部分列) >= InsertOrIgnore

使用限制

  • INSERT ON CONFLICT语句的条件必须包含所有主键。

  • Hologres HQE在执行INSERT ON CONFLICT时,本身不会保序(保证顺序),因此不能实现keep first、keep last的效果,都是keep any。但在实际应用中,如果数据源有主键重复数据需要去重,建议使用keep last,命令如下:

    --保留重复数据的最后一条数据
    set hg_experimental_affect_row_multiple_times_keep_last = on;

使用示例

  • INSERT ON CONFLICT语句的示例用法:

    说明

    Hologres从V2.1.17版本起支持Serverless Computing能力,针对大数据量离线导入、大型ETL作业、外表大数据量查询等场景,使用Serverless Computing执行该类任务可以直接使用额外的Serverless资源,避免使用实例自身资源,无需为实例预留额外的计算资源,显著提升实例稳定性、减少OOM概率,且仅需为任务单独付费。Serverless Computing详情请参见Serverless Computing概述,Serverless Computing使用方法请参见Serverless Computing使用指南

    1. 准备表和数据:

      begin ;
      create table test1 (
          a int NOT NULL PRIMARY KEY,
          b int,
          c int
      );
      commit ;
      
      insert into test1 values (1,2,3);
      
    2. 不同场景下的使用示例:

      说明

      下面的每个场景示例结果不相互依赖,没有顺序关系,都是基于上述已创建的表和数据的结果。

      • 场景1:实现InsertOrIgnore,即主键重复不更新。

        INSERT INTO test1 (a, b, c) VALUES (1, 1, 1)
        ON CONFLICT (a)
        DO NOTHING;
        
        --更新后test1表的数据为:
        a	b	c
        1	2	3
      • 场景2:实现InsertOrUpdate的整行更新,可以通过如下两种方式实现。

        • 方式1:在SET..EXCLUDED中列出所有的列。

          INSERT INTO test1 (a, b, c) VALUES (1, 1, 1)
          ON CONFLICT (a)
          DO UPDATE SET b = EXCLUDED.b, c = EXCLUDED.c;
          
          --更新后test1表的数据为:
          a	b	c
          1	1	1
        • 方式2:使用ROW(EXCLUDED.*)代表更新所有列。

          INSERT INTO test1 (a, b, c)VALUES (1, 1, 1)
          ON CONFLICT (a)
          DO UPDATE SET (a,b,c) = ROW(EXCLUDED.*);
          
          --更新后test1表的数据为:
          a	b	c
          1	1	1
      • 场景3:实现InsertOrUpdate的部分列更新,即只更新指定列,缺失的列不更新。

        --要实现部分列更新的效果,需要在set后列出想要更新的列
        INSERT INTO test1 (a, b, c) VALUES (1, 1, 1)
        ON CONFLICT (a)
        DO UPDATE SET b = EXCLUDED.b;
        
        --表中c列不更新,更新后test1表的数据为:
        a	b	c
        1	1	3
      • 场景4:实现InsertOrReplace,即整行覆盖,如果有缺失的列,缺失的列补null。

        --如果要实现InsertOrReplace,且缺失的列补null,则需要在insert的值中手动补null。
        INSERT INTO test1 (a, b,c) VALUES (1, 1,null)
        ON CONFLICT (a)
        DO UPDATE SET b = EXCLUDED.b,c = EXCLUDED.c;
        
        --更新后test1表的数据为:
        a	b	c
        1	1	\N
      • 场景5:从另外一张test2表更新test1表数据。

        --准备test2表和数据
        CREATE TABLE test2 (
            d int NOT NULL PRIMARY KEY,
            e int,
            f int
        );
        INSERT INTO test2 VALUES (1, 5, 6);
        
        
        --将test2整表替换test1表相同主键的行
        INSERT INTO test1 (a, b, c) SELECT d,e,f FROM test2 ON CONFLICT (a) DO UPDATE SET
        (a,b,c) = ROW (excluded.*);
        
        --更新后test1表数据如下:
        a	b	c
        1	5	6
        
        --将test2整表替换test1表相同主键的行,但调整了更新映射关系,即test2的e列更新到c列,f列更新到b列
        INSERT INTO test1 (a, b, c) SELECT d,e,f FROM test2 ON CONFLICT (a) DO UPDATE SET
        (a,c,b) = ROW (excluded.*);
        --更新后test1表数据如下:       
        a	b	c
        1	6	5
  • 行存表INSERT ON CONFLICT语句的优化:

    Hologres对行存表的更新场景实行了优化,建议您在使用时将UPDATE列的顺序与INSERT的顺序保持一致,并且更新为整行更新。

    INSERT INTO test1 (a, b, c) 
    SELECT
        d,e,f
    FROM
        test2
    ON CONFLICT (a)
        DO UPDATE SET
            a = excluded.a,
            b = excluded.b,
            c = excluded.c;
            
    INSERT INTO test1 (a, b, c)
    SELECT d,e,f FROM test2
    ON CONFLICT (a)
    DO UPDATE SET(a,b,c) = ROW (excluded.*)

常见报错

  • 问题现象

    对数据源执行INSERT ON CONFLICT语句时出现如下两种报错其中一个。

    • 报错一:duplicate key value violates unique constraint

    • 报错二:Update row with Key (xxx)=(yyy) multiple times

    • 报错三(OOM问题):Total memory used by all existing queries exceeded memory limitation

  • 问题原因一:数据源存在重复数据。

    Hologres兼容PostgreSQL,使用的也是标准PostgreSQL语法。在标准的PostgreSQL语义中,对数据源执行INSERT ON CONFLICT语句时,数据源不能包含重复数据,如果包含重复数据则会产生上述报错。

    说明

    数据源重复是指待插入的数据中包含重复数据,不是指待插入的数据与表里的数据重复。

    使用INSERT ON CONFLICT语句插入数据时包含重复数据,示例语句如下。

    INSERT INTO test1 VALUES (1, 2, 3), (1, 2, 3)
    ON CONFLICT (a) 
    DO UPDATE SET (a, b, c) = ROW (excluded.*);

    解决方法:

    如果数据源包含重复数据,可以配置如下参数,保留重复数据的最后一条数据:

    set hg_experimental_affect_row_multiple_times_keep_last = on;
  • 问题原因二:数据源因TTL过期出现重复数据。

    数据源中有表设置过表数据生命周期(TTL),表中有部分数据已经过了TTL,因TTL不是准确的时间,导致过期的数据未被清理,导入时主键(PK)数据重复,从而出现报错。

    解决方法:

    Hologres从 V1.3.23版本开始,通过以下命令能快速修正因TTL过期PK重复的数据。执行该命令后,系统会将该表PK重复的数据清理掉,清理策略默认为Keep Last即保留重复PK中最后一条写入的PK数据,其余重复PK数据进行清理。

    说明
    • 原则上来说PK不会出现重复数据,因此该命令仅清理因TTL导致PK重复的数据。

    • 该命令仅Hologres V1.3.23及以上版本使用,若实例版本较低,请升级实例。

    call public.hg_remove_duplicated_pk('<schema>.<table_name>');

    使用示例:假设有两个表,tbl_1为目标表,tbl_2为源表且配置了TTL,时间设置为300s。将tbl_2的数据整行更新至tbl_1,因TTL过期后,tbl_2的主键重复,导致报错。

    
    BEGIN;
    CREATE TABLE tbl_1 (
      a int NOT NULL PRIMARY KEY,
      b int,
      c int
    );
    CREATE TABLE tbl_2 (
      d int NOT NULL PRIMARY KEY,
      e int,
      f int
    );
    CALL set_table_property('tbl_2', 'time_to_live_in_seconds', '300'); 
    COMMIT;
    
    INSERT INTO tbl_1 VALUES (1, 1, 1), (2, 3, 4);
    INSERT INTO tbl_2 VALUES (1, 5, 6);
    --过300s后再向tbl_2插入数据
    INSERT INTO tbl_2 VALUES (1, 3, 6);
    
    --将tbl_2整表替换tbl_1表相同主键的行,PK因ttl重复了导致更新报错
    INSERT INTO tbl_1 (a, b, c)
    SELECT
        d,e,f
    FROM
        tbl_2
    ON CONFLICT (a)
        DO UPDATE SET
            (a,b,c) = ROW (excluded.*);
    --错误原因:ERROR: internal error: Duplicate keys detected when building hash table.
    
    --guc清理tbl_2的PK重复数据,策略为keep last,
    call public.hg_remove_duplicated_pk('tbl_2');
    
    --再重新导入tbl_1数据,数据导入成功
  • 问题原因三:实例本身内存资源不足,无法支撑本次大数据量写入任务。

    解决方法:

    • 推荐使用Hologres Serverless Computing能力执行本次大数据量写入任务。Hologres从V2.1.17版本起支持Serverless Computing能力,针对大数据量离线导入、大型ETL作业、外表大数据量查询等场景,使用Serverless Computing执行该类任务可以直接使用额外的Serverless资源,避免使用实例自身资源,无需为实例预留额外的计算资源,显著提升实例稳定性、减少OOM概率,且仅需为任务单独付费。Serverless Computing详情请参见Serverless Computing概述,Serverless Computing使用方法请参见Serverless Computing使用指南

    • 参考OOM常见问题排查指南中的方法处理。