在使用DTS執行PostgreSQL資料庫間的資料移轉前,可通過本文介紹的方法在源庫建立觸發器和函數擷取源庫的DDL資訊,然後再由DTS執行資料移轉,在增量資料移轉階段即可實現DDL操作的增量遷移。
前提條件
源庫需滿足以下要求:
如果源庫為自建PostgreSQL,則資料庫版本需大於等於9.4。
如果源庫為RDS PostgreSQL,則RDS PostgreSQL執行個體版本需大於等於10。
RDS PostgreSQL 9.4暫不支援建立事件觸發程序,因此無法實現此功能。
RDS PostgreSQL 10、11和12核心版本需大於等於20201130。
RDS PostgreSQL 13核心版本需大於等於20210228。
說明升級RDS PostgreSQL核心版本,請參見升級核心小版本。
資料移轉任務需在2020年10月1日之後建立。
背景資訊
通過DTS執行PostgreSQL資料庫間的資料移轉時,在增量資料移轉階段,DTS僅支援DML操作(INSERT、DELETE、UPDATE)的同步,不支援DDL操作的同步。
通過本文的方法先在源庫中建立觸發器和函數來捕獲DDL資訊,再由DTS執行資料移轉,即可實現DDL操作的同步。
僅支援表層級DDL操作的同步:CREATE TABLE、DROP TABLE、ALTER TABLE(包含RENAME TABLE、ADD COLUMN、DROP COLUMN)。
操作步驟
如果源庫中有多個資料庫需要執行增量資料移轉,您需要重複執行步驟2到步驟5。
登入源PostgreSQL資料庫,相關方法請參見串連PostgreSQL執行個體或psql工具介紹。
切換至待遷移的資料庫。
說明本案例以psql工具為例介紹,您可以使用
\c <資料庫名>
命令來切換資料庫,例如\c dtststdata
。執行下述命令建立存放DDL資訊的表。
CREATE TABLE public.dts_ddl_command ( ddl_text text COLLATE pg_catalog."default", id bigserial primary key, event text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", username character varying COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", object_name character varying COLLATE pg_catalog."default", client_address character varying COLLATE pg_catalog."default", client_port integer, event_time timestamp with time zone, txid_current character varying(128) COLLATE pg_catalog."default", message text COLLATE pg_catalog."default" );
執行下述命令建立捕獲DDL資訊的函數。
CREATE FUNCTION public.dts_capture_ddl() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ declare ddl_text text; declare max_rows int := 10000; declare current_rows int; declare pg_version_95 int := 90500; declare pg_version_10 int := 100000; declare current_version int; declare object_id varchar; declare alter_table varchar; declare record_object record; declare message text; declare pub RECORD; begin select current_query() into ddl_text; if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL; show server_version_num into current_version; if current_version >= pg_version_95 then for record_object in (select * from pg_event_trigger_ddl_commands()) loop if record_object.command_tag = 'CREATE TABLE' then object_id := record_object.object_identity; end if; end loop; else select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id; end if; if object_id = '' or object_id is null then message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query(); else alter_table := 'ALTER TABLE ' || object_id || ' REPLICA IDENTITY FULL'; message := 'alter_sql=' || alter_table; execute alter_table; end if; if current_version >= pg_version_10 then for pub in (select * from pg_publication where pubname like 'dts_sync_%') loop raise notice 'pubname=%',pub.pubname; BEGIN execute 'alter publication ' || pub.pubname || ' add table ' || object_id; EXCEPTION WHEN OTHERS THEN END; end loop; end if; end if; insert into public.dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message) values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message); select count(id) into current_rows from public.dts_ddl_command; if current_rows > max_rows then delete from public.dts_ddl_command where id in (select min(id) from public.dts_ddl_command); end if; end $BODY$;
將剛建立的函數的所有者修改為DTS串連源庫的帳號,以postgresql為例。
ALTER FUNCTION public.dts_capture_ddl() OWNER TO postgres;
執行下述命令建立全域事件觸發程序。
CREATE EVENT TRIGGER dts_intercept_ddl ON ddl_command_end EXECUTE PROCEDURE public.dts_capture_ddl();
後續步驟
根據源庫的版本,配置增量資料移轉任務。更多資訊,請參見自建PostgreSQL遷移至RDS PostgreSQL。
遷移類型僅需勾選增量遷移。
資料移轉任務釋放後,您需要登入源PostgreSQL資料庫,執行下述命令刪除觸發器和函數。
drop EVENT trigger dts_intercept_ddl; drop function public.dts_capture_ddl(); drop table public.dts_ddl_command;