本文介紹如何將Table Store中資料表的資料移轉或同步到另一個資料表。您可以通過通道服務、DataWorks、DataX或命令列工具等方式實現此操作。
前提條件
已擷取來源資料表和目標資料表的執行個體名稱、執行個體訪問地址、地區ID等資訊。
已建立目標資料表,並且目標資料表的主鍵數量和類型與源表保持一致。具體操作,請參見建立資料表。
已擷取 AccessKey 資訊。請使用阿里雲帳號或RAM使用者的 AccessKey 進行配置。擷取AccessKey的具體操作,請參見如何擷取AccessKey。
出於安全考慮,強烈建議您通過RAM使用者使用Table Store功能。您可以建立RAM使用者、授予該使用者管理Table Store許可權(
AliyunOTSFullAccess
)並為該RAM使用者建立AccessKey。具體操作,請參見使用RAM使用者存取金鑰訪問Table Store。
使用通道服務同步
建立來源資料表的通道後,使用SDK進行資料同步。同步過程中可以自訂業務處理邏輯對資料進行處理。
建立來源資料表的通道並記錄通道ID,具體操作,請參見建立資料通道。
使用SDK同步資料。
Java範例程式碼如下:
import com.alicloud.openservices.tablestore.SyncClient; import com.alicloud.openservices.tablestore.TunnelClient; import com.alicloud.openservices.tablestore.model.StreamRecord; import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor; import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput; import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker; import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig; import java.util.List; public class TunnelSample { public static void main(String[] args) { // 源表的執行個體名稱 final String sourceInstanceName = "sourceInstanceName"; // 源表的執行個體訪問地址 final String sourceEndpoint = "sourceEndpoint"; // 擷取環境變數中源表所在執行個體的訪問憑證 AccessKey ID 和 AccessKey Secret final String sourceAccessKeyId = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_ID"); final String sourceKeySecret = System.getenv("SOURCE_TABLESTORE_ACCESS_KEY_SECRET"); // 初始化 TunnelClient TunnelClient tunnelClient = new TunnelClient(sourceEndpoint, sourceAccessKeyId, sourceKeySecret, sourceInstanceName); // 配置 TunnelWorkerConfig TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // 配置 TunnelWorker,並啟動自動化的資料處理任務。 // tunnelId,通道 ID。該值可以在Table Store控制台的即時消費通道頁簽查看,或通過ListTunnel或者DescribeTunnel擷取。 TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); worker.shutdown(); tunnelClient.shutdown(); } } public static class SimpleProcessor implements IChannelProcessor { // 目標表的執行個體名稱 final String targetInstanceName = "targetInstanceName"; // 目標表的執行個體訪問地址 final String targetEndpoint = "targetEndpoint"; // 擷取環境變數中目標表所在執行個體的訪問憑證 AccessKey ID 和 AccessKey Secret final String targetAccessKeyId = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_ID"); final String targetKeySecret = System.getenv("TARGET_TABLESTORE_ACCESS_KEY_SECRET"); // 初始化目標表所在執行個體的 Tablestore Client SyncClient client = new SyncClient(targetEndpoint, targetAccessKeyId, targetKeySecret, targetInstanceName); @Override public void process(ProcessRecordsInput processRecordsInput) { // ProcessRecordsInput中返回了增量或全量資料。 List<StreamRecord> list = processRecordsInput.getRecords(); for (StreamRecord streamRecord : list) { // 自訂業務處理邏輯。 switch (streamRecord.getRecordType()) { case PUT: // putRow break; case UPDATE: // updateRow break; case DELETE: // deleteRow break; } System.out.println(streamRecord.toString()); } } @Override public void shutdown() { } } }
使用DataWorks或者DataX同步
通過DataWorks或者DataX實現Table Store中資料表的同步,此處以DataWorks為例介紹具體操作。
準備工作
已開通DataWorks服務並建立工作空間。具體操作,請參見開通DataWorks服務和建立工作空間。
步驟一:新增Table Store資料來源
分別為來源資料表和目標資料表所在的執行個體新增Table Store資料來源。
進入Data Integration頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的 ,在下拉框中選擇對應工作空間後單擊進入Data Integration。
在左側導覽列,單擊資料來源。
在資料來源列表頁面,單擊新增資料來源。
在新增資料來源對話方塊,找到Tablestore區塊,單擊Tablestore。
在新增OTS資料來源對話方塊,根據下表配置資料來源參數。
參數
說明
參數
說明
資料來源名稱
資料來源名稱必須以字母、數字、底線(_)組合,且不能以數字和底線(_)開頭。
資料來源描述
對資料來源進行簡單描述,不得超過80個字元。
地區
選擇Tablestore執行個體所屬地區。
Table Store執行個體名稱
Tablestore執行個體的名稱。更多資訊,請參見執行個體。
Endpoint
Tablestore執行個體的服務地址,推薦使用VPC地址。
本文以Tablestore執行個體和DataWorks工作空間在同一阿里雲主帳號的同一地區下為例進行說明。更多情境資訊,請參見各情境網路連通配置樣本。
AccessKey ID
阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。擷取方式請參見建立AccessKey。
AccessKey Secret
測試資源群組連通性。
建立資料來源時,您需要測試資源群組的連通性,以保證同步任務使用的資源群組能夠與資料來源連通,否則將無法正常執行資料同步任務。
(可選)購買並綁定資源群組至當前DataWorks工作空間。具體操作,請參見新增和使用Serverless資源群組。
不推薦使用舊版資源群組(獨享資源群組和公用資源群組),相較於舊版資源群組,Serverless資源群組支援的能力更豐富、售賣方式更統一、能有效利用資源片段避免浪費,因此推薦您使用Serverless資源群組。
Serverless資源群組預設不具備公網訪問能力。需要為綁定的VPC配置公網NAT Gateway和EIP後,才支援公網訪問資料來源。
待資源群組啟動成功後,在串連配置地區,單擊相應資源群組連通狀態列的測試連通性。
測試連通性通過後,連通狀態顯示可連通,單擊完成。
在資料來源列表中,可以查看建立的資料來源。
如果顯示無法連通,表示資源群組與資料來源無法連通,後續相應資料來源任務將無法正常執行。您可以參考以下思路排查處理。
根據右側彈出的連通性診斷工具視窗,自助解決連通性問題。
如果連通性診斷工具未給出具體解決辦法,請檢查您設定的帳號、密碼、串連地址等參數,以及確保將資源群組的IP地址加入到資料來源的白名單中。更多資訊,請參見網路連通方案。
步驟二:建立同步任務節點
進入資料開發頁面。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的 ,在下拉框中選擇對應工作空間後單擊進入資料開發。
在DataStudio控制台的資料開發頁面,單擊商務程序節點下的目標商務程序。
如果需要建立商務程序,請參見建立商務程序。
在Data Integration節點上右鍵選擇建立節點 > 離線同步。
在建立節點對話方塊,選擇路徑並填寫節點名稱。
單擊確認。
在Data Integration節點下會顯示建立的離線同步節點。
步驟三:配置離線同步任務並啟動
在Data Integration節點下,雙擊開啟建立的離線同步任務節點。
配置網路與資源。
選擇離線同步任務的資料來源、資料去向以及用於執行同步任務的資源群組,並測試連通性。
資料同步任務的執行必須經過資源群組來實現,請選擇資源群組,並保證資源群組與讀寫兩端的資料來源可以聯通訪問。
在網路與資源配置步驟,選擇資料來源為Tablestore,並選擇資料來源名稱為新增的來源資料源。
選擇資源群組。
選擇資源群組後,系統會顯示資源群組的地區、規格等資訊以及自動化的測試資源群組與所選資料來源之間連通性。
請與新增資料來源時選擇的資源群組保持一致。
選擇資料去向為Tablestore,並選擇資料來源名稱為新增的目標資料來源。
系統會自動化的測試資源群組與所選資料來源之間連通性。
測試可連通後,單擊下一步。
配置任務並儲存。
指令碼模式嚮導模式在配置任務步驟,單擊
表徵圖,然後在彈出的對話方塊中單擊確認。
在指令碼配置頁面,配置指令碼。指令碼配置規則請參見Tablestore資料來源。
配置Tablestore Reader
Tablestore Reader外掛程式實現了從Tablestore讀取資料,通過您指定的抽取資料範圍,可以方便地實現資料增量抽取的需求。具體操作,請參見Reader指令碼Demo與參數說明。
配置Tablestore Writer
Tablestore Writer通過Tablestore官方Java SDK串連到Tablestore服務端,並通過SDK寫入Tablestore服務端 。Tablestore Writer本身對於寫入過程進行了諸多最佳化,包括寫入逾時重試、異常寫入重試、批量提交等功能。具體操作,請參見Writer指令碼Demo與參數說明。
單擊
表徵圖,儲存配置。
在配置任務步驟的配置資料來源與去向地區,根據實際情況配置資料來源和資料去向。
資料來源資料去向參數
說明
表
來源資料表名稱。
主鍵區間分布(起始)
資料讀取的起始主鍵和結束主鍵,格式為JSON數組。
起始主鍵和結束主鍵需要是有效主鍵或者是由
INF_MIN
和INF_MAX
類型組成的虛擬點,虛擬點的列數必須與主鍵相同。其中INF_MIN
表示無限小,任何類型的值都比它大;INF_MAX
表示無限大,任何類型的值都比它小。資料表中的行按主鍵從小到大排序,讀取範圍是一個左閉右開的區間,返回的資料是大於等於起始主鍵且小於結束主鍵的所有行。
主鍵區間分布(結束)
切分配置資訊
自訂切分配置資訊,普通情況下不建議配置。
當Tablestore資料存放區發生熱點,且使用Tablestore Reader自動切分的策略不能生效時,建議使用自訂的切分規則。切分指定的是在主鍵起始和結束區間內的切分點,僅配置切分鍵,無需指定全部的主鍵。格式為JSON數組。
參數
說明
表
目標資料表名稱。
主鍵資訊
目標資料表的主鍵資訊。
寫入模式
資料寫入Table Store的模式,支援以下兩種模式:
PutRow:對應於Tablestore API PutRow,插入資料到指定的行。如果該行不存在,則新增一行。如果該行存在,則覆蓋原有行。
UpdateRow:對應於Tablestore API UpdateRow,更新指定行的資料。如果該行不存在,則新增一行。如果該行存在,則根據請求的內容在這一行中新增、修改或者刪除指定列的值。
配置欄位對應。
選擇資料來源和資料去向後,需要指定來源欄位和目標欄位的映射關係,配置欄位對應關係後,任務將根據欄位對應關係,將源表欄位寫入目標表對應類型的欄位中。
配置通道控制。
您可通過通道配置,控制資料同步過程相關屬性。相關參數說明詳情可參見離線同步並發和限流之間的關係。
單擊
表徵圖,儲存配置。
執行同步任務。
由於全量資料一般只需要同步一次,所以無需配置調度屬性。
單擊
表徵圖。
在參數對話方塊,選擇運行資源群組的名稱。
單擊運行。
運行結束後,在同步任務的結果頁簽,單擊Detail log url對應的連結後。在任務的詳細作業記錄頁面,查看
Current task status
對應的狀態。當
Current task status
的值為FINISH時,表示任務運行完成。
使用命令列工具遷移
在使用命令列工具進行資料移轉時,您需要手動將源表的資料匯出為本地JSON檔案,隨後再將其匯入到目標表。此方法僅適用於少量資料的遷移情境。針對大規模資料移轉,不建議採用此方法。
準備工作
已下載命令列工具。具體操作,請參見下載命令列工具。
步驟一:匯出源表資料
啟動命令列工具,並配置源表所在執行個體的接入資訊。具體操作,請參見啟動並配置接入資訊。
通過
config
命令配置接入資訊。執行前請使用源表所在的執行個體訪問地址、執行個體名稱、AccessKey ID、AccessKey Secret替換命令中的endpoint、instance、id、key。
config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
匯出資料。
執行
use
命令以使用源表。此處以source_table
為例。use --wc -t source_table
匯出源表中的資料到本地JSON檔案中。具體操作,請參見匯出資料。
scan -o /tmp/sourceData.json
步驟二 :匯入目標表資料
配置目標表所在執行個體的接入資訊。
通過
config
命令配置接入資訊。執行前請使用目標表所在的執行個體訪問地址、執行個體名稱、AccessKey ID、AccessKey Secret替換命令中的endpoint、instance、id、key。
config --endpoint https://myinstance.cn-hangzhou.ots.aliyuncs.com --instance myinstance --id NTSVL******************** --key 7NR2****************************************
匯入資料。
執行
use
命令以使用目標表。此處以target_table
為例。use --wc -t target_table
匯入本地JSON檔案中的資料到目標表中。具體操作,請參見匯入資料。
import -i /tmp/sourceData.json
相關文檔
如果要實現跨帳號、跨地區資料移轉,您也可以使用DataX工具通過互連網或者通過雲企業網串連VPC進行操作。關於使用雲企業網的具體操作,請參見雲企業網快速入門。