全部產品
Search
文件中心

Tablestore:將Table Store資料表中資料同步到另一個資料表

更新時間:Nov 21, 2024

使用通道服務、DataWorks或者DataX將Table Store資料表中的資料同步到另一個資料表。

前提條件

已建立目標資料表,目標資料表的列必須與來源資料表中待遷移的列一一對應。具體操作,請參見建立資料表

說明

如果要實現跨帳號、跨地區資料移轉,請使用DataX工具通過互連網或者通過雲企業網連通VPC進行操作。關於使用雲企業網的具體操作,請參見雲企業網快速入門

使用通道服務遷移同步

建立來源資料表的通道後,使用SDK進行遷移同步。遷移過程中可以自訂業務處理邏輯對資料進行處理。

前提條件

  • 已確定要使用的Endpoint。具體操作,請參見初始化OTSClient

  • 已配置密鑰。具體操作,請參見初始化OTSClient

  • 已將密鑰配置到環境變數中。具體操作,請參見初始化OTSClient

    Table Store使用OTS_AK_ENV環境變數名表示阿里雲帳號或者RAM使用者的AccessKey ID,使用OTS_SK_ENV環境變數名表示對應AccessKey Secret,請根據實際配置。

操作步驟

  1. 使用Table Store控制台或SDK建立來源資料表的通道並記錄通道ID,具體操作請分別參見快速入門通過SDK使用通道服務

  2. 使用SDK遷移資料。

    範例程式碼如下:

    public class TunnelTest {
    
        public static void main(String[] args){
           String accessKeyId = System.getenv("OTS_AK_ENV");
           String accessKeySecret = System.getenv("OTS_SK_ENV");
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   accessKeyId,accessKeySecret,"instanceName");
    
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    
            //tunnelId可以在Table Store控制台通道管理頁面查看或者調用describeTunnelRequest查詢。
            TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    
        public static class SimpleProcessor implements IChannelProcessor{
        
           //目標tablestore連線物件。
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   "accessKeyId","accessKeySecret","instanceName");
                   
           @Override
            public void process(ProcessRecordsInput processRecordsInput) {
            
                //ProcessRecordsInput中返回了增量或全量資料。
                List<StreamRecord> list = processRecordsInput.getRecords();
                for(StreamRecord streamRecord : list){
                    switch (streamRecord.getRecordType()){
                        case PUT:
                            //自訂業務處理邏輯。
                            //putRow
                            break;
                        case UPDATE:
                            //updateRow
                            break;
                        case DELETE:
                            //deleteRow
                            break;
                    }
    
                    System.out.println(streamRecord.toString());
                }
            }
    
            @Override
            public void shutdown() {
                
            }
        }
    }

使用DataWorks或者DataX遷移同步

通過DataWorks或者DataX實現Table Store資料的遷移同步,此處以DataWorks為例介紹遷移操作。

步驟一:新增Table Store資料來源

分別以來源資料表和目標資料表所在執行個體新增Table Store資料來源。

  1. 進入Data Integration頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料開發與治理 > Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration

  2. 在左側導覽列,單擊資料來源

  3. 資料來源列表頁面,單擊新增資料來源

  4. 新增資料來源對話方塊,找到Tablestore區塊,單擊Tablestore

  5. 新增OTS資料來源對話方塊,根據下表配置資料來源參數。

    參數

    說明

    資料來源名稱

    資料來源名稱必須以字母、數字、底線(_)組合,且不能以數字和底線(_)開頭。

    資料來源描述

    對資料來源進行簡單描述,不得超過80個字元。

    Endpoint

    Tablestore執行個體的服務地址。更多資訊,請參見服務地址

    如果Tablestore執行個體和目標資料來源的資源在同一個地區,填寫VPC地址;如果Tablestore執行個體和目標資料來源的資源不在同一個地區,填寫公網地址。

    Table Store執行個體名稱

    Tablestore執行個體的名稱。更多資訊,請參見執行個體

    AccessKey ID

    阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。擷取方式請參見建立AccessKey

    AccessKey Secret

  6. 測試資源群組連通性。

    建立資料來源時,您需要測試資源群組的連通性,以保證同步任務使用的資源群組能夠與資料來源連通,否則將無法正常執行資料同步任務。

    重要

    資料同步時,一個任務只能使用一種資源群組。資源群組列表預設顯示僅Data Integration公用資源群組。為確保資料同步的穩定性和效能要求,推薦使用獨享Data Integration資源群組。

    1. 單擊前往購買進行全新建立並綁定資源群組到工作空間或單擊綁定已購資源群組為工作空間綁定已有資源群組。具體操作,請參見新增和使用獨享Data Integration資源群組

    2. 待資源群組啟動成功後,單擊相應資源群組連通狀態(生產環境)列的測試連通性

      當連通狀態顯示為可連通時,表示連通成功。

  7. 測試連通性通過後,單擊完成

    在資料來源列表中,可以查看建立的資料來源。

