PolarDB for PostgreSQL (compatible with Oracle) と互換性のあるPolarDBO Change Data Capture (CDC) コネクタを使用して、PolarDB for PostgreSQL (Compatible with Oracle) データベースの完全なスナップショットデータとデータ変更を順番に読み取ることができます。 機能と使用方法の詳細については、コミュニティのPostgres CDCドキュメントを参照してください。
PolarDB for PostgreSQL (Compatible with Oracle) は、少数のデータタイプと組み込みオブジェクトのみがコミュニティPostgreSQLと異なります。 このトピックでは、コミュニティPostgres CDCに基づいて最小限のコード変更でPolarDB for PostgreSQL (Compatible with Oracle) をサポートするPolarDBO Flink CDCコネクタをパッケージ化する方法について説明します。
PolarDB for PostgreSQL (Oracleと互換) は64ビットのDATE型を使用し、コミュニティPostgreSQLは32ビットのDATE型を使用します。 PolarDBO Flink CDCコネクタは、64ビットDATEタイプのデータと互換性があります。
PolarDBO Flink CDCコネクタのパッケージ化
PolarDBO Flink CDCコネクタは、コミュニティPostgres CDCに基づいて開発されています。 ご自身でパッケージ化したコネクタまたはこのドキュメントで提供されているJARパッケージには、サービスレベル契約 (SLA) の保証はありません。
前提条件
Flink CDCのバージョンを確認する
Realtime Compute for Apache Flinkを使用している場合は、対応するVerverica Runtime (VVR) バージョンの互換性のあるコミュニティFlink CDCバージョンを確認します。 詳細については、「CDCおよびVVRバージョン対応」をご参照ください。
説明Flink CDCコードリポジトリについては、「Flink CDC on GitHub」をご参照ください。
Debeziumのバージョンを確認する
Flink CDCの
pom.xml
ファイルで、debezium.version
キーワードを検索し、Debeziumのバージョンを確認します。説明Debeziumコードリポジトリについては、「Debezium on GitHub」をご参照ください。
PgJDBCバージョンの確認
Postgres CDCの
pom.xml
ファイルで、org.postgresql
キーワードを検索し、PgJDBCのバージョンを確認します。説明リリース3.0より前のバージョンの場合、ファイルパスは
flink-connector-postgres-cdc/pom.xml
です。リリース3.0以降のバージョンの場合、ファイルパスは
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
です。PgJDBCコードリポジトリについては、「PgJDBC on GitHub」をご参照ください。
手順
リリース用-3.1
コミュニティFlink-CDCリリース3.1バージョンは、Realtime Compute for Apache Flink vvr-8.0.x-flink-1.17と互換性があります。
対応するバージョンのPolarDBO Flink CDCコネクタをパッケージ化するには、次の手順を実行します。
確認したバージョンに対応するバージョンのFlink CDC、Debezium、およびPgJDBCのコードリポジトリをクローンします。
git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
DebeziumおよびPgJDBCからFlink CDCに必要なファイルをコピーします。
mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
PolarDB for PostgreSQL (Oracle互換) のパッチファイルを適用します。
git apply release-3.1_support_polardbo.patch
説明前の例では、次のPolarDBO Flink CDC互換パッチファイルを使用しています。release-3.1_support_polardbo.patch。
Mavenを使用してPolarDBO Flink CDCコネクタをパッケージ化します。
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true # You can find the jar package in the target directory of flink-sql-connector-postgres-cdc
PolarDBO Flink CDCコネクタのJARファイルは、JDK8に基づいて生成されます: flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar。
リリース2.3用
コミュニティFlink-CDCリリース2.3バージョンは、vvr-4.0.15-flink-1.13からvvr-6.0.2-flink-1.15までRealtime Compute for Apache Flinkと互換性があります。
対応するバージョンのPolarDBO Flink CDCコネクタをパッケージ化するには、次の手順を実行します。
確認したバージョンに対応するバージョンのFlink CDC、Debezium、およびPgJDBCのコードリポジトリをクローンします。
git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.git
DebeziumおよびPgJDBCからFlink-CDCに必要なファイルをコピーします。
mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
PolarDB for PostgreSQL (Oracle互換) のパッチファイルを適用します。
git apply release-2.3_support_polardbo.patch
説明前の例では、次のPolarDBO Flink CDC互換パッチファイルを使用しています。release-2.3_support_polardbo.patch。
Mavenを使用してPolarDBO Flink CDCコネクタをパッケージ化します。
mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true # You can find the jar package in the target directory of flink-sql-connector-postgres-cdc
PolarDBO Flink CDCコネクタのJARファイルは、JDK8に基づいて生成されます: flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar。
使用法
PolarDBO Flink CDCコネクタを使用して、PolarDB for PostgreSQL (Compatible with Oracle) データベースの論理レプリケーション機能を使用してデータ変更を読み取る前に、次の要件が満たされていることを確認してください。
wal_levelパラメーターの値はlogicalに設定されます。これは、論理レプリケーションに必要な情報が先行書き込みログ (WAL) ログに書き込まれることを指定します。
説明PolarDBコンソールでwal_levelパラメーターを設定できます。 詳細については、「クラスターパラメーターの設定」トピックの「手順」をご参照ください。 このパラメーターを変更すると、クラスターが再起動します。 作業は慎重に行ってください。
REPLICA IDENTITY
パラメーターは、ALTER table schema. TABLE REPLICA IDENTITY FULL;
ステートメントを実行して、サブスクライブされたテーブルのFULL
に設定され、レプリケーション中のテーブルのデータの一貫性を確保します。 FULLの値は、INSERTおよびUPDATE操作のイベントに、テーブル内のすべての列の前の値が含まれることを指定します。説明REPLICA IDENTITYは、PostgreSQL固有のテーブルレベル設定で、UPDATEイベントとDELETEイベントが発生したときに、関連するテーブル列の以前の値が論理デコーディングプラグインで使用できるかどうかを確認します。 REPLICA IDENTITYパラメーターの値の説明の詳細については、「REPLICA IDENTITYに関するDebeziumドキュメント」をご参照ください。
サブスクライブ済みテーブルの
REPLICA IDENTITY
パラメーターをFULL
に設定するには、テーブルをロックする必要があります。これはビジネスに影響を与える可能性があります。 作業は慎重に行ってください。 次のステートメントを実行して、パラメーターがFULL
に設定されているかどうかを確認できます。SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
max_wal_sendersパラメーターとmax_replication_slotsパラメーターの値は、データベースで占有されているレプリケーションスロットの数と、Realtime Compute for Apache Flinkドラフトに必要なレプリケーションスロットの数よりも大きくなっています。
次のアカウントのいずれかが使用されます。完全なデータクエリのために、サブスクライブされたテーブルに対するLOGINおよびREPLICATION権限とSELECT権限を持つ特権アカウントまたはデータベースアカウント。
コネクタがPolarDBクラスターのプライマリエンドポイントに接続されています。 クラスターエンドポイントは論理レプリケーションをサポートしていません。
Polardb Flink CDCコネクタとPostgres CDCの違い
PolarDBO Flink CDCコネクタは、Postgres CDCに基づいて開発およびパッケージされています。 Postgres CDCコネクタの構文とパラメーターについては、「Postgres CDCドキュメント」をご参照ください。 次のセクションでは、2つのコネクタの主な違いについて説明します。
PolarDBO Flink CDCコネクタのWITH句のconnectorパラメーターは、
polardbo-cdc
に設定する必要があります。PolarDBO Flink CDCは、PolarDB for PostgreSQL、PolarDB for PostgreSQL (Oracle互換) 1.0、およびPolarDB for PostgreSQL (Oracle互換) 2.0と互換性があります。
説明PolarDB for PostgreSQLにPostgres CDCコネクタを使用することを推奨します。
PolarDB for PostgreSQL (Compatible with Oracle) 1.0および2.0の
DATE
型の列は、Flinkサービスのソーステーブルおよびシンクテーブルのtimestamp
型にマップする必要があります。decoding.plugin.name
パラメーターをpgoutput
に設定することを推奨します。 そうしないと、UTF-8エンコードされていないデータベースの増分解析で文字化けが発生する可能性があります。 詳細については、「コミュニティドキュメント」をご参照ください。
型マッピング
次の表に、PolarDB for PostgreSQLとFlinkの間のデータ型マッピングを示します。 マッピングは、DATE型を除いて、コミュニティPostgreSQLとFlinkのデータ型間のマッピングと同じです。
PolarDB for PostgreSQLでサポートされているデータ型 | Flinkがサポートするデータ型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
シリアル | |
BIGINT | BIGINT |
ビッグシリーズ | |
REAL | FLOAT |
フロート4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | デシマル (p, s) |
デシマル (p, s) | |
BOOLEAN | BOOLEAN |
日付 |
|
TIME [(p)] [タイムゾーンなし] | TIME [(p)] [タイムゾーンなし] |
タイムスタンプ [(p)] [タイムゾーンなし] | タイムスタンプ [(p)] [タイムゾーンなし] |
CHAR(n) | STRING |
キャラクター (n) | |
VARCHAR(n) | |
キャラクターの評価 (n) | |
TEXT | |
BYTEA | BYTES |
例
次の例では、PolarDBO Flink CDCコネクタを使用して、flink_sourceデータベースの出荷テーブルのデータを、PolarDB for PostgreSQL (Compatible with Oracle) 2.0クラスターのflink_sinkデータベースのshipments_sinkテーブルに同期する方法について説明します。
次の例は、パッケージ化されたPolarDBO Flink CDCコネクタがPolarDB for PostgreSQL (Compatible with Oracle) クラスターで動作することを確認するための基本的なテストとしてのみ機能します。 運用環境については、公式のPostgres CDCドキュメントを参照し、ビジネス要件に基づいてパラメーターを設定します。
準備をします。
PolarDB for PostgreSQL (Oracle互換) クラスターの準備をします。
PolarDB購入ページで、PolarDB for PostgreSQL (Compatible with Oracle) 2.0クラスターを購入します。
特権アカウントを作成します。 詳細については、「データベースアカウントの作成」トピックのアカウントの作成セクションをご参照ください。
クラスターのプライマリエンドポイントを申請します。 詳細については、「エンドポイントの表示または申請」をご参照ください。 PolarDBクラスターとRealtime Compute for Apache Flinkワークスペースが同じゾーンにある場合、クラスターのプライベートエンドポイントを使用できます。 それ以外の場合は、パブリックエンドポイントを申請する必要があります。 Realtime Compute for Apache FlinkワークスペースのエンドポイントをPolarDBクラスターのIPアドレスホワイトリストに追加します。 詳細については、「クラスターのホワイトリストの設定」をご参照ください。
PolarDBコンソールで、flink_sourceという名前のソースデータベースとflink_sinkという名前のターゲットデータベースを作成します。 詳細については、「データベース管理」をご参照ください。
次のステートメントを実行して、frink_sourceデータベースにshipmentsという名前のテーブルを作成し、テーブルにデータを書き込みます。
CREATE TABLE public.shipments ( shipment_id INT, order_id INT, origin TEXT, destination TEXT, is_arrived BOOLEAN, order_time DATE, PRIMARY KEY (shipment_id) ); ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();
次のステートメントを実行して、flink_sinkデータベースにshipments_sinkという名前のテーブルを作成します。
CREATE TABLE public.shipments_sink ( shipment_id INT, order_id INT, origin TEXT, destination TEXT, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) );
Realtime Compute for Apache Flinkワークスペースの準備をします。
Realtime Compute for Apache Flinkコンソールにログインし、Realtime Compute for Apache Flinkワークスペースを購入します。 詳細については、「Realtime Compute For Apache Flinkの有効化」をご参照ください。
説明Realtime Compute for Apache FlinkワークスペースにPolarDBクラスターと同じリージョンと仮想プライベートクラウド (VPC) を設定することを推奨します。 この場合、PolarDBクラスターのプライマリプライベートエンドポイントを、Realtime Compute for Apache Flinkワークスペースのエンドポイントとして使用できます。
[カスタムコネクタの作成] をクリックします。 [カスタムコネクタの作成] ダイアログボックスで、[JARの提供] ステップのPolarDBO Flink CDCパッケージをアップロードします。 [コネクタの表示] ステップで、[フォーマット] ドロップダウンリストから [debezium-json] を選択します。 詳細については、「カスタムコネクタの管理」をご参照ください。
Flinkドラフトの作成
Realtime Compute for Apache Flinkコンソールにログインし、SQLドラフトを作成します。 詳細については、「SQLドラフトの作成」をご参照ください。 次のFlink SQL文を実行して、PolarDBクラスターのプライマリエンドポイント、ポート番号、データベースアカウント名、およびデータベースアカウントのパスワードを変更します。
説明PolarDB for PostgreSQL (Oracleと互換) は、64ビットのDATEタイプを使用します。 Flink SQLおよびその他のほとんどのデータベースは、32ビットのDATE型を使用します。 したがって、ソーステーブルのDATE型の列は、Flink SQLの_sourceテーブルと_sinkテーブルの両方のTIMESTAMP型の列にマッピングする必要があります。 それ以外の場合、ドラフトは次のようなエラーで失敗する可能性があります。
“java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471): 1720891573000”
。CREATE TEMPORARY TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'polardbo-cdc', 'hostname' = '<yourHostname>', 'port' = '<yourPort>', 'username' = '<yourUserName>', 'password' = '<yourPassWord>', 'database-name' = 'flink_source', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' ); CREATE TEMPORARY TABLE shipments_sink ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, order_time TIMESTAMP, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink', 'table-name' = 'shipments_sink', 'username' = '<yourUserName>', 'password' = '<yourPassWord>' ); INSERT INTO shipments_sink SELECT * FROM shipments;
ドラフトを展開して開始します。
結果をテストして確認します。
ドラフトのステータスがRUNNINGに変更された後、次のステートメントを実行して、flink_sourceデータベースの出荷テーブルのデータがflink_sinkデータベースのshipments_sinkテーブルと同期されていることを確認します。
SELECT * FROM public.shipments_sink;
サンプル結果:
shipment_id | order_id | origin | destination | is_arrived | order_time -------------+----------+--------+-------------+------------+--------------------- 1 | 1 | test1 | test1 | f | 2024-09-18 05:45:08 (1 row)
flink_sourceデータベースの出荷テーブルで次のDMLステートメントを実行します。 データ変更は、リアルタイムで同期されることが期待されます。
INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now(); UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1; DELETE FROM public.shipments WHERE shipment_id = 2; INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now(); UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;
次のステートメントを実行して、出荷テーブルのデータ変更がflink_sinkデータベースのshipments_sinkテーブルに同期されていることを確認します。
SELECT * FROM public.shipments_sink;
サンプル結果:
shipment_id | order_id | origin | destination | is_arrived | order_time -------------+----------+--------+-------------+------------+--------------------- 1 | 1 | test1 | test1 | t | 2024-09-18 05:45:08 3 | 3 | test3 | test3 | t | 2024-09-18 07:33:23 (2 rows)