このトピックでは、data Transmission Service (DTS) を使用して、ApsaraDB RDS for MySQLインスタンスからApsaraMQ for Kafkaインスタンスにデータを同期する方法について説明します。
前提条件
ソースApsaraDB RDS for MySQLインスタンスとターゲットApsaraMQ for Kafkaインスタンスが作成されます。
説明ApsaraDB RDS For MySQLインスタンスの作成方法の詳細については、「ApsaraDB RDS for MySQLインスタンスの作成」をご参照ください。
サポートされているインスタンスバージョンの詳細については、「データ同期シナリオの概要」をご参照ください。
ターゲットApsaraMQ for Kafkaインスタンスで、同期データを受信するトピックが作成されます。 詳細については、「ステップ3: リソースの作成」トピックのステップ1: トピックの作成セクションをご参照ください。
移行先ApsaraMQ for Kafkaインスタンスの使用可能なストレージ容量が、移行元ApsaraDB RDS for MySQLインスタンスのデータの合計サイズよりも大きいこと。
使用上の注意
DTSは、ソースデータベースからターゲットデータベースに外部キーを同期しません。 したがって、ソースデータベースでのカスケード操作と削除操作は、ターゲットデータベースと同期されません。
カテゴリ | 説明 |
ソースインスタンスの制限 |
|
その他の制限 |
|
特別なケース |
|
課金
同期タイプ | タスク設定料金 |
スキーマ同期と完全データ同期 | 無料です。 |
増分データ同期 | 有料。 詳細については、「課金の概要」をご参照ください。 |
単一レコードのサイズ制限
Kafkaに書き込むことができる1つのレコードの最大サイズは10 MBです。 したがって、ソースデータの行のサイズが10 MBを超えると、DTSがKafkaにレコードを書き込むことができないため、関連するDTSタスクが中断されます。 このシナリオでは、大きなフィールドを含むテーブル全体を同期するのではなく、テーブルの一部のフィールドのみを同期することをお勧めします。 DTSタスクを設定するときは、これらの大きなフィールドのレコードを除外する必要があります。 大きなフィールドを含むテーブルがタスクのオブジェクトに含まれている場合は、テーブルを削除し、オブジェクトにテーブルを再度追加してから、大きなフィールドを除外するフィルタ条件を指定する必要があります。
サポートしている同期トポロジ
一方向の 1 対 1 の同期
一方向の 1 対多の同期
一方向の多対 1 の同期
DTSでサポートされている同期トポロジの詳細については、「同期トポロジ」をご参照ください。
同期可能なSQL操作
操作タイプ | SQL文 |
DML | 挿入、更新、および削除 |
DDL |
|
手順
[データ同期タスク] ページに移動します。
DMSコンソールにログインします。
上部のナビゲーションバーで、データ开発ポインタを上に移動します
を選択します。
説明実際の動作は、DMSのモードおよびレイアウトに応じて変化し得る。 詳細については、「シンプルモード」および「DMSコンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。
に行くこともできます
右側にデータ同期タスク、データ同期インスタンスが存在するリージョンを選択します。
説明新しいDTSコンソールを使用する場合は、上部のナビゲーションバーでデータ同期タスクを作成するリージョンを選択します。
[タスクの作成] をクリックします。 [データ同期タスクの作成] ウィザードで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。
警告ソースデータベースとターゲットデータベースを設定した後、ページに表示される制限を読むことを推奨します。 そうしないと、タスクが失敗したり、データの不一致が発生します。
セクション
パラメーター
説明
非該当
タスク名
DTSタスクの名前。 タスク名は自動生成されます。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。
移行元データベース
既存の DMS データベースインスタンスを選択します。(任意です。DMS データベースインスタンスが未登録の場合は、このオプションを無視して、以下のセクションでデータベース設定を行ってください。)
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
移行元ディスクのタイプを設定します。 MySQL を選択します。
アクセス方法
ソースデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
ソースApsaraDB RDS for MySQLインスタンスが存在するリージョンです。
Alibaba Cloud アカウント間でデータを複製
Alibaba Cloudアカウント間でデータを同期するかどうかを指定します。 この例では、× が選択されています。
RDS インスタンス ID
ソースApsaraDB RDS for MySQLインスタンスのID。
データベースアカウント
ソースApsaraDB RDS for MySQLインスタンスのデータベースアカウント。 アカウントには、同期するオブジェクトに対する読み取り権限が必要です。
データベースのパスワード
データベースインスタンスへのアクセスに使用されるパスワード。
暗号化
データベースへの接続を暗号化するかどうかを指定します。 ビジネス要件に基づいて、[非暗号化] または [SSL暗号化] を選択できます。 このパラメーターをSSL暗号化に設定する場合、DTSタスクを設定する前に、ApsaraDB RDS for MySQLインスタンスのSSL暗号化を有効にする必要があります。 詳細については、「クラウド証明書を使用したSSL暗号化の有効化」をご参照ください。
移行先データベース
既存の DMS データベースインスタンスを選択します。(任意です。DMS データベースインスタンスが未登録の場合は、このオプションを無視して、以下のセクションでデータベース設定を行ってください。)
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
ターゲットデータベースのタイプ。 Kafkaを選択します。
アクセス方法
ターゲットデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
ターゲットApsaraMQ for Kafkaインスタンスが存在するリージョン。
Kafka インスタンス ID
移行先のApsaraMQ for KafkaインスタンスのID。
暗号化
Kafkaクラスターへの接続を暗号化するかどうかを指定します。 ビジネスとセキュリティの要件に基づいて、非暗号化 または SCRAM-SHA-256 を選択します。
トピック
同期されたデータの受信に使用されるトピック。 ドロップダウンリストからトピックを選択します。
DDL 情報を格納するトピック
DDL情報を格納するために使用されるトピック。 ドロップダウンリストからトピックを選択します。 このパラメーターを設定しない場合、DDL情報はトピックパラメーターで指定されたトピックに格納されます。
Kafka スキーマレジストリの使用の使用
Kafka Schema Registryを使用するかどうかを指定します。 Kafka Schema Registryは、メタデータの提供レイヤーを提供します。 Avroスキーマを保存および取得するためのRESTful APIを提供します。 有効な値:
×
○ このパラメーターで [はい] を選択した場合、AvroスキーマのKafka Schema Registryに登録されているURLまたはIPアドレスを入力する必要があります。
ページの下部で、接続をテストして続行 をクリックします。
説明DTSサーバーのCIDRブロックをソースデータベースとターゲットデータベースのセキュリティ設定に自動または手動で追加して、DTSサーバーからのアクセスを許可できるようにします。 詳細については、「DTSサーバーのCIDRブロックの追加」をご参照ください。
ソースデータベースまたはターゲットデータベースが自己管理データベースで、アクセス方法がAlibaba Cloud インスタンスに設定されていない場合、DTS サーバーの CIDR ブロック ダイアログボックスで 接続テスト をクリックします。
同期するオブジェクトと詳細設定を構成します。 次の表にパラメーターを示します。
パラメーター
説明
同期タイプ
同期タイプ。 デフォルトでは、増分データ同期が選択されています。 [スキーマ同期] および [完全データ同期] も選択する必要があります。 事前チェックが完了すると、DTSは選択したオブジェクトの履歴データをソースデータベースからターゲットクラスターに同期します。 履歴データは、その後の増分同期の基礎となる。
説明ターゲットデータベースがApsaraMQ for Kafkaインスタンスの場合、スキーマ同期は選択できません。
競合するテーブルの処理モード
エラーの事前チェックと報告: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットデータベースに同じテーブル名のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。
説明ソースデータベースとターゲットデータベースに同じ名前のテーブルが含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 詳細については、「マップオブジェクト名」をご参照ください。
エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。
警告エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
ソースデータベースとターゲットデータベースが同じスキーマを持ち、ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じ主キー値または一意キー値を持つ場合:
完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。
増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。
ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。 作業は慎重に行ってください。
Kafka のデータ形式
ApsaraMQ for Kafkaインスタンスにデータが格納される形式。
DTS Avroを選択した場合、データはDTS Avroのスキーマ定義に基づいて解析されます。 詳細については、『GitHub』をご参照ください。
Canal Jsonを選択した場合、データはCanal JSON形式で保存されます。 関連するパラメーターと例の詳細については、「Kafkaクラスターのデータ形式」トピックのCanal Jsonセクションをご参照ください。
Kafka 圧縮形式
Kafka圧縮データの圧縮形式。 ビジネス要件に基づいて圧縮形式を選択します。 有効な値:
LZ4 (デフォルト): 低圧縮比と高圧縮速度。
GZIP: 高い圧縮比と低い圧縮速度。
説明GZIP圧縮は大量のCPUリソースを消費します。
スナッピー: 中程度の圧縮比と中程度の圧縮速度。
Kafka パーティションへのデータ転送ポリシー
データをKafkaパーティションに同期するためのポリシー。 ビジネス要件に基づいてポリシーを選択します。 詳細については、「データをKafkaパーティションに移行するためのポリシーの指定」をご参照ください。
移行先インスタンスでのオブジェクト名の大文字化
ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化をソースまたはターゲットデータベースの大文字化と一致させることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから1つ以上のオブジェクトを選択し、アイコンをクリックして 選択中のオブジェクト セクションにオブジェクトを追加します。
説明同期するオブジェクトとしてテーブルを選択できます。
選択中のオブジェクト
この例では、このパラメーターを設定する必要はありません。 オブジェクト名マッピング機能を使用して、ソーステーブルのデータが同期されるトピックの名前、トピック内のパーティション数、パーティションキーなど、ターゲットApsaraMQ for Kafkaインスタンスに関する情報を指定できます。 詳細については、このトピックの「オブジェクト名マッピング機能の使用」をご参照ください。
説明特定のデータベースまたはテーブルで実行されるSQL操作を選択するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、同期するSQL操作を選択します。 同期できるSQL文の詳細については、このトピックの「同期できるSQL操作」をご参照ください。
オブジェクト名マッピング機能を使用してオブジェクトの名前を変更すると、そのオブジェクトに依存する他のオブジェクトの同期が失敗する可能性があります。
[次へ:詳細設定] をクリックして詳細設定を構成します。
パラメーター
説明
タスクのスケジュールに使用する専用クラスターの選択
デフォルトでは、DTSはタスクを共有クラスターにスケジュールします。 このパラメーターを設定する必要はありません。 データ同期タスクの安定性を向上させたい場合は、専用クラスターを購入してください。 詳細については、「DTS専用クラスターの概要」をご参照ください。
アラートの設定
データ同期タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、同期レイテンシが指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:
No: アラートを有効にしません。
Yes: アラートを設定します。 この場合、アラートしきい値と
アラート通知設定 詳細については、「モニタリングとアラートの設定」トピックの「DTSタスクを作成するときのモニタリングとアラートの設定」をご参照ください。
失敗した接続の再試行時間
失敗した接続のリトライ時間範囲。 データ同期タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 このパラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。
説明ソースまたはターゲットデータベースが同じである複数のデータ同期タスクに対して異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。
DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
その他の問題の再試行時間範囲。 たとえば、データ同期タスクの開始後にDDLまたはDML操作の実行に失敗した場合、DTSはその時間範囲内ですぐに操作を再試行します。 有効な値: 1 ~ 1440 単位は分です。 デフォルト値は 10 です。 このパラメーターを10より大きい値に設定することを推奨します。 指定された時間範囲内で失敗した操作が正常に実行されると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメーターの値は、失敗した接続の再試行時間 パラメーターの値よりも小さくする必要があります。
完全移行率を制限するかどうか
完全データ同期中、DTSはソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを使用します。 これにより、データベースサーバーの負荷が増加する可能性があります。 1 秒あたりのソースデータベースのクエリ率 QPS 、1 秒あたりの完全移行の行数 RPS、および1 秒あたりの完全移行データ量 (MB) BPS パラメーターを設定して、ターゲットデータベースサーバーの負荷を軽減できます。
説明このパラメーターは、同期タイプ パラメーターで 完全データ同期 が選択されている場合にのみ表示されます。
増分同期率を制限するかどうか
増分データ同期のスロットリングを有効にするかどうかを指定します。 ビジネス要件に基づいて、増分データ同期のスロットリングを有効にできます。 スロットリングを設定するには、1 秒あたりの増分同期の行数 RPSおよび1 秒あたりの増分同期データ量 (MB) BPS パラメーターを設定する必要があります。 これにより、移行先データベースサーバーの負荷が軽減されます。
環境タグ
DTSインスタンスを識別するために使用される環境タグ。 ビジネス要件に基づいて環境タグを選択できます。 この例では、このパラメーターを設定する必要はありません。
ETL の設定
抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 詳細については、「」をご参照ください。ETLとは何ですか? 有効な値:
Yes: ETL機能を設定します。 コードエディターでデータ処理ステートメントを入力できます。 詳細については、「データ移行またはデータ同期タスクでのETLの設定」をご参照ください。
No: ETL機能を設定しません。
順方向および逆方向タスクのハートビートテーブル sql を削除
DTSインスタンスの実行中に、ハートビートテーブルのSQL操作をソースデータベースに書き込むかどうかを指定します。 有効な値:
Yes: ハートビートテーブルにSQL操作を書き込みません。 この場合、DTSインスタンスのレイテンシが表示され得る。
No: ハートビートテーブルにSQL操作を書き込みます。 この場合、ソースデータベースの物理バックアップやクローニングなどの機能が影響を受ける可能性があります。
タスク設定を保存し、事前チェックを実行します。
関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示するには、ポインターを 次:タスク設定の保存と事前チェック に移動し、OpenAPI パラメーターのプレビュー をクリックします。
パラメーターを表示または表示する必要がない場合は、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、プレチェックを再実行します。
事前チェック中にアイテムに対してアラートがトリガーされた場合:
アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
成功率が100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。
購入ページで、データ同期インスタンスの課金方法とインスタンスクラスのパラメーターを設定します。 下表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
Billing Method
サブスクリプション: データ同期インスタンスの作成時にサブスクリプションの料金を支払います。 サブスクリプションの課金方法は、長期使用の場合、従量課金の課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは1時間ごとに課金されます。 従量課金方法は、短期使用に適しています。 従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。
リソースグループの設定
データ同期インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「」をご参照ください。リソース管理とは
インスタンスクラス
DTSは、同期速度が異なるインスタンスクラスを提供します。 ビジネス要件に基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプションの課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1〜9か月、1年、2年、3年、または5年とすることができる。
説明このパラメーターは、サブスクリプション の課金方法を選択した場合にのみ使用できます。
読み取りと選択データ伝送サービス (従量課金) サービス規約.
[購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。
タスクリストでタスクの進行状況を確認できます。
オブジェクト名マッピング機能の使用
選択中のオブジェクト セクションで、ポインターをトピック名の上に移動します。
編集 を右クリックします。
テーブルの編集 ダイアログボックスで、次の表に示すパラメーターを設定します。
パラメーター
説明
テーブル名
ソーステーブルのデータが同期されるトピックの名前。 既定では、このパラメーターは、ソースデータベースとターゲットデータベースの設定 ステップの 移行先データベース セクションの トピック の値に設定されます。
重要ターゲットデータベースがApsaraMQ for Kafkaインスタンスの場合、トピックはターゲットApsaraMQ for Kafkaインスタンスに存在する必要があります。 そうでない場合、データ同期は失敗します。
ソーステーブルのテーブル名パラメーターの値をターゲットデータベースのトピックの名前に変更すると、テーブル内のデータが指定されたトピックに書き込まれます。
フィルタリング条件
詳細については、「フィルター条件の指定」をご参照ください。
パーティション数
データが同期されるトピック内のパーティションの数。
パーティションキー
Kafka パーティションへのデータ転送ポリシー パラメーターを 主キーのハッシュ値に基づいて、データを個別のパーティションに転送 に設定した場合、このパラメーターを設定して、ハッシュ値を計算するためのパーティションキーとして1つ以上の列を指定します。 DTSは、計算されたハッシュ値に基づいて、宛先トピックの各パーティションに異なる行を配信します。
説明列のパーティションキーを選択するには、まず すべてのテーブルを同期 をオフにする必要があります。
[OK] をクリックします。