步驟二:建立同步任務節點

  1. 進入資料開發頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料開發與治理 > 資料開發,在下拉框中選擇對應工作空間後單擊進入資料開發

  2. 在DataStudio控制台的資料開發頁面,單擊商務程序節點下的目標商務程序。

    如果需要建立商務程序,請參見建立商務程序

  3. Data Integration節點上右鍵選擇建立節點 > 離線同步

  4. 建立節點對話方塊,選擇路徑並填寫節點名稱。

  5. 單擊確認

    Data Integration節點下會顯示建立的離線同步節點。

步驟三:配置離線同步任務並啟動

  1. Data Integration節點下,雙擊開啟建立的離線同步任務節點。

  2. 配置同步網路連結。

    選擇離線同步任務的資料來源、資料去向以及用於執行同步任務的資源群組,並測試連通性。

    重要

    資料同步任務的執行必須經過資源群組來實現,請選擇資源群組並保證資源群組與讀寫兩端的資料來源能聯通訪問。

    1. 網路與資源配置步驟,選擇資料來源Tablestore,並選擇資料來源名稱為新增的來源資料源。

    2. 選擇資源群組。

      選擇資源群組後,系統會顯示資源群組的地區、規格等資訊以及自動化的測試資源群組與所選資料來源之間連通性。

      重要

      請與新增資料來源時選擇的資源群組保持一致。

    3. 選擇資料去向Tablestore,並選擇資料來源名稱為新增的目標資料來源。

      系統會自動化的測試資源群組與所選資料來源之間連通性。

    4. 測試可連通後,單擊下一步

    5. 提示對話方塊,單擊確認使用指令碼模式

      重要
      • Table Store僅支援指令碼模式。當存在不支援嚮導模式的資料來源時,如果繼續編輯任務,將強制使用指令碼模式進行編輯。

      • 任務轉為指令碼模式後,將無法轉為嚮導模式。

  3. 配置任務並儲存。

    全量資料的同步需要使用到Tablestore Reader與Tablestore Writer外掛程式。指令碼配置規則請參見Tablestore資料來源

    1. 配置任務步驟,編輯指令碼。

      • 配置Tablestore Reader

        Tablestore Reader外掛程式實現了從Tablestore讀取資料,通過您指定的抽取資料範圍,可以方便地實現資料增量抽取的需求。具體操作,請參見附錄一:Reader指令碼Demo與參數說明

      • 配置Tablestore Writer

        Tablestore Writer通過Tablestore官方Java SDK串連到Tablestore服務端,並通過SDK寫入Tablestore服務端 。Tablestore Writer本身對於寫入過程進行了諸多最佳化,包括寫入逾時重試、異常寫入重試、批量提交等功能。具體操作,請參見附錄二:Writer指令碼Demo與參數說明

    2. 按【Ctrl+S】儲存指令碼。

      說明

      執行後續操作時,如果未儲存指令碼,則系統會出現儲存確認的提示,單擊確認即可。

  4. 執行同步任務。

    說明

    全量資料一般只需要同步一次,無需配置調度屬性。

    1. 單擊1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e表徵圖。

    2. 參數對話方塊,選擇運行資源群組的名稱。

    3. 單擊運行

      運行結束後,在同步任務的作業記錄頁簽,單擊Detail log url對應的連結後。在任務的詳細作業記錄頁面,查看Current task status對應的狀態。

      Current task status的值為FINISH時,表示任務運行完成。