本文介绍如果数据传输源端发生了 DDL 变更(例如新增表字段),您如何通过触发器来捕获 DDL 相关事件信息,以确保 DML 变更操作正常同步。
背景信息
使用数据传输执行源端 PostgreSQL 数据库增量同步时,默认仅支持 DML 语句(包括 INSERT
、DELETE
和 UPDATE
)的同步。但如果源端发生了 DDL 变更(例如新增表字段),而数据传输无法获取新的结构信息,将导致后续和该对象相关的 DML 同步失败,造成任务中断的问题。所以,您需要先在源端 PostgreSQL 数据库创建 DDL 记录表和触发器来捕获 DDL 信息,以确保 DML 的正常同步。
使用限制
源端 PostgreSQL 数据库必须为 V10.x 及以上版本。
操作步骤
登录源端数据库后,切换至待迁移数据库。
以 PSQL 工具为例,通过执行命令
\c <数据库名称>
,切换至待迁移数据库。创建下述两张表,用于记录 DDL 事件变更。
非 DROP DDL 事件信息表
----postgreSQL DDL EVENT TABLE---- CREATE TABLE public.oms_non_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", objid integer, top_objid integer, object_identity text COLLATE pg_catalog."default", top_object_identity text COLLATE pg_catalog."default", top_schema text COLLATE pg_catalog."default", all_children_table text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_non_dropped_ddl_command REPLICA IDENTITY FULL;
DROP DDL 事件信息表
----postgreSQL DROP DDL EVENT TABLE---- CREATE TABLE public.oms_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", objid integer, object_type text COLLATE pg_catalog."default", object_name text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_dropped_ddl_command REPLICA IDENTITY FULL;
创建下述两种事件触发器函数。
非 DROP DDL 事件触发器函数
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_non_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; DECLARE obj record; DECLARE top_objid integer; DECLARE parent_oid integer; DECLARE top_object_identity text; DECLARE top_schema text; DECLARE all_children_table text; BEGIN SELECT current_query() INTO ddl_text; FOR obj IN (SELECT * FROM pg_event_trigger_ddl_commands() WHERE TG_TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA')) LOOP record_object:=obj; SELECT inhparent FROM pg_inherits WHERE inhrelid = obj.objid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO parent_oid; WHILE parent_oid IS NOT NULL LOOP SELECT inhparent FROM pg_inherits WHERE inhrelid = parent_oid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO top_objid; IF top_objid IS NULL THEN top_objid := parent_oid; exit; END IF; parent_oid := top_objid; END LOOP; IF top_objid IS NULL THEN top_objid :=record_object.objid; ELSE END IF; IF regexp_matches(ddl_text, 'ALTER\s+TABLE\s+.+\s+RENAME\s+TO\s+.+', 'i') IS NOT NULL THEN WITH RECURSIVE child_tables AS ( SELECT oid AS table_objid FROM pg_class WHERE oid = top_objid UNION SELECT c.oid AS table_objid FROM pg_class c JOIN pg_inherits i ON c.oid = i.inhrelid JOIN child_tables ct ON i.inhparent = ct.table_objid ) SELECT string_agg(table_objid::text, ', ') INTO all_children_table FROM child_tables WHERE table_objid <> top_objid; ELSE END IF; SELECT pn.nspname, ss.relname INTO obj from pg_catalog.pg_class ss JOIN pg_catalog.pg_namespace pn ON ss.relnamespace = pn.oid WHERE ss.oid = top_objid; top_object_identity:=obj.relname; top_schema:=obj.nspname; IF TG_TAG ='CREATE TABLE' AND record_object.object_type='table' THEN EXECUTE 'ALTER TABLE ' || record_object.object_identity || ' REPLICA IDENTITY FULL'; ELSE END IF; INSERT INTO public.oms_non_dropped_ddl_command(id,ddl_text,tag,database,schema,object_type, objid, top_objid, object_identity,top_object_identity,top_schema,all_children_table) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema,record_object.object_type, record_object.objid, top_objid ,record_object.object_identity,top_object_identity,top_schema,all_children_table); END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS ddl trriger error during command process: %', SQLERRM; END $BODY$;
DROP DDL 事件触发器函数
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; BEGIN SELECT current_query() INTO ddl_text; FOR record_object in (select * from pg_event_trigger_dropped_objects() WHERE TG_TAG NOT IN ('ALTER TABLE','DROP PUBLICATION','ALTER PUBLICATION')) LOOP IF record_object.object_type = 'type' THEN ELSE INSERT INTO public.oms_dropped_ddl_command(id,ddl_text,tag,database,schema, objid, object_type, object_name) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema, record_object.objid, record_object.object_type, record_object.object_name); END IF; END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS drop ddl trriger error during command process: %', SQLERRM; END $BODY$;
将创建函数的所有者修改为数据传输连接源端数据库的账号。此处以
postgres
为例:ALTER FUNCTION public.oms_capture_ddl_for_non_dropped() OWNER TO postgres; ALTER FUNCTION public.oms_capture_ddl_for_dropped() OWNER TO postgres;
执行下述命令,创建全局事件触发器。
CREATE EVENT TRIGGER oms_capture_ddl_for_non_dropped ON ddl_command_end WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA') EXECUTE PROCEDURE public.oms_capture_ddl_for_non_dropped(); CREATE EVENT TRIGGER oms_capture_ddl_for_dropped ON sql_drop EXECUTE PROCEDURE public.oms_capture_ddl_for_dropped();
后续操作
释放数据迁移任务后,您需要登录源端数据库,执行下述命令删除触发器函数和对应表。
drop EVENT trigger oms_capture_ddl_for_dropped;
drop EVENT trigger oms_capture_ddl_for_non_dropped;
drop function public.oms_capture_ddl_for_non_dropped();
drop function public.oms_capture_ddl_for_dropped();
drop table public.oms_non_dropped_ddl_command;
drop table public.oms_dropped_ddl_command;