DataWorksData Integration支援使用Tablestore Stream Reader讀取Tablestore的增量資料,本文為您介紹DataWorks的Tablestore Stream資料讀取能力。
資料同步前準備:Tablestore Stream環境準備
使用Tablestore Stream外掛程式前,您必須確保Tablestore表上已經開啟Stream功能。時序表已預設開啟stream功能。您可以在建表時指定開啟,也可以使用SDK的UpdateTable介面開啟。開啟Stream的方法,如下所示。
SyncClient client = new SyncClient("", "", "", "");
#方法1:建表的時候開啟:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量資料保留24小時。
client.createTable(createTableRequest);
#方法2:如果建表時未開啟,您可以通過UpdateTable開啟:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);
使用SDK的UpdateTable介面開啟時:
指定開啟Stream並設定到期時間,即開啟了Tablestore增量資料匯出功能。開啟stream功能後,Tablestore服務端就會將您的動作記錄額外儲存起來,每個分區有一個有序的動作記錄隊列,每條動作記錄會在一定時間後被記憶體回收,該時間即為您指定的到期時間。
Tablestore的SDK提供了幾個Stream相關的API用於讀取這部分的動作記錄,增量外掛程式也是通過Tablestore SDK的介面擷取到增量資料。列模式下會將增量資料轉化為多個6元組的形式(pk、colName、version、colValue、opType和sequenceInfo),行模式則會以普通行的形式匯出增量資料。
支援的同步模式與欄位類型
Tablestore Stream Reader外掛程式支援使用列模式或行模式同步Tablestore的增量資料。兩種模式下的同步過程和欄位類型要求如下。
列模式
在Tablestore多版本模式下,表中的資料群組織為行>列>版本
三級的模式, 一行可以有任意列,列名並不是固定的,每一列可以含有多個版本,每個版本都有一個特定的時間戳記(版本號碼)。
您可以通過Tablestore的API進行一系列讀寫操作,Tablestore通過記錄您最近對錶的一系列寫操作(或資料更改操作)來實現記錄增量資料的目的,所以您也可以把增量資料看作一批操作記錄。
Tablestore支援PutRow、UpdateRow和DeleteRow操作:
PutRow:寫入一行,如果該行已存在即覆蓋該行。
UpdateRow:更新一行,不更改原行的其它資料。更新包括新增或覆蓋(如果對應列的對應版本已存在)一些列值、刪除某一列的全部版本、刪除某一列的某個版本。
DeleteRow:刪除一行。
Tablestore會根據每種操作產生對應的增量資料記錄,Reader外掛程式會讀出這些記錄,並匯出為Data Integration的資料格式。
同時,由於Tablestore具有動態列、多版本的特性,所以Reader外掛程式匯出的一行不對應Tablestore中的一行,而是對應Tablestore中的一列的一個版本。即Tablestore中的一行可能會匯出很多行,每行包含主索引值、該列的列名、該列下該版本的時間戳記(版本號碼)、該版本的值、操作類型。如果設定isExportSequenceInfo為true,還會包括時序資訊。
轉換為Data Integration的資料格式後,定義了以下四種操作類型:
U(UPDATE):寫入一列的一個版本。
DO(DELETE_ONE_VERSION):刪除某一列的某個版本。
DA(DELETE_ALL_VERSION):刪除某一列的全部版本,此時需要根據主鍵和列名,刪除對應列的全部版本。
DR(DELETE_ROW):刪除某一行,此時需要根據主鍵,刪除該行資料。
假設該表有兩個主鍵列,主鍵列名分別為pkName1, pkName2,樣本如下。
pkName1 | pkName2 | columnName | timestamp | columnValue | opType |
pk1_V1 | pk2_V1 | col_a | 1441803688001 | col_val1 | U |
pk1_V1 | pk2_V1 | col_a | 1441803688002 | col_val2 | U |
pk1_V1 | pk2_V1 | col_b | 1441803688003 | col_val3 | U |
pk1_V2 | pk2_V2 | col_a | 1441803688000 | — | DO |
pk1_V2 | pk2_V2 | col_b | — | — | DA |
pk1_V3 | pk2_V3 | — | — | — | DR |
pk1_V3 | pk2_V3 | col_a | 1441803688005 | col_val1 | U |
假設匯出的資料如上,共7行,對應Tablestore表內的3行,主鍵分別是(pk1_V1,pk2_V1),(pk1_V2, pk2_V2),(pk1_V3, pk2_V3):
對於主鍵為(pk1_V1,pk2_V1)的一行,包括寫入col_a列的兩個版本和col_b列的一個版本等操作。
對於主鍵為(pk1_V2,pk2_V2)的一行,包括刪除col_a列的一個版本和刪除col_b列的全部版本等操作。
對於主鍵為(pk1_V3,pk2_V3)的一行,包括刪除整行和寫入col_a列的一個版本等操作。
行模式
寬行表
您可以通過行模式匯出資料,該模式將使用者每次更新的記錄,抽取成行的形式匯出,需要設定mode屬性並配置列名。
"parameter": { #parameter中配置下面三項配置(例如datasource、table等其它配置項照常配置)。 "mode": "single_version_and_update_only", # 設定匯出模式。 "column":[ #按照需求添加需要匯出TableStore中的列,您可以自訂設定配置個數。 { "name": "uid" #列名樣本,可以是主鍵或屬性列。 }, { "name": "name" #列名樣本,可以是主鍵或屬性列。 }, ], "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。 }
時序表
時序表在建立時會自動開啟Stream,因此不需要手動開啟Stream功能。
Tablestore Stream Reader支援匯出時序表中的增量資料,當表為時序表時,需要配置的資訊如下:
"parameter": { #parameter中配置下面四項配置(例如datasource、table等其它配置項照常配置)。 "mode": "single_version_and_update_only", # 設定匯出模式。 "isTimeseriesTable":"true", # 設定匯出為時序表。 "column":[ #按照需求添加需要匯出TableStore中的列,您可以自訂設定配置個數。 { "name": "_m_name" #度量名稱欄位。 }, { "name": "_data_source" #資料來源欄位。 }, { "name": "_tags" #標籤欄位,將tags轉換為string類型。 }, { "name": "tag1", #標籤內部欄位鍵名稱。 "is_timeseries_tag":"true" #表明該欄位為tags內部欄位。 }, { "name": "time" #時間戳記欄位。 }, { "name": "name" #屬性列名稱。 }, ], "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。 }
行模式匯出的資料更接近於原始的行,易於後續處理,但需要注意以下問題:
每次匯出的行是從使用者每次更新的記錄中抽取,每一行資料與使用者的寫入或更新操作一一對應。如果使用者存在單獨更新某些列的行為,則會出現有一些記錄只有被更新的部分列,其它列為空白的情況。
行模式不會匯出資料的版本號碼(即每列的時間戳記),也無法進行刪除操作。
資料類型轉換列表
目前Tablestore Stream Reader支援所有的Tablestore類型,其針對Tablestore類型的轉換列表,如下所示。
類型分類 | Tablestore Stream資料類型 |
整數類 | INTEGER |
浮點類 | DOUBLE |
字串類 | STRING |
布爾類 | BOOLEAN |
二進位類 | BINARY |
資料同步任務開發:Tablestore增量同步處理流程引導
操作流程請參見通過嚮導模式配置離線同步任務、通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:Tablestore Stream指令碼Demo與參數說明。
附錄:Tablestore Stream指令碼Demo與參數說明
附錄:離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。
Tablestore Stream Reader指令碼Demo
列模式
{ "type":"job", "version":"2.0",//版本號碼。 "steps":[ { "stepType":"otsstream",//外掛程式名。 "parameter":{ "datasource":"$srcDatasource",//資料來源。 "dataTable":"",//表名。 "statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。 "maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。 "isExportSequenceInfo":false,//是否匯出時序資訊。 "startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間 "endTimeString":"${endTime}${hh}"//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間 }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//錯誤記錄數。 }, "speed":{ "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。 "concurrent":1,//作業並發數。 "mbps":"12"//限流,此處1mbps = 1MB/s。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
行模式讀取寬表
{ "type":"job", "version":"2.0",//版本號碼。 "steps":[ { "stepType":"otsstream",//外掛程式名。 "parameter":{ "datasource":"$srcDatasource",//資料來源。 "dataTable":"",//表名。 "statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。 "maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。 "isExportSequenceInfo":false,//是否匯出時序資訊。 "startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間 "endTimeString":"${endTime}${hh}",//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間 "mode": "single_version_and_update_only", "column":[ { "name":"pId" }, { "name": "uId" }, { "name":"col0" }, { "name": "col1" } ], }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//錯誤記錄數。 }, "speed":{ "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。 "concurrent":1,//作業並發數。 "mbps":"12"//限流 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
行模式讀取時序表
{ "type":"job", "version":"2.0",//版本號碼。 "steps":[ { "stepType":"otsstream",//外掛程式名。 "parameter":{ "datasource":"$srcDatasource",//資料來源。 "dataTable":"",//表名。 "statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。 "maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。 "isExportSequenceInfo":false,//是否匯出時序資訊。 "startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間 "endTimeString":"${endTime}${hh}",//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間 "mode": "single_version_and_update_only", "isTimeseriesTable":"true", "column": [ { "name": "_m_name" }, { "name": "_data_source", }, { "name": "_tags", }, { "name": "string_column", } ] }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//錯誤記錄數。 }, "speed":{ "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。 "concurrent":1,//作業並發數。 "mbps":"12"//限流,此處1mbps = 1MB/s。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Tablestore Stream Reader指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
dataSource | 資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。 | 是 | 無 |
dataTable | 匯出增量資料的表的名稱。該表需要開啟Stream,可以在建表時開啟,或者使用UpdateTable介面開啟。 | 是 | 無 |
statusTable | Reader外掛程式用於選項組的表的名稱,這些狀態可用於減少對非目標範圍內的資料的掃描,從而加快匯出速度。statusTable是Reader用於儲存狀態的表,如果該表不存在,Reader會自動建立該表。一次離線匯出任務完成後,您無需刪除該表,該表中記錄的狀態可用於下次匯出任務中:
您配置一個類似TableStoreStreamReaderStatusTable的名稱即可,請注意不要與業務相關的表重名。 | 是 | 無 |
startTimestampMillis | 增量資料的時間範圍(左閉右開)的左邊界,單位為毫秒:
| 否 | 無 |
endTimestampMillis | 增量資料的時間範圍(左閉右開)的右邊界,單位為毫秒:
| 否 | 無 |
date | 日期格式為yyyyMMdd,例如20151111,表示匯出該日的資料。如果沒有指定date,則需要指定startTimestampMillis和endTimestampMillis或startTimeString和endTimeString,反之也成立。例如,采雲間調度僅支援天層級,所以提供該配置,作用與startTimestampMillis和endTimestampMillis或startTimeString和endTimeString類似。 | 否 | 無 |
isExportSequenceInfo | 是否匯出時序資訊,時序資訊包含了資料的寫入時間等。預設該值為false,即不匯出。 | 否 | false |
maxRetries | 從TableStore中讀增量資料時,每次請求的最大重試次數,預設為30次。重試之間有間隔,重試30次的總時間約為5分鐘,通常無需更改。 | 否 | 30 |
startTimeString | 任務的開始時間,即增量資料的時間範圍(左閉右開)的左邊界,格式為 | 否 | 無 |
endTimeString | 任務的結束時間,即增量資料的時間範圍(左閉右開)的右邊界,格式為 | 否 | 無 |
enableSeekIterator | Reader外掛程式需要先確定增量位點,然後再拉取資料,如果是經常啟動並執行任務,外掛程式會根據之前掃描的位點來確定位置。如果之前沒運行過這個外掛程式,將會從增量開始位置(預設增量保留7天,即7天前)開始掃描,因此當還沒有掃描到設定的開始時間之後的資料時,會存在開始一段時間沒有資料匯出的情況,您可以在reader的配置參數裡增加 | 否 | false |
mode | 匯出模式,設定為single_version_and_update_only時為行模式,預設不設定為列模式。 | 否 | 無 |
isTimeseriesTable | 是否為時序表,只有在行模式,即mode為single_version_and_update_only時配置生效。 | 否 | false |
column | column配置
說明 在行模式下必須配置,否則不會匯出資料。 |
| 無 |