すべてのプロダクト
Search
ドキュメントセンター

PolarDB:PolarDBO Flink CDC

最終更新日:Nov 11, 2024

PolarDB for PostgreSQL (compatible with Oracle) と互換性のあるPolarDBO Change Data Capture (CDC) コネクタを使用して、PolarDB for PostgreSQL (Compatible with Oracle) データベースの完全なスナップショットデータとデータ変更を順番に読み取ることができます。 機能と使用方法の詳細については、コミュニティのPostgres CDCドキュメントを参照してください。

PolarDB for PostgreSQL (Compatible with Oracle) は、少数のデータタイプと組み込みオブジェクトのみがコミュニティPostgreSQLと異なります。 このトピックでは、コミュニティPostgres CDCに基づいて最小限のコード変更でPolarDB for PostgreSQL (Compatible with Oracle) をサポートするPolarDBO Flink CDCコネクタをパッケージ化する方法について説明します。

説明

PolarDB for PostgreSQL (Oracleと互換) は64ビットのDATE型を使用し、コミュニティPostgreSQLは32ビットのDATE型を使用します。 PolarDBO Flink CDCコネクタは、64ビットDATEタイプのデータと互換性があります。

PolarDBO Flink CDCコネクタパッケージ化

重要

PolarDBO Flink CDCコネクタは、コミュニティPostgres CDCに基づいて開発されています。 ご自身でパッケージ化したコネクタまたはこのドキュメントで提供されているJARパッケージには、サービスレベル契約 (SLA) の保証はありません。

前提条件

  • Flink CDCのバージョンを確認する

    Realtime Compute for Apache Flinkを使用している場合は、対応するVerverica Runtime (VVR) バージョンの互換性のあるコミュニティFlink CDCバージョンを確認します。 詳細については、「CDCおよびVVRバージョン対応」をご参照ください。

    説明

    Flink CDCコードリポジトリについては、「Flink CDC on GitHub」をご参照ください。

  • Debeziumのバージョンを確認する

    Flink CDCのpom.xmlファイルで、debezium.versionキーワードを検索し、Debeziumのバージョンを確認します。

    説明

    Debeziumコードリポジトリについては、「Debezium on GitHub」をご参照ください。

  • PgJDBCバージョンの確認

    Postgres CDCのpom.xmlファイルで、org.postgresqlキーワードを検索し、PgJDBCのバージョンを確認します。

    説明
    • リリース3.0より前のバージョンの場合、ファイルパスはflink-connector-postgres-cdc/pom.xmlです。

    • リリース3.0以降のバージョンの場合、ファイルパスはflink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xmlです。

    • PgJDBCコードリポジトリについては、「PgJDBC on GitHub」をご参照ください。

手順

リリース用-3.1

コミュニティFlink-CDCリリース3.1バージョンは、Realtime Compute for Apache Flink vvr-8.0.x-flink-1.17と互換性があります。

対応するバージョンのPolarDBO Flink CDCコネクタをパッケージ化するには、次の手順を実行します。

  1. 確認したバージョンに対応するバージョンのFlink CDC、Debezium、およびPgJDBCのコードリポジトリをクローンします。

    git clone -b release-3.1 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.5.1 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.9.8.Final --depth=1 https://github.com/debezium/debezium.git
  2. DebeziumおよびPgJDBCからFlink CDCに必要なファイルをコピーします。

    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. PolarDB for PostgreSQL (Oracle互換) のパッチファイルを適用します。

    git apply release-3.1_support_polardbo.patch
    説明

    前の例では、次のPolarDBO Flink CDC互換パッチファイルを使用しています。release-3.1_support_polardbo.patch

  4. Mavenを使用してPolarDBO Flink CDCコネクタをパッケージ化します。

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # You can find the jar package in the target directory of flink-sql-connector-postgres-cdc

PolarDBO Flink CDCコネクタのJARファイルは、JDK8に基づいて生成されます: flink-sql-connector-postgres-cdc-3.1-SNAPSHOT.jar

リリース2.3用

コミュニティFlink-CDCリリース2.3バージョンは、vvr-4.0.15-flink-1.13からvvr-6.0.2-flink-1.15までRealtime Compute for Apache Flinkと互換性があります。

