全部產品
Search
文件中心

DataWorks:Tablestore Stream資料來源

更新時間:Oct 24, 2024

DataWorksData Integration支援使用Tablestore Stream Reader讀取Tablestore的增量資料,本文為您介紹DataWorks的Tablestore Stream資料讀取能力。

資料同步前準備:Tablestore Stream環境準備

使用Tablestore Stream外掛程式前,您必須確保Tablestore表上已經開啟Stream功能。時序表已預設開啟stream功能。您可以在建表時指定開啟,也可以使用SDK的UpdateTable介面開啟。開啟Stream的方法,如下所示。

SyncClient client = new SyncClient("", "", "", "");
#方法1:建表的時候開啟:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量資料保留24小時。
client.createTable(createTableRequest);
#方法2:如果建表時未開啟,您可以通過UpdateTable開啟:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);

使用SDK的UpdateTable介面開啟時:

  • 指定開啟Stream並設定到期時間,即開啟了Tablestore增量資料匯出功能。開啟stream功能後,Tablestore服務端就會將您的動作記錄額外儲存起來,每個分區有一個有序的動作記錄隊列,每條動作記錄會在一定時間後被記憶體回收,該時間即為您指定的到期時間。

  • Tablestore的SDK提供了幾個Stream相關的API用於讀取這部分的動作記錄,增量外掛程式也是通過Tablestore SDK的介面擷取到增量資料。列模式下會將增量資料轉化為多個6元組的形式(pk、colName、version、colValue、opType和sequenceInfo),行模式則會以普通行的形式匯出增量資料。

支援的同步模式與欄位類型

Tablestore Stream Reader外掛程式支援使用列模式或行模式同步Tablestore的增量資料。兩種模式下的同步過程和欄位類型要求如下。

列模式

在Tablestore多版本模式下,表中的資料群組織為行>列>版本三級的模式, 一行可以有任意列,列名並不是固定的,每一列可以含有多個版本,每個版本都有一個特定的時間戳記(版本號碼)。

您可以通過Tablestore的API進行一系列讀寫操作,Tablestore通過記錄您最近對錶的一系列寫操作(或資料更改操作)來實現記錄增量資料的目的,所以您也可以把增量資料看作一批操作記錄。

Tablestore支援PutRowUpdateRowDeleteRow操作:

  • PutRow:寫入一行,如果該行已存在即覆蓋該行。

  • UpdateRow:更新一行,不更改原行的其它資料。更新包括新增或覆蓋(如果對應列的對應版本已存在)一些列值、刪除某一列的全部版本、刪除某一列的某個版本。

  • DeleteRow:刪除一行。

Tablestore會根據每種操作產生對應的增量資料記錄,Reader外掛程式會讀出這些記錄,並匯出為Data Integration的資料格式。

同時,由於Tablestore具有動態列、多版本的特性,所以Reader外掛程式匯出的一行不對應Tablestore中的一行,而是對應Tablestore中的一列的一個版本。即Tablestore中的一行可能會匯出很多行,每行包含主索引值、該列的列名、該列下該版本的時間戳記(版本號碼)、該版本的值、操作類型。如果設定isExportSequenceInfo為true,還會包括時序資訊。

轉換為Data Integration的資料格式後,定義了以下四種操作類型:

  • U(UPDATE):寫入一列的一個版本。

  • DO(DELETE_ONE_VERSION):刪除某一列的某個版本。

  • DA(DELETE_ALL_VERSION):刪除某一列的全部版本,此時需要根據主鍵和列名,刪除對應列的全部版本。

  • DR(DELETE_ROW):刪除某一行,此時需要根據主鍵,刪除該行資料。

假設該表有兩個主鍵列,主鍵列名分別為pkName1, pkName2,樣本如下。

pkName1

pkName2

columnName

timestamp

columnValue

opType

pk1_V1

pk2_V1

col_a

1441803688001

col_val1

U

pk1_V1

pk2_V1

col_a

1441803688002

col_val2

U

pk1_V1

pk2_V1

col_b

1441803688003

col_val3

U

pk1_V2

pk2_V2

col_a

1441803688000

DO

pk1_V2

pk2_V2

col_b

DA

pk1_V3

pk2_V3

DR

pk1_V3

pk2_V3

col_a

1441803688005

col_val1

U

假設匯出的資料如上,共7行,對應Tablestore表內的3行,主鍵分別是(pk1_V1,pk2_V1),(pk1_V2, pk2_V2),(pk1_V3, pk2_V3):

  • 對於主鍵為(pk1_V1,pk2_V1)的一行,包括寫入col_a列的兩個版本和col_b列的一個版本等操作。

  • 對於主鍵為(pk1_V2,pk2_V2)的一行,包括刪除col_a列的一個版本和刪除col_b列的全部版本等操作。

  • 對於主鍵為(pk1_V3,pk2_V3)的一行,包括刪除整行和寫入col_a列的一個版本等操作。

