通過在DataWorksData Integration平台上配置離線同步任務,可實現Table Store中新增和變化資料的周期性增量同步處理至OSS,滿足資料備份和後續處理需求。
準備工作
擷取Tablestore源表的執行個體名稱、執行個體訪問地址、地區ID等資訊,並為源表開啟Stream功能。
資料表可選擇在建立時啟用Stream功能,或通過修改操作啟用;時序表預設已啟用該功能。
為阿里雲帳號或RAM使用者(具備Tablestore與OSS服務的許可權)建立AccessKey。
開通DataWorks服務,並在OSS儲存空間或Tablestore執行個體所在地區建立工作空間。
建立Serverless資源群組並綁定到工作空間。有關計費資訊,請參見Serverless資源群組計費。
當DataWorks和Tablestore執行個體位於不同地區時,需要建立VPC對等串連實現跨地區網路連通。
操作步驟
步驟一:新增Table Store資料來源
在DataWorks中配置Table Store資料來源,建立與來源資料的串連。
登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的,在下拉框中選擇對應工作空間後單擊進入Data Integration。
在左側導覽列,單擊資料來源。
在資料來源列表頁面,單擊新增資料來源。
在新增資料來源對話方塊,搜尋並選擇資料來源類型為Tablestore。
在新增OTS資料來源對話方塊,根據下表配置資料來源參數。
參數
說明
資料來源名稱
資料來源名稱必須以字母、數字、底線(_)組合,且不能以數字和底線(_)開頭。
資料來源描述
對資料來源進行簡單描述,不得超過80個字元。
地區
選擇Tablestore執行個體所屬地區。
Table Store執行個體名稱
Tablestore執行個體的名稱。
Endpoint
Tablestore執行個體的服務地址,推薦使用VPC地址。
AccessKey ID
阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。
AccessKey Secret
測試資源群組連通性。
建立資料來源時,需要測試資源群組的連通性,確保同步任務使用的資源群組能夠與資料來源正常連通,否則將無法正常執行資料同步任務。
在串連配置地區,單擊相應資源群組連通狀態列的測試連通性。
測試連通性通過後,連通狀態顯示可連通,單擊完成。可在資料來源列表中查看建立的資料來源。
如果測試連通性結果為無法通過,可使用連通性診斷工具自助解決。
步驟二:新增OSS資料來源
配置OSS資料來源作為資料匯出的目標儲存。
再次單擊新增資料來源,在對話方塊中搜尋並選擇資料來源類型為OSS,並配置相關的資料來源參數。
參數
說明
資料來源名稱
資料來源名稱必須以字母、數字、底線(_)組合,且不能以數字和底線(_)開頭。
資料來源描述
對資料來源進行簡單描述,不得超過80個字元。
訪問模式
RAM角色授權模式:DataWorks服務帳號以角色扮演的方式訪問資料來源。首次選擇該模式時,請根據頁面提示開啟授權。
Access Key模式:通過阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret訪問資料來源。
選擇角色
僅當訪問模式為RAM角色授權模式時需要選擇RAM角色。
AccessKey ID
僅當訪問模式為Access Key模式時需要填寫。阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。
AccessKey Secret
地區
Bucket所在地區。
Endpoint
OSS訪問網域名稱,詳見地區和Endpoint。
Bucket
Bucket名稱。
完成參數和連通性測試後,單擊完成添加資料來源。
步驟三:配置離線同步任務
建立並配置資料同步任務,定義從Table Store到OSS的資料轉送規則。
建立任務節點
進入資料開發頁面。
登入DataWorks控制台。
在頁面上方,選擇資源群組和地區。
在左側導覽列,單擊。
選擇對應工作空間後單擊進入Data Studio。
在Data Studio控制台的資料開發頁面,單擊專案目錄右側的
表徵圖,然後選擇。在建立節點對話方塊,選擇路徑,資料來源下拉選擇Tablestore Stream,資料去向下拉選擇OSS,填寫名稱,然後單擊確認。
配置同步任務
在專案目錄下,單擊開啟建立的離線同步任務節點,通過嚮導模式或指令碼模式配置同步任務。
同步時序表時,只能使用指令碼模式配置同步任務。
嚮導模式(預設)
配置以下內容:
資料來源:選擇來來源資料源和去向資料來源。
運行資源:選擇資源群組,選擇後會自動檢測資料來源連通性。
資料來源:選擇來來源資料表,其它配置可保持預設,也可根據需求修改。
資料去向:選擇目標文本類型,並配置相關參數。
文本類型:可選類型包括csv、text、orc和parquet。
檔案名稱(含路徑):OSS Bucket內包含路徑的檔案名稱,如
tablestore/resouce_table.csv。資料行分隔符號:預設為
,,如果分隔字元不可見,請填寫Unicode編碼,比如\u001b、\u007c。檔案路徑:僅當文本類型為parquet時填寫,OSS Bucket內的檔案路徑。
檔案名稱:僅當文本類型為parquet時填寫,OSS Bucket內的檔案名稱。
去向欄位對應:根據源表主鍵和增量變更資訊自動設定,可根據需求修改。
配置完成後,單擊頁面上方的儲存。
指令碼模式
單擊頁面上方的指令碼模式,在切換後的頁面中編輯指令碼。
資料表
以下樣本配置以目標檔案類型為csv為例,來來源資料表主鍵包含1個int類型的主鍵列id和1個string類型的主鍵列name。配置時請替換樣本指令碼內的資料來源datasource、表名稱table以及寫入檔案名稱object。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "otsstream",
"parameter": {
"statusTable": "TableStoreStreamReaderStatusTable",
"maxRetries": 31,
"isExportSequenceInfo": false,
"datasource": "source_data",
"column": [
"id",
"name",
"colName",
"version",
"colValue",
"opType",
"sequenceInfo"
],
"startTimeString": "${startTime}",
"table": "source_table",
"endTimeString": "${endTime}"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "oss",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"datasource": "target_data",
"writeSingleObject": false,
"column": [
"0",
"1",
"2",
"3",
"4",
"5",
"6"
],
"writeMode": "truncate",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileFormat": "csv",
"object": "tablestore/source_table.csv"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}時序表
以下樣本配置以目標檔案類型為csv為例,來源時序表的時間軸資料包含1個int類型的屬性列value。配置時請替換樣本指令碼內的資料來源datasource、表名稱table以及寫入檔案名稱object。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "otsstream",
"parameter": {
"statusTable": "TableStoreStreamReaderStatusTable",
"maxRetries": 31,
"isExportSequenceInfo": false,
"datasource": "source_data",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source"
},
{
"name": "_tags"
},
{
"name": "_time"
},
{
"name": "value",
"type": "int"
}
],
"startTimeString": "${startTime}",
"table": "source_series",
"isTimeseriesTable":"true",
"mode": "single_version_and_update_only",
"endTimeString": "${endTime}"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "oss",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"datasource": "target_data",
"writeSingleObject": false,
"writeMode": "truncate",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileFormat": "csv",
"object": "tablestore/source_series.csv"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}指令碼編輯完成後,單擊頁面上方的儲存。
調試同步任務
單擊頁面右側的調試配置,選擇啟動並執行資源群組,並添加啟動並執行指令碼參數。
startTime:同步增量資料的開始時間(包含),如
20251119200000。endTime:同步增量資料的結束時間(不包含),如
20251119205000。增量同步處理採用周期調度機制,每隔5分鐘進行一次調度,且外掛程式存在5分鐘的延遲,因此同步的總延遲為5~10分鐘,配置結束時間時避免配置目前時間往前10分鐘的時間段。
單擊頁面上方的運行,開始同步任務。
上述樣本值表示同步
2025年11月19日20點到20點50分(不包含)的增量資料。
步驟四:查看同步結果
運行同步任務後,可通過日誌查看任務的執行狀態,並在OSS Bucket查看同步結果檔案。
在頁面下方查看任務運行狀態和結果,出現以下資訊時表示同步任務運行成功。
2025-11-18 11:16:23 INFO Shell run successfully! 2025-11-18 11:16:23 INFO Current task status: FINISH 2025-11-18 11:16:23 INFO Cost time is: 77.208s查看目標Bucket的檔案。
前往Bucket列表,單擊目標Bucket,查看或下載同步結果檔案。
應用於生產環境
調試完成後,可在頁面右側調度配置中配置調度參數startTime、endTime和周期調度策略,並發布到生產環境。詳細的配置規則請參見配置並使用調度參數、調度策略和調度時間。
