全部產品
Search
文件中心

Tablestore:建立投遞任務

更新時間:Jun 30, 2024

如果要Table Store資料表中的資料投遞到OSS Bucket中儲存,您可以使用CreateDeliveryTask介面建立一個投遞任務。

重要

請確認已安裝支援資料湖投遞功能的Table StoreGo SDK。

前提條件

  • 已開通OSS服務且在Table Store執行個體所在地區建立Bucket。具體操作,請參見開通OSS服務

  • 已通過控制台建立Table Store服務關聯角色並記錄角色的ARN。具體操作,請參見建立投遞任務

    服務關聯角色的ARN請通過RAM控制台擷取,具體操作如下:

    RAM 角色管理介面,搜尋AliyunServiceRoleForOTSDataDelivery後,單擊角色名稱,在角色詳情介面,可以查看和複製角色的ARN資訊。

    figrole

  • 已初始化TableStoreClient。具體操作,請參見初始化

  • 已建立資料表並寫入資料。

參數

參數

說明

TableName

資料表名稱。

TaskName

投遞任務名稱。

名稱只能包含英文小寫字母(a~z)、數字和虛線(-),開頭和結尾必須為英文小寫字母或數字,且長度為3~16字元。

TaskConfig

投遞任務配置,包括如下選項:

  • OssPrefix:OSS Bucket中的目錄首碼,將Table Store的資料投遞到該OSS Bucket目錄中。投遞路徑中支援引用$yyyy、$MM、$dd、$HH、$mm五種時間變數。

    • 當投遞路徑中引用時間變數時,可以按資料的寫入時間動態產生OSS目錄,實現hive partition naming style的資料時間分區,從而按照時間分區組織OSS中的檔案分布。

    • 當投遞路徑中不引用時間變數時,所有檔案會被投遞到固定的OSS首碼目錄中。

  • OssBucket:OSS Bucket名稱。

  • OssEndpoint:OSS Bucket所在地區的服務地址。

  • OssRoleName:Table Store服務關聯角色的ARN資訊。

  • Format:投遞的資料的儲存以Parquet列存格式儲存,資料湖投遞預設使用PLAIN編碼方式,PLAIN編碼方式支援任意類型資料。

  • EventTimeColumn:事件時間列,用於指定按某一列資料的時間進行分區。如果不設定此參數,則按資料寫入Table Store的時間進行分區。

  • Schema:指定需要投遞的資料列,必須手動設定投遞欄位的源表欄位、目標欄位和目標欄位類型。

    您可以選擇任意欄位以任意順序、名稱寫入列存檔案,OSS的列存資料會按Schema數組中的資料列先後順序分布。

    重要

    投遞資料的欄位類型必須與資料來源的欄位類型匹配,否則會作為髒資料丟棄。欄位類型映射詳情請參見資料格式映射

TaskType

投遞任務的類型,包括如下選項:

  • IncTask:表示增量資料投遞模式,只同步增量資料。

  • BaseTask:表示全量資料投遞模式,一次性全表掃描資料同步。

  • BaseIncTask(預設):表示全量&增量資料投遞模式,全量資料同步完成後,再同步增量資料。

    其中增量資料同步時可以擷取最新投遞時間和瞭解當前投遞狀態。

樣本

以下樣本用於為資料表建立投遞任務。

func CreateTaskSample(client *tablestore.TableStoreClient) {
    createTask := &tablestore.CreateDeliveryTaskRequest{
        TableName: "<TABLE_NAME>",
        TaskName: "<TASK_NAME>",
        TaskType: tablestore.BaseIncTask,
        TaskConfig: &tablestore.OSSTaskConfig{
            OssPrefix:   "sample/year=$yyyy/month=$MM",
            OssBucket:      "datadeliverytest",
            OssEndpoint:    "oss-cn-hangzhou.aliyuncs.com",
            OssRoleName:    "acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery",
            Schema: []*tablestore.TaskSchema{
                {
                    ColumnName: "PK1",
                    OssColumnName: "PK1",
                    Type: tablestore.ParquetInt64,
                },
                {
                    ColumnName: "PK2",
                    OssColumnName: "PK2",
                    Type: tablestore.ParquetUtf8,
                },
                {
                    ColumnName: "Col1",
                    OssColumnName: "Col1",
                    Type: tablestore.ParquetDouble,
                },
            },
        },
    }
    createResp, err := client.CreateDeliveryTask(createTask)
    if err != nil {
        log.Fatal("create delivery task failed ", err)
    }
    fmt.Println("create delivery task success ", createResp.RequestId)
}