行模式

  • 寬行表

    您可以通過行模式匯出資料,該模式將使用者每次更新的記錄,抽取成行的形式匯出,需要設定mode屬性並配置列名。

    "parameter": {
      #parameter中配置下面三項配置(例如datasource、table等其它配置項照常配置)。
      "mode": "single_version_and_update_only", # 設定匯出模式。
      "column":[  #按照需求添加需要匯出TableStore中的列,您可以自訂設定配置個數。
              {
                 "name": "uid"  #列名樣本,可以是主鍵或屬性列。
              },
              {
                 "name": "name"  #列名樣本,可以是主鍵或屬性列。
              },
      ],
      "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。
    }
  • 時序表

    時序表在建立時會自動開啟Stream,因此不需要手動開啟Stream功能。

    Tablestore Stream Reader支援匯出時序表中的增量資料,當表為時序表時,需要配置的資訊如下:

    "parameter": {
      #parameter中配置下面四項配置(例如datasource、table等其它配置項照常配置)。
      "mode": "single_version_and_update_only", # 設定匯出模式。
      "isTimeseriesTable":"true",  # 設定匯出為時序表。
      "column":[  #按照需求添加需要匯出TableStore中的列,您可以自訂設定配置個數。
              {
                "name": "_m_name"       #度量名稱欄位。
              },
              {
                "name": "_data_source"  #資料來源欄位。
              },
              {
                "name": "_tags"         #標籤欄位,將tags轉換為string類型。
              },
              {
                "name": "tag1",       #標籤內部欄位鍵名稱。
                "is_timeseries_tag":"true"  #表明該欄位為tags內部欄位。
              },
              {
                "name": "time"          #時間戳記欄位。
              },
              {
                 "name": "name"         #屬性列名稱。
              },
      ],
      "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。
    }

    行模式匯出的資料更接近於原始的行,易於後續處理,但需要注意以下問題:

    • 每次匯出的行是從使用者每次更新的記錄中抽取,每一行資料與使用者的寫入或更新操作一一對應。如果使用者存在單獨更新某些列的行為,則會出現有一些記錄只有被更新的部分列,其它列為空白的情況。

    • 行模式不會匯出資料的版本號碼(即每列的時間戳記),也無法進行刪除操作。

資料類型轉換列表

目前Tablestore Stream Reader支援所有的Tablestore類型,其針對Tablestore類型的轉換列表,如下所示。

類型分類

Tablestore Stream資料類型

整數類

INTEGER

浮點類

DOUBLE

字串類

STRING

布爾類

BOOLEAN

二進位類

BINARY

資料同步任務開發

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

