すべてのプロダクト
Search
ドキュメントセンター

Tablestore:Tablestore のテーブル間でデータを同期する

最終更新日:Dec 28, 2024

このトピックでは、Tunnel Service、DataWorks、または DataX を使用して、Tablestore 内の 1 つのテーブルから別のテーブルにデータを同期する方法について説明します。

前提条件

宛先テーブルが作成されています。宛先テーブルには、ソーステーブルから同期する列が含まれている必要があります。詳細については、「Tablestore コンソールでワイドカラムモデルを使用する」トピックの手順 3: データテーブルを作成するセクションを参照してください。

説明

アカウントとリージョンをまたいでデータを移行する場合は、DataX を使用してインターネットまたはクラウドエンタープライズネットワーク (CEN) 経由で仮想プライベートクラウド (VPC) に接続します。CEN の使用方法については、概要を参照してください。

Tunnel Service を使用してデータを同期する

ソーステーブルのトンネルが作成された後、Tablestore SDK を使用して、ソーステーブルから宛先テーブルにデータを同期できます。同期中にビジネスに合わせてデータを処理するためのカスタムロジックを指定できます。

前提条件

  • 使用するエンドポイントが取得されています。詳細については、OTSClient インスタンスを初期化するを参照してください。

  • AccessKey ペアが構成されています。詳細については、OTSClient インスタンスを初期化するを参照してください。

  • AccessKey ペアが環境変数に構成されています。詳細については、OTSClient インスタンスを初期化するを参照してください。

    OTS_AK_ENV 環境変数は、Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ID を示します。OTS_SK_ENV 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret を示します。ビジネス要件に基づいて AccessKey ペアを指定します。

手順

  1. Tablestore コンソールで、または Tablestore SDK を使用して、ソーステーブルのトンネルを作成し、トンネル ID を記録します。詳細については、クイックスタートまたはTablestore SDK を使用して Tunnel Service を使用するを参照してください。

  2. Tablestore SDK を使用してデータを同期します。

    サンプルコード:

    public class TunnelTest {
    
        public static void main(String[] args){
           String accessKeyId = System.getenv("OTS_AK_ENV");
           String accessKeySecret = System.getenv("OTS_SK_ENV");
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   accessKeyId,accessKeySecret,"instanceName");
    
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    
            // トンネルIDは、Tablestoreコンソールの「トンネル」タブで表示するか、describeTunnelRequest操作を呼び出してクエリできます。
            TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    
        public static class SimpleProcessor implements IChannelProcessor{
        
           // トンネルを宛先テーブルに接続します。
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   "accessKeyId","accessKeySecret","instanceName");
                   
           @Override
            public void process(ProcessRecordsInput processRecordsInput) {
            
                // 増分データまたはフルデータがProcessRecordsInputで返されます。
                List<StreamRecord> list = processRecordsInput.getRecords();
                for(StreamRecord streamRecord : list){
                    switch (streamRecord.getRecordType()){
                        case PUT:
                            // ビジネスに合わせてデータを処理するために使用するカスタムロジックを指定します。
                            //putRow
                            break;
                        case UPDATE:
                            //updateRow
                            break;
                        case DELETE:
                            //deleteRow
                            break;
                    }
    
                    System.out.println(streamRecord.toString());
                }
            }
    
            @Override
            public void shutdown() {
                
            }
        }
    }

DataWorks または DataX を使用してデータを同期する

DataWorks または DataX を使用して、ソーステーブルから宛先テーブルにデータを同期できます。このセクションでは、DataWorks を使用してデータを同期する方法について説明します。

手順 1: Tablestore データソースを追加する

ソーステーブルと宛先テーブルの Tablestore インスタンスをデータソースとして追加します。

  1. データ統合ページに移動します。

    DataWorks コンソールにログオンし、左上隅でリージョンを選択し、データ開発とガバナンス > データ統合を選択し、ドロップダウンリストからワークスペースを選択し、データ統合に移動をクリックします。

  2. 左側のナビゲーションペインで、データソースをクリックします。

  3. データソースページで、データソースを追加をクリックします。

  4. データソースを追加ダイアログボックスで、Tablestoreブロックをクリックします。

  5. OTS データソースを追加ダイアログボックスで、次の表に示すパラメータを構成します。

    パラメータ

    説明

    データソース名

    データソースの名前。名前には文字、数字、アンダースコア (_) を使用でき、文字で始める必要があります。

    データソースの説明

    データソースの説明。説明は 80 文字以下にする必要があります。

    エンドポイント

    Tablestore インスタンスのエンドポイント。詳細については、エンドポイントを参照してください。

    Tablestore インスタンスと宛先データソースのリソースが同じリージョンにある場合は、仮想プライベートクラウド (VPC) エンドポイントを入力します。それ以外の場合は、パブリックエンドポイントを入力します。

    Table Store インスタンス名

    Tablestore インスタンスの名前。詳細については、インスタンスを参照してください。

    AccessKey ID

    Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret。AccessKey ペアの作成方法については、AccessKey ペアを作成するを参照してください。

    AccessKey Secret

  6. データソースと選択したリソースグループ間のネットワーク接続をテストします。

    同期ノードが想定どおりに実行されるようにするには、データソースと、同期ノードが実行されるすべてのリソースグループタイプ間の接続をテストする必要があります。

    重要

    同期タスクは、1 つのタイプのリソースグループのみを使用できます。デフォルトでは、データ統合の共有リソースグループのみがリソースグループリストに表示されます。データ同期の安定性とパフォーマンスを確保するために、データ統合専用の排他的リソースグループを使用することをお勧めします。

    1. 購入をクリックして新しいリソースグループを作成するか、購入したリソースグループを関連付けるをクリックして既存のリソースグループを関連付けます。詳細については、データ統合専用の排他的リソースグループを作成して使用するを参照してください。

    2. リソースグループが起動した後、リソースグループの接続ステータス (本番環境)列のネットワーク接続をテストをクリックします。

      接続済みと表示された場合、接続テストは合格です。

  7. データソースがネットワーク接続テストに合格した場合は、完了をクリックします。

    新しく作成されたデータソースがデータソースリストに表示されます。

