全部產品
Search
文件中心

DataWorks:Tablestore資料來源

更新時間:Jun 19, 2024

Table StoreTablestore是構建在阿里雲飛天分布式系統之上的NoSQL資料存放區服務,Tablestore資料來源為您提供讀取和寫入Tablestore雙向通道的功能,本文為您介紹DataWorks的Tablestore資料同步的能力支援情況。

使用限制

  • Tablestore Reader和Writer外掛程式實現了從Tablestore讀取和寫入資料,包含行模式列模式兩種資料讀取與寫入方式,可針對寬表與時序表進行資料讀取與寫入。

    • 列模式:在Tablestore多版本模型下,表中的資料群組織為 > > 版本三級的模式, 一行可以有任意列,列名並不是固定的,每一列可以含有多個版本,每個版本都有一個特定的時間戳記(版本號碼)。列模式會將資料匯出為(主索引值,列名,時間戳記,列值)的四元組格式,列模式下匯入的資料也是(主索引值,列名,時間戳記,列值)的四元組格式。

    • 行模式:該模式將使用者每次更新的記錄,抽取成行的形式匯出,即(主索引值,列值)的格式。

      行模式下每一行資料對應TableStore表中的一條資料。寫入行模式的資料包含主鍵列列值、普通列列值兩部分。

  • Tablestore列由主鍵列primaryKey+普通列column組成,源端列順序需要和Tablestore目的端主鍵列+普通列保持一致,否則會產生列映射錯誤。

  • Tablestore Reader會根據一張表中待讀取的資料的範圍,按照資料同步並發的數目N,將範圍等分為N份Task。每個Task都會有一個Tablestore Reader線程來執行。

支援的欄位類型

目前Tablestore Reader和Tablestore Writer支援所有Tablestore類型,其針對Tablestore類型的轉換列表,如下所示。

類型分類

Tablestore資料類型

整數類

INTEGER

浮點類

DOUBLE

字串類

STRING

布爾類

BOOLEAN

二進位類

BINARY

說明
  • Tablestore本身不支援日期型類型。應用程式層通常使用Long儲存時間的Unix TimeStamp。

  • 您需要將INTEGER類型的資料,在指令碼模式中配置為INT類型,DataWorks會將其轉換為INTEGER類型。如果您直接配置為INTEGER類型,日誌將會報錯,導致任務無法順利完成。

資料同步任務開發

Tablestore資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源

單表離線同步任務配置指導

附錄:Reader指令碼Demo與參數說明

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。

Reader指令碼Demo

行模式讀取寬表配置