附錄:指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

  • 列模式

    {
        "type":"job",
        "version":"2.0",//版本號碼。
        "steps":[
            {
                "stepType":"otsstream",//外掛程式名。
                "parameter":{
                    "datasource":"$srcDatasource",//資料來源。
                    "dataTable":"",//表名。
                    "statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。
                    "maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。
                    "isExportSequenceInfo":false,//是否匯出時序資訊。
                    "startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
                    "endTimeString":"${endTime}${hh}"//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
                },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//錯誤記錄數。
            },
            "speed":{
                "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
                "concurrent":1,//作業並發數。
                "mbps":"12"//限流,此處1mbps = 1MB/s。
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 行模式讀取寬表

    {
        "type":"job",
        "version":"2.0",//版本號碼。
        "steps":[
            {
                "stepType":"otsstream",//外掛程式名。
                "parameter":{
                    "datasource":"$srcDatasource",//資料來源。
                    "dataTable":"",//表名。
                    "statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。
                    "maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。
                    "isExportSequenceInfo":false,//是否匯出時序資訊。
                    "startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
                    "endTimeString":"${endTime}${hh}",//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
                    "mode": "single_version_and_update_only",
                    "column":[
                            {
                                "name":"pId"
                            },
                            {
                                "name": "uId"
                            },
                            {
                                "name":"col0"
                            },
                            {
                                "name": "col1"
                            }
                        ],
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//錯誤記錄數。
            },
            "speed":{
                "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
                "concurrent":1,//作業並發數。
                "mbps":"12"//限流
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 行模式讀取時序表

    {
        "type":"job",
        "version":"2.0",//版本號碼。
        "steps":[
            {
                "stepType":"otsstream",//外掛程式名。
                "parameter":{
                    "datasource":"$srcDatasource",//資料來源。
                    "dataTable":"",//表名。
                    "statusTable":"TableStoreStreamReaderStatusTable",//用於選項組的表的名稱。
                    "maxRetries":30,//從 TableStore 中讀增量資料時,每次請求的最大重試次數,預設為30。
                    "isExportSequenceInfo":false,//是否匯出時序資訊。
                    "startTimeString":"${startTime}${hh}",//增量資料的時間範圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
                    "endTimeString":"${endTime}${hh}",//增量資料的時間範圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
                    "mode": "single_version_and_update_only",
                    "isTimeseriesTable":"true",
                    "column": [
                              {
                                "name": "_m_name"
                              },
                              {
                                "name": "_data_source",
                              },
                              {
                                "name": "_tags",
                              },
                              {
                                "name": "string_column",
                              }
                        ]
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"//錯誤記錄數。
            },
            "speed":{
                "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
                "concurrent":1,//作業並發數。
                "mbps":"12"//限流,此處1mbps = 1MB/s。
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }

Reader指令碼參數

參數

描述

是否必選

預設值

datasource

資料來源名稱,指令碼模式支援添加資料來源,該配置項填寫的內容必須與添加的資料來源名稱保持一致。

dataTable

匯出增量資料的表的名稱。該表需要開啟Stream,可以在建表時開啟,或者使用UpdateTable介面開啟。

statusTable

Reader外掛程式用於選項組的表的名稱,這些狀態可用於減少對非目標範圍內的資料的掃描,從而加快匯出速度。statusTable是Reader用於儲存狀態的表,如果該表不存在,Reader會自動建立該表。一次離線匯出任務完成後,您無需刪除該表,該表中記錄的狀態可用於下次匯出任務中:

  • 您無需建立該表,只需要給出一個表名。Reader外掛程式會嘗試在您的instance下建立該表,如果該表不存在即建立新表。如果該表已存在,會判斷該表的Meta是否與期望一致,如果不一致會拋出異常。

  • 在一次匯出完成之後,您無需刪除該表,該表的狀態可以用於下次的匯出任務。

  • 該表會開啟TTL,資料自動到期,會認為其資料量很小。

  • 針對同一個instance下的多個不同的dataTable的Reader配置,可以使用同一個statusTable,記錄的狀態資訊互不影響。

您配置一個類似TableStoreStreamReaderStatusTable的名稱即可,請注意不要與業務相關的表重名。

startTimestampMillis

增量資料的時間範圍(左閉右開)的左邊界,單位為毫秒:

  • Reader外掛程式會從statusTable中找對應startTimestampMillis的位點,從該點開始讀取開始匯出資料。

  • 如果statusTable中找不到對應的位點,則從系統保留的增量資料的第一條開始讀取,並跳過寫入時間小於startTimestampMillis的資料。

endTimestampMillis

增量資料的時間範圍(左閉右開)的右邊界,單位為毫秒:

  • Reader外掛程式從startTimestampMillis位置開始匯出資料後,當遇到第一條時間戳記大於等於endTimestampMillis的資料時,結束匯出資料,匯出完成。

  • 當讀取完當前全部的增量資料時,即使未達到endTimestampMillis,也會結束讀取。

date

日期格式為yyyyMMdd,例如20151111,表示匯出該日的資料。如果沒有指定date,則需要指定startTimestampMillisendTimestampMillisstartTimeStringendTimeString,反之也成立。例如,采雲間調度僅支援天層級,所以提供該配置,作用與startTimestampMillisendTimestampMillisstartTimeStringendTimeString類似。

isExportSequenceInfo

是否匯出時序資訊,時序資訊包含了資料的寫入時間等。預設該值為false,即不匯出。

false

maxRetries

從TableStore中讀增量資料時,每次請求的最大重試次數,預設為30次。重試之間有間隔,重試30次的總時間約為5分鐘,通常無需更改。

30

startTimeString

任務的開始時間,即增量資料的時間範圍(左閉右開)的左邊界,格式為yyyymmddhh24miss,單位為秒。

endTimeString

任務的結束時間,即增量資料的時間範圍(左閉右開)的右邊界,格式為yyyymmddhh24miss,單位為秒。

enableSeekIterator

Reader外掛程式需要先確定增量位點,然後再拉取資料,如果是經常啟動並執行任務,外掛程式會根據之前掃描的位點來確定位置。如果之前沒運行過這個外掛程式,將會從增量開始位置(預設增量保留7天,即7天前)開始掃描,因此當還沒有掃描到設定的開始時間之後的資料時,會存在開始一段時間沒有資料匯出的情況,您可以在reader的配置參數裡增加 "enableSeekIterator": true的配置,協助您加快位點定位。

false

mode

匯出模式,設定為single_version_and_update_only時為行模式,預設不設定為列模式。

isTimeseriesTable

是否為時序表,只有在行模式,即modesingle_version_and_update_only時配置生效。

false

column

column配置single_version_and_update_only模式下,所匯出的資料列,配置範例:

"column":[
    {"name":"pk1"},
	{"name":"col1"},
	{"name":"col2","dataType":"new"},
	{"name":"col2","dataType":"old"},
	{"name":"col2","dataType":"latest"}
],
  • name欄位表示要匯出的資料列的名稱,必須配置。

  • dataType欄位表示要匯出的資料類型,預設為new類型,非必須配置。dataType支援三種枚舉類型

    • new:表示本列更新後的值

    • old:表示本列更新前的值

    • latest:表中本列的當前最新值

說明

在行模式下必須配置,否則不會匯出資料。

  • 行模式下:是

  • 列模式下:否