対応するバージョンのPolarDBO Flink CDCコネクタをパッケージ化するには、次の手順を実行します。

  1. 確認したバージョンに対応するバージョンのFlink CDC、Debezium、およびPgJDBCのコードリポジトリをクローンします。

    git clone -b release-2.3 --depth=1 https://github.com/apache/flink-cdc.git
    git clone -b REL42.2.26 --depth=1 https://github.com/pgjdbc/pgjdbc.git
    git clone -b v1.6.4.Final --depth=1 https://github.com/debezium/debezium.git
  2. DebeziumおよびPgJDBCからFlink-CDCに必要なファイルをコピーします。

    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    mkdir -p flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/ConnectionFactoryImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/core/v3
    cp pgjdbc/pgjdbc/src/main/java/org/postgresql/jdbc/PgDatabaseMetaData.java flink-cdc/flink-connector-postgres-cdc/src/main/java/org/postgresql/jdbc
    cp debezium/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java flink-cdc/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql
  3. PolarDB for PostgreSQL (Oracle互換) のパッチファイルを適用します。

    git apply release-2.3_support_polardbo.patch
    説明

    前の例では、次のPolarDBO Flink CDC互換パッチファイルを使用しています。release-2.3_support_polardbo.patch

  4. Mavenを使用してPolarDBO Flink CDCコネクタをパッケージ化します。

    mvn clean install -DskipTests -Dcheckstyle.skip=true -Dspotless.check.skip -Drat.skip=true
    
    # You can find the jar package in the target directory of flink-sql-connector-postgres-cdc

PolarDBO Flink CDCコネクタのJARファイルは、JDK8に基づいて生成されます: flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar

使用法

PolarDBO Flink CDCコネクタを使用して、PolarDB for PostgreSQL (Compatible with Oracle) データベースの論理レプリケーション機能を使用してデータ変更を読み取る前に、次の要件が満たされていることを確認してください。

  • wal_levelパラメーターの値はlogicalに設定されます。これは、論理レプリケーションに必要な情報が先行書き込みログ (WAL) ログに書き込まれることを指定します。

    説明

    PolarDBコンソールでwal_levelパラメーターを設定できます。 詳細については、「クラスターパラメーターの設定」トピックの「手順」をご参照ください。 このパラメーターを変更すると、クラスターが再起動します。 作業は慎重に行ってください。

  • REPLICA IDENTITYパラメーターは、ALTER table schema. TABLE REPLICA IDENTITY FULL; ステートメントを実行して、サブスクライブされたテーブルのFULLに設定され、レプリケーション中のテーブルのデータの一貫性を確保します。 FULLの値は、INSERTおよびUPDATE操作のイベントに、テーブル内のすべての列の前の値が含まれることを指定します。

    説明
    • REPLICA IDENTITYは、PostgreSQL固有のテーブルレベル設定で、UPDATEイベントとDELETEイベントが発生したときに、関連するテーブル列の以前の値が論理デコーディングプラグインで使用できるかどうかを確認します。 REPLICA IDENTITYパラメーターの値の説明の詳細については、「REPLICA IDENTITYに関するDebeziumドキュメント」をご参照ください。

    • サブスクライブ済みテーブルのREPLICA IDENTITYパラメーターをFULLに設定するには、テーブルをロックする必要があります。これはビジネスに影響を与える可能性があります。 作業は慎重に行ってください。 次のステートメントを実行して、パラメーターがFULLに設定されているかどうかを確認できます。

      SELECT relreplident = 'f' FROM pg_class WHERE relname = 'tablename';
  • max_wal_sendersパラメーターとmax_replication_slotsパラメーターの値は、データベースで占有されているレプリケーションスロットの数と、Realtime Compute for Apache Flinkドラフトに必要なレプリケーションスロットの数よりも大きくなっています。

  • 次のアカウントのいずれかが使用されます。完全なデータクエリのために、サブスクライブされたテーブルに対するLOGINおよびREPLICATION権限とSELECT権限を持つ特権アカウントまたはデータベースアカウント。

  • コネクタがPolarDBクラスターのプライマリエンドポイントに接続されています。 クラスターエンドポイントは論理レプリケーションをサポートしていません。

Polardb Flink CDCコネクタとPostgres CDCの違い