{
    "type":"job",
    "version":"2.0",//版本號碼。
    "steps":[
        {
            "stepType":"ots",//外掛程式名。
            "parameter":{
                "datasource":"",//資料來源。
                "newVersion":"true",//使用新版otsreader
                "mode": "normal",// 行模式讀取資料
                "isTimeseriesTable":"false",// 配置該表為寬表(非時序表)
                "column":[//欄位。
                    {
                        "name":"column1"//欄位名。
                    },
                    {
                        "name":"column2"
                    },
                    {
                        "name":"column3"
                    },
                    {
                        "name":"column4"
                    },
                    {
                        "name":"column5"
                    }
                ],
                "range":{
                    "split":[
                        {
                            "type":"STRING",
                            "value":"beginValue"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint1"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint2"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint3"
                        },
                        {
                            "type":"STRING",
                            "value":"endValue"
                        }
                    ],
                    "end":[
                        {
                            "type":"STRING",
                            "value":"endValue"
                        },
                        {
                            "type":"INT",
                            "value":"100"
                        },
                        {
                            "type":"INF_MAX"
                        },
                        {
                            "type":"INF_MAX"
                        }
                    ],
                    "begin":[
                        {
                            "type":"STRING",
                            "value":"beginValue"
                        },
                        {
                            "type":"INT",
                            "value":"0"
                        },
                        {
                            "type":"INF_MIN"
                        },
                        {
                            "type":"INF_MIN"
                        }
                    ]
                },
                "table":""//表名。
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//錯誤記錄數。
        },
        "speed":{
            "throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
            "concurrent":1 //作業並發數。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

行模式讀時序表配置

{
    "type":"job",
    "version":"2.0",//版本號碼。
    "steps":[
        {
            "stepType":"ots",//外掛程式名。
            "parameter":{
                "datasource":"",//資料來源。
                "table": "",//表名
                // 讀時序資料mode必須為normal
                "mode": "normal",
                // 讀時序資料newVersion必須為true
                "newVersion": "true",
                // 配置該表為時序表
                "isTimeseriesTable":"true",
                // measurementName:配置需要讀取時序資料的度量名稱,非必需,為空白則讀取全表資料
                "measurementName":"measurement_1",
                "column": [
                  {
                    "name": "_m_name"
                  },
                  {
                    "name": "tagA",
                    "is_timeseries_tag":"true"
                  },
                  {
                    "name": "double_0",
                    "type":"DOUBLE"
                  },
                  {
                    "name": "string_0",
                    "type":"STRING"
                  },
                  {
                    "name": "long_0",
                    "type":"INT"
                  },
                  {
                    "name": "binary_0",
                    "type":"BINARY"
                  },
                  {
                    "name": "bool_0",
                    "type":"BOOL"
                  },
                  {
                    "type":"STRING",
                    "value":"testString"
                  }
                ]
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//錯誤記錄數。
        },
        "speed":{
            "throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
            "concurrent":1 //作業並發數。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

列模式讀取寬表配置

{
    "type":"job",
    "version":"2.0",//版本號碼。
    "steps":[
        {
            "stepType":"ots",//外掛程式名。
            "parameter":{
                "datasource":"",//資料來源。
                "table":"",//表名
                "newVersion":"true",//新版otsreader
                "mode": "multiversion",//多版本模式
                "column":[//配置需要匯出的列名稱(必須是非主鍵列)
                    {"name":"mobile"},
                    {"name":"name"},
                    {"name":"age"},
                    {"name":"salary"},
                    {"name":"marry"}
                ],
                "range":{//匯出的範圍
                    "begin":[
                        {"type":"INF_MIN"},
                        {"type":"INF_MAX"}
                    ],
                    "end":[
                        {"type":"INF_MAX"},
                        {"type":"INF_MIN"}
                    ],
                    "split":[
                    ]
                },

            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//錯誤記錄數。
        },
        "speed":{
            "throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
            "concurrent":1 //作業並發數。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader指令碼參數通用配置

參數

描述

是否必選

預設值

endpoint

Tablestore Server的EndPoint(服務地址),詳情請參見服務地址

accessId

Tablestore的AccessKey ID。

accessKey

Tablestore的AccessKey Secret。

instanceName

Tablestore的執行個體名稱,執行個體是您使用和管理Tablestore服務的實體。

您在開通Tablestore服務後,需要通過管理主控台來建立執行個體,然後在執行個體內進行表的建立和管理。

執行個體是Tablestore資源管理的基礎單元,Tablestore對應用程式的存取控制和資源計量都在執行個體層級完成。

table

所選取的需要抽取的表名稱,這裡有且只能填寫一張表。在Tablestore不存在多表同步的需求。

newVersion

定義了使用的Tablestore Reader外掛程式的版本。

  • false:舊版本Tablestore Reader,僅支援行模式讀取寬表

  • true:新版本Tablestore Reader,支援行模式列模式時序表寬表

新版Tablestore Reader除了支援新的功能,對系統資源的開銷也相對較低,因此推薦使用新版Tablestore Reader。

新版本外掛程式配置相容了舊版本外掛程式的配置,即舊任務增加newVersion=true配置之後,可以正常運行。

false

mode

定義了讀取資料的模式,當前支援兩種模式:

  • normal:行模式讀取資料,資料格式為{主鍵列值,普通列值}。

  • multiVersion:列模式讀取資料,資料格式為{主鍵列,普通列名,時間戳記,普通列名對應列值}。

本配置僅在新版Tablestore Reader(newVersion:true)配置下生效。

舊版Tablestore Reader會忽略mode配置,並僅支援行模式讀取。

normal

isTimeseriesTable

定義了操作的資料表是否為時序資料表:

  • false:資料表為普通的寬表。

  • true:資料表為時序資料表。

本配置僅在newVersion:true & mode:normal配置下生效。

舊版Tablestore Reader不支援時序表,且時序表無法按列模式讀取。

false

Reader指令碼參數附加配置

Tablestore Reader支援行模式讀取寬表、行模式讀取時序表、列模式讀取寬表。下面為您介紹各模式的附加配置。

行模式讀寬表參數

參數

描述

是否必選

預設值

column

所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊。由於Tablestore本身是NoSQL系統,在Tablestore Reader抽取資料過程中,必須指定相應的欄位名稱。

  • 支援普通的列讀取,例如{"name":"col1"}。

  • 支援部分列讀取,如果您不配置該列,則Tablestore Reader不予讀取。

  • 支援常量列讀取,例如{"type":"STRING", "value":"DataX"}。使用type描述常量類型,目前支援String、Int、Double、Bool、Binary(使用Base64編碼填寫)、INF_MIN(Tablestore的系統限定最小值,如果使用該值,您不能填寫value屬性,否則報錯)、INF_MAX(Tablestore的系統限定最大值,如果使用該值,您不能填寫value屬性,否則報錯)。

  • 不支援函數或者自訂運算式,由於Tablestore本身不提供類似SQL的函數或者運算式功能,Tablestore Reader也不能提供函數或運算式列功能。

beginend

beginend配置項用於表示抽取Tablestore表資料的範圍。

beginend描述的是Tablestore PrimaryKey的區間分布狀態,對於無限大小的區間,您可以使用{"type":"INF_MIN"}{"type":"INF_MAX"}分別指代beginend,其中type表示抽取資料的類型。

說明
  • beginend配置項預設值為[INF_MIN,INF_MAX),即讀取全部資料。

  • 當配置的beginend數量小於主鍵數量時,依據最左主鍵匹配原則,未配置的主鍵範圍預設為[INF_MIN,INF_MAX)

  • 當只配置了beginend中的一項時,匯出的資料範圍為[begin, INF_MAX)[INF_MIN, end)

  • 當僅有一個主鍵列時,beginend設定匯出資料遵循左閉右開規則。

  • 當有多個主鍵列時,最後一個主鍵列匯出遵循左閉右開規則,其他主鍵列為左右閉區間。

例如,對一張主鍵為[Hundreds, Tens,Ones]的三主鍵Tablestore表進行資料幫浦,表中的資料的主鍵分別為:(0,0,0)(0,0,1)(0,0,2)(0,0,3)......(9,9,8)(9,9,9),共1000列。beginend的配置如下所示。

  • 樣本一:抽取Hundreds的範圍為[3,5]Tens的範圍為[4,6]。抽取的資料主鍵包含(3,4,0)(3,4,1)...(4,4,0),(4,0,1)...(5,6,8)(5,6,9)。配置如下:

    "range": {
          "begin": [
            {"type":"INT", "value":"3"},  //指定抽取Hundreds的最小值。
            {"type":"INT", "value":"4"}  //指定抽取Tens的最小值。
          ],
          "end": [
            {"type":"INT", "value":"5"}, //指定抽取Hundreds的最大值。
            {"type":"INT", "value":"6"} //指定抽取Tens的最大值。
          ]
        }
  • 樣本二:抽取Hundreds的範圍為[3,5]Tens的範圍為[4,6]Ones範圍為[5,7)。抽取的資料主鍵包含(3,4,5)(3,4,6)...(4,4,5),(4,4,6)...(5,6,5)(5,6,6)。配置如下:

    "range": {
          "begin": [
            {"type":"INT", "value":"3"},  //指定抽取Hundreds的最小值。
            {"type":"INT", "value":"4"},  //指定抽取Tens的最小值。
            {"type":"INT", "value":"5"}  //指定抽取Ones的最小值。
          ],
          "end": [
            {"type":"INT", "value":"5"}, //指定抽取Hundreds的最大值。
            {"type":"INT", "value":"6"}, //指定抽取Tens的最大值。
            {"type":"INT", "value":"7"}  //指定抽取Ones的最大值。
          ]
        }

(INF_MIN,INF_MAX)

split

該配置項屬於進階配置項,是您自己定義切分配置資訊,普通情況下不建議使用。

通過配置split參數可以自己定義分區的資料範圍,通常在Tablestore 資料存放區發生熱點,可以使用自訂的切分規則,以下任務配置為例:

{
  "range": {
    "begin": [{"type": "INF_MIN"}],
    "end":   [{"type": "INF_MAX"}],
    "split": [
      {"type": "STRING","value": "1"},
      {"type": "STRING","value": "2"},
      {"type": "STRING","value": "3"},
      {"type": "STRING","value": "4"},
      {"type": "STRING","value": "5"}
    ]
  }

任務運行時,資料會切分成6個段,並發讀取。分段數量建議比任務並發數大。

// 第 1 段
[最小值, 1)
// 第 2 段
[1, 2)
// 第 3 段
[2, 3)
// 第 4 段
[3, 4)
// 第 5 段
[4, 5)
// 第 6 段
[5, 最大值)

當split參數未配置時,使用自動切分邏輯。

自動切分邏輯會找出 Partition Key最大、最小值,並進行均勻分段。

Partition Key支援整型和字串,整型使用整除來分段,字串使用第一個字元的unicode碼來分段。

行模式讀時序表參數

參數

描述

是否必選

預設值

column

column是一個數組,每個元素表示一列,可配置常量列與普通列兩種類型。

對於常量列,需要配置以下欄位:

  • type:欄位實值型別,必需配置。目前支援string、int、double、bool、binary。

  • value : 欄位值,必需配置。

對於普通列,需要配置以下欄位:

  • name :列名,必需配置。以下為預定義的欄位。

    • 時間軸的'度量名稱'使用_m_name標識,資料類型為String。

    • 時間軸的'資料來源'使用_data_source標識,資料類型為String。

    • 時間軸的'標籤'使用_tags標識,資料類型為String。

    • 時間軸的'時間戳記'使用_time標識,資料類型為Long。

  • is_timeseries_tag:該列是否為tags欄位內部的索引值,非必需配置,預設為false。

  • type:欄位實值型別,非必需配置,預設為string。目前支援 string、int、double、bool、binary。

讀出四列資料的指令碼樣本:

"column": [
  {
    "name": "_m_name"               // 時間軸的度量名稱欄位
  },
  {
    "name": "tag_key",                // 時間軸中標籤欄位內tag_key對應的value
    "is_timeseries_tag":"true"
  },
  {
    "name": "string_column",        // fields中名稱為string_column
    "type":"string"                    // 且資料類型為string的列
  },
  {
    "value": "constant_value",        // 常量列,值固定為"constant_value"
    "type":"string"
  }
],

measurementName

配置需要讀取時間軸的度量名稱。若不配置則讀取全表資料。

timeRange

請求資料的時間範圍,讀取的範圍是[begin,end),左閉右開的區間,且begin必須小於end,時間戳記單位為毫秒。格式如下:

"timeRange":{
    // begin:非必選,預設為0,取值範圍是0~LONG_MAX
    "begin":1400000000000,
    // end:非必選,預設為Long Max(9223372036854775807L),取值範圍是0~LONG_MAX
    "end"  :1600000000000
},

全部版本

列模式讀寬表參數

參數

描述

是否必選

預設值

column

指定要匯出的列,在列模式下只支援普通列。

格式:

"column": [
    {"name1":"{your column name1}"},
    {"name2":"{your column name2}"}
],
說明
  • 在列模式下,不支援常量列。

  • PK列不能指定,匯出4元組中預設包括完整的PK。

  • 不能重複指定列。

所有列

range

讀取的資料範圍,讀取的範圍為[begin,end),左閉右開的區間,並且:

  • begin小於end,表示正序讀取資料。

  • begin大於end,表示反序讀取資料。

  • begin和end不能相等。

type支援的類型有如下幾類:

  • string

  • int

  • binary:輸入的方式採用二進位的Base64字串形式傳入。

  • INF_MIN:表示無限小。

  • INF_MAX:表示無限大。

格式:

"range":{
    // 可選,預設表示從無限小開始讀取
    // 這個值的輸入可以填寫空數組,或者PK首碼,亦或者完整的PK,在正序讀取資料時,預設填充PK尾碼為INF_MIN,反序為INF_MAX
    // 例子:
    // 如果使用者的表有2個PK,類型分別為string、int,那麼如下3種輸入都是合法,如:
    //1. 從表的開始位置讀取 -> 讀取到表的結束為止:
    //"begin":[],"end":[],
    //2. 從第一列主索引值為"a",第二列主鍵的最小值開始讀取 -> 讀取到第一列主索引值為"b",第二列主鍵的最大值:
    //"begin":[{"type":"string", "value":"a"}],"end":[{"type":"string", "value":"b"}],
    //3. 從第一列主索引值為"a",第二列主鍵的最小值開始讀取 -> 讀取到表的結束位置:
    //"begin":[{"type":"string", "value":"a"},{"type":"INF_MIN"}],"end":[],    
    //
    // binary類型的PK列比較特殊,因為Json不支援直接輸入位元,所以系統定義:使用者如果要傳入
    // 二進位,必須使用(Java)Base64.encodeBase64String方法,將二進位轉換為一個可視化的字串,然後將這個字串填入value中
    // 例子(Java):
    //   byte[] bytes = "hello".getBytes();  # 構造一個位元據,這裡使用字串hello的byte值
    //   String inputValue = Base64.encodeBase64String(bytes) # 調用Base64方法,將二進位轉換為可視化的字串
    //   上面的代碼執行之後,可以獲得inputValue為"aGVsbG8="
    //   最終寫入配置:{"type":"binary","value" : "aGVsbG8="}

    "begin":[{"type":"string", "value":"a"},{"type":"INF_MIN"}],

    // 預設表示讀取到無限大結束
    // 這個值得輸入可以填寫空數組,或者PK首碼,亦或者完整的PK,在正序讀取資料時,預設填充PK尾碼為INF_MAX,反序為INF_MIN
    // 可選
    "end":[{"type":"string", "value":"g"},{"type":"INF_MAX"}],

    // 目前使用者資料較多時,需要開啟並發匯出,Split可以將當前範圍的資料按照切分點切分為多個並發任務
    // 可選
    //   1. split中的輸入值只能PK的第一列(分區建),且值的類型必須和PartitionKey一致
    //   2. 值的範圍必須在begin和end之間
    //   3. split內部的值必鬚根據begin和end的正反序關係而遞增或者遞減
    "split":[{"type":"string", "value":"b"}, {"type":"string", "value":"c"}]
},

全部資料

timeRange

請求資料的時間範圍,讀取的範圍是[begin,end),左閉右開的區間,且begin必須小於end,時間戳記單位為毫秒。

格式:

"timeRange":{
    // begin:非必選,預設為0,取值範圍是0~LONG_MAX
    "begin":1400000000000,
    // end:非必選,預設為Long Max(9223372036854775807L),取值範圍是0~LONG_MAX
    "end"  :1600000000000
},

全部版本

maxVersion

請求的最巨量資料版本數,取值範圍是1~INT32_MAX。

全部版本

附錄:Writer指令碼Demo與參數說明

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。

Writer指令碼Demo

行模式寫入寬表配置

{
    "type":"job",
    "version":"2.0",//版本號碼。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"ots",//外掛程式名。
            "parameter":{
                "datasource":"",//資料來源。
                "table":"",//表名。
                "newVersion":"true",//使用新版otswriter
                "mode": "normal",// 行模式寫入資料
                "isTimeseriesTable":"false",// 配置該表為寬表(非時序表)
                "primaryKey" : [//Tablestore的主鍵資訊。
                    {"name":"gid", "type":"INT"},
                    {"name":"uid", "type":"STRING"}
                 ],
                "column" : [//欄位。
                      {"name":"col1", "type":"INT"},
                      {"name":"col2", "type":"DOUBLE"},
                      {"name":"col3", "type":"STRING"},
                      {"name":"col4", "type":"STRING"},
                      {"name":"col5", "type":"BOOL"}
                  ],
                "writeMode" : "PutRow"    //寫入模式。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//錯誤記錄數。
        },
        "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1, //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

行模式寫時序表配置

{
    "type":"job",
    "version":"2.0",//版本號碼。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"ots",//外掛程式名。
            "parameter":{
                "datasource":"",//資料來源。
                "table": "testTimeseriesTableName01",
                "mode": "normal",
                "newVersion": "true",
                "isTimeseriesTable":"true",
                "timeunit":"microseconds",
                "column": [
                      {
                        "name": "_m_name"
                      },
                      {
                        "name": "_data_source",
                      },
                      {
                        "name": "_tags",
                      },
                      {
                        "name": "_time",
                      },
                      {
                        "name": "string_1",
                        "type":"string"
                      },
                      {
                        "name":"tag3",
                        "is_timeseries_tag":"true",
                      }
                    ]
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//錯誤記錄數。
        },
        "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1, //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

列模式寫入寬表配置

{
    "type":"job",
    "version":"2.0",//版本號碼。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"ots",//外掛程式名。
            "parameter":{
                "datasource":"",//資料來源。
                "table":"",
                "newVersion":"true",
                "mode":"multiVersion",
                "primaryKey" : [
                    "gid",
                    "uid"
                    ]
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//錯誤記錄數。
        },x`
        "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1, //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer指令碼參數通用配置

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。

endPoint

Tablestore Server的EndPoint(服務地址),詳情請參見服務地址

accessId

Tablestore的AccessKey ID。

accessKey

Tablestore的AccessKey Secret。

instanceName

Tablestore的執行個體名稱,執行個體是您使用和管理Tablestore服務的實體。

您在開通Tablestore服務後,需要通過管理主控台來建立執行個體,然後在執行個體內進行表的建立和管理。執行個體是Tablestore資源管理的基礎單元,Tablestore對應用程式的存取控制和資源計量都在執行個體層級完成。

table

所選取的需要抽取的表名稱,此處能且只能填寫一張表。在Tablestore中不存在多表同步的需求。

newVersion

定義了使用的Tablestore Writer外掛程式版本。

  • false:舊版本Tablestore Writer,僅支援行模式寫入寬表。

  • true:新版本Tablestore Writer,支援行模式列模式時序表寬表,且支援主鍵字增列功能。

新版Tablestore Writer除了支援新的功能,對系統資源的開銷也相對較低,因此推薦使用新版Tablestore Writer。

新版外掛程式相容舊版本外掛程式配置,即舊任務增加newVersion=true配置之後,可以正常運行。

false

mode

定義了寫入資料的模式,當前支援兩種模式

  • normal:寫入普通格式的資料(行模式)。

  • multiVersion:寫入多版本格式的資料(列模式)。

本配置僅在newVersion:true配置下生效

舊版Tablestore Writer會忽略mode配置,並僅支援行模式讀取。

normal

isTimeseriesTable

定義了操作的資料表是否為時序資料表。

  • false:資料表為普通的寬表。

  • true:資料表為時序資料表。

本配置僅在newVersion:true & mode:normal配置下生效(列模式不相容時序表)。

false

Writer指令碼參數附加配置

Tablestore Writer支援行模式寫入寬表、行模式寫入時序表、列模式寫入寬表。下面介紹各模式的附加配置。

行模式寫寬表參數

參數

描述

是否必選

預設值

primaryKey

Tablestore的主鍵資訊,使用JSON的數組描述欄位資訊。Tablestore本身是NoSQL系統,在Tablestore Writer匯入資料過程中,必須指定相應的欄位名稱。

資料同步系統本身支援類型轉換的,因此對於源頭資料非STRING/INT,Tablestore Writer會進行資料類型轉換。配置樣本如下。

"primaryKey" : [
    {"name":"gid", "type":"INT"},
    {"name":"uid", "type":"STRING"}
                 ],
說明

Tablestore的PrimaryKey僅支援STRING和INT類型,因此Tablestore Writer本身也限定填寫STRING和INT兩種類型。

column

所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊。

配置樣本:

"column" : [
     {"name":"col1", "type":"INT"},
     {"name":"col2", "type":"DOUBLE"},
     {"name":"col3", "type":"STRING"},
     {"name":"col4", "type":"BINARY"},
     {"name":"col5", "type":"BOOL"}
              ],

其中的name為寫入的Tablestore列名稱,type為寫入的類型。Tablestore支援STRING、INT、DOUBLE、BOOL和BINARY類型。

說明

寫入過程不支援常量、函數或者自訂運算式。

writeMode

資料寫入Table Store的模式,目前支援以下兩種模式:

  • PutRow:對應於Tablestore API PutRow,插入資料到指定的行。如果該行不存在,則新增一行。如果該行存在,則覆蓋原有行。

  • UpdateRow:對應於Tablestore API UpdateRow,更新指定行的資料。如果該行不存在,則新增一行。如果該行存在,則根據請求的內容在這一行中新增、修改或者刪除指定列的值。

enableAutoIncrement

是否允許向包含主鍵自增列的Tablestore表中寫入資料。

  • true:外掛程式會自動掃描目的端表的自增列資訊,並在寫入資料時添加自增列(不需要使用者配置自增列名稱)。

  • false:寫入含主鍵自增列的表時會報錯。

false

requestTotalSizeLimitation

該配置限制寫入Tablestore時單行資料的大小,配置類型為數字。

1MB

attributeColumnSizeLimitation

該配置限制寫入Tablestore時單個屬性列的大小,配置類型為數字。

2MB

primaryKeyColumnSizeLimitation

該配置限制寫入Tablestore時單個主鍵列的大小,配置類型為數字。

1KB

attributeColumnMaxCount

該配置限制寫入Tablestore時屬性列的個數,配置類型為數字。

1,024

行模式寫時序表參數

參數

描述

是否必選

預設值

column

column中的每個元素對應時序資料中的一個欄位,每個元素可配置以下參數。

  • name:列名,必需配置。以下為預定義的欄位:

    • 時間軸的'度量名稱'使用_m_name標識,資料類型為String。

    • 時間軸的'資料來源'使用_data_source標識,資料類型為String。

    • 時間軸的'標籤'使用_tags標識,資料類型為String,字串格式為["tagKey1=value1","tagKey2=value2"]。

    • 時間軸的'時間戳記'使用_time標識,資料類型為Long,預設單位為微秒。

  • is_timeseries_tag : 是否為tags欄位內部的索引值,非必需配置,預設為false。

  • type : 欄位實值型別,非必需配置,預設為string。支援string、int、double、bool、binary類型。

由於時序資料的度量名稱與時間戳記不可為空,因此必須配置_m_name_time欄位。

樣本:一條待寫入資料如下,包含六個欄位:

mName1    source1    ["tag1=A","tag2=B"]    1677763080000000    field_value     C

使用如下配置:

"column": [
      {
        "name": "_m_name"
      },
      {
        "name": "_data_source",
      },
      {
        "name": "_tags",
      },
      {
        "name": "_time",
      },
      {
        "name": "string_1",
        "type":"string"
      },
      {
        "name":"tag3",
        "is_timeseries_tag":"true",
      }
    ],

寫入Tablestore後,控制台查看結果如下結果

timeunit

所配置的時間戳記_time欄位的單位,支援NANOSECONDSMICROSECONDSMILLISECONDSSECONDSMINUTES

MICROSECONDS

列模式寫寬表參數

參數

描述

是否必選

預設值

primaryKey

表的主鍵列。

考慮到配置成本,並不需要配置primaryKey在Record(Line)中的位置,但Record的格式需要固定,要求primaryKey一定在行首,primaryKey之後是columnName。record的格式為:{pk0,pk1...}, {columnName}, {timestamp}, {value}

例如,有以下9條資料:

1,pk1,row1,1677699863871,value_0_0
1,pk1,row2,1677699863871,value_0_1
1,pk1,row3,1677699863871,value_0_2
2,pk2,row1,1677699863871,value_1_0
2,pk2,row2,1677699863871,value_1_1
2,pk2,row3,1677699863871,value_1_2
3,pk3,row1,1677699863871,value_2_0
3,pk3,row2,1677699863871,value_2_1
3,pk3,row3,1677699863871,value_2_2

配置樣本如下:

"primaryKey" : [
    "gid",
    "uid"
    ],

寫入寬行結果:

gid     uid     row1        row2        row3
1        pk1        value_0_0    value_0_1    value_0_2
2        pk2        value_1_0    value_1_1    value_1_2
3        pk3        value_2_0    value_2_1    value_2_2

columnNamePrefixFilter

列名首碼過濾。

Hbase匯入的資料,cfqulifier共同組成了columnName,但Tablestore不支援cf,所以需要將cf過濾掉。

配置樣本:"columnNamePrefixFilter":"cf:"

說明
  • 該參數選填,如果沒有填寫或者值為空白字串,表示不對列名進行過濾。

  • 如果通過Data Integration添加的資料columnName列不是以首碼開始,則將該Record放入髒資料回收器中。

常見問題

問題一

Q:向包含主鍵自增列的目標表寫入資料,需要如何配置Tablestore Writer?

  1. Tablestore Writer的配置中必須包含以下兩條:

    "newVersion": "true",
    "enableAutoIncrement": "true",
  2. Tablestore Writer中不需要配置主鍵自增列的列名。

  3. Tablestore Writer中配置的primaryKey條數+column條數需要等於上遊Tablestore Reader資料的列數。

問題二

Q:在時序模型的配置中,如何理解_tagis_timeseries_tag兩個欄位?

樣本:某條資料共有三個標籤,標籤為:【手機=小米,記憶體=8G,鏡頭=萊卡】。資料

  • 資料匯出樣本(Tablestore Reader)

    • 如果想將上述標籤合并到一起作為一列匯出,則配置為:

      "column": [
            {
              "name": "_tags",
            }
          ],

      DataWorks會將標籤匯出為一列資料,形式如下:

      ["phone=xiaomi","camera=LEICA","RAM=8G"]
    • 如果希望匯出phone標籤和camera標籤,並且每個標籤單獨作為一列匯出,則配置為:

      "column": [
            {
              "name": "phone",
              "is_timeseries_tag":"true",
            },
            {
              "name": "camera",
              "is_timeseries_tag":"true",
            }
          ],

      DataWorks會匯出兩列資料,形式如下:

      xiaomi, LEICA
  • 資料匯入樣本(Tablestore Writer)

    現在上遊資料來源(Reader)有兩列資料:

    • 一列資料為:["phone=xiaomi","camera=LEICA","RAM=8G"]

    • 另一列資料為:6499。

    現希望將這兩列資料都添加到標籤裡面,預期的寫入後標籤欄位格式如下所示:格式則配置為:

    "column": [
          {
            "name": "_tags",
          },
          {
            "name": "price",
            "is_timeseries_tag":"true",
          },
        ],
    • 第一列配置將["phone=xiaomi","camera=LEICA","RAM=8G"]整體匯入標籤欄位。

    • 第二列配置將price=6499單獨匯入標籤欄位。