All Products
Search
Document Center

ApsaraDB for OceanBase:Create a trigger

Last Updated:Jan 03, 2024

If a data definition language (DDL) operation, such as adding a field to a table, is performed on the source of a data transmission project, you can use a trigger to capture information about the DDL event to ensure normal synchronization of data manipulation language (DML) operations.

Background information

When you use the data transmission service for incremental synchronization from a PostgreSQL database, only DML operations, including INSERT, DELETE, and UPDATE, can be synchronized. However, if a DDL operation, such as adding a field to a table, is performed on the source of a project, the data transmission service cannot obtain the new table schema. As a result, DML operations related to this table object will fail to be synchronized, thereby interrupting the project. Therefore, you must first create DDL record tables and triggers at the source database to capture DDL operation information to ensure normal synchronization of DML operations.

Limitations

The version of the source PostgreSQL database must be V10.x or later.

Procedure

  1. Log on to the source and switch to the database to be migrated.

    For example, run the \c <database name> command in the PSQL tool to switch to the database to be migrated.

  2. Create two tables for recording DDL change events.

    • Non-DROP DDL event information table

      ----postgreSQL DDL EVENT TABLE---- 
      
          CREATE TABLE public.oms_non_dropped_ddl_command (
          id bigserial primary key,
          ddl_text text COLLATE pg_catalog."default",
          tag text COLLATE pg_catalog."default",
          database character varying COLLATE pg_catalog."default",
          schema character varying COLLATE pg_catalog."default",
          object_type character varying COLLATE pg_catalog."default",
          objid integer,
          top_objid integer,
          object_identity text COLLATE pg_catalog."default",
          top_object_identity text COLLATE pg_catalog."default",
          top_schema text COLLATE pg_catalog."default",
          all_children_table text COLLATE pg_catalog."default"
          );
          ALTER TABLE public.oms_non_dropped_ddl_command REPLICA IDENTITY FULL;
    • DROP DDL event information table

      ----postgreSQL DROP DDL EVENT TABLE----
      
          CREATE TABLE public.oms_dropped_ddl_command
          (
          id bigserial primary key,
          ddl_text text COLLATE pg_catalog."default",
          tag text COLLATE pg_catalog."default",
          database character varying COLLATE pg_catalog."default",
          schema character varying COLLATE pg_catalog."default",
          objid integer,
          object_type text COLLATE pg_catalog."default",
          object_name text COLLATE pg_catalog."default"
          );
          ALTER TABLE public.oms_dropped_ddl_command REPLICA IDENTITY FULL;
  3. Create two event trigger functions.

    • Non-DROP DDL event trigger function

        CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_non_dropped()
          RETURNS event_trigger
          LANGUAGE 'plpgsql'
          COST 100
          VOLATILE NOT LEAKPROOF SECURITY DEFINER
          AS $BODY$
          DECLARE ddl_text text;
          DECLARE record_object record;
          DECLARE obj record;
          DECLARE top_objid integer;
          DECLARE parent_oid integer;
          DECLARE top_object_identity text; 
          DECLARE top_schema text;
          DECLARE all_children_table text;
      
          BEGIN
          SELECT current_query() INTO ddl_text;
          FOR obj IN (SELECT * FROM pg_event_trigger_ddl_commands() WHERE TG_TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA')) LOOP
      	   record_object:=obj;
      	   SELECT inhparent FROM pg_inherits WHERE inhrelid = obj.objid
      	   AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO parent_oid;
             WHILE parent_oid IS NOT NULL LOOP
              SELECT inhparent
              FROM pg_inherits
              WHERE inhrelid = parent_oid
              AND inhparent::regclass::text NOT LIKE 'pg_%'
              ORDER BY inhseqno DESC
              LIMIT 1
              INTO top_objid;
      
              IF top_objid IS NULL THEN
                  top_objid := parent_oid;
                 exit;
              END IF;
              parent_oid := top_objid;
          END LOOP;
          IF top_objid IS NULL THEN
          top_objid :=record_object.objid;
          ELSE
          END IF;
          IF regexp_matches(ddl_text, 'ALTER\s+TABLE\s+.+\s+RENAME\s+TO\s+.+', 'i') IS NOT NULL THEN
          WITH RECURSIVE child_tables AS (
          SELECT oid AS table_objid
          FROM pg_class
          WHERE oid = top_objid
          UNION
              SELECT c.oid AS table_objid
          FROM pg_class c
          JOIN pg_inherits i ON c.oid = i.inhrelid
          JOIN child_tables ct ON i.inhparent = ct.table_objid
          )
          SELECT string_agg(table_objid::text, ', ') INTO all_children_table FROM child_tables WHERE table_objid <> top_objid;
          ELSE
          END IF;
          SELECT pn.nspname, ss.relname INTO obj from pg_catalog.pg_class ss JOIN pg_catalog.pg_namespace pn ON ss.relnamespace = pn.oid WHERE ss.oid = top_objid;
          top_object_identity:=obj.relname;
          top_schema:=obj.nspname;
          IF TG_TAG ='CREATE TABLE' AND record_object.object_type='table' THEN
          EXECUTE 'ALTER TABLE ' || record_object.object_identity || ' REPLICA IDENTITY FULL';
          ELSE
          END IF;
          INSERT INTO public.oms_non_dropped_ddl_command(id,ddl_text,tag,database,schema,object_type, objid, top_objid, object_identity,top_object_identity,top_schema,all_children_table)
          VALUES (default,ddl_text, TG_TAG,current_database(),current_schema,record_object.object_type, record_object.objid, top_objid ,record_object.object_identity,top_object_identity,top_schema,all_children_table);
          END LOOP;
          EXCEPTION 
          WHEN OTHERS THEN
              RAISE LOG 'OMS ddl trriger error during command process: %', SQLERRM;
          END
          $BODY$;
    • DROP DDL event trigger function

          CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_dropped()
          RETURNS event_trigger
          LANGUAGE 'plpgsql'
          COST 100
          VOLATILE NOT LEAKPROOF SECURITY DEFINER
          AS $BODY$
          DECLARE ddl_text text;
          DECLARE record_object record;
          BEGIN
          SELECT current_query() INTO ddl_text;
          FOR record_object in (select * from pg_event_trigger_dropped_objects() WHERE TG_TAG NOT IN ('ALTER TABLE','DROP PUBLICATION','ALTER PUBLICATION')) LOOP
          IF record_object.object_type = 'type' THEN
          ELSE
          INSERT INTO public.oms_dropped_ddl_command(id,ddl_text,tag,database,schema, objid, object_type, object_name)
          VALUES (default,ddl_text, TG_TAG,current_database(),current_schema, record_object.objid, record_object.object_type, record_object.object_name);
          END IF;
          END LOOP;
          EXCEPTION
          WHEN OTHERS THEN
          RAISE LOG 'OMS drop ddl trriger error during command process: %', SQLERRM;
          END
          $BODY$;
  4. Change the owner of the created functions to the account used by the data transmission service to connect to the source database. In this example, postgres as an example.

    ALTER FUNCTION public.oms_capture_ddl_for_non_dropped()
        OWNER TO postgres;
        ALTER FUNCTION public.oms_capture_ddl_for_dropped()
        OWNER TO postgres;
  5. Execute the following statements to create global event triggers:

        CREATE EVENT TRIGGER oms_capture_ddl_for_non_dropped
        ON ddl_command_end
        WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA')
        EXECUTE PROCEDURE public.oms_capture_ddl_for_non_dropped();
    
        CREATE EVENT TRIGGER oms_capture_ddl_for_dropped ON sql_drop EXECUTE PROCEDURE public.oms_capture_ddl_for_dropped();

What to do next

After the data migration project is released, you must log on to the source database and run the following commands to drop the trigger functions and corresponding tables:

drop EVENT trigger oms_capture_ddl_for_dropped;
drop EVENT trigger oms_capture_ddl_for_non_dropped;
drop function public.oms_capture_ddl_for_non_dropped();
drop function public.oms_capture_ddl_for_dropped();
drop table public.oms_non_dropped_ddl_command;
drop table public.oms_dropped_ddl_command;