Before you use Data Transmission Service (DTS) to migrate data between PostgreSQL databases, you can create a function and a trigger in the source database. The function and trigger obtain the data definition language (DDL) information of the source database. During incremental data migration, DTS migrates DDL operations to the destination database.

Prerequisites

  • The source database must meet the following requirements:
    • If the source database is a self-managed PostgreSQL database, the database version must be V9.4 or later.
    • If the source database is an ApsaraDB RDS for PostgreSQL instance, the version of the ApsaraDB RDS for PostgreSQL instance must be V10 or later.
      • ApsaraDB RDS for PostgreSQL V9.4 does not support event triggers.
      • The kernel versions of ApsaraDB RDS for PostgreSQL V10, V11, and V12 must be 20201130 or later.
      • The kernel versions of ApsaraDB RDS for PostgreSQL V13 must be 20210228 or later.
      Note For more information about how to upgrade the kernel version of ApsaraDB RDS for PostgreSQL, see Update the minor engine version of an ApsaraDB RDS for PostgreSQL instance.
  • A data migration task was created after October 1, 2020.

Background information

When you use DTS to migrate data between PostgreSQL databases, DTS synchronizes only data manipulation language (DML) operations during incremental data migration. DML operations include INSERT, DELETE, and UPDATE. DTS does not synchronize DDL operations during incremental data migration.

To synchronize DDL operations, you can create a trigger and a function to obtain the DDL information of the source database. During incremental data migration, DTS migrates DDL operations to the destination database.

Note Only the following DDL operations can be synchronized: CREATE TABLE, DROP TABLE, and ALTER TABLE. The ALTER TABLE operation includes RENAME TABLE, ADD COLUMN, and DROP COLUMN.

Procedure

Warning If you need to migrate incremental data from multiple databases, repeat Steps 2 to 5 for each database.
  1. Log on to the source PostgreSQL database. For more information, see Connect to an ApsaraDB RDS for PostgreSQL instance or psql.
  2. Switch to the source database.
    Note The psql tool is used in this example. You can run the \c <Database name> command to switch to the source database, for example, \c dtststdata.
  3. Execute the following statements to create a table that stores the DDL information:
    CREATE TABLE public.dts_ddl_command
    (
        ddl_text text COLLATE pg_catalog."default",
       id bigserial primary key,
       event text COLLATE pg_catalog."default",
       tag text COLLATE pg_catalog."default",
       username character varying COLLATE pg_catalog."default",
       database character varying COLLATE pg_catalog."default",
       schema character varying COLLATE pg_catalog."default",
       object_type character varying COLLATE pg_catalog."default",
       object_name character varying COLLATE pg_catalog."default",
       client_address character varying COLLATE pg_catalog."default",
       client_port integer,
       event_time timestamp with time zone,
       txid_current character varying(128) COLLATE pg_catalog."default",
       message text COLLATE pg_catalog."default"
    );
  4. Execute the following statements to create a function that obtains the DDL information:
    CREATE FUNCTION public.dts_capture_ddl()
        RETURNS event_trigger
        LANGUAGE 'plpgsql'
        COST 100
        VOLATILE NOT LEAKPROOF SECURITY DEFINER
    AS $BODY$
      declare ddl_text text;
      declare max_rows int := 10000;
      declare current_rows int;
      declare pg_version_95 int := 90500;
      declare pg_version_10 int := 100000;
      declare current_version int;
      declare object_id varchar;
      declare alter_table varchar;
      declare record_object record;
      declare message text;
      declare pub RECORD;
    begin
    
      select current_query() into ddl_text;
    
      if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL;
        show server_version_num into current_version;
        if current_version >= pg_version_95 then
          for record_object in (select * from pg_event_trigger_ddl_commands()) loop
            if record_object.command_tag = 'CREATE TABLE' then
              object_id := record_object.object_identity;
            end if;
          end loop;
        else
          select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id;
        end if;
        if object_id = '' or object_id is null then
          message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query();
        else
          alter_table := 'ALTER TABLE ' || object_id || ' REPLICA IDENTITY FULL';
          message := 'alter_sql=' || alter_table;
          execute alter_table;
        end if;
        if current_version >= pg_version_10 then
          for pub in (select * from pg_publication where pubname like 'dts_sync_%') loop
            raise notice 'pubname=%',pub.pubname;
            BEGIN
              execute 'alter publication ' || pub.pubname || ' add table ' || object_id;
            EXCEPTION WHEN OTHERS THEN
            END;
          end loop;
        end if;
      end if;
    
      insert into public.dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message)
      values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message);
    
      select count(id) into current_rows from public.dts_ddl_command;
      if current_rows > max_rows then
        delete from public.dts_ddl_command where id in (select min(id) from public.dts_ddl_command);
      end if;
    end
    $BODY$;
  5. Change the owner of the function to the account that is used to connect to the source database, for example, postgresql.
    ALTER FUNCTION public.dts_capture_ddl()
        OWNER TO postgres;
  6. Execute the following statements to create a global event trigger:
    CREATE EVENT TRIGGER dts_intercept_ddl ON ddl_command_end
    EXECUTE PROCEDURE public.dts_capture_ddl();

What to do next

Configure a data migration task. For more information, see the following topics:
Note After the data migration task is released, you must log on to the source PostgreSQL database and execute the following statements to delete the trigger and function.
drop EVENT trigger dts_intercept_ddl;
drop function public.dts_capture_ddl();
drop table public.dts_ddl_command;