相容PolarDB PostgreSQL版(相容Oracle)的Flink CDC連接器(簡稱PolarDBO Flink CDC)可用於依次讀取PolarDB PostgreSQL版(相容Oracle)資料庫全量快照資料和變更資料,具體功能及用法請參考社區Postgres CDC。
由於PolarDB PostgreSQL版(相容Oracle)與社區PoatgreSQL僅在少量資料類型和內建對象處理存在差異,本文為您介紹如何基於社區Postgres CDC,通過少量代碼適配打包出支援PolarDB PostgreSQL版(相容Oracle)的PolarDBO Flink CDC連接器。
PolarDB PostgreSQL版(相容Oracle)的DATE類型是64位,而社區PostgreSQL的DATE類型為32位。因此,在PolarDBO Flink CDC中會對DATA類型資料的處理進行適配。
打包PolarDBO Flink CDC連接器
PolarDBO Flink CDC連接器基於社區Postgres CDC適配開發,無論是您自行打包,還是使用本文中提供的JAR包,PolarDBO Flink CDC連接器都不提供SLA保障。
操作前提
確定Flink-CDC版本
如果您使用的是阿里雲Realtime Compute for Apache Flink,需要確認與對應Ververica Runtime(簡稱VVR)版本相容的社區Flink-CDC版本,具體可以參考CDC與VVR版本對應關係。
說明Flink-CDC代碼倉庫請參考Flink-CDC。
確定Debezium版本
在對應版本的Flink-CDC的
pom.xml
中通過尋找關鍵字debezium.version
確定Debezium版本。說明Debezium代碼倉庫請參考Debezium。
確定PgJDBC版本
在對應版本的Postgres-CDC的
pom.xml
中通過尋找關鍵字org.postgresql
確定PgJDBC版本。說明release-3.0以下版本檔案路徑為:
flink-connector-postgres-cdc/pom.xml
。release-3.0及以上版本檔案路徑為:
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
。PgJDBC代碼倉庫請參考PgJDBC。
操作步驟
release-3.1打包
社區Flink-CDC release-3.1版本相容阿里雲Realtime Compute for Apache Flink的vvr-8.0.x-flink-1.17。
打包對應版本的PolarDBO Flink CDC連接器步驟如下:
複製對應版本的Flink-CDC、Debezium和PgJDBC的代碼檔案。
git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
複製Debezium和PgJDBC部分檔案到Flink-CDC中。
mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
應用適配PolarDB PostgreSQL版(相容Oracle)的patch檔案。
git apply release-3.1_support_polardbo.patch
說明以上使用的PolarDBO Flink CDC相容patch檔案:release-3.1_support_polardbo.patch。
使用Maven打包PolarDBO Flink CDC連接器。
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true # 打包完成後可以在flink-sql-connector-postgres-cdc的target目錄中擷取到jar包
按照以上流程基於JDK8打包出PolarDBO Flink CDC連接器的JAR包:flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar。
release-2.3打包
社區Flink-CDC release-2.3版本相容阿里雲Realtime Compute for Apache Flink的vvr-4.0.15-flink-1.13 ~ vvr-6.0.2-flink-1.15。
打包對應版本的PolarDBO Flink CDC連接器步驟如下:
複製對應版本的Flink-CDC、Debezium和PgJDBC的代碼檔案。
git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.git
複製Debezium和PgJDBC部分檔案到Flink-CDC中。
mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
應用適配PolarDB PostgreSQL版(相容Oracle)的patch檔案。
git apply release-2.3_support_polardbo.patch
說明以上使用的PolarDBO Flink CDC相容patch檔案:release-2.3_support_polardbo.patch。
使用Maven打包PolarDBO Flink CDC連接器。
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true # 打包完成後可以在flink-sql-connector-postgres-cdc的target目錄中擷取到jar包
按照以上流程基於JDK8打包出PolarDBO Flink CDC連接器的JAR包:flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar。
使用說明
PolarDBO Flink CDC連接器通過PolarDB PostgreSQL版(相容Oracle)資料庫的邏輯複製讀取CDC變更流資料,需要滿足以下條件:
wal_level參數的值需設定為logical,即在預寫式日誌WAL(Write-ahead logging)中增加支援邏輯複製所需的資訊。
說明您可以通過控制台設定wal_level參數,詳細操作請參考設定叢集參數。修改該參數後叢集將會重啟,請在修改參數前做好業務安排,謹慎操作。
執行
ALTER TABLE schema.table REPLICA IDENTITY FULL;
命令設定訂閱表的REPLICA IDENTITY
為FULL
(發出的插入和更新操作事件包含表中所有列的舊值),以保障該表資料同步的一致性。說明REPLICA IDENTITY是PostgreSQL特有的表級設定,決定了邏輯解碼外掛程式在發生(INSERT)和更新(UPDATE)事件時,是否包含涉及的表列的舊值。REPLICA IDENTITY取值含義詳情,請參見REPLICA IDENTITY。
設定訂閱表的
REPLICA IDENTITY
為FULL
時可能需要鎖表,可能影響業務,請在修改參數前做好業務安排。您可以通過以下命令查看當前配置是否為FULL
:SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
需要確保max_wal_senders和max_replication_slots的參數值均大於當前資料庫複寫槽已使用數與Flink作業所需要的slot數量。
確保使用的是高許可權帳號或者同時擁有LOGIN和REPLICATION許可權,並且具有訂閱表的SELECT許可權用於全量資料查詢。
只能串連PolarDB叢集的主地址,叢集地址不支援邏輯複製。
PolarDBO Flink CDC連接器與Postgres CDC區別
PolarDBO Flink CDC連接器基於Postgres CDC打包,具體文法和參數可以參考Postgres CDC。但存在以下主要區別:
WITH的connector參數需要設定為固定值:
polardbo-cdc
。PolarDBO Flink CDC同時相容PolarDB PostgreSQL版各版本、PolarDB PostgreSQL版(相容Oracle) 1.0和PolarDB PostgreSQL版(相容Oracle) 2.0版本。
說明如果您使用的是PolarDB PostgreSQL版,推薦您直接使用社區Postgres CDC。
PolarDB PostgreSQL版(相容Oracle) 1.0、PolarDB PostgreSQL版(相容Oracle) 2.0中的
DATE
類型的列,Flink SQL中的source和sink表對應類型必須指定為timestamp
。建議將
decoding.plugin.name
參數設定為pgoutput
,否則非UTF-8編碼的資料庫可能會發生增量解析亂碼,詳細介紹請參考社區文檔。
類型映射
PolarDB PostgreSQL和Flink欄位類型映射,除DATE類型外,其他欄位類型和社區PostgreSQL完全相同,具體映射關係如下:
PolarDB PostgreSQL欄位類型 | Flink欄位類型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE |
|
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
樣本
以下樣本用於說明,如何通過PolarDBO Flink CDC,將PolarDB PostgreSQL版(相容Oracle) 2.0叢集中flink_source庫的shipments表,同步到flink_sink庫的shipments_sink表中。
以下樣本僅用於簡單驗證打包的PolarDBO Flink CDC能夠在PolarDB PostgreSQL版(相容Oracle)上運行。正式使用時,為滿足您的業務需求,請參考社區Postgres CDC配置參數。
前提準備
PolarDB PostgreSQL版(相容Oracle)準備
在PolarDB叢集購買頁面,購買PolarDB PostgreSQL版(相容Oracle) 2.0叢集。
建立高許可權賬戶,詳細操作請參考建立帳號。
擷取叢集主地址,詳細操作請參考查看串連地址。如果PolarDB叢集和Realtime Compute for Apache Flink在同一可用性區域,可直接使用私網地址,否則需要申請公網地址。將Flink執行個體地址添加到PolarDB叢集白名單中,請參考設定叢集白名單。
在控制台建立來源資料庫flink_source和目標資料庫flink_sink,詳細步驟請參考建立資料庫。
執行如下語句,在來源資料庫flink_source中建立shipments表,並寫入資料。
CREATE TABLE public.shipments ( shipment_id INT, order_id INT, origin TEXT, destination TEXT, is_arrived BOOLEAN, order_time DATE, PRIMARY KEY (shipment_id) ); ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();
執行如下語句,在目標資料庫flink_sink中建立shipments_sink表。
CREATE TABLE public.shipments_sink ( shipment_id INT, order_id INT, origin TEXT, destination TEXT, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) );
Realtime Compute for Apache Flink準備
登入Realtime Compute控制台,購買Realtime Compute for Apache Flink執行個體,詳細操作請參考開通Realtime ComputeFlink版。
說明建議Realtime Compute for Apache Flink的地區和專用網路和PolarDB叢集保持一致,串連地址可以直接使用PolarDB叢集主地址的私網地址。
建立自訂連接器,上傳打包好的PolarDBO Flink CDC,Formats選擇debezium-json,詳細步驟請參考建立自訂連接器。
建立Flink作業
登入Realtime Compute控制台,建立一個SQL作業草稿,請參考SQL作業開發。使用以下Flink SQL語句,修改PolarDB叢集主地址,連接埠,帳號和密碼。
說明PolarDB PostgreSQL版(相容Oracle)的DATE類型是64位,而Flink SQL以及大部分資料庫的DATE類型為32位。因此,源表中DATE類型的列,在Flink SQL的source和sink表中都必須要指定為TIMESTAMP類型。否則,作業會因為類型不符而報錯中斷,例如:
“java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471):1720891573000”
。CREATE TEMPORARY TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'polardbo-cdc', 'hostname' = '<yourHostname>', 'port' = '<yourPort>', 'username' = '<yourUserName>', 'password' = '<yourPassWord>', 'database-name' = 'flink_source', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' ); CREATE TEMPORARY TABLE shipments_sink ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink', 'table-name' = 'shipments_sink', 'username' = '<yourUserName>', 'password' = '<yourPassWord>' ); INSERT INTO shipments_sink SELECT * FROM shipments;
部署並啟動作業。
測試與驗證。
部署作業運行成功後,即狀態為運行中,shipments表中的資料已經同步到目標資料庫flink_sink的shipments_sink表。
SELECT * FROM public.shipments_sink;
返回結果如下:
shipment_id | order_id | origin | destination | is_arrived | order_time -------------+----------+--------+-------------+------------+--------------------- 1 | 1 | test1 | test1 | f | 2024-09-18 05:45:08 (1 row)
在來源資料庫flink_source的shipments表上執行DML,新增修改也將即時同步。
INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now(); UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1; DELETE FROM public.shipments WHERE shipment_id = 2; INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now(); UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;
shipments表中的資料已經同步更新到目標資料庫flink_sink的shipments_sink表。
SELECT * FROM public.shipments_sink;
返回結果如下:
shipment_id | order_id | origin | destination | is_arrived | order_time -------------+----------+--------+-------------+------------+--------------------- 1 | 1 | test1 | test1 | t | 2024-09-18 05:45:08 3 | 3 | test3 | test3 | t | 2024-09-18 07:33:23 (2 rows)