全部產品
Search
文件中心

Tablestore:通過SDK投遞資料到OSS

更新時間:Sep 15, 2024

使用SDK進行資料投遞前,您需要瞭解使用資料湖投遞功能的注意事項、介面等資訊。建立投遞任務後,Table Store資料表中的資料會自動投遞到OSS Bucket中儲存。

注意事項

  • 目前支援使用資料湖投遞功能的地區有華東1(杭州)、華東2(上海)、華北2(北京)和華北3(張家口)。

  • 資料湖投遞不支援同步刪除操作,Table Store中的刪除操作在資料投遞時會被忽略,已投遞到OSS中的資料不會被刪除。

  • 建立資料投遞任務時存在最多1分鐘的初始化時間。

  • 資料同步存在延遲,寫入速率穩定時,延遲在3分鐘內。資料同步的P99延遲在10分鐘內。

    說明

    P99延遲表示過去10秒內最慢的1%的請求的平均延遲。

前提條件

  • 在Object Storage Service服務側已完成如下操作:

    已開通OSS服務且在Table Store執行個體所在地區建立Bucket,詳情請參見開通OSS服務

    說明

    資料湖投遞支援投遞到和Table Store相同地區的任意OSS Bucket中。如需投遞到其他數倉儲存(例如MaxCompute),請提交工單申請。

  • 在Table Store服務側已完成如下操作:

    • 已在執行個體詳情頁面擷取執行個體的服務地址(Endpoint)。更多資訊,請參考服務地址

    • 已建立Table Store資料表。具體操作,請參見建立資料表

  • 在存取控制RAM服務側完成如下操作:

    • 已建立RAM使用者並為RAM使用者授予管理Table Store許可權(AliyunOTSFullAccess)。具體操作,請參見建立RAM使用者為RAM使用者授權

      警告

      阿里雲帳號AccessKey泄露會威脅您所有資源的安全。建議您使用RAM使用者AccessKey進行操作,可以有效降低AccessKey泄露的風險。

      已為RAM使用者建立AccessKey。具體操作,請參見建立AccessKey

  • 已配置訪問憑證。具體操作,請參見配置訪問憑證

介面

介面

說明

CreateDeliveryTask

建立一個投遞任務。

ListDeliveryTask

列出一個資料表所有的投遞任務資訊。

DescribeDeliveryTask

查詢投遞任務描述資訊。

DeleteDeliveryTask

刪除一個投遞任務。

參數

參數

說明

tableName

資料表名稱。

taskName

投遞任務名稱。

名稱只能包含英文小寫字母(a~z)、數字和虛線(-),開頭和結尾必須為英文小寫字母或數字,且長度為3~16字元。

taskConfig

投遞任務配置,包括如下選項:

  • ossPrefix:OSS Bucket中的目錄首碼,將Table Store的資料投遞到該OSS Bucket目錄中。投遞路徑中支援引用$yyyy、$MM、$dd、$HH、$mm五種時間變數。

    • 當投遞路徑中引用時間變數時,可以按資料的寫入時間動態產生OSS目錄,實現hive partition naming style的資料時間分區,從而按照時間分區組織OSS中的檔案分布。

    • 當投遞路徑中不引用時間變數時,所有檔案會被投遞到固定的OSS首碼目錄中。

  • ossBucket:OSS Bucket名稱。

  • ossEndpoint:OSS Bucket所在地區的服務地址。

  • ossStsRole:Table Store服務關聯角色的ARN資訊。

  • format:投遞的資料的儲存以Parquet列存格式儲存。預設值為Parquet。

    資料湖投遞預設使用PLAIN編碼方式,PLAIN編碼方式支援任意類型資料。

    目前僅支援Parquet,一般無需配置,保持預設即可。

  • eventTimeColumn:事件時間列,用於指定按某一列資料的時間進行分區,配置項包括時間列名和時間格式。其中時間格式(EventTimeFormat)的取值範圍為RFC822、RFC850、RFC1123、RFC3339和Unix,請根據實際時間格式進行配置。

    如果不設定此參數,則按資料寫入Table Store的時間進行分區。

  • parquetSchema:指定需要投遞的資料列,必須手動設定投遞欄位的源表欄位、目標欄位和目標欄位類型。

    您可以選擇任意欄位以任意順序、名稱寫入列存檔案,OSS的列存資料會按Schema數組中的資料列先後順序分布。

    重要

    投遞資料的欄位類型必須與資料來源的欄位類型匹配,否則會作為髒資料丟棄。欄位類型映射詳情請參見資料格式映射

taskType

投遞任務的類型,包括如下選項:

  • INC:表示增量資料投遞模式,只同步增量資料。

  • BASE:表示全量資料投遞模式,一次性全表掃描資料同步。

  • BASE_INC(預設):表示全量&增量資料投遞模式,全量資料同步完成後,再同步增量資料。

    其中增量資料同步時可以擷取最新投遞時間和瞭解當前投遞狀態。

使用

您可以通過Java SDKGo SDK實現資料湖投遞功能。此處以Java SDK為例介紹資料投遞湖的操作。

以下樣本用於為資料表建立投遞任務。

import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {

        public static void main(String[] args) {
            final String endPoint = "https://yourinstancename.cn-hangzhou.ots.aliyuncs.com";

            final String accessKeyId = System.getenv("OTS_AK_ENV");
            
            final String accessKeySecret = System.getenv("OTS_SK_ENV");

            final String instanceName = "yourinstancename";

            SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
            try {
                createDeliveryTask(client);
                System.out.println("end");
            } catch (TableStoreException e) {
                System.err.println("操作失敗,詳情:" + e.getMessage() + e.getErrorCode() + e.toString());
                System.err.println("Request ID:" + e.getRequestId());
            } catch (ClientException e) {
                System.err.println("請求失敗,詳情:" + e.getMessage());
            } finally {
                client.shutdown();
            }
        }

        private static void createDeliveryTask(SyncClient client){
            String tableName = "sampleTable";
            String taskName = "sampledeliverytask";
            OSSTaskConfig taskConfig = new OSSTaskConfig();
            taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
            taskConfig.setOssBucket("datadeliverytest");
            taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
            taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
            //eventColumn為可選配置,指定按某一列資料的時間進行分區。如果不設定此參數,則按資料寫入Table Store的時間進行分區。
            EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
            taskConfig.setEventTimeColumn(eventColumn);
            taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
            taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
            taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
            CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
            request.setTableName(tableName);
            request.setTaskName(taskName);
            request.setTaskConfig(taskConfig);
            request.setTaskType(DeliveryTaskType.BASE_INC);
            CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
            System.out.println("resquestID: "+ response.getRequestId());
            System.out.println("traceID: " + response.getTraceId());
            System.out.println("create delivery task success");
        }
}