手順 2: 同期ノードを作成する

  1. DataStudioコンソールに移動します。

    DataWorks コンソールにログオンし、左上隅でリージョンを選択し、データ開発とガバナンス > データ開発を選択し、ドロップダウンリストからワークスペースを選択し、Datastudio に移動をクリックします。

  2. DataStudio コンソールのスケジュールされたワークフローページで、ビジネスフローをクリックしてビジネスフローを選択します。

    ワークフローの作成方法については、ワークフローを作成するを参照してください。

  3. データ統合ノードを右クリックし、ノードを作成 > オフライン同期を選択します。

  4. ノードを作成ダイアログボックスで、パスを選択し、ノード名を入力します。

  5. 確認をクリックします。

    新しく作成されたオフライン同期ノードがデータ統合ノードの下に表示されます。

手順 3: オフライン同期タスクを構成して実行する

  1. データ統合の下にある新しい同期ノードをダブルクリックします。

  2. リソースグループとデータソース間のネットワーク接続を確立します。

    データ同期タスクのソースデータソースと宛先データソース、およびデータ同期タスクの実行に使用するリソースグループを選択します。リソースグループとデータソース間のネットワーク接続を確立し、接続をテストします。

    重要

    データ同期タスクは、リソースグループを使用して実行されます。リソースグループを選択し、リソースグループとデータソース間のネットワーク接続が確立されていることを確認します。

    1. ネットワーク接続とリソースグループを構成する手順で、ソースドロップダウンリストからTablestoreを選択し、データソース名パラメータを作成したソースデータソースに設定します。

    2. リソースグループドロップダウンリストからリソースグループを選択します。

      リソースグループを選択すると、システムはリソースグループのリージョンと仕様を表示します。システムは、リソースグループとソースデータソース間の接続を自動的にテストします。

      重要

      リソースグループが、データソースの作成時に選択したものと同じであることを確認します。

    3. 宛先ドロップダウンリストからTablestoreを選択し、データソース名パラメータを新しい宛先データソースに設定します。

      システムは、リソースグループと宛先データソース間の接続を自動的にテストします。

    4. 次へをクリックします。

    5. 表示されるメッセージで、スクリプトモードを使用をクリックします。

      重要
      • Tablestore はスクリプトモードのみをサポートしています。ウィザードモードを使用してデータソースを構成できない場合は、スクリプトモードを使用してバッチ同期タスクを構成します。

      • タスクをスクリプトモードに切り替えると、ウィザードモードに戻すことはできません。

  3. タスクを構成して保存します。

    フルデータを同期するには、Tablestore Reader と Tablestore Writer を使用する必要があります。スクリプトの構成方法については、Tablestore データソースを参照してください。

    1. タスクを構成する手順でスクリプトを変更します。

      • Tablestore Reader を構成する

        Tablestore Reader は Tablestore からデータを読み取ります。データ範囲を指定して、Tablestore から増分データを抽出できます。詳細については、付録: Tablestore Reader のコードとパラメータを参照してください。

      • Tablestore Writer を構成する

        Tablestore SDK for Java を使用して、Tablestore Writer は Tablestore サーバーに接続し、データを Tablestore サーバーに書き込みます。Tablestore Writer は、書き込みタイムアウト時の再試行、書き込み例外時の再試行、バッチ送信など、書き込みプロセスを最適化するための機能をユーザーに提供します。詳細については、付録: Tablestore Writer のコードとパラメータを参照してください。

    2. Ctrl + S を押してスクリプトを保存します。

      説明

      スクリプトを保存しないと、後続の操作を実行するときにメッセージが表示されます。この場合は、メッセージのOKをクリックして保存します。

  4. 同期タスクを実行します。

    説明

    ほとんどの場合、フルデータの同期は一度だけ行う必要があり、スケジューリングプロパティを構成する必要はありません。

    1. 1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806eアイコンをクリックします。

    2. パラメータダイアログボックスで、ドロップダウンリストからリソースグループの名前を選択します。

    3. 実行をクリックします。

      スクリプトが実行された後、ランタイムログタブの Detail log url の横にあるリンクをクリックします。詳細なランタイムログページで、現在のタスクステータスの値を確認します。

      現在のタスクステータスの値がFINISHの場合、タスクは完了です。