Table StoreTablestore是構建在阿里雲飛天分布式系統之上的NoSQL資料存放區服務,Tablestore資料來源為您提供讀取和寫入Tablestore雙向通道的功能,本文為您介紹DataWorks的Tablestore資料同步的能力支援情況。
使用限制
Tablestore Reader和Writer外掛程式實現了從Tablestore讀取和寫入資料,包含行模式、列模式兩種資料讀取與寫入方式,可針對寬表與時序表進行資料讀取與寫入。
列模式:在Tablestore多版本模型下,表中的資料群組織為行 > 列 > 版本三級的模式, 一行可以有任意列,列名並不是固定的,每一列可以含有多個版本,每個版本都有一個特定的時間戳記(版本號碼)。列模式會將資料匯出為(主索引值,列名,時間戳記,列值)的四元組格式,列模式下匯入的資料也是(主索引值,列名,時間戳記,列值)的四元組格式。
行模式:該模式將使用者每次更新的記錄,抽取成行的形式匯出,即(主索引值,列值)的格式。
行模式下每一行資料對應TableStore表中的一條資料。寫入行模式的資料包含主鍵列列值、普通列列值兩部分。
Tablestore列由主鍵列primaryKey+普通列column組成,源端列順序需要和Tablestore目的端主鍵列+普通列保持一致,否則會產生列映射錯誤。
Tablestore Reader會根據一張表中待讀取的資料的範圍,按照資料同步並發的數目N,將範圍等分為N份Task。每個Task都會有一個Tablestore Reader線程來執行。
支援的欄位類型
目前Tablestore Reader和Tablestore Writer支援所有Tablestore類型,其針對Tablestore類型的轉換列表,如下所示。
類型分類 | Tablestore資料類型 |
整數類 | INTEGER |
浮點類 | DOUBLE |
字串類 | STRING |
布爾類 | BOOLEAN |
二進位類 | BINARY |
Tablestore本身不支援日期型類型。應用程式層通常使用Long儲存時間的Unix TimeStamp。
您需要將INTEGER類型的資料,在指令碼模式中配置為INT類型,DataWorks會將其轉換為INTEGER類型。如果您直接配置為INTEGER類型,日誌將會報錯,導致任務無法順利完成。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄二:Writer指令碼Demo與參數說明。
附錄一:Reader指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
行模式讀取寬表配置
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"ots",//外掛程式名。
"parameter":{
"datasource":"",//資料來源。
"newVersion":"true",//使用新版otsreader
"mode": "normal",// 行模式讀取資料
"isTimeseriesTable":"false",// 配置該表為寬表(非時序表)
"column":[//欄位。
{
"name":"column1"//欄位名。
},
{
"name":"column2"
},
{
"name":"column3"
},
{
"name":"column4"
},
{
"name":"column5"
}
],
"range":{
"split":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"STRING",
"value":"splitPoint1"
},
{
"type":"STRING",
"value":"splitPoint2"
},
{
"type":"STRING",
"value":"splitPoint3"
},
{
"type":"STRING",
"value":"endValue"
}
],
"end":[
{
"type":"STRING",
"value":"endValue"
},
{
"type":"INT",
"value":"100"
},
{
"type":"INF_MAX"
},
{
"type":"INF_MAX"
}
],
"begin":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"INT",
"value":"0"
},
{
"type":"INF_MIN"
},
{
"type":"INF_MIN"
}
]
},
"table":""//表名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作業並發數。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式讀時序表配置
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"ots",//外掛程式名。
"parameter":{
"datasource":"",//資料來源。
"table": "",//表名
// 讀時序資料mode必須為normal
"mode": "normal",
// 讀時序資料newVersion必須為true
"newVersion": "true",
// 配置該表為時序表
"isTimeseriesTable":"true",
// measurementName:配置需要讀取時序資料的度量名稱,非必需,為空白則讀取全表資料
"measurementName":"measurement_1",
"column": [
{
"name": "_m_name"
},
{
"name": "tagA",
"is_timeseries_tag":"true"
},
{
"name": "double_0",
"type":"DOUBLE"
},
{
"name": "string_0",
"type":"STRING"
},
{
"name": "long_0",
"type":"INT"
},
{
"name": "binary_0",
"type":"BINARY"
},
{
"name": "bool_0",
"type":"BOOL"
},
{
"type":"STRING",
"value":"testString"
}
]
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作業並發數。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式讀取寬表配置
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"ots",//外掛程式名。
"parameter":{
"datasource":"",//資料來源。
"table":"",//表名
"newVersion":"true",//新版otsreader
"mode": "multiversion",//多版本模式
"column":[//配置需要匯出的列名稱(必須是非主鍵列)
{"name":"mobile"},
{"name":"name"},
{"name":"age"},
{"name":"salary"},
{"name":"marry"}
],
"range":{//匯出的範圍
"begin":[
{"type":"INF_MIN"},
{"type":"INF_MAX"}
],
"end":[
{"type":"INF_MAX"},
{"type":"INF_MIN"}
],
"split":[
]
},
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作業並發數。
"mbps":"12"//限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader指令碼參數通用配置
參數 | 描述 | 是否必選 | 預設值 |
endpoint | Tablestore Server的EndPoint(服務地址),詳情請參見服務地址。 | 是 | 無 |
accessId | Tablestore的AccessKey ID。 | 是 | 無 |
accessKey | Tablestore的AccessKey Secret。 | 是 | 無 |
instanceName | Tablestore的執行個體名稱,執行個體是您使用和管理Tablestore服務的實體。 您在開通Tablestore服務後,需要通過管理主控台來建立執行個體,然後在執行個體內進行表的建立和管理。 執行個體是Tablestore資源管理的基礎單元,Tablestore對應用程式的存取控制和資源計量都在執行個體層級完成。 | 是 | 無 |
table | 所選取的需要抽取的表名稱,這裡有且只能填寫一張表。在Tablestore不存在多表同步的需求。 | 是 | 無 |
newVersion | 定義了使用的Tablestore Reader外掛程式的版本。
新版Tablestore Reader除了支援新的功能,對系統資源的開銷也相對較低,因此推薦使用新版Tablestore Reader。 新版本外掛程式配置相容了舊版本外掛程式的配置,即舊任務增加newVersion=true配置之後,可以正常運行。 | 否 | false |
mode | 定義了讀取資料的模式,當前支援兩種模式:
本配置僅在新版Tablestore Reader(newVersion:true)配置下生效。 舊版Tablestore Reader會忽略mode配置,並僅支援行模式讀取。 | 否 | normal |
isTimeseriesTable | 定義了操作的資料表是否為時序資料表:
本配置僅在newVersion:true & mode:normal配置下生效。 舊版Tablestore Reader不支援時序表,且時序表無法按列模式讀取。 | 否 | false |
Reader指令碼參數附加配置
Tablestore Reader支援行模式讀取寬表、行模式讀取時序表、列模式讀取寬表。下面為您介紹各模式的附加配置。
行模式讀寬表參數
參數 | 描述 | 是否必選 | 預設值 |
column | 所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊。由於Tablestore本身是NoSQL系統,在Tablestore Reader抽取資料過程中,必須指定相應的欄位名稱。
| 是 | 無 |
begin和end | begin和end配置項用於表示抽取Tablestore表資料的範圍。 begin和end描述的是Tablestore PrimaryKey的區間分布狀態,對於無限大小的區間,您可以使用 說明
例如,對一張主鍵為
| 否 | (INF_MIN,INF_MAX) |
split | 該配置項屬於進階配置項,是您自己定義切分配置資訊,普通情況下不建議使用。 通過配置split參數可以自己定義分區的資料範圍,通常在Tablestore 資料存放區發生熱點,可以使用自訂的切分規則,以下任務配置為例:
任務運行時,資料會切分成6個段,並發讀取。分段數量建議比任務並發數大。
| 否 | 當split參數未配置時,使用自動切分邏輯。 自動切分邏輯會找出 Partition Key最大、最小值,並進行均勻分段。 Partition Key支援整型和字串,整型使用整除來分段,字串使用第一個字元的unicode碼來分段。 |
行模式讀時序表參數
參數 | 描述 | 是否必選 | 預設值 |
column | column是一個數組,每個元素表示一列,可配置常量列與普通列兩種類型。 對於常量列,需要配置以下欄位:
對於普通列,需要配置以下欄位:
讀出四列資料的指令碼樣本:
| 是 | 無 |
measurementName | 配置需要讀取時間軸的度量名稱。若不配置則讀取全表資料。 | 否 | 無 |
timeRange | 請求資料的時間範圍,讀取的範圍是[begin,end),左閉右開的區間,且begin必須小於end,時間戳記單位為毫秒。格式如下:
| 否 | 全部版本 |
列模式讀寬表參數
參數 | 描述 | 是否必選 | 預設值 |
column | 指定要匯出的列,在列模式下只支援普通列。 格式:
說明
| 是 | 所有列 |
range | 讀取的資料範圍,讀取的範圍為[begin,end),左閉右開的區間,並且:
type支援的類型有如下幾類:
格式:
| 否 | 全部資料 |
timeRange | 請求資料的時間範圍,讀取的範圍是[begin,end),左閉右開的區間,且begin必須小於end,時間戳記單位為毫秒。 格式:
| 否 | 全部版本 |
maxVersion | 請求的最巨量資料版本數,取值範圍是1~INT32_MAX。 | 否 | 全部版本 |
附錄二:Writer指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Writer指令碼Demo
行模式寫入寬表配置
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//外掛程式名。
"parameter":{
"datasource":"",//資料來源。
"table":"",//表名。
"newVersion":"true",//使用新版otswriter
"mode": "normal",// 行模式寫入資料
"isTimeseriesTable":"false",// 配置該表為寬表(非時序表)
"primaryKey" : [//Tablestore的主鍵資訊。
{"name":"gid", "type":"INT"},
{"name":"uid", "type":"STRING"}
],
"column" : [//欄位。
{"name":"col1", "type":"INT"},
{"name":"col2", "type":"DOUBLE"},
{"name":"col3", "type":"STRING"},
{"name":"col4", "type":"STRING"},
{"name":"col5", "type":"BOOL"}
],
"writeMode" : "PutRow" //寫入模式。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式寫時序表配置
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//外掛程式名。
"parameter":{
"datasource":"",//資料來源。
"table": "testTimeseriesTableName01",
"mode": "normal",
"newVersion": "true",
"isTimeseriesTable":"true",
"timeunit":"microseconds",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source",
},
{
"name": "_tags",
},
{
"name": "_time",
},
{
"name": "string_1",
"type":"string"
},
{
"name":"tag3",
"is_timeseries_tag":"true",
}
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式寫入寬表配置
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//外掛程式名。
"parameter":{
"datasource":"",//資料來源。
"table":"",
"newVersion":"true",
"mode":"multiVersion",
"primaryKey" : [
"gid",
"uid"
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},x`
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer指令碼參數通用配置
參數 | 描述 | 是否必選 | 預設值 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。 | 是 | 無 |
endPoint | Tablestore Server的EndPoint(服務地址),詳情請參見服務地址。 | 是 | 無 |
accessId | Tablestore的AccessKey ID。 | 是 | 無 |
accessKey | Tablestore的AccessKey Secret。 | 是 | 無 |
instanceName | Tablestore的執行個體名稱,執行個體是您使用和管理Tablestore服務的實體。 您在開通Tablestore服務後,需要通過管理主控台來建立執行個體,然後在執行個體內進行表的建立和管理。執行個體是Tablestore資源管理的基礎單元,Tablestore對應用程式的存取控制和資源計量都在執行個體層級完成。 | 是 | 無 |
table | 所選取的需要抽取的表名稱,此處能且只能填寫一張表。在Tablestore中不存在多表同步的需求。 | 是 | 無 |
newVersion | 定義了使用的Tablestore Writer外掛程式版本。
新版Tablestore Writer除了支援新的功能,對系統資源的開銷也相對較低,因此推薦使用新版Tablestore Writer。 新版外掛程式相容舊版本外掛程式配置,即舊任務增加newVersion=true配置之後,可以正常運行。 | 是 | false |
mode | 定義了寫入資料的模式,當前支援兩種模式
本配置僅在newVersion:true配置下生效 舊版Tablestore Writer會忽略mode配置,並僅支援行模式讀取。 | 否 | normal |
isTimeseriesTable | 定義了操作的資料表是否為時序資料表。
本配置僅在newVersion:true & mode:normal配置下生效(列模式不相容時序表)。 | 否 | false |
Writer指令碼參數附加配置
Tablestore Writer支援行模式寫入寬表、行模式寫入時序表、列模式寫入寬表。下面介紹各模式的附加配置。
行模式寫寬表參數
參數 | 描述 | 是否必選 | 預設值 |
primaryKey | Tablestore的主鍵資訊,使用JSON的數組描述欄位資訊。Tablestore本身是NoSQL系統,在Tablestore Writer匯入資料過程中,必須指定相應的欄位名稱。 資料同步系統本身支援類型轉換的,因此對於源頭資料非STRING/INT,Tablestore Writer會進行資料類型轉換。配置樣本如下。
說明 Tablestore的PrimaryKey僅支援STRING和INT類型,因此Tablestore Writer本身也限定填寫STRING和INT兩種類型。 | 是 | 無 |
column | 所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊。 配置樣本:
其中的name為寫入的Tablestore列名稱,type為寫入的類型。Tablestore支援STRING、INT、DOUBLE、BOOL和BINARY類型。 說明 寫入過程不支援常量、函數或者自訂運算式。 | 是 | 無 |
writeMode | 資料寫入Table Store的模式,目前支援以下兩種模式:
| 是 | 無 |
enableAutoIncrement | 是否允許向包含主鍵自增列的Tablestore表中寫入資料。
| 否 | false |
requestTotalSizeLimitation | 該配置限制寫入Tablestore時單行資料的大小,配置類型為數字。 | 否 | 1MB |
attributeColumnSizeLimitation | 該配置限制寫入Tablestore時單個屬性列的大小,配置類型為數字。 | 否 | 2MB |
primaryKeyColumnSizeLimitation | 該配置限制寫入Tablestore時單個主鍵列的大小,配置類型為數字。 | 否 | 1KB |
attributeColumnMaxCount | 該配置限制寫入Tablestore時屬性列的個數,配置類型為數字。 | 否 | 1,024 |
行模式寫時序表參數
參數 | 描述 | 是否必選 | 預設值 |
column | column中的每個元素對應時序資料中的一個欄位,每個元素可配置以下參數。
由於時序資料的度量名稱與時間戳記不可為空,因此必須配置 樣本:一條待寫入資料如下,包含六個欄位:
使用如下配置:
寫入Tablestore後,控制台查看結果如下 | 是 | 無 |
timeunit | 所配置的時間戳記_time欄位的單位,支援NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS、MINUTES。 | 否 | MICROSECONDS |
列模式寫寬表參數
參數 | 描述 | 是否必選 | 預設值 |
primaryKey | 表的主鍵列。 考慮到配置成本,並不需要配置primaryKey在Record(Line)中的位置,但Record的格式需要固定,要求primaryKey一定在行首,primaryKey之後是columnName。record的格式為: 例如,有以下9條資料:
配置樣本如下:
寫入寬行結果:
| 是 | 無 |
columnNamePrefixFilter | 列名首碼過濾。 Hbase匯入的資料,cf和qulifier共同組成了columnName,但Tablestore不支援cf,所以需要將cf過濾掉。 配置樣本: 說明
| 否 | 無 |
常見問題
問題一
Q:向包含主鍵自增列的目標表寫入資料,需要如何配置Tablestore Writer?
Tablestore Writer的配置中必須包含以下兩條:
"newVersion": "true", "enableAutoIncrement": "true",
Tablestore Writer中不需要配置主鍵自增列的列名。
Tablestore Writer中配置的primaryKey條數+column條數需要等於上遊Tablestore Reader資料的列數。
問題二
Q:在時序模型的配置中,如何理解_tag
和is_timeseries_tag
兩個欄位?
樣本:某條資料共有三個標籤,標籤為:【手機=小米,記憶體=8G,鏡頭=萊卡】。
資料匯出樣本(Tablestore Reader)
如果想將上述標籤合并到一起作為一列匯出,則配置為:
"column": [ { "name": "_tags", } ],
DataWorks會將標籤匯出為一列資料,形式如下:
["phone=xiaomi","camera=LEICA","RAM=8G"]
如果希望匯出
phone
標籤和camera
標籤,並且每個標籤單獨作為一列匯出,則配置為:"column": [ { "name": "phone", "is_timeseries_tag":"true", }, { "name": "camera", "is_timeseries_tag":"true", } ],
DataWorks會匯出兩列資料,形式如下:
xiaomi, LEICA
資料匯入樣本(Tablestore Writer)
現在上遊資料來源(Reader)有兩列資料:
一列資料為:
["phone=xiaomi","camera=LEICA","RAM=8G"]
。另一列資料為:6499。
現希望將這兩列資料都添加到標籤裡面,預期的寫入後標籤欄位格式如下所示:則配置為:
"column": [ { "name": "_tags", }, { "name": "price", "is_timeseries_tag":"true", }, ],
第一列配置將
["phone=xiaomi","camera=LEICA","RAM=8G"]
整體匯入標籤欄位。第二列配置將
price=6499
單獨匯入標籤欄位。