Alibaba Cloud データ転送サービス (DTS) コンソールでデータ同期タスクを作成して、ApsaraDB RDS for MySQL インスタンスから Function Compute の関数に増分データをリアルタイムまたは定期的に同期できます。その後、Function Compute でコードを記述して、同期された増分データの処理(データの分析や保存など)を実行できます。
前提条件
ソース ApsaraDB RDS for MySQL インスタンスが作成されていること。詳細については、「ApsaraDB RDS for MySQL インスタンスを作成する」をご参照ください。
宛先サービスと関数が作成されており、関数の [ハンドラータイプ] パラメーターが [イベントハンドラー] に設定されていること。関数の作成方法の詳細については、「関数をすばやく作成する」をご参照ください。
使用上の注意
カテゴリ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
特別な場合 |
|
課金
同期タイプ | タスク構成料金 |
完全同期 | 無料。 |
増分同期 | 有料。詳細については、「課金の概要」をご参照ください。 |
データ同期をサポートする SQL 操作
操作タイプ | SQL 文 |
DML | INSERT、UPDATE、および DELETE |
DDL |
|
データベースアカウントに必要な権限
データベース | 必要な権限 | 参照 |
ソース ApsaraDB RDS for MySQL インスタンス | 同期対象のオブジェクトに対する読み取り権限。 | 「アカウントを作成する」および「アカウント権限を変更する」。 |
使用するソースデータベースアカウントが ApsaraDB RDS コンソールで作成および承認されていない場合は、アカウントに REPLICATION CLIENT、REPLICATION SLAVE、SHOW VIEW、および SELECT の権限があることを確認してください。
手順
次のいずれかの方法を使用して [データ同期] ページに移動し、データ同期インスタンスが存在するリージョンを選択します。
DTS コンソール
DTS コンソール にログインします。
左側のナビゲーションウィンドウで、データ同期 をクリックします。
ページの左上隅で、データ同期タスクが存在するリージョンを選択します。
DMS コンソール
説明実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。
DMS コンソール にログインします。
上部のナビゲーションバーで、[データ + AI] にポインターを移動し、 を選択します。
データ同期タスク の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。
タスクの作成 をクリックして、タスク構成ページに移動します。
ソースデータベースと宛先データベースを構成します。次の表にパラメーターを示します。
セクション
パラメーター
説明
該当なし
タスク名
DTS タスクの名前。DTS はタスク名を自動的に生成します。タスクを簡単に識別できるわかりやすい名前を指定することをお勧めします。一意のタスク名を指定する必要はありません。
移行元データベース
[既存の接続を選択]
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。詳細については、「データベース接続を管理する」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できなかった場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
ソースデータベースのタイプ。 MySQL を選択します。
アクセス方法
ソースデータベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
ソース ApsaraDB RDS for MySQL インスタンスが存在するリージョン。
Alibaba Cloud アカウント間でデータを複製
Alibaba Cloud アカウント間でデータを同期するかどうかを指定します。この例では、× が選択されています。
RDS インスタンス ID
ソース ApsaraDB RDS for MySQL インスタンスの ID。
データベースアカウント
ソース ApsaraDB RDS for MySQL インスタンスのデータベースアカウント。アカウントに必要な権限については、このトピックの「データベースアカウントに必要な権限」セクションをご参照ください。
データベースのパスワード
データベースへのアクセスに使用するパスワード。
暗号化
データベースへの接続を暗号化するかどうかを指定します。ビジネス要件に基づいて、[非暗号化] または [SSL 暗号化] を選択できます。このパラメーターを [SSL 暗号化] に設定する場合は、DTS タスクを構成する前に、ApsaraDB RDS for MySQL インスタンスで SSL 暗号化を有効にする必要があります。詳細については、「クラウド証明書を使用して SSL 暗号化を有効にする」をご参照ください。
移行先データベース
[既存の接続を選択]
DTS に登録されているデータベースインスタンスを使用する場合は、ドロップダウンリストからインスタンスを選択します。DTS は、インスタンスの次のデータベースパラメーターを自動的に入力します。詳細については、「データベース接続を管理する」をご参照ください。
説明DMS コンソールでは、[DMS データベースインスタンスを選択] ドロップダウンリストからデータベースインスタンスを選択できます。
インスタンスを DTS に登録できなかった場合、または DTS に登録されているインスタンスを使用する必要がない場合は、次のデータベース情報を構成する必要があります。
データベースタイプ
宛先データベースのタイプ。 Function Compute (FC) を選択します。
アクセス方法
宛先データベースのアクセス方法。 Alibaba Cloud インスタンス を選択します。
インスタンスのリージョン
宛先データベースが存在するリージョン。デフォルトでは、ソースデータベースの インスタンスのリージョン パラメーターと同じ値で、変更できません。
サービス
宛先関数が属するサービスの名前。
関数
同期されたデータを受信する宛先関数。
サービスのバージョンとエイリアス
サービスのバージョンまたはエイリアス。ビジネス要件に基づいてこのパラメーターを構成します。
デフォルトバージョン を選択した場合、サービスバージョン パラメーターの値は [LATEST] に固定されます。
バージョンの指定 を選択した場合、サービスバージョン パラメーターを構成する必要があります。
エイリアスの指定 を選択した場合、サービスのエイリアス パラメーターを構成する必要があります。
説明Function Compute の用語の詳細については、「用語」をご参照ください。
ページの下部にある [接続をテストして続行] をクリックします。
ソースデータベースまたは宛先データベースが ApsaraDB RDS for MySQL インスタンスや ApsaraDB for MongoDB インスタンスなどの Alibaba Cloud データベースインスタンスの場合、DTS は DTS サーバーの CIDR ブロックをインスタンスのホワイトリストに自動的に追加します。ソースデータベースまたは宛先データベースが Elastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTS は DTS サーバーの CIDR ブロックを ECS インスタンスのセキュリティグループルールに自動的に追加します。ECS インスタンスがデータベースにアクセスできることを確認する必要があります。データベースが複数の ECS インスタンスにデプロイされている場合は、DTS サーバーの CIDR ブロックを各 ECS インスタンスのセキュリティグループルールに手動で追加する必要があります。ソースデータベースまたは宛先データベースがデータセンターにデプロイされているか、サードパーティのクラウド サービス プロバイダーによって提供されている自己管理データベースの場合は、DTS サーバーの CIDR ブロックをデータベースのホワイトリストに手動で追加して、DTS がデータベースにアクセスできるようにする必要があります。詳細については、「DTS サーバーの CIDR ブロックを追加する」トピックの「DTS サーバーの CIDR ブロック」セクションをご参照ください。
警告DTS サーバーの CIDR ブロックがデータベースまたはインスタンスのホワイトリスト、または ECS セキュリティグループルールに自動または手動で追加されると、セキュリティリスクが発生する可能性があります。したがって、DTS を使用してデータを同期する前に、潜在的なリスクを理解して認識し、予防措置を講じる必要があります。これには、ユーザー名とパスワードのセキュリティ強化、公開ポートの制限、API 呼び出しの認証、ホワイトリストまたは ECS セキュリティグループルールの定期的な確認と不正な CIDR ブロックの禁止、Express Connect、VPN Gateway、または Smart Access Gateway を使用したデータベースと DTS の接続などが含まれますが、これらに限定されません。
同期するオブジェクトを構成します。
オブジェクト設定 ステップで、同期するオブジェクトを構成します。
パラメーター
説明
同期タイプ
デフォルトでは、増分データ同期 が選択されています。既存のデータを同期する場合は、完全データ同期 も選択する必要があります。
データ形式
宛先関数に同期されるデータの保存形式。 [Canal Json] 形式のみがサポートされています。
説明Canal JSON 形式のパラメーターの詳細については、「Kafka クラスタのデータ形式」トピックの「Canal Json」セクションをご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから 1 つ以上のオブジェクトを選択し、
アイコンをクリックして、選択中のオブジェクト セクションにオブジェクトを追加します。説明同期対象のオブジェクトとしてデータベースまたはテーブルを選択します。
選択中のオブジェクト
選択中のオブジェクト セクションで、同期するデータを確認します。
説明選択したオブジェクトを削除するには、選択中のオブジェクト セクションで削除するオブジェクトを選択し、
アイコンをクリックします。次へ:詳細設定 をクリックして、詳細設定を構成します。
パラメーター
説明
タスクのスケジュールに使用する専用クラスターの選択
デフォルトでは、専用クラスターを指定しない場合、DTS は共有クラスターにタスクをスケジュールします。データ同期インスタンスの安定性を向上させるには、専用クラスターを購入します。詳細については、「DTS 専用クラスターとは」をご参照ください。
失敗した接続の再試行時間
接続失敗時のリトライ時間の範囲。データ同期タスクの開始後にソースデータベースまたは宛先データベースに接続できない場合、DTS は指定された時間範囲内で直ちに接続を再試行します。有効値: 10 ~ 1440。単位: 分。デフォルト値: 720。このパラメーターは 30 より大きい値に設定することをお勧めします。DTS が指定された時間範囲内にソースデータベースと宛先データベースに再接続すると、DTS はデータ同期タスクを再開します。そうでない場合、データ同期タスクは失敗します。
説明ソースデータベースまたは宛先データベースが同じ複数のデータ同期タスクに異なるリトライ時間の範囲を指定した場合、最短のリトライ時間の範囲が優先されます。
DTS が接続を再試行すると、DTS インスタンスの料金が発生します。ビジネス要件に基づいてリトライ時間の範囲を指定することをお勧めします。ソースインスタンスと宛先インスタンスが解放された後、できるだけ早く DTS インスタンスを解放することもできます。
移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。
その他の問題のリトライ時間の範囲。たとえば、データ同期タスクの開始後に DDL または DML 操作を実行できなかった場合、DTS は指定された時間範囲内で直ちに操作を再試行します。有効値: 1 ~ 1440。単位: 分。デフォルト値: 10。このパラメーターは 10 より大きい値に設定することをお勧めします。失敗した操作が指定された時間範囲内で正常に実行されると、DTS はデータ同期タスクを再開します。そうでない場合、データ同期タスクは失敗します。
重要移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメーターの値は、失敗した接続の再試行時間 パラメーターの値よりも小さくなければなりません。
完全同期レートを制限するかどうか
完全同期中、DTS はソースデータベースと宛先データベースの読み取りおよび書き込みリソースを使用します。これにより、データベースサーバーの負荷が増加する可能性があります。完全同期タスクの 1 秒あたりのソースデータベースのクエリ率 QPS、1 秒あたりの完全移行の行数 RPS、および 1 秒あたりの完全移行データ量 (MB) BPS パラメーターを構成して、宛先データベースサーバーの負荷を軽減できます。
説明完全データ同期 が 同期タイプ パラメーターに選択されている場合にのみ、このパラメーターを構成できます。
増分同期率を制限するかどうか
増分同期のスロットリングを有効にするかどうかを指定します。ビジネス要件に基づいて、増分同期のスロットリングを有効にできます。スロットリングを構成するには、1 秒あたりの増分同期の行数 RPS および 1 秒あたりの増分同期データ量 (MB) BPS パラメーターを構成する必要があります。これにより、宛先データベースサーバーの負荷が軽減されます。
[順方向タスクと逆方向タスクのハートビートテーブルの SQL 操作を削除するかどうか]
DTS インスタンスの実行中に、ハートビートテーブルの SQL 操作をソースデータベースに書き込むかどうかを指定します。有効値:
[はい]: ハートビートテーブルの SQL 操作を書き込みません。この場合、DTS インスタンスのレイテンシが表示される場合があります。
[いいえ]: ハートビートテーブルの SQL 操作を書き込みます。この場合、ソースデータベースの物理バックアップやクローニングなどの機能が影響を受ける可能性があります。
環境タグ
DTS インスタンスを識別するために使用される環境タグ。ビジネス要件に基づいて環境タグを選択できます。この例では、環境タグは選択されていません。
ETL の設定
ETL (抽出・変換・書き出し) 機能を有効にするかどうかを指定します。詳細については、「ETL とは」をご参照ください。有効値:
[はい]: ETL 機能を構成します。コードエディターにデータ処理ステートメントを入力できます。詳細については、「データ移行またはデータ同期タスクで ETL を構成する」をご参照ください。
[いいえ]: ETL 機能を構成しません。
[監視とアラート]
データ同期インスタンスのアラートを構成するかどうかを指定します。タスクが失敗した場合、または同期レイテンシが指定されたしきい値を超えた場合、アラート連絡先に通知が送信されます。有効値:
[いいえ]: アラートを有効にしません。
[はい]: アラートを構成します。この場合、アラートしきい値と アラート通知設定 も構成する必要があります。詳細については、「監視とアラート」トピックの「DTS タスクを作成するときに監視とアラートを構成する」セクションをご参照ください。
タスク設定を保存し、事前チェックを実行します。
関連する API 操作を呼び出して DTS タスクを構成するときに指定するパラメーターを表示するには、次:タスク設定の保存と事前チェック にポインターを移動し、OpenAPI パラメーターのプレビュー をクリックします。
パラメーターを表示する必要がない場合、またはすでに表示している場合は、ページの下部にある 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTS は事前チェックを実行します。タスクが事前チェックに合格した後にのみ、データ同期タスクを開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。チェック結果に基づいて原因を分析した後、問題をトラブルシューティングします。次に、事前チェックを再実行します。
事前チェック中に項目のアラートがトリガーされた場合:
アラート項目を無視できない場合は、失敗した項目の横にある [詳細の表示] をクリックして、問題をトラブルシューティングします。次に、事前チェックを再度実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。表示されるメッセージで、[OK] をクリックします。次に、[再チェック] をクリックして、事前チェックを再度実行します。アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
インスタンスを購入します。
[成功率] が [100%] になるまで待ちます。次に、[次へ: インスタンスの購入] をクリックします。
[購入] ページで、データ同期タスクの [課金方法] および [インスタンスクラス] パラメーターを構成します。次の表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
課金方法
サブスクリプション: データ同期インスタンスを作成するときにサブスクリプションの料金を支払います。サブスクリプション課金方法は、長期使用の場合、従量課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは 1 時間単位で課金されます。従量課金方法は、短期使用に適しています。従量課金データ同期インスタンスが不要になった場合は、インスタンスを解放してコストを削減できます。
リソースグループ設定
データ同期インスタンスが属するリソースグループ。デフォルト値: [デフォルトのリソースグループ]。詳細については、「Resource Management とは」をご参照ください。
インスタンスクラス
DTS は、同期速度が異なるインスタンスクラスを提供します。ビジネス要件に基づいてインスタンスクラスを選択できます。詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプション課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。サブスクリプション期間は、1 ~ 9 か月、1 年、2 年、3 年、または 5 年にすることができます。
説明このパラメーターは、サブスクリプション 課金方法を選択した場合にのみ使用できます。
[データ転送サービス (従量課金) サービス利用規約] を読んで選択します。
[購入して開始] をクリックします。表示されるダイアログボックスで、OK をクリックします。
タスクリストでタスクの進捗状況を確認できます。
宛先関数が受信するデータの形式
宛先関数が受信するデータは、Object タイプです。ソースデータベースの増分データは、配列形式の Records フィールドに保存されます。配列の各要素は、Object タイプのデータレコードを示します。次の表に、Object タイプのデータレコードのフィールドを示します。
宛先関数は、次の 2 種類の SQL 操作を記録するデータを受信します。
DDL データ: データスキーマの変更に関する操作。
DML データ: データ管理に関する操作。
フィールド | タイプ | 説明 |
| ブール値 | 操作が DDL 操作かどうかを示します。有効値:
|
| 文字列 | SQL 操作のタイプ。
重要 完全同期中は、フィールドの値は INIT に固定されます。 |
| 文字列 | ソース MySQL データベースの名前。 |
| 文字列 | ソース MySQL データベース内のテーブルの名前。 |
| 文字列 | テーブルに含まれるプライマリキーの名前。 |
| Long | ソースデータベースで操作が実行された時刻。値は 13 桁の UNIX タイムスタンプです。単位: ミリ秒。 説明 検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。 |
| Long | 宛先データベースで操作の実行が開始された時刻。値は 13 桁の UNIX タイムスタンプです。単位: ミリ秒。 説明 検索エンジンを使用して、UNIX タイムスタンプコンバーターを取得できます。 |
| Object 配列 | Object タイプの要素が 1 つだけ含まれる配列。要素のキーは列名、要素の値は列に含まれる値です。 |
old | Object 配列 | 元のデータが保存されている配列。フィールドの形式は、data フィールドの形式と同じです。 重要 このフィールドは、 |
sql | 文字列 |
|
| Int | 操作のシリアル番号。 |
DDL 操作と宛先関数が受信するデータの例
テーブルを変更する
SQL 文
ALTER TABLE `demoTable`
ADD COLUMN `address` varchar(20) NULL AFTER `sex`
;宛先関数が受信するデータ
{
'Records': [{
'type': 'DDL',
'serverId': '27142679',
'es': 1690000000000,
'sql': '/* Query from DMS-WEBSQL-0-Eid_15682857722282385K by user 14xxxxxxxx */ ALTER TABLE `demoDatabase`.`DDL` \n\tADD COLUMN `address` varchar(20) NULL AFTER `sex`',
'database': 'demoDatabase',
'id': 63151,
'isDdl': True,
'table': 'demoTable',
'ts': 1690000000000
}]
}DML 操作と宛先関数が受信するデータの例
データを挿入する
SQL 文
INSERT INTO demoTable VALUES("xiaoming", 10, "man");宛先関数が受信するデータ
{
'Records': [{
'data': [{
'sex': 'man',
'name': 'xiaoming',
'age': '10'
}],
'pkNames': ['name'],
'type': 'INSERT',
'serverId': '27142678',
'es': 1690000000000,
'sql': '',
'database': 'demoDatabase',
'sqlType': {
'sex': 253,
'name': 253,
'age': 3
},
'mysqlType': {
'sex': 'varchar',
'name': 'varchar',
'age': 'int'
},
'id': 62051,
'isDdl': False,
'table': 'demoTable',
'ts': 1690000000000
}]
}データを更新する
SQL 文
UPDATE `demoDatabase`.`demoTable` SET `age`=11 WHERE `name`='xiaoming';宛先関数が受信するデータ
{
'Records': [{
'data': [{
'sex': 'man',
'name': 'xiaoming',
'age': '11'
}],
'pkNames': ['name'],
'old': [{
'sex': 'man',
'name': 'xiaoming',
'age': '10'
}],
'type': 'UPDATE',
'serverId': '27142679',
'es': 1690000000000,
'sql': '',
'database': 'demoDatabase',
'sqlType': {
'sex': 253,
'name': 253,
'age': 3
},
'mysqlType': {
'sex': 'varchar',
'name': 'varchar',
'age': 'int'
},
'id': 62373,
'isDdl': False,
'table': 'demoTable',
'ts': 1690000000000
}]
}データを削除する
SQL 文
DELETE FROM `demoDatabase`.`demoTable` WHERE `name`='xiaoming';宛先関数が受信するデータ
{
'Records': [{
'data': [{
'sex': 'man',
'name': 'xiaoming',
'age': '11'
}],
'pkNames': ['name'],
'type': 'DELETE',
'serverId': '27142679',
'es': 1690000000000,
'sql': '',
'database': 'demoDatabase',
'sqlType': {
'sex': 253,
'name': 253,
'age': 3
},
'mysqlType': {
'sex': 'varchar',
'name': 'varchar',
'age': 'int'
},
'id': 62635,
'isDdl': False,
'table': 'demoTable',
'ts': 1690000000000
}]
}関連情報
データ同期シナリオの詳細については、「データ同期シナリオの概要」をご参照ください。
同期する単一のデータが 16 MB より大きい場合、DTS タスクでエラーが報告されます。同期対象のオブジェクトを変更するか、ETL 機能を使用して大きなフィールドのデータをフィルタリングできます。詳細については、「同期対象のオブジェクトを変更する」をご参照ください。
ビジネス要件に基づいて関数コードを記述します。詳細については、「概要」をご参照ください。