PolarDBO Flink CDCコネクタは、Postgres CDCに基づいて開発およびパッケージされています。 Postgres CDCコネクタの構文とパラメーターについては、「Postgres CDCドキュメント」をご参照ください。 次のセクションでは、2つのコネクタの主な違いについて説明します。

  • PolarDBO Flink CDCコネクタのWITH句のconnectorパラメーターは、polardbo-cdcに設定する必要があります。

  • PolarDBO Flink CDCは、PolarDB for PostgreSQLPolarDB for PostgreSQL (Oracle互換) 1.0、およびPolarDB for PostgreSQL (Oracle互換) 2.0と互換性があります。

    説明

    PolarDB for PostgreSQLPostgres CDCコネクタを使用することを推奨します。

  • PolarDB for PostgreSQL (Compatible with Oracle) 1.0および2.0のDATE型の列は、Flinkサービスのソーステーブルおよびシンクテーブルのtimestamp型にマップする必要があります。

  • decoding.plugin.nameパラメーターをpgoutputに設定することを推奨します。 そうしないと、UTF-8エンコードされていないデータベースの増分解析で文字化けが発生する可能性があります。 詳細については、「コミュニティドキュメント」をご参照ください。

型マッピング

次の表に、PolarDB for PostgreSQLとFlinkの間のデータ型マッピングを示します。 マッピングは、DATE型を除いて、コミュニティPostgreSQLとFlinkのデータ型間のマッピングと同じです。

PolarDB for PostgreSQLでサポートされているデータ型

Flinkがサポートするデータ型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

シリアル

BIGINT

BIGINT

ビッグシリーズ

REAL

FLOAT

フロート4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

デシマル (p, s)

デシマル (p, s)

BOOLEAN

BOOLEAN

日付

  • PolarDB for PostgreSQL (Oracleと互換) の1.0: TIMESTAMP

  • PolarDB for PostgreSQL (Oracleと互換) の2.0: TIMESTAMP

  • PolarDB for PostgreSQL: 日付

TIME [(p)] [タイムゾーンなし]

TIME [(p)] [タイムゾーンなし]

タイムスタンプ [(p)] [タイムゾーンなし]

タイムスタンプ [(p)] [タイムゾーンなし]

CHAR(n)

STRING

キャラクター (n)

VARCHAR(n)

キャラクターの評価 (n)

TEXT

BYTEA

BYTES

次の例では、PolarDBO Flink CDCコネクタを使用して、flink_sourceデータベースの出荷テーブルのデータを、PolarDB for PostgreSQL (Compatible with Oracle) 2.0クラスターのflink_sinkデータベースのshipments_sinkテーブルに同期する方法について説明します。

説明

