本文由簡體中文內容自動轉碼而成。阿里雲不保證此自動轉碼的準確性、完整性及時效性。本文内容請以簡體中文版本為準。

資料表同步到資料表

更新時間:2025-02-28 19:06

本文介紹如何將Table Store中資料表的資料移轉或同步到另一個資料表。您可以通過通道服務、DataWorks、DataX或命令列工具等方式實現此操作。

前提條件

  • 已擷取來源資料表和目標資料表的執行個體名稱、執行個體訪問地址、地區ID等資訊。

  • 已建立目標資料表,並且目標資料表的主鍵數量和類型與源表保持一致。具體操作,請參見建立資料表

  • 已擷取 AccessKey 資訊。請使用阿里雲帳號或RAM使用者的 AccessKey 進行配置。擷取AccessKey的具體操作,請參見如何擷取AccessKey

    重要

    出於安全考慮,強烈建議您通過RAM使用者使用Table Store功能。您可以建立RAM使用者、授予該使用者管理Table Store許可權( AliyunOTSFullAccess )並為該RAM使用者建立AccessKey。具體操作,請參見使用RAM使用者存取金鑰訪問Table Store

使用通道服務同步

建立來源資料表的通道後,使用SDK進行資料同步。同步過程中可以自訂業務處理邏輯對資料進行處理。

  1. 建立來源資料表的通道並記錄通道ID,具體操作,請參見建立資料通道

  2. 使用SDK同步資料。

    Java範例程式碼如下:

    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.TunnelClient;
    import com.alicloud.openservices.tablestore.model.StreamRecord;
    import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
    import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
    
    import java.util.List;
    
    public class TunnelSample {
    
        public static void main(String[] args) {
            // 源表的執行個體名稱
            final String sourceInstanceName = "sourceInstanceName";
            // 源表的執行個體訪問地址
            final String sourceEndpoint = "sourceEndpoint";
            // 擷取環境變數中源表所在執行個體的訪問憑證 AccessKey ID 和 AccessKey Secret
            final String sourceAccessKeyId = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_ID");
            final String sourceKeySecret = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_SECRET");
    
            // 初始化 TunnelClient
            TunnelClient tunnelClient = new TunnelClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName);
    
            // 配置 TunnelWorkerConfig
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    
            // 配置 TunnelWorker,並啟動自動化的資料處理任務。
            // tunnelId,通道 ID。該值可以在Table Store控制台的即時消費通道頁簽查看,或通過ListTunnel或者DescribeTunnel擷取。
            TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    
        public static class SimpleProcessor implements IChannelProcessor {
            // 目標表的執行個體名稱
            final String targetInstanceName = "targetInstanceName";
            // 目標表的執行個體訪問地址
            final String targetEndpoint = "targetEndpoint";
            // 擷取環境變數中目標表所在執行個體的訪問憑證 AccessKey ID 和 AccessKey Secret
            final String targetAccessKeyId = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_ID");
            final String targetKeySecret = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_SECRET");
    
            // 初始化目標表所在執行個體的 Tablestore Client
            SyncClient client = new SyncClient(targetEndpoint, targetAccessKeyId, targetKeySecret, targetInstanceName);
    
            @Override
            public void process(ProcessRecordsInput processRecordsInput) {
                // ProcessRecordsInput中返回了增量或全量資料。
                List<StreamRecord> list = processRecordsInput.getRecords();
                for (StreamRecord streamRecord : list) {
                    // 自訂業務處理邏輯。
                    switch (streamRecord.getRecordType()) {
                        case PUT:
                            // putRow
                            break;
                        case UPDATE:
                            // updateRow
                            break;
                        case DELETE:
                            // deleteRow
                            break;
                    }
                    System.out.println(streamRecord.toString());
                }
            }
    
            @Override
            public void shutdown() {
    
            }
        }
    }

使用DataWorks或者DataX同步

通過DataWorks或者DataX實現Table Store中資料表的同步,此處以DataWorks為例介紹具體操作。

準備工作

已開通DataWorks服務並建立工作空間。具體操作,請參見開通DataWorks服務建立工作空間

步驟一:新增Table Store資料來源

