このトピックでは、data Transmission Service (DTS) を使用して、増分データをPolarDB-XインスタンスからMessage Queue for Apache Kafkaインスタンスにリアルタイムで同期する方法について説明します。
前提条件
MySQL 5.7と互換性のあるソースPolarDB-Xインスタンスが作成されました。
Apache Kafkaインスタンスの宛先Message Queueのバージョンがサポートされています。 詳細については、「データ同期シナリオの概要」をご参照ください。
ターゲットMessage Queue for Apache Kafkaインスタンスの使用可能なストレージ容量は、ソースPolarDB-Xインスタンスのデータの合計サイズよりも大きくなっています。
Apache Kafkaインスタンスの宛先Message Queueで、同期されたデータを受信するトピックが作成されます。 詳細については、「手順1: トピックの作成」をご参照ください。
制限事項
DTSは、ソースデータベースの外部キーをターゲットデータベースに同期しません。 したがって、ソースデータベースのカスケードおよび削除操作は、ターゲットデータベースと同期されません。
カテゴリ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
注意事項 | DTSは、バイナリログファイルの位置を移動するようにスケジュールされたソースデータベースの 'dts_health_check '.'ha_health_check' テーブルを更新します。 |
課金
同期タイプ | タスク設定料金 |
スキーマ同期と完全データ同期 | 無料です。 |
増分データ同期 | 有料。 詳細については、「課金の概要」をご参照ください。 |
単一レコードのサイズ制限
Kafkaに書き込むことができる1つのレコードの最大サイズは10 MBです。 したがって、ソースデータの行のサイズが10 MBを超えると、DTSがKafkaにレコードを書き込むことができないため、関連するDTSタスクが中断されます。 このシナリオでは、大きなフィールドを含むテーブル全体を同期するのではなく、テーブルの一部のフィールドのみを同期することをお勧めします。 DTSタスクを設定するときは、これらの大きなフィールドのレコードを除外する必要があります。 大きなフィールドを含むテーブルがタスクのオブジェクトに含まれている場合は、テーブルを削除し、オブジェクトにテーブルを再度追加してから、大きなフィールドを除外するようにフィルター条件を設定する必要があります。
同期可能なSQL操作
操作タイプ | SQL文 |
DML | 挿入、更新、および削除 |
手順
[データ同期タスク] ページに移動します。
データ管理 (DMS) コンソールにログインします。
上部のナビゲーションバーで、DTSをクリックします。
左側のナビゲーションウィンドウで、を選択します。
説明操作は、DMSコンソールのモードとレイアウトによって異なります。 詳細については、「シンプルモード」および「DMSコンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。
新しいDTSコンソールのデータ同期タスクページに行くこともできます。
データ同期タスクの右側で、データ同期インスタンスが存在するリージョンを選択します。
説明新しいDTSコンソールを使用する場合は、上部のナビゲーションバーでデータ同期インスタンスが存在するリージョンを選択する必要があります。
[タスクの作成] をクリックします。 [データ同期タスクの作成] ページで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。
警告ソースデータベースとターゲットデータベースを設定した後、ページに表示される制限を読むことを推奨します。 そうしないと、タスクが失敗したり、データの不一致が発生します。
セクション
パラメーター
説明
N/A
タスク名
DTSタスクの名前。 タスク名は自動生成されます。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。
ソースデータベース
既存のDMSデータベースインスタンスの選択
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
移行元ディスクのタイプを設定します。 [PolarDB-X 2.0] を選択します。
アクセス方法
ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。
インスタンスリージョン
ソースPolarDB-Xインスタンスが存在するリージョン。
インスタンスID
ソースPolarDB-XインスタンスのID。
データベースアカウント
ソースPolarDB-Xインスタンスのデータベースアカウント。 アカウントには、同期するオブジェクトに対するSELECT権限、およびREPLICATION CLIENT権限とREPLICATION SLAVE権限が必要です。
説明データベースアカウントに権限を付与する方法の詳細については、「PolarDB-Xのデータ同期ツール」をご参照ください。
データベースパスワード
データベースインスタンスへのアクセスに使用されるパスワード。
宛先データベース
既存のDMSデータベースインスタンスの選択
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
ターゲットデータベースのタイプ。 Kafkaを選択します。
アクセス方法
ターゲットデータベースのアクセス方法。 Express Connect、VPN Gateway、またはSmart Access Gatewayを選択します。
説明DTSは、アクセス方法としてApache KafkaのMessage Queueを提供しません。 Apache KafkaのMessage Queueを自己管理Kafkaクラスターとして使用して、データ同期を設定できます。
インスタンスリージョン
Apache Kafkaインスタンスの宛先Message Queueが存在するリージョン。
接続済みVPC
Apache Kafkaインスタンスの宛先Message Queueが属する仮想プライベートクラウド (VPC) のID。 VPC IDを取得するには、次の操作を実行します。Message Queue for Apache Kafkaコンソールにログインし、Message Queue for Apache Kafkaインスタンスのインスタンス詳細ページに移動します。 [設定情報] セクションで、VPC IDを表示します。
ドメイン名または IP アドレス
Message Queue for Apache KafkaインスタンスのDefault Endpointパラメーターに含まれるIPアドレス。
説明IPアドレスを取得するには、Message Queue for Apache Kafkaコンソールにログインし、Message Queue for Apache Kafkaインスタンスの [インスタンスの詳細] ページに移動します。 [基本情報] ページで、[デフォルトのエンドポイント] パラメーターからIPアドレスを取得します。
ポート番号
Apache Kafkaインスタンスの宛先Message Queueのサービスポート番号。 デフォルト値: 9092
データベースアカウント
Apache Kafkaインスタンスの宛先Message Queueのデータベースアカウント。
説明Apache KafkaインスタンスのMessage QueueがVPC接続インスタンスの場合、データベースアカウントまたはデータベースパスワードパラメーターを設定する必要はありません。
データベースパスワード
データベースインスタンスへのアクセスに使用されるパスワード。
Kafkaバージョン
Apache Kafkaインスタンスの宛先Message Queueのバージョン。
暗号化
接続を暗号化するかどうかを指定します。 ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。
トピック
同期データの受信に使用されるトピック。 ドロップダウンリストからトピックを選択します。
DDL情報を保存するトピック
DDL情報の格納に使用されるトピック。 ドロップダウンリストからトピックを選択します。 このパラメーターを指定しない場合、DDL情報はtopicパラメーターで指定されたトピックに格納されます。
Kafkaスキーマレジストリの使用
メタデータのサービングレイヤーを提供するKafka Schema Registryを使用するかどうかを指定します。 Avroスキーマを保存および取得するためのRESTful APIを提供します。 有効な値:
いいえ: Kafka Schema Registryを使用しません。
はい: Kafka Schema Registryを使用します。 この場合、AvroスキーマのKafka Schema Registryに登録されているURLまたはIPアドレスを入力する必要があります。
ページの下部で、接続性をテストして続行 をクリックします。
ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLインスタンスやApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 データベースが複数のECSインスタンスにデプロイされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」トピックの「DTSサーバーのCIDRブロック」セクションをご参照ください。
警告DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。
同期するオブジェクトと詳細設定を設定します。
パラメーター
説明
同期タイプ
同期タイプ。 デフォルトでは、増分データ同期が選択されています。 [スキーマ同期] および [完全データ同期] も選択する必要があります。 事前チェックが完了すると、DTSは選択したオブジェクトの履歴データをソースデータベースからターゲットクラスターに同期します。 履歴データは、その後の増分同期の基礎となる。
競合テーブルの処理モード
エラーの事前チェックと報告: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットデータベースに同じテーブル名のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。
説明ソースデータベースとターゲットデータベースに同じ名前のテーブルが含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 詳細については、「マップオブジェクト名」をご参照ください。
エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。
警告エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
ソースデータベースとターゲットデータベースが同じスキーマを持ち、ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じ主キー値または一意キー値を持つ場合:
完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。
増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。
ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。 作業は慎重に行ってください。
Kafkaのデータ形式
Message Queue for Apache Kafkaインスタンスにデータが格納される形式。
DTS Avroを選択した場合、データはDTS Avroのスキーマ定義に基づいて解析されます。 詳細については、『GitHub』をご参照ください。
Canal Jsonを選択した場合、データはCanal JSON形式で保存されます。 関連するパラメーターと例の詳細については、「Kafkaクラスターのデータ形式」トピックの「Canal JSON」セクションをご参照ください。
Kafkaパーティションへの出荷データのポリシー
ビジネス要件に基づいて同期ポリシーを選択します。 詳細については、「データをKafkaパーティションに移行するためのポリシーの指定」をご参照ください。
重要ソースデータベースがPolarDB-X 1.0データベースである場合、データをKafkaパーティションに出荷する機能はサポートされません。
宛先インスタンスでのオブジェクト名の大文字化
ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化をソースまたはターゲットデータベースの大文字化と一致させることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから1つ以上のオブジェクトを選択し、アイコンをクリックして 選択中のオブジェクト セクションにオブジェクトを追加します。
説明同期するオブジェクトとして、列、テーブル、またはデータベースを選択できます。 同期するオブジェクトとしてテーブルまたは列を選択した場合、DTSはビュー、トリガー、ストアドプロシージャなどの他のオブジェクトをターゲットデータベースに同期しません。
[選択済みオブジェクト]
同期先のインスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名のマップ」トピックの「単一オブジェクトの名前のマップ」セクションをご参照ください。
一度に複数のオブジェクトの名前を変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名のマップ」トピックの「一度に複数のオブジェクト名をマップする」セクションをご参照ください。
説明特定のデータベースまたはテーブルで実行されるSQL操作を選択するには、次の手順を実行します。[選択されたオブジェクト] セクションで、オブジェクトを右クリックします。 表示されるダイアログボックスで、同期するSQL操作を選択します。 同期できるSQL操作の詳細については、「同期できるSQL操作」をご参照ください。
データをフィルタリングするWHERE条件を指定するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、条件を指定します。 詳細については、「フィルター条件の指定」をご参照ください。
[次へ:詳細設定] をクリックして詳細設定を構成します。
パラメーター
説明
Set Alerts
データ同期タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、同期レイテンシが指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:
No: アラートを有効にしません。
Yes: アラートを設定します。 この場合、アラートしきい値と アラート通知設定 詳細については、「モニタリングとアラートの設定」トピックの「DTSタスクを作成するときのモニタリングとアラートの設定」をご参照ください。
失敗した接続のリトライ時間範囲の指定
失敗した接続のリトライ時間範囲。 データ同期タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 このパラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。
説明ソースまたはターゲットデータベースが同じである複数のデータ同期タスクに対して異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。
DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。
ETLの設定
抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 詳細については、「」をご参照ください。ETLとは何ですか? 有効な値:
Yes: ETL機能を設定します。 コードエディターでデータ処理ステートメントを入力できます。 詳細については、「データ移行またはデータ同期タスクでのETLの設定」をご参照ください。
No: ETL機能を設定しません。
順方向および逆方向タスクのハートビートテーブル sql を削除
DTSインスタンスの実行中に、ハートビートテーブルのSQL操作をソースデータベースに書き込むかどうかを指定します。 有効な値:
Yes: ハートビートテーブルにSQL操作を書き込みません。 この場合、DTSインスタンスのレイテンシが表示され得る。
No: ハートビートテーブルにSQL操作を書き込みます。 この場合、ソースデータベースの物理バックアップやクローニングなどの機能が影響を受ける可能性があります。
タスク設定を保存し、事前チェックを実行します。
関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示するには、ポインターを 次:タスク設定の保存と事前チェック に移動し、OpenAPI パラメーターのプレビュー をクリックします。
パラメーターを表示または表示する必要がない場合は、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、プレチェックを再実行します。
事前チェック中にアイテムに対してアラートがトリガーされた場合:
アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
成功率が100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。
購入ページで、データ同期インスタンスの課金方法とインスタンスクラスのパラメーターを設定します。 下表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
Billing Method
サブスクリプション: データ同期インスタンスの作成時にサブスクリプションの料金を支払います。 サブスクリプションの課金方法は、長期使用の場合、従量課金の課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは1時間ごとに課金されます。 従量課金方法は、短期使用に適しています。 従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。
リソースグループの設定
データ同期インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「」をご参照ください。リソース管理とは
インスタンスクラス
DTSは、同期速度が異なるインスタンスクラスを提供します。 ビジネス要件に基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプションの課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1〜9か月、1年、2年、3年、または5年とすることができる。
説明このパラメーターは、サブスクリプション の課金方法を選択した場合にのみ使用できます。
データ伝送サービス (従量課金) サービス規約を読んで選択します。
[購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。
タスクリストでタスクの進行状況を確認できます。