次の例は、パッケージ化されたPolarDBO Flink CDCコネクタがPolarDB for PostgreSQL (Compatible with Oracle) クラスターで動作することを確認するための基本的なテストとしてのみ機能します。 運用環境については、公式のPostgres CDCドキュメントを参照し、ビジネス要件に基づいてパラメーターを設定します。

  1. 準備をします。

    • PolarDB for PostgreSQL (Oracle互換) クラスターの準備をします。

      1. PolarDB購入ページで、PolarDB for PostgreSQL (Compatible with Oracle) 2.0クラスターを購入します。

      2. 特権アカウントを作成します。 詳細については、「データベースアカウントの作成」トピックのアカウントの作成セクションをご参照ください。

      3. クラスターのプライマリエンドポイントを申請します。 詳細については、「エンドポイントの表示または申請」をご参照ください。 PolarDBクラスターとRealtime Compute for Apache Flinkワークスペースが同じゾーンにある場合、クラスターのプライベートエンドポイントを使用できます。 それ以外の場合は、パブリックエンドポイントを申請する必要があります。 Realtime Compute for Apache FlinkワークスペースのエンドポイントをPolarDBクラスターのIPアドレスホワイトリストに追加します。 詳細については、「クラスターのホワイトリストの設定」をご参照ください。

      4. PolarDBコンソールで、flink_sourceという名前のソースデータベースとflink_sinkという名前のターゲットデータベースを作成します。 詳細については、「データベース管理」をご参照ください。

      5. 次のステートメントを実行して、frink_sourceデータベースにshipmentsという名前のテーブルを作成し、テーブルにデータを書き込みます。

        CREATE TABLE public.shipments (
          shipment_id INT,
          order_id INT,
          origin TEXT,
          destination TEXT,
          is_arrived BOOLEAN,
          order_time DATE,
          PRIMARY KEY (shipment_id) 
        );
        ALTER TABLE public.shipments REPLICA IDENTITY FULL;
        INSERT INTO public.shipments SELECT 1, 1, 'test1', 'test1', false, now();
      6. 次のステートメントを実行して、flink_sinkデータベースにshipments_sinkという名前のテーブルを作成します。

        CREATE TABLE public.shipments_sink (
           shipment_id INT,
           order_id INT,
           origin TEXT,
           destination TEXT,
           is_arrived BOOLEAN,
           order_time TIMESTAMP,
           PRIMARY KEY (shipment_id)
         );
    • Realtime Compute for Apache Flinkワークスペースの準備をします。

      1. Realtime Compute for Apache Flinkコンソールにログインし、Realtime Compute for Apache Flinkワークスペースを購入します。 詳細については、「Realtime Compute For Apache Flinkの有効化」をご参照ください。

        説明

        Realtime Compute for Apache FlinkワークスペースにPolarDBクラスターと同じリージョン仮想プライベートクラウド (VPC) を設定することを推奨します。 この場合、PolarDBクラスターのプライマリプライベートエンドポイントを、Realtime Compute for Apache Flinkワークスペースのエンドポイントとして使用できます。

      2. [カスタムコネクタの作成] をクリックします。 [カスタムコネクタの作成] ダイアログボックスで、[JARの提供] ステップのPolarDBO Flink CDCパッケージをアップロードします。 [コネクタの表示] ステップで、[フォーマット] ドロップダウンリストから [debezium-json] を選択します。 詳細については、「カスタムコネクタの管理」をご参照ください。

        image

  2. Flinkドラフトの作成

    1. Realtime Compute for Apache Flinkコンソールにログインし、SQLドラフトを作成します。 詳細については、「SQLドラフトの作成」をご参照ください。 次のFlink SQL文を実行して、PolarDBクラスターのプライマリエンドポイント、ポート番号、データベースアカウント名、およびデータベースアカウントのパスワードを変更します。

      説明

      PolarDB for PostgreSQL (Oracleと互換) は、64ビットのDATEタイプを使用します。 Flink SQLおよびその他のほとんどのデータベースは、32ビットのDATE型を使用します。 したがって、ソーステーブルのDATE型の列は、Flink SQLの_sourceテーブルと_sinkテーブルの両方のTIMESTAMP型の列にマッピングする必要があります。 それ以外の場合、ドラフトは次のようなエラーで失敗する可能性があります。“java.time.DateTimeException: Invalid value for EpochDay (valid values -365243219162 - 365241780471): 1720891573000”

      CREATE TEMPORARY TABLE shipments (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
         'connector' = 'polardbo-cdc',
         'hostname' = '<yourHostname>',
         'port' = '<yourPort>',
         'username' = '<yourUserName>',
         'password' = '<yourPassWord>',
         'database-name' = 'flink_source',
         'schema-name' = 'public',
         'table-name' = 'shipments',
         'decoding.plugin.name' = 'pgoutput',
         'slot.name' = 'flink'
       );
      
      CREATE TEMPORARY TABLE shipments_sink (
         shipment_id INT,
         order_id INT,
         origin STRING,
         destination STRING,
         is_arrived BOOLEAN,
         order_time TIMESTAMP,
         PRIMARY KEY (shipment_id) NOT ENFORCED
       ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://<yourHostname>:<yourPort>/flink_sink',
        'table-name' = 'shipments_sink',
        'username' = '<yourUserName>',
        'password' = '<yourPassWord>'
      );
      
      INSERT INTO shipments_sink SELECT * FROM shipments;
    2. ドラフトを展開して開始します。

      image

      image

    3. 結果をテストして確認します。

      • ドラフトのステータスがRUNNINGに変更された後、次のステートメントを実行して、flink_sourceデータベースの出荷テーブルのデータがflink_sinkデータベースのshipments_sinkテーブルと同期されていることを確認します。

        SELECT * FROM public.shipments_sink;

        サンプル結果:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | f          | 2024-09-18 05:45:08
        (1 row)
      • flink_sourceデータベースの出荷テーブルで次のDMLステートメントを実行します。 データ変更は、リアルタイムで同期されることが期待されます。

        INSERT INTO public.shipments SELECT 2, 2, 'test2', 'test2', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 1;
        DELETE FROM public.shipments WHERE shipment_id = 2;
        INSERT INTO public.shipments SELECT 3, 3, 'test3', 'test3', false, now();
        UPDATE public.shipments SET is_arrived = true WHERE shipment_id = 3;

        次のステートメントを実行して、出荷テーブルのデータ変更がflink_sinkデータベースのshipments_sinkテーブルに同期されていることを確認します。

        SELECT * FROM public.shipments_sink;

        サンプル結果:

         shipment_id | order_id | origin | destination | is_arrived |     order_time      
        -------------+----------+--------+-------------+------------+---------------------
                   1 |        1 | test1  | test1       | t          | 2024-09-18 05:45:08
                   3 |        3 | test3  | test3       | t          | 2024-09-18 07:33:23
        (2 rows)