分別為來源資料表和目標資料表所在的執行個體新增Table Store資料來源。

  1. 進入Data Integration頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的Data Integration > Data Integration,在下拉框中選擇對應工作空間後單擊進入Data Integration

  2. 在左側導覽列,單擊資料來源

  3. 資料來源列表頁面,單擊新增資料來源

  4. 新增資料來源對話方塊,找到Tablestore區塊,單擊Tablestore

  5. 新增OTS資料來源對話方塊,根據下表配置資料來源參數。

    參數

    說明

    參數

    說明

    資料來源名稱

    資料來源名稱必須以字母、數字、底線(_)組合,且不能以數字和底線(_)開頭。

    資料來源描述

    對資料來源進行簡單描述,不得超過80個字元。

    地區

    選擇Tablestore執行個體所屬地區。

    Table Store執行個體名稱

    Tablestore執行個體的名稱。更多資訊,請參見執行個體

    Endpoint

    Tablestore執行個體的服務地址,推薦使用VPC地址

    重要

    本文以Tablestore執行個體和DataWorks工作空間在同一阿里雲主帳號的同一地區下為例進行說明。更多情境資訊,請參見各情境網路連通配置樣本

    AccessKey ID

    阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。擷取方式請參見建立AccessKey

    AccessKey Secret

  6. 測試資源群組連通性。

    建立資料來源時,您需要測試資源群組的連通性,以保證同步任務使用的資源群組能夠與資料來源連通,否則將無法正常執行資料同步任務。

    1. (可選)購買並綁定資源群組至當前DataWorks工作空間。具體操作,請參見新增和使用Serverless資源群組

      不推薦使用舊版資源群組(獨享資源群組和公用資源群組),相較於舊版資源群組,Serverless資源群組支援的能力更豐富、售賣方式更統一、能有效利用資源片段避免浪費,因此推薦您使用Serverless資源群組。

      說明

      Serverless資源群組預設不具備公網訪問能力。需要為綁定的VPC配置公網NAT Gateway和EIP後,才支援公網訪問資料來源。

    2. 待資源群組啟動成功後,在串連配置地區,單擊相應資源群組連通狀態列的測試連通性

    3. 測試連通性通過後,連通狀態顯示可連通,單擊完成

      在資料來源列表中,可以查看建立的資料來源。

      說明

      如果顯示無法連通,表示資源群組與資料來源無法連通,後續相應資料來源任務將無法正常執行。您可以參考以下思路排查處理。

      • 根據右側彈出的連通性診斷工具視窗,自助解決連通性問題。

      • 如果連通性診斷工具未給出具體解決辦法,請檢查您設定的帳號、密碼、串連地址等參數,以及確保將資源群組的IP地址加入到資料來源的白名單中。更多資訊,請參見網路連通方案

步驟二:建立同步任務節點

  1. 進入資料開發頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料開發與營運 > 資料開發,在下拉框中選擇對應工作空間後單擊進入資料開發

  2. 在DataStudio控制台的資料開發頁面,單擊商務程序節點下的目標商務程序。

    如果需要建立商務程序,請參見建立商務程序

  3. Data Integration節點上右鍵選擇建立節點 > 離線同步

  4. 建立節點對話方塊,選擇路徑並填寫節點名稱。

  5. 單擊確認

    Data Integration節點下會顯示建立的離線同步節點。

