Kafkaは、高スループットと高スケーラビリティを備えた分散メッセージキューサービスです。 Kafkaは、ログ収集、モニタリングデータ集約、ストリーミング処理、オンラインおよびオフライン分析などのビッグデータ分析に広く使用されています。 ビッグデータエコシステムにとって重要です。 このトピックでは、data Transmission Service (DTS) を使用して、PolarDB for Oracleクラスターから自己管理型Kafkaクラスターにデータを同期する方法について説明します。 データ同期機能を使用すると、メッセージ処理機能を拡張できます。
前提条件
ソースPolarDB for PostgreSQL (Compatible with Oracle) クラスターは最新バージョンである必要があります。 クラスターの更新方法の詳細については、「バージョン管理」をご参照ください。
ソースPolarDB for PostgreSQL (Compatible with Oracle) クラスターで同期するテーブルには、プライマリキーまたはUNIQUE NOT NULLインデックスが含まれている必要があります。
ソースのPolarDB for PostgreSQL (Compatible with Oracle) クラスターでは、wal_levelパラメーターはlogicalに設定されています。 これは、論理エンコードに必要な情報が先行書き込みログ (WAL) ログに追加されることを示します。 詳細は、「クラスターパラメーターの設定」をご参照ください。
制限事項
このシナリオでは、DTSは 増分データ同期DTSはサポートしていません スキーマ同期 または 完全なデータ同期。
データ同期タスクは、1つのデータベースからのみデータを同期できます。 複数のデータベースのデータを同期するには、各データベースのデータ同期タスクを作成する必要があります。
データ同期の遅延時間を正確にするために、DTSは
dts_postgres_heartbeat
という名前のハートビートテーブルをソースデータベースに追加します。 次の図は、ハートビートテーブルのスキーマを示しています。ソースデータベースに実行時間の長いトランザクションがあり、タスクに増分データ同期が含まれている場合、実行時間の長いトランザクションが送信される前の書き込み先ロギング (WAL) ログはクリアされないため、蓄積され、ソースデータベースのストレージスペースが不十分になります。
課金
同期タイプ | タスク設定料金 |
スキーマ同期と完全データ同期 | 無料です。 |
増分データ同期 | 有料。 詳細については、「課金の概要」をご参照ください。 |
手順
データ同期インスタンスを購入します。 詳細については、「DTS インスタンスの購入」をご参照ください。
説明購入ページで、ソースインスタンスをPOLARDB、宛先インスタンスをKafka、同期トポロジを一方向同期に設定します。
にログインします。 DTSコンソール。
説明データ管理 (DMS) コンソールにリダイレクトされている場合は、にあるアイコンをクリックして、以前のバージョンのDTSコンソールに移動し。
左側のナビゲーションウィンドウで、[データ同期] をクリックします。
[同期タスク] ページの上部で、ターゲットインスタンスが存在するリージョンを選択します。
データ同期インスタンスを見つけ、[操作] 列の [タスクの設定] をクリックします。
ソースインスタンスとターゲットインスタンスを設定します。
セクション
パラメーター
説明
非該当
タスク名
DTSが自動的に生成するタスク名。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を使用する必要はありません。
移行元データベース
データベースエンジン
このパラメーターの値はPolarDB Instanceに設定されており、変更することはできません。
インスタンスリージョン
購入ページで選択したソースリージョン。 このパラメーターの値は変更できません。
PolarDBインスタンスID
ソースPolarDB for OracleクラスターのID。
データベース名
ソースデータベースの名前。
データベースアカウント
ソースPolarDB for Oracleクラスターの特権アカウント。 特権データベースアカウントの作成方法の詳細については、「データベースアカウントの作成」をご参照ください。
データベースパスワード
データベースアカウントのパスワードを設定します。
ターゲットデータベース
データベースエンジン
自己管理Kafkaクラスターのアクセス方法。 この例では、パブリックIPアドレスを持つユーザー作成データベースが選択されています。
説明自己管理Kafkaクラスターが別のタイプの場合、データベースに必要な環境を設定する必要があります。 詳細については、「Preparation overview」をご参照ください。
インスタンスリージョン
購入ページで選択したターゲットリージョン。 このパラメーターの値は変更できません。
ECS インスタンス ID
KafkaクラスターがデプロイされているECSインスタンスのID。
データベースエンジン
ターゲットデータベースのタイプ。 Kafkaを選択します。
ポート番号
Kafkaクラスターのサービスポート番号。 デフォルト値: 9092
データベースアカウント
Kafkaクラスターへのログインに使用されるユーザー名。 Kafkaクラスターで認証が有効になっていない場合は、ユーザー名を入力する必要はありません。
データベースパスワード
ユーザー名に対応するパスワード。 Kafkaクラスターで認証が有効になっていない場合は、パスワードを入力する必要はありません。
トピック
[トピックリストの取得] をクリックし、ドロップダウンリストからトピック名を選択します。
Kafkaバージョン
自己管理Kafkaクラスターのバージョン。
暗号化
ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。
の右下隅のページでホワイトリストと次への設定をクリックします。
ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQL、ApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのIPアドレスホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 自己管理データベースが複数のECSインスタンスでホストされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのIPアドレスホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。
警告DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。
同期するオブジェクトを選択します。
設定
説明
同期するオブジェクトの選択
[使用可能] セクションから1つ以上のテーブルを選択し、アイコンをクリックしてテーブルを [選択済み] セクションに移動します。 同期するオブジェクトとしてテーブルのみを選択できます。
説明DTSは、手順6で選択したトピック名にテーブル名をマップします。 トピック名を変更する場合は、テーブルの上にポインタを移動し、[編集] をクリックします。 Kafkaクラスターに存在するトピックを指定する必要があります。 詳細については、「同期するオブジェクトの名前変更」をご参照ください。
Kafkaに配信されるデータ形式
Kafkaクラスターに同期されるデータは、AvroまたはSharePlex JSON形式で保存されます。 詳細については、「Data formats of a Kafka cluster」をご参照ください。
Kafkaパーティションへの出荷データのポリシー
ビジネス要件に基づいて、Kafkaパーティションへのデータ同期のポリシーを選択します。 詳細については、「Kafkaパーティションにデータを同期するためのポリシーの指定」をご参照ください。
データベースとテーブルの名前変更
オブジェクト名マッピング機能を使用して、ターゲットインスタンスに同期されるオブジェクトの名前を変更できます。 詳細は、オブジェクト名のマッピングをご参照ください。
失敗した接続の再試行時間
既定では、DTSがソースデータベースまたはターゲットデータベースへの接続に失敗した場合、DTSは次の720分 (12時間) 以内に再試行します。 必要に応じて再試行時間を指定できます。 DTSが指定された時間内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。
説明DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 ビジネスニーズに基づいて再試行時間を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。
ページの右下隅にある [次へ] をクリックします。
初期同期タイプとフィルターオプションを選択します。
パラメーター
説明
初期同期
デフォルトでは、初期増分データ同期が選択されています。 DTSは、ソースデータベースで生成された増分データをターゲットデータベースに同期します。
フィルターオプション
デフォルトでは、増分同期フェーズでDDLを無視が選択されています。 DTSは、増分データ同期中にソースデータベースで実行されるDDL操作を同期しません。
説明このパラメーターの設定は有効になりません。 DTSは、このオプションを選択したかどうかに関係なく、ソースデータベースで実行されるDDL操作を同期しません。
ページの右下隅にあるをクリックします。事前チェック.
説明データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。
タスクが事前チェックに合格しなかった場合は、失敗した各項目の横にあるアイコンをクリックして詳細を表示できます。
詳細に基づいて問題をトラブルシューティングした後、新しい事前チェックを開始します。
問題をトラブルシューティングする必要がない場合は、失敗した項目を無視して新しい事前チェックを開始してください。
次のメッセージが表示されたら、[事前チェック] ダイアログボックスを閉じます。[事前チェックの合格] その後、データ同期タスクが開始されます。
初期同期が完了し、データ同期タスクが同期状態になるまで待ちます。
データ同期タスクのステータスは、[同期タスク] ページで確認できます。