DataWorksData Integration支援使用Tablestore Stream Reader讀取Tablestore的增量資料,本文為您介紹DataWorks的Tablestore Stream資料讀取能力。
資料同步前準備:Tablestore Stream環境準備
使用Tablestore Stream外掛程式前,您必須確保Tablestore表上已經開啟Stream功能。時序表已預設開啟stream功能。您可以在建表時指定開啟,也可以使用SDK的UpdateTable介面開啟。開啟Stream的方法,如下所示。
SyncClient client = new SyncClient("", "", "", "");
#方法1:建表的時候開啟:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量資料保留24小時。
client.createTable(createTableRequest);
#方法2:如果建表時未開啟,您可以通過UpdateTable開啟:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);
使用SDK的UpdateTable介面開啟時:
指定開啟Stream並設定到期時間,即開啟了Tablestore增量資料匯出功能。開啟stream功能後,Tablestore服務端就會將您的動作記錄額外儲存起來,每個分區有一個有序的動作記錄隊列,每條動作記錄會在一定時間後被記憶體回收,該時間即為您指定的到期時間。
Tablestore的SDK提供了幾個Stream相關的API用於讀取這部分的動作記錄,增量外掛程式也是通過Tablestore SDK的介面擷取到增量資料。列模式下會將增量資料轉化為多個6元組的形式(pk、colName、version、colValue、opType和sequenceInfo),行模式則會以普通行的形式匯出增量資料。
支援的同步模式與欄位類型
Tablestore Stream Reader外掛程式支援使用列模式或行模式同步Tablestore的增量資料。兩種模式下的同步過程和欄位類型要求如下。
列模式
在Tablestore多版本模式下,表中的資料群組織為行>列>版本
三級的模式, 一行可以有任意列,列名並不是固定的,每一列可以含有多個版本,每個版本都有一個特定的時間戳記(版本號碼)。
您可以通過Tablestore的API進行一系列讀寫操作,Tablestore通過記錄您最近對錶的一系列寫操作(或資料更改操作)來實現記錄增量資料的目的,所以您也可以把增量資料看作一批操作記錄。
Tablestore支援PutRow、UpdateRow和DeleteRow操作:
Tablestore會根據每種操作產生對應的增量資料記錄,Reader外掛程式會讀出這些記錄,並匯出為Data Integration的資料格式。
同時,由於Tablestore具有動態列、多版本的特性,所以Reader外掛程式匯出的一行不對應Tablestore中的一行,而是對應Tablestore中的一列的一個版本。即Tablestore中的一行可能會匯出很多行,每行包含主索引值、該列的列名、該列下該版本的時間戳記(版本號碼)、該版本的值、操作類型。如果設定isExportSequenceInfo為true,還會包括時序資訊。
轉換為Data Integration的資料格式後,定義了以下四種操作類型:
U(UPDATE):寫入一列的一個版本。
DO(DELETE_ONE_VERSION):刪除某一列的某個版本。
DA(DELETE_ALL_VERSION):刪除某一列的全部版本,此時需要根據主鍵和列名,刪除對應列的全部版本。
DR(DELETE_ROW):刪除某一行,此時需要根據主鍵,刪除該行資料。
假設該表有兩個主鍵列,主鍵列名分別為pkName1, pkName2,樣本如下。
pkName1 | pkName2 | columnName | timestamp | columnValue | opType |
pk1_V1 | pk2_V1 | col_a | 1441803688001 | col_val1 | U |
pk1_V1 | pk2_V1 | col_a | 1441803688002 | col_val2 | U |
pk1_V1 | pk2_V1 | col_b | 1441803688003 | col_val3 | U |
pk1_V2 | pk2_V2 | col_a | 1441803688000 | — | DO |
pk1_V2 | pk2_V2 | col_b | — | — | DA |
pk1_V3 | pk2_V3 | — | — | — | DR |
pk1_V3 | pk2_V3 | col_a | 1441803688005 | col_val1 | U |
假設匯出的資料如上,共7行,對應Tablestore表內的3行,主鍵分別是(pk1_V1,pk2_V1),(pk1_V2, pk2_V2),(pk1_V3, pk2_V3):
對於主鍵為(pk1_V1,pk2_V1)的一行,包括寫入col_a列的兩個版本和col_b列的一個版本等操作。
對於主鍵為(pk1_V2,pk2_V2)的一行,包括刪除col_a列的一個版本和刪除col_b列的全部版本等操作。
對於主鍵為(pk1_V3,pk2_V3)的一行,包括刪除整行和寫入col_a列的一個版本等操作。
行模式
寬行表
您可以通過行模式匯出資料,該模式將使用者每次更新的記錄,抽取成行的形式匯出,需要設定mode屬性並配置列名。
"parameter": {
#parameter中配置下面三項配置(例如datasource、table等其它配置項照常配置)。
"mode": "single_version_and_update_only", # 設定匯出模式。
"column":[ #按照需求添加需要匯出TableStore中的列,您可以自訂設定配置個數。
{
"name": "uid" #列名樣本,可以是主鍵或屬性列。
},
{
"name": "name" #列名樣本,可以是主鍵或屬性列。
},
],
"isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。
}
時序表
時序表在建立時會自動開啟Stream,因此不需要手動開啟Stream功能。
Tablestore Stream Reader支援匯出時序表中的增量資料,當表為時序表時,需要配置的資訊如下:
"parameter": {
#parameter中配置下面四項配置(例如datasource、table等其它配置項照常配置)。
"mode": "single_version_and_update_only", # 設定匯出模式。
"isTimeseriesTable":"true", # 設定匯出為時序表。
"column":[ #按照需求添加需要匯出TableStore中的列,您可以自訂設定配置個數。
{
"name": "_m_name" #度量名稱欄位。
},
{
"name": "_data_source" #資料來源欄位。
},
{
"name": "_tags" #標籤欄位,將tags轉換為string類型。
},
{
"name": "tag1", #標籤內部欄位鍵名稱。
"is_timeseries_tag":"true" #表明該欄位為tags內部欄位。
},
{
"name": "time" #時間戳記欄位。
},
{
"name": "name" #屬性列名稱。
},
],
"isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。
}
行模式匯出的資料更接近於原始的行,易於後續處理,但需要注意以下問題:
資料類型轉換列表
目前Tablestore Stream Reader支援所有的Tablestore類型,其針對Tablestore類型的轉換列表,如下所示。
類型分類 | Tablestore Stream資料類型 |
整數類 | INTEGER |
浮點類 | DOUBLE |
字串類 | STRING |
布爾類 | BOOLEAN |
二進位類 | BINARY |
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
附錄:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
列模式
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"otsstream",//外掛程式名。
"parameter":{
"datasource":"$srcDatasource",//資料來源。
"dataTable":"",//表名。
"statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。
"maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。
"isExportSequenceInfo":false,//是否匯出時序資訊。
"startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
"endTimeString":"${endTime}${hh}"//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1,//作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式讀取寬表
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"otsstream",//外掛程式名。
"parameter":{
"datasource":"$srcDatasource",//資料來源。
"dataTable":"",//表名。
"statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。
"maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。
"isExportSequenceInfo":false,//是否匯出時序資訊。
"startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
"endTimeString":"${endTime}${hh}",//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
"mode": "single_version_and_update_only",
"column":[
{
"name":"pId"
},
{
"name": "uId"
},
{
"name":"col0"
},
{
"name": "col1"
}
],
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1,//作業並發數。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式讀取時序表
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"otsstream",//外掛程式名。
"parameter":{
"datasource":"$srcDatasource",//資料來源。
"dataTable":"",//表名。
"statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。
"maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。
"isExportSequenceInfo":false,//是否匯出時序資訊。
"startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
"endTimeString":"${endTime}${hh}",//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
"mode": "single_version_and_update_only",
"isTimeseriesTable":"true",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source",
},
{
"name": "_tags",
},
{
"name": "string_column",
}
]
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1,//作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。 | 是 | 無 |
dataTable | 匯出增量資料的表的名稱。該表需要開啟Stream,可以在建表時開啟,或者使用UpdateTable介面開啟。 | 是 | 無 |
statusTable | Reader外掛程式用於選項組的表的名稱,這些狀態可用於減少對非目標範圍內的資料的掃描,從而加快匯出速度。statusTable是Reader用於儲存狀態的表,如果該表不存在,Reader會自動建立該表。一次離線匯出任務完成後,您無需刪除該表,該表中記錄的狀態可用於下次匯出任務中: 您無需建立該表,只需要給出一個表名。Reader外掛程式會嘗試在您的instance下建立該表,如果該表不存在即建立新表。如果該表已存在,會判斷該表的Meta是否與期望一致,如果不一致會拋出異常。 在一次匯出完成之後,您無需刪除該表,該表的狀態可以用於下次的匯出任務。 該表會開啟TTL,資料自動到期,會認為其資料量很小。 針對同一個instance下的多個不同的dataTable的Reader配置,可以使用同一個statusTable,記錄的狀態資訊互不影響。
您配置一個類似TableStoreStreamReaderStatusTable的名稱即可,請注意不要與業務相關的表重名。 | 是 | 無 |
startTimestampMillis | 增量資料的時間範圍(左閉右開)的左邊界,單位為毫秒: | 否 | 無 |
endTimestampMillis | 增量資料的時間範圍(左閉右開)的右邊界,單位為毫秒: | 否 | 無 |
date | 日期格式為yyyyMMdd,例如20151111,表示匯出該日的資料。如果沒有指定date,則需要指定startTimestampMillis和endTimestampMillis或startTimeString和endTimeString,反之也成立。例如,采雲間調度僅支援天層級,所以提供該配置,作用與startTimestampMillis和endTimestampMillis或startTimeString和endTimeString類似。 | 否 | 無 |
isExportSequenceInfo | 是否匯出時序資訊,時序資訊包含了資料的寫入時間等。預設該值為false,即不匯出。 | 否 | false |
maxRetries | 從TableStore中讀增量資料時,每次請求的最大重試次數,預設為30次。重試之間有間隔,重試30次的總時間約為5分鐘,通常無需更改。 | 否 | 30 |
startTimeString | 任務的開始時間,即增量資料的時間範圍(左閉右開)的左邊界,格式為yyyymmddhh24miss ,單位為秒。 | 否 | 無 |
endTimeString | 任務的結束時間,即增量資料的時間範圍(左閉右開)的右邊界,格式為yyyymmddhh24miss ,單位為秒。 | 否 | 無 |
enableSeekIterator | Reader外掛程式需要先確定增量位點,然後再拉取資料,如果是經常啟動並執行任務,外掛程式會根據之前掃描的位點來確定位置。如果之前沒運行過這個外掛程式,將會從增量開始位置(預設增量保留7天,即7天前)開始掃描,因此當還沒有掃描到設定的開始時間之後的資料時,會存在開始一段時間沒有資料匯出的情況,您可以在reader的配置參數裡增加 "enableSeekIterator": true 的配置,協助您加快位點定位。 | 否 | false |
mode | 匯出模式,設定為single_version_and_update_only時為行模式,預設不設定為列模式。 | 否 | 無 |
isTimeseriesTable | 是否為時序表,只有在行模式,即mode為single_version_and_update_only時配置生效。 | 否 | false |
column | column配置single_version_and_update_only 模式下,所匯出的資料列,配置範例: "column":[
{"name":"pk1"},
{"name":"col1"},
{"name":"col2","dataType":"new"},
{"name":"col2","dataType":"old"},
{"name":"col2","dataType":"latest"}
],
| | 無 |