步驟三:配置離線同步任務並啟動

  1. Data Integration節點下,雙擊開啟建立的離線同步任務節點。

  2. 配置網路與資源。

    選擇離線同步任務的資料來源、資料去向以及用於執行同步任務的資源群組,並測試連通性。

    重要

    資料同步任務的執行必須經過資源群組來實現,請選擇資源群組,並保證資源群組與讀寫兩端的資料來源可以聯通訪問。

    1. 網路與資源配置步驟,選擇資料來源Tablestore,並選擇資料來源名稱為新增的來源資料源。

    2. 選擇資源群組。

      選擇資源群組後,系統會顯示資源群組的地區、規格等資訊以及自動化的測試資源群組與所選資料來源之間連通性。

      重要

      請與新增資料來源時選擇的資源群組保持一致。

    3. 選擇資料去向Tablestore,並選擇資料來源名稱為新增的目標資料來源。

      系統會自動化的測試資源群組與所選資料來源之間連通性。

    4. 測試可連通後,單擊下一步

  3. 配置任務並儲存。

    指令碼模式
    嚮導模式
    1. 配置任務步驟,單擊image.png表徵圖,然後在彈出的對話方塊中單擊確認

    2. 在指令碼配置頁面,配置指令碼。指令碼配置規則請參見Tablestore資料來源

      • 配置Tablestore Reader

        Tablestore Reader外掛程式實現了從Tablestore讀取資料,通過您指定的抽取資料範圍,可以方便地實現資料增量抽取的需求。具體操作,請參見Reader指令碼Demo與參數說明

      • 配置Tablestore Writer

        Tablestore Writer通過Tablestore官方Java SDK串連到Tablestore服務端,並通過SDK寫入Tablestore服務端 。Tablestore Writer本身對於寫入過程進行了諸多最佳化,包括寫入逾時重試、異常寫入重試、批量提交等功能。具體操作,請參見Writer指令碼Demo與參數說明

    3. 單擊image.png表徵圖,儲存配置。

    1. 配置任務步驟的配置資料來源與去向地區,根據實際情況配置資料來源和資料去向。

      資料來源
      資料去向

      參數

      說明

      來源資料表名稱。

      主鍵區間分布(起始)

      資料讀取的起始主鍵和結束主鍵,格式為JSON數組。

      起始主鍵和結束主鍵需要是有效主鍵或者是由INF_MININF_MAX類型組成的虛擬點,虛擬點的列數必須與主鍵相同。其中INF_MIN表示無限小,任何類型的值都比它大;INF_MAX表示無限大,任何類型的值都比它小。

      資料表中的行按主鍵從小到大排序,讀取範圍是一個左閉右開的區間,返回的資料是大於等於起始主鍵且小於結束主鍵的所有行。

      主鍵區間分布(結束)

      切分配置資訊

      自訂切分配置資訊,普通情況下不建議配置。

      當Tablestore資料存放區發生熱點,且使用Tablestore Reader自動切分的策略不能生效時,建議使用自訂的切分規則。切分指定的是在主鍵起始和結束區間內的切分點,僅配置切分鍵,無需指定全部的主鍵。格式為JSON數組。

      參數

      說明

      目標資料表名稱。

      主鍵資訊

      目標資料表的主鍵資訊。

      寫入模式

      資料寫入Table Store的模式,支援以下兩種模式:

      • PutRow:對應於Tablestore API PutRow,插入資料到指定的行。如果該行不存在,則新增一行。如果該行存在,則覆蓋原有行。

      • UpdateRow:對應於Tablestore API UpdateRow,更新指定行的資料。如果該行不存在,則新增一行。如果該行存在,則根據請求的內容在這一行中新增、修改或者刪除指定列的值。

    2. 配置欄位對應。

      選擇資料來源和資料去向後,需要指定來源欄位目標欄位的映射關係,配置欄位對應關係後,任務將根據欄位對應關係,將源表欄位寫入目標表對應類型的欄位中。

      image

    3. 配置通道控制。

      您可通過通道配置,控制資料同步過程相關屬性。相關參數說明詳情可參見離線同步並發和限流之間的關係

    4. 單擊image.png表徵圖,儲存配置。

  4. 執行同步任務。

    說明

    由於全量資料一般只需要同步一次,所以無需配置調度屬性。

    1. 單擊1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e表徵圖。

    2. 參數對話方塊,選擇運行資源群組的名稱。

    3. 單擊運行

      運行結束後,在同步任務的結果頁簽,單擊Detail log url對應的連結後。在任務的詳細作業記錄頁面,查看Current task status對應的狀態。

      Current task status的值為FINISH時,表示任務運行完成。

使用命令列工具遷移

重要

在使用命令列工具進行資料移轉時,您需要手動將源表的資料匯出為本地JSON檔案,隨後再將其匯入到目標表。此方法僅適用於少量資料的遷移情境。針對大規模資料移轉,不建議採用此方法。

準備工作

已下載命令列工具。具體操作,請參見下載命令列工具

步驟一:匯出源表資料

  1. 啟動命令列工具,並配置源表所在執行個體的接入資訊。具體操作,請參見啟動並配置接入資訊

    通過config命令配置接入資訊。

    執行前請使用源表所在的執行個體訪問地址、執行個體名稱、AccessKey ID、AccessKey Secret替換命令中的endpoint、instance、id、key。
    config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
  2. 匯出資料。

    1. 執行use命令以使用源表。此處以source_table為例。

      use --wc -t source_table
    2. 匯出源表中的資料到本地JSON檔案中。具體操作,請參見匯出資料

      scan -o /tmp/sourceData.json

步驟二 :匯入目標表資料

  1. 配置目標表所在執行個體的接入資訊。

    通過config命令配置接入資訊。

    執行前請使用目標表所在的執行個體訪問地址、執行個體名稱、AccessKey ID、AccessKey Secret替換命令中的endpoint、instance、id、key。
    config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
  2. 匯入資料。

    1. 執行use命令以使用目標表。此處以target_table為例。

      use --wc -t target_table
    2. 匯入本地JSON檔案中的資料到目標表中。具體操作,請參見匯入資料

      import -i /tmp/sourceData.json 

相關文檔

如果要實現跨帳號、跨地區資料移轉,您也可以使用DataX工具通過互連網或者通過雲企業網串連VPC進行操作。關於使用雲企業網的具體操作,請參見雲企業網快速入門

  • 本頁導讀 (1, M)
  • 前提條件
  • 使用通道服務同步
  • 使用DataWorks或者DataX同步
  • 準備工作
  • 步驟一:新增Table Store資料來源
  • 步驟二:建立同步任務節點
  • 步驟三:配置離線同步任務並啟動
  • 使用命令列工具遷移
  • 準備工作
  • 步驟一:匯出源表資料
  • 步驟二 :匯入目標表資料
  • 相關文檔
文檔反饋