オープンソースのPostgreSQLでは、レプリケーションスロットを使用して変更データキャプチャ (CDC) 機能を有効にすることができます。 このトピックでは、ApsaraDB RDS for PostgreSQLインスタンスのCDC機能を有効にする方法について説明します。
前提条件
RDSインスタンスが作成されました。 詳細については、「ApsaraDB RDS For PostgreSQLインスタンスの作成」をご参照ください。
ホワイトリストは、クライアントからRDSインスタンスへのリクエストを許可するように設定されています。 詳細については、「ApsaraDB RDS For PostgreSQLインスタンスのIPアドレスホワイトリストの設定」をご参照ください。
psqlがクライアントにインストールされています。 詳細は、「PostgreSQL 公式ドキュメント」をご参照ください。
使用上の注意
CDC機能を有効にして、キャプチャしたデータをプライマリRDSインスタンスでのみ使用できます。 読み取り専用RDSインスタンスはサポートされていません。
ApsaraDB RDS for PostgreSQLは、論理レプリケーションスロットフェールオーバー機能をサポートしています。 プライマリ /セカンダリの切り替えは、CDC機能には影響しません。 詳細については、「Logical Replication Slot Failover」をご参照ください。
RDSインスタンスのCDC機能を有効にする前に、RDSインスタンスの指定されたパラメーターを変更する必要があります。 パラメータの変更は、インスタンスの再起動をトリガし得る。 ワークロードへの影響を防ぐため、オフピーク時にパラメーターを変更することを推奨します。
RDSインスタンスのCDC機能を有効にした後、次の点に注意する必要があります。
RDSインスタンスには、WALログ用のストレージが必要です。 キャプチャしたデータの消費が異常または停止した場合、WALログは自動的にクリアされません。 ログはディスクスペースを蓄積して使い果たします。 この場合、RDSインスタンスはロックされている可能性があります。 RDSインスタンスがロックされている場合、RDSインスタンスは読み取りリクエストのみを処理できます。
ダウンストリームクライアントがRDSインスタンスのデータ行を消費する場合、クライアントはリアルタイムで消費を報告するように設定する必要があります。 それ以外の場合、RDSインスタンスは複数のバージョンのデータ行を保持します。 複数のバージョンのデータ行が保持されている場合、トランザクションID (XID) のラップアラウンドが発生し、RDSインスタンスは読み取り要求のみを処理できます。 サンプルエラーログ:
"HINT: ラップアラウンドの問題を回避するために、すぐにオープントランザクションを閉じます。 準備済みの古いトランザクションをコミットまたはロールバックするか、古いレプリケーションスロットを削除する必要がある場合もあります。「警告: 最も古いxminは遠い過去のものです。」
複製スロットを手動で削除して、AliPGが以前のバージョンのWALログとデータ行を自動的に消去できるようにすることができます。 詳細については、「CDC機能の無効化」をご参照ください。
説明RDSインスタンスのWALログのサイズとストレージ使用量を定期的に監視し、アラートルールを設定することを推奨します。 詳細については、「拡張モニタリングメトリクスの表示」および「ApsaraDB RDS For PostgreSQLインスタンスのアラートルールの管理」をご参照ください。
CDC機能の有効化
ステップ1: テストデータベースの作成
- [インスタンス] ページに移動します。 上部のナビゲーションバーで、RDSインスタンスが存在するリージョンを選択します。 次に、RDSインスタンスを見つけ、インスタンスのIDをクリックします。
表示されるページの左側のナビゲーションウィンドウで、[データベース] をクリックします。
表示されるページで、[データベースの作成] をクリックします。
説明この例では、testdbという名前のデータベースが作成されます。 詳細については、「データベースの作成」をご参照ください。
ステップ2: テストアカウントの作成と権限の設定
表示されるページの左側のナビゲーションウィンドウで、[アカウント] をクリックします。
表示されるページで、[アカウントの作成] をクリックして、db_adminという名前の特権アカウントとcdc_userという名前の標準アカウントを作成します。 特権アカウントには管理者権限があり、標準アカウントは変更データの取得に使用されます。
説明この例のアカウントのユーザー名は参照用です。 ビジネス要件に基づいてユーザー名を指定できます。 アカウントの作成方法の詳細については、「アカウントの作成」をご参照ください。
db_adminアカウントを使用してクライアントをRDSインスタンスに接続します。
psql -h <RDSインスタンスのエンドポイント> -p 5432 -U db_admin -d testdb
説明RDSインスタンスのエンドポイントを取得する方法の詳細については、「エンドポイントとポート番号の表示と変更」をご参照ください。
次のステートメントを実行して、レプリケーションロールをcdc_userアカウントに割り当てます。
ALTER USER cdc_user WITH REPLICATION;
次のステートメントを実行して、結果を照会できます。
SELECT rolreplication FROM pg_roles WHERE rolname='cdc_user';
サンプル出力:
rollreplication ---------------- t (1行)
次のステートメントを実行して、cdc_userアカウントに権限を付与します。
SCHEMA PUBLICのすべてのテーブルをcdc_userに選択します。
手順3: RDSインスタンスのパラメーターの変更
次のステートメントを実行して、RDSインスタンスのパラメーター設定を照会します。
SELECT名, 設定, short_desc, ソース pg_settingsから WHERE name ='wal_level';
サンプル出力:
名 | setting | short_desc | source ----------------------- + --------- wal_level | replica | WALに書き込まれる情報のレベルを設定します。 | 設定ファイル (1行)
wal_levelパラメーターは、WALログに書き込むことができるデータの量を指定します。 デフォルト値はレプリカです。 このパラメーターは、サーバーの起動時にのみ設定できます。 有効な値:
minimal: インスタンスを故障または即時シャットダウンから復元するために必要なデータのみを記録します。 最小レベルでは、ベースバックアップまたはWALログを使用してデータベースを復元することはできません。
replica: スタンバイサーバーで読み取り専用クエリを実行するなど、WALのアーカイブとレプリケーションをサポートするのに十分な量のデータを書き込みます。
logical: レプリカレベルで書き込まれるデータに加えて、論理デコードをサポートするために必要な情報を追加します。
- [インスタンス] ページに移動します。 上部のナビゲーションバーで、RDSインスタンスが存在するリージョンを選択します。 次に、RDSインスタンスを見つけ、インスタンスのIDをクリックします。
表示されるページの左側のナビゲーションウィンドウで、[パラメーター] をクリックします。
wal_levelパラメーターの値をlogicalに変更します。
説明インスタンスパラメーターを変更する方法の詳細については、「ApsaraDB RDS For PostgreSQLインスタンスのパラメーターの変更」をご参照ください。
パラメーターを変更して変更を送信すると、RDSインスタンスが再起動します。 ワークロードへの影響を防ぐため、オフピーク時にパラメーターを変更することを推奨します。
ステップ4: 論理レプリケーションスロットを作成する
手順3でパラメーターを変更して変更を送信すると、RDSインスタンスが再起動され、変更が有効になります。 RDSインスタンスが [実行中] 状態の場合、次の操作を実行できます。
db_adminアカウントを使用してクライアントをRDSインスタンスに接続します。
psql -h <RDSインスタンスのエンドポイント> -p 5432 -U db_admin -d testdb
次のステートメントを実行して
test_decoding
を使用し、cdc_replication_slot
という名前のレプリケーションスロットを作成します。SELECT pg_create_logical_replication_slot('cdc_replication_slot '、'test_decoding');
説明cdc_replication_slot
は参照のみに使用されます。 ビジネス要件に基づいて名前を指定できます。test_decoding
は、オープンソースのPostgreSQLによって提供される出力拡張です。 ステートメントでtest_decodingを直接使用できます。
サンプル出力:
pg_create_logical_replication_slot ------------------------------------ (cdc_replication_slot、1/14003428) (1行)
次のステートメントを実行して、結果を照会できます。
SELECT * FROM pg_replication_slots;
サンプル出力:
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase ------------- ---------------------------------------------------------------------------------... cdc_replication_slot | test_decoding | logical | 18822 | testdb | f | f | | | 22356 | 1/140033F0 | 1/14003428 | reserved | | f (1行)
ステップ5: テストデータを作成する
次のステートメントを実行して、テストデータを作成し、運用環境をシミュレートします。
CREATE TABLE public.tb_test (
id int NOT NULL PRIMARYキー
);
ALTER TABLE public.tb_test ADD name varchar(1) NULL;
INSERT INTO public.tb_test SELECT 1、'A';
ステップ6: クライアントを使用してデータを読み取る
cdc_userアカウントを使用してクライアントをRDSインスタンスに接続します。
psql -h <RDSインスタンスのエンドポイント> -p 5432 -U cdc_user -d testdb
次のステートメントを実行して、レプリケーションスロットからデータを読み取ります。
SELECT * FROM pg_logical_slot_peek_changes('cdc_replication_slot '、null、null);
サンプル出力:
lsn | xid | データ ----------- --------- ------------------------------------------------------------------------- 1/14003D90 | 22376 | 22376開始 1/1401DDE8 | 22376 | COMMIT 22376 1/1401DDE8 | 22377 | 22377開始 1/1401E100 | 22377 | COMMIT 22377 1/1401E2A8 | 22382 | 22382開始 1/1401E2A8 | 22382 | table public.tb_test: INSERT: id[integer]:1 name[character variable]:'A' 1/1401E3C0 | 22382 | COMMIT 22382 (7行)
ステップ7: クライアントを使用してキャプチャしたデータを使用する
\q
コマンドを実行して、データベース接続を閉じます。次のコマンドを実行して、キャプチャされたデータを消費します。
説明postgresユーザーを使用して
pg_recvlogical
コマンドを実行する必要があります。 su - postgresコマンドを実行して、postgresユーザーに切り替えることができます。-bash: pg_recvlogical: command not found
というエラーメッセージが表示された場合は、FAQの説明に基づいて問題を解決します。pg_recvlogical -h <RDSインスタンスのエンドポイント> -U <特権アカウント> -d <Test database> -- create-slot -- if-not-exists -- slot=cdc_replication_slot -- plugin=test_decoding -- start -f -
例:
pg_recvlogical -h pgm-***** .pgsql.singapore.rds.aliyuncs.com -U db_admin -d testdb -- create-slot -- if-not-exists -- slot=cdc_replication_slot -- plugin=test_decoding -- start -f -
サンプル出力:
BEGIN 22376 COMMIT 22376 BEGIN 22377 COMMIT 22377 BEGIN 22382 table public.tb_test: INSERT: id[integer]:1 name[character variable]:'A' COMMIT 22382
CDC機能を無効にする
RDSインスタンスのCDC機能を有効にすると、RDSインスタンスにはWALログ用のストレージが必要になります。 キャプチャしたデータの消費が異常または停止した場合、WALログは自動的に削除されません。 ログはディスクスペースを蓄積して使い果たします。 この場合、RDSインスタンスはロックされている可能性があります。 RDSインスタンスがロックされている場合、インスタンスは読み取り要求のみを処理できます。 非アクティブなレプリケーションスロットを手動で削除して、AliPGがWALログを自動的に削除できるようにすることができます。
ApsaraDB RDSコンソールで、API操作を呼び出すか、SQL文を実行して、非アクティブなレプリケーションスロットを削除できます。
ApsaraDB RDSコンソール: ApsaraDB RDS for PostgreSQLインスタンスのWALログ管理機能の使用
API操作: DeleteSlot。
SQL 文:
db_adminアカウントを使用してRDSインスタンスに接続します。
psql -h <RDSインスタンスのエンドポイント> -p 5432 -U db_admin -d testdb
次のステートメントを実行して、非アクティブなレプリケーションスロットに関する名前と関連情報を表示します。
SELECT slot_name、slot_type、データベース、アクティブ、safe_wal_size pg_replication_slotsから WHERE active = 'f';
サンプル出力:
slot_name | slot_type | データベース | アクティブ | safe_wal_size ---------------------- --------- ------------------------------------------ cdc_replication_slot | 論理 | testdb | f | (1行)
次のステートメントを実行して、論理レプリケーションスロットを削除します。
SELECT pg_drop_replication_slot('cdc_replication_slot ');
よくある質問
クライアントがキャプチャされたデータを消費するときに
-bash: pg_recvlogical: command not found
エラーメッセージが表示された場合はどうすればよいですか?pg_recvlogicalは、PostgreSQL用のネイティブなロジックデコードツールです。 このツールは、デフォルトのtest_decoding拡張機能を使用します。 test_decoding拡張子は、PostgreSQLソースコードパッケージのcontrib/test_decodingディレクトリに格納されます。 PostgreSQLソースコードをコンパイルし、PostgreSQLをインストールすることを推奨します。 次に、PostgreSQLのインストールパスの /binディレクトリにpg_recvlogicalツールがあります。 ソースコードからPostgreSQLをインストールする方法の詳細については、「ソースコードからのインストール」をご参照ください。
レプリケーションスロットを削除しても、WALファイルが自動的にクリアされず、ディスク領域を占有している場合はどうすればよいですか?
wal_keep_segments
パラメーターをデフォルト値128
に設定すると、保存ファイルの数を減らすことができます。 インスタンスパラメーターを変更する方法の詳細については、「ApsaraDB RDS For PostgreSQLインスタンスのパラメーターの変更」をご参照ください。