PolarDB for PostgreSQL (Oracle互換) 用に特別に設計されたDebeziumコネクタは、Debezium PolarDBOコネクタと呼ばれます。 Debezium PolarDBOコネクタは、PolarDB for PostgreSQL (Compatible with Oracle) データベース内の行レベルの変更をキャプチャし、データ変更イベントレコードを生成してKafkaトピックにストリーミングします。 特定の機能と使用方法については、「Debezium connector For PostgreSQL」をご参照ください。
PolarDB for PostgreSQL (Compatible with Oracle) とPostgreSQL Community Editionでは、データ型と組み込みオブジェクトの処理にわずかな違いがあります。 このトピックでは、最小限のコード変更で、Debezium PostgreSQLコネクタのコミュニティエディションに基づいてDebezium PolarDBOコネクタを構築する方法について説明します。
Debezium PolarDBOコネクタを構築する
Debezium PolarDBOコネクタは、Debezium PostgreSQLコネクタのコミュニティ版に基づいて開発されています。 Debezium PolarDBOコネクタは、コネクタを手動で構築するか、このトピックで提供するJARパッケージを使用するかにかかわらず、サービスレベル契約 (SLA) 保証を提供しません。
前提条件
Java環境の設定
DebeziumのすべてのバージョンはJava 11以降で実行する必要があります。 Debezium PolarDBOコネクタをビルドまたは実行する前に、Java 11環境が設定されていることを確認してください。
Debeziumのバージョンを決定する
使用するKafkaまたはKafka ConnectおよびPolarDB for PostgreSQL (Compatible with Oracle) のバージョンに基づいて、Debeziumのバージョンを決定します。 バージョンの互換性については、「Debeziumリリースの概要」をご参照ください。
説明Debeziumに関連するソースコードとドキュメントの詳細については、Debeziumにアクセスしてください。
次の項目では、PolarDB for PostgreSQL (Compatible with Oracle) とPostgreSQL Community Editionのバージョン互換性について説明します。
PolarDB for PostgreSQL (Oracleと互換) 2.0はPostgreSQL 14と互換性があります。
PolarDB for PostgreSQL (Oracleと互換) 1.0はPostgreSQL 11と互換性があります。
pgJDBCバージョンの決定
PgJDBCのバージョンを確認するには、対応するDebeziumバージョンの
pom.xmlファイルでversion.postgresql.driverキーワードを検索します。説明pgJDBCに関連するソースコードとドキュメントの詳細については、pgJDBCにアクセスしてください。
手順
Debezium Community Edition 2.6.2.Finalは、Kafka Connect 2.xと3.x、およびPostgreSQLバージョン10から16をサポートしています。
次の手順では、Debezium Community Edition 2.6.2.Finalに基づいてDebezium PolarDBOコネクタを構築する方法について説明します。
対応するバージョンのDebeziumおよびpgJDBCのコードファイルを複製します。
git clone -b v2.6.2.Final --depth=1 https://github.com/debezium/debezium.git git clone -b REL42.6.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git必要なpgJDBCファイルをDebeziumにコピーします。
mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 mkdir -p debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbc cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/core/v3 cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java debezium/debezium-connector-postgres/src/main/java/org/postgresql/jdbcDebeziumソースコードにパッチファイルを適用して、PolarDB for PostgreSQL (Compatible with Oracle) のサポートを追加します。
git apply v2.6.2.Final-support-polardbo-v1.patch説明パッチファイルは、v2.6.2.Final-support-polardbo-v1.patchからダウンロードできます。
デフォルトでは、パッチファイルは、debezium-api、debezium-core、pgJDBC、protobuf-javaなどの依存関係をJARパッケージに追加します。 これらの依存関係が必要ない場合は、pom.xmlファイルから依存関係を削除できます。
Mavenを使用して、Debezium PolarDBOコネクタをビルドおよびパッケージします。
mvn clean package -pl :debezium-connector-postgres -DskipITs -Dquick # After you complete the packaging process, you can obtain the JAR package from the corresponding directory within the debezium-connector-postgres/ directory.上記の手順を実行して、JDK 11に基づいてDebezium PolarDBOコネクタのJARパッケージをビルドできます。 debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jarからパッケージを直接ダウンロードすることもできます。
使用上の注意
Debezium PolarDBOコネクタは、論理レプリケーションを使用して、PolarDB for PostgreSQL (Oracle互換) データベースの増分変更をキャプチャします。 次の条件を満たす必要があります。
wal_levelパラメーターをlogicalに設定して、論理レプリケーションに必要な情報が先行書き込みログ (WAL) レコードに書き込まれるようにします。
説明PolarDBコンソールでwal_levelパラメーターを設定できます。 詳細については、「クラスターパラメーターの設定」トピックの手順セクションをご参照ください。 パラメーターを変更すると、クラスターが再起動します。 作業は慎重に行ってください。
ALTER TABLE schema.table REPLICA IDENTITY FULL;ステートメントを実行して、サブスクライブするテーブルごとにREPLICA IDENTITYパラメーターをFULLに設定します。 この設定により、関連するテーブル列の前の値が、更新および削除操作のために論理復号プラグインで使用できるようになります。説明REPLICA IDENTITYは、PostgreSQL固有のテーブルレベルのパラメーターで、関連するテーブル列の以前の値が更新および削除操作のために論理復号プラグインで使用できるかどうかを決定します。 REPLICA IDENTITY値の詳細については、「レプリカID」をご参照ください。
サブスクライブするテーブルのREPLICA IDENTITYパラメーターをFULLに設定すると、テーブルがロックされる可能性があります。 これはビジネスに影響します。 混乱を最小限に抑えるには、この変更を計画し、ビジネスプロセスに基づいて調整します あなたのビジネス要件。 テーブルの
REPLICA IDENTITYパラメーターがFULLに設定されているかどうかを確認するには、次のステートメントを実行します。SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
max_wal_sendersおよびmax_replication_slotsパラメーターの値が、使用するレプリケーションスロットの数およびKafkaジョブで必要なスロットの数よりも大きいことを確認します。
特権アカウントまたはLOGINおよびREPLICATION権限を持つ標準アカウントを使用していることを確認してください。 アカウントには、完全なデータクエリをサブスクライブするテーブルに対するSELECT権限が必要です。
クラスターのプライマリエンドポイントを使用して、PolarDBクラスターに接続します。 クラスターエンドポイントを使用してクラスターに接続する場合、論理レプリケーションはサポートされていません。
connector.classパラメーターを
io.de bezium.connector.postgresql.PolarDBOConnectorに設定します。plugin.nameパラメーターをpgoutputに設定することを推奨します。 これにより、エンコードされたデータベースの増分解析中のデータの破損や文字化けを防ぎnon-UTF-8。 詳細については、「コミュニティドキュメント」をご参照ください。
例
次の例では、Debezium PolarDBOコネクタを使用して、PolarDB for PostgreSQL Oracle構文互換性2.0クラスターのdbz_dbデータベースのt1とt2という名前のテーブルをKafkaメッセージキューに同期する方法について説明します。
準備
カフカをセットアップします。
Kafkaインスタンスをデプロイし、Kafka Connectホストからインスタンスにアクセスできることを確認します。 ApsaraMQ for Kafkaを使用することもできます。 詳細については、「クイックスタート」をご参照ください。
メッセージを受信するために、Kafkaインスタンスにpg_dbz_eventという名前のトピックを作成します。
説明テストシナリオで読みやすくするために、単一パーティションのトピックを作成することをお勧めします。 実際のビジネスシナリオでは、マルチパーティショントピックを作成することを推奨します。
ポート8083で分散モードでローカルにKafka Connectを起動します。
Debezium PolarDBOコネクタのJARパッケージをKafka Connectの
plugin.pathディレクトリにコピーします。 詳細については、このトピックの「手順」セクションの4番目の手順を参照してください。# ${plugin.path} Replace this with the specific path. mkdir ${plugin.path}/debezium-connector-polardbo cp debezium-connector-postgres-polardbo-v1.0-2.6.2.Final.jar ${plugin.path}/debezium-connector-polardbo
PolarDB for PostgreSQL (Oracle互換) を設定します。
PolarDBクラスター購入ページで、PolarDB for PostgreSQL (Oracle互換) 2.0クラスターを購入します。
PolarDBクラスターを設定します。 クラスターがDebezium PolarDBOコネクタを使用するための前提条件を満たしていることを確認します。 詳細については、「使用状況のメモ」をご参照ください。
特権アカウントを作成します。 詳細については、「アカウントの作成」をご参照ください。
クラスターのプライマリエンドポイントを取得します。 詳細については、「エンドポイントとポートの表示」をご参照ください。 PolarDBクラスターとKafka Connectインスタンスが同じゾーンにある場合は、プライベートエンドポイントを使用します。 それ以外の場合は、パブリックエンドポイントを申請し、Kafka ConnectインスタンスのエンドポイントをPolarDBクラスターのホワイトリストに追加します。 詳細については、「ホワイトリストの設定」をご参照ください。
PolarDBコンソールでdbz_dbという名前のデータベースを作成します。 詳細については、「データベースの作成」をご参照ください。
次のSQL文を実行して、dbz_dbデータベースにt1およびt2テーブルを作成し、テーブルにデータを設定します。
CREATE TABLE public.t1 (a int PRIMARY KEY, b text, c TIMESTAMP); ALTER TABLE public.t1 REPLICA IDENTITY FULL; INSERT INTO public.t1(a, b, c) VALUES(1, 'a', now()); CREATE TABLE public.t2 (a int PRIMARY KEY, b text, c DATE); ALTER TABLE public.t2 REPLICA IDENTITY FULL; INSERT INTO public.t2(a, b, c) VALUES(1, 'a', now());
テスト
config/postgresql-connector.jsonという名前の設定ファイルを作成します。 詳細については、「コネクタの設定例」をご参照ください。
{ "name": "dbz-polardb", "config": { "connector.class": "io.debezium.connector.postgresql.PolarDBOConnector", "database.hostname": "<yourHostname>", "database.port": "<yourPort>", "database.user": "<yourUserName>", "database.password": "<yourPassWord>", "database.dbname" : "dbz_db", "plugin.name": "pgoutput", "slot.name": "dbz_polardb", "table.include.list": "public.t1,public.t2", "topic.prefix": "polardb" "transforms": "Combine", "transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Combine.topic.regex": "(.*)", "transforms.Combine.topic.replacement": "pg_dbz_event" } }説明デフォルトでは、テーブルごとに個別のトピックが作成されます。 上記の設定は、トピックを集約します。
コネクタを追加します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 'http://localhost:8083/connectors' -d @config/postgresql-connector.jsonコネクタを追加すると、Kafkaのpg_dbz_eventトピックから完全なデータを取得できます。
PolarDBクラスターのdbz_dbデータベースで次のDMLステートメントを実行します。
INSERT INTO public.t1(a, b, c) VALUES(2, 'b', now()); UPDATE public.t1 SET b = 'c' WHERE a = 1; DELETE FROM public.t1 WHERE a = 2; INSERT INTO public.t1(a, b, c) VALUES(4, 'd', now()); INSERT INTO public.t2(a, b, c) VALUES(2, 'b', now()); UPDATE public.t2 SET b = 'c' WHERE a = 1; DELETE FROM public.t2 WHERE a = 2; INSERT INTO public.t2(a, b, c) VALUES(4, 'd', now());次に、Kafkaトピックの増分データを照会できます。