このトピックでは、Tunnel Service、DataWorks、または DataX を使用して、Tablestore 内の 1 つのテーブルから別のテーブルにデータを同期する方法について説明します。
前提条件
宛先テーブルが作成されています。宛先テーブルには、ソーステーブルから同期する列が含まれている必要があります。詳細については、「Tablestore コンソールでワイドカラムモデルを使用する」トピックの手順 3: データテーブルを作成するセクションを参照してください。
アカウントとリージョンをまたいでデータを移行する場合は、DataX を使用してインターネットまたはクラウドエンタープライズネットワーク (CEN) 経由で仮想プライベートクラウド (VPC) に接続します。CEN の使用方法については、概要を参照してください。
Tunnel Service を使用してデータを同期する
ソーステーブルのトンネルが作成された後、Tablestore SDK を使用して、ソーステーブルから宛先テーブルにデータを同期できます。同期中にビジネスに合わせてデータを処理するためのカスタムロジックを指定できます。
前提条件
使用するエンドポイントが取得されています。詳細については、OTSClient インスタンスを初期化するを参照してください。
AccessKey ペアが構成されています。詳細については、OTSClient インスタンスを初期化するを参照してください。
AccessKey ペアが環境変数に構成されています。詳細については、OTSClient インスタンスを初期化するを参照してください。
OTS_AK_ENV 環境変数は、Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID を示します。OTS_SK_ENV 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret を示します。ビジネス要件に基づいて AccessKey ペアを指定します。
手順
Tablestore コンソールで、または Tablestore SDK を使用して、ソーステーブルのトンネルを作成し、トンネル ID を記録します。詳細については、クイックスタートまたはTablestore SDK を使用して Tunnel Service を使用するを参照してください。
Tablestore SDK を使用してデータを同期します。
サンプルコード:
public class TunnelTest { public static void main(String[] args){ String accessKeyId = System.getenv("OTS_AK_ENV"); String accessKeySecret = System.getenv("OTS_SK_ENV"); TunnelClient tunnelClient = new TunnelClient("endpoint", accessKeyId,accessKeySecret,"instanceName"); TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // トンネルIDは、Tablestoreコンソールの「トンネル」タブで表示するか、describeTunnelRequest操作を呼び出してクエリできます。 TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); worker.shutdown(); tunnelClient.shutdown(); } } public static class SimpleProcessor implements IChannelProcessor{ // トンネルを宛先テーブルに接続します。 TunnelClient tunnelClient = new TunnelClient("endpoint", "accessKeyId","accessKeySecret","instanceName"); @Override public void process(ProcessRecordsInput processRecordsInput) { // 増分データまたはフルデータがProcessRecordsInputで返されます。 List<StreamRecord> list = processRecordsInput.getRecords(); for(StreamRecord streamRecord : list){ switch (streamRecord.getRecordType()){ case PUT: // ビジネスに合わせてデータを処理するために使用するカスタムロジックを指定します。 //putRow break; case UPDATE: //updateRow break; case DELETE: //deleteRow break; } System.out.println(streamRecord.toString()); } } @Override public void shutdown() { } } }
DataWorks または DataX を使用してデータを同期する
DataWorks または DataX を使用して、ソーステーブルから宛先テーブルにデータを同期できます。このセクションでは、DataWorks を使用してデータを同期する方法について説明します。
手順 1: Tablestore データソースを追加する
ソーステーブルと宛先テーブルの Tablestore インスタンスをデータソースとして追加します。
データ統合ページに移動します。
DataWorks コンソールにログオンし、左上隅でリージョンを選択し、 を選択し、ドロップダウンリストからワークスペースを選択し、データ統合に移動をクリックします。
左側のナビゲーションペインで、データソースをクリックします。
データソースページで、データソースを追加をクリックします。
データソースを追加ダイアログボックスで、Tablestoreブロックをクリックします。
OTS データソースを追加ダイアログボックスで、次の表に示すパラメータを構成します。
パラメータ
説明
データソース名
データソースの名前。名前には文字、数字、アンダースコア (_) を使用でき、文字で始める必要があります。
データソースの説明
データソースの説明。説明は 80 文字以下にする必要があります。
エンドポイント
Tablestore インスタンスのエンドポイント。詳細については、エンドポイントを参照してください。
Tablestore インスタンスと宛先データソースのリソースが同じリージョンにある場合は、仮想プライベートクラウド (VPC) エンドポイントを入力します。それ以外の場合は、パブリックエンドポイントを入力します。
Table Store インスタンス名
Tablestore インスタンスの名前。詳細については、インスタンスを参照してください。
AccessKey ID
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret。AccessKey ペアの作成方法については、AccessKey ペアを作成するを参照してください。
AccessKey Secret
データソースと選択したリソースグループ間のネットワーク接続をテストします。
同期ノードが想定どおりに実行されるようにするには、データソースと、同期ノードが実行されるすべてのリソースグループタイプ間の接続をテストする必要があります。
重要同期タスクは、1 つのタイプのリソースグループのみを使用できます。デフォルトでは、データ統合の共有リソースグループのみがリソースグループリストに表示されます。データ同期の安定性とパフォーマンスを確保するために、データ統合専用の排他的リソースグループを使用することをお勧めします。
購入をクリックして新しいリソースグループを作成するか、購入したリソースグループを関連付けるをクリックして既存のリソースグループを関連付けます。詳細については、データ統合専用の排他的リソースグループを作成して使用するを参照してください。
リソースグループが起動した後、リソースグループの接続ステータス (本番環境)列のネットワーク接続をテストをクリックします。
接続済みと表示された場合、接続テストは合格です。
データソースがネットワーク接続テストに合格した場合は、完了をクリックします。
新しく作成されたデータソースがデータソースリストに表示されます。
手順 2: 同期ノードを作成する
DataStudioコンソールに移動します。
DataWorks コンソールにログオンし、左上隅でリージョンを選択し、 を選択し、ドロップダウンリストからワークスペースを選択し、Datastudio に移動をクリックします。
DataStudio コンソールのスケジュールされたワークフローページで、ビジネスフローをクリックしてビジネスフローを選択します。
ワークフローの作成方法については、ワークフローを作成するを参照してください。
データ統合ノードを右クリックし、ノードを作成 > オフライン同期を選択します。
ノードを作成ダイアログボックスで、パスを選択し、ノード名を入力します。
確認をクリックします。
新しく作成されたオフライン同期ノードがデータ統合ノードの下に表示されます。
手順 3: オフライン同期タスクを構成して実行する
データ統合の下にある新しい同期ノードをダブルクリックします。
リソースグループとデータソース間のネットワーク接続を確立します。
データ同期タスクのソースデータソースと宛先データソース、およびデータ同期タスクの実行に使用するリソースグループを選択します。リソースグループとデータソース間のネットワーク接続を確立し、接続をテストします。
重要データ同期タスクは、リソースグループを使用して実行されます。リソースグループを選択し、リソースグループとデータソース間のネットワーク接続が確立されていることを確認します。
ネットワーク接続とリソースグループを構成する手順で、ソースドロップダウンリストからTablestoreを選択し、データソース名パラメータを作成したソースデータソースに設定します。
リソースグループドロップダウンリストからリソースグループを選択します。
リソースグループを選択すると、システムはリソースグループのリージョンと仕様を表示します。システムは、リソースグループとソースデータソース間の接続を自動的にテストします。
重要リソースグループが、データソースの作成時に選択したものと同じであることを確認します。
宛先ドロップダウンリストからTablestoreを選択し、データソース名パラメータを新しい宛先データソースに設定します。
システムは、リソースグループと宛先データソース間の接続を自動的にテストします。
次へをクリックします。
表示されるメッセージで、スクリプトモードを使用をクリックします。
重要Tablestore はスクリプトモードのみをサポートしています。ウィザードモードを使用してデータソースを構成できない場合は、スクリプトモードを使用してバッチ同期タスクを構成します。
タスクをスクリプトモードに切り替えると、ウィザードモードに戻すことはできません。
タスクを構成して保存します。
フルデータを同期するには、Tablestore Reader と Tablestore Writer を使用する必要があります。スクリプトの構成方法については、Tablestore データソースを参照してください。
タスクを構成する手順でスクリプトを変更します。
Tablestore Reader を構成する
Tablestore Reader は Tablestore からデータを読み取ります。データ範囲を指定して、Tablestore から増分データを抽出できます。詳細については、付録: Tablestore Reader のコードとパラメータを参照してください。
Tablestore Writer を構成する
Tablestore SDK for Java を使用して、Tablestore Writer は Tablestore サーバーに接続し、データを Tablestore サーバーに書き込みます。Tablestore Writer は、書き込みタイムアウト時の再試行、書き込み例外時の再試行、バッチ送信など、書き込みプロセスを最適化するための機能をユーザーに提供します。詳細については、付録: Tablestore Writer のコードとパラメータを参照してください。
Ctrl + S を押してスクリプトを保存します。
説明スクリプトを保存しないと、後続の操作を実行するときにメッセージが表示されます。この場合は、メッセージのOKをクリックして保存します。
同期タスクを実行します。
説明ほとんどの場合、フルデータの同期は一度だけ行う必要があり、スケジューリングプロパティを構成する必要はありません。
アイコンをクリックします。
パラメータダイアログボックスで、ドロップダウンリストからリソースグループの名前を選択します。
実行をクリックします。
スクリプトが実行された後、ランタイムログタブの Detail log url の横にあるリンクをクリックします。詳細なランタイムログページで、
現在のタスクステータス
の値を確認します。現在のタスクステータス
の値がFINISHの場合、タスクは完了です。