使用SDK進行資料投遞前,您需要瞭解使用資料湖投遞功能的注意事項、介面等資訊。建立投遞任務後,Table Store資料表中的資料會自動投遞到OSS Bucket中儲存。
注意事項
目前支援使用資料湖投遞功能的地區有華東1(杭州)、華東2(上海)、華北2(北京)和華北3(張家口)。
資料湖投遞不支援同步刪除操作,Table Store中的刪除操作在資料投遞時會被忽略,已投遞到OSS中的資料不會被刪除。
建立資料投遞任務時存在最多1分鐘的初始化時間。
資料同步存在延遲,寫入速率穩定時,延遲在3分鐘內。資料同步的P99延遲在10分鐘內。
說明P99延遲表示過去10秒內最慢的1%的請求的平均延遲。
前提條件
在Object Storage Service服務側已完成如下操作:
已開通OSS服務且在Table Store執行個體所在地區建立Bucket,詳情請參見開通OSS服務。
說明資料湖投遞支援投遞到和Table Store相同地區的任意OSS Bucket中。如需投遞到其他數倉儲存(例如MaxCompute),請提交工單申請。
在Table Store服務側已完成如下操作:
在存取控制RAM服務側完成如下操作:
已建立RAM使用者並為RAM使用者授予管理Table Store許可權(AliyunOTSFullAccess)。具體操作,請參見建立RAM使用者和為RAM使用者授權。
警告阿里雲帳號AccessKey泄露會威脅您所有資源的安全。建議您使用RAM使用者AccessKey進行操作,可以有效降低AccessKey泄露的風險。
已為RAM使用者建立AccessKey。具體操作,請參見建立AccessKey。
已配置訪問憑證。具體操作,請參見配置訪問憑證。
介面
介面 | 說明 |
CreateDeliveryTask | 建立一個投遞任務。 |
ListDeliveryTask | 列出一個資料表所有的投遞任務資訊。 |
DescribeDeliveryTask | 查詢投遞任務描述資訊。 |
DeleteDeliveryTask | 刪除一個投遞任務。 |
參數
參數 | 說明 |
tableName | 資料表名稱。 |
taskName | 投遞任務名稱。 名稱只能包含英文小寫字母(a~z)、數字和虛線(-),開頭和結尾必須為英文小寫字母或數字,且長度為3~16字元。 |
taskConfig | 投遞任務配置,包括如下選項:
|
taskType | 投遞任務的類型,包括如下選項:
|
使用
您可以通過Java SDK、Go SDK實現資料湖投遞功能。此處以Java SDK為例介紹資料投遞湖的操作。
以下樣本用於為資料表建立投遞任務。
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {
public static void main(String[] args) {
final String endPoint = "https://yourinstancename.cn-hangzhou.ots.aliyuncs.com";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "yourinstancename";
SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
try {
createDeliveryTask(client);
System.out.println("end");
} catch (TableStoreException e) {
System.err.println("操作失敗,詳情:" + e.getMessage() + e.getErrorCode() + e.toString());
System.err.println("Request ID:" + e.getRequestId());
} catch (ClientException e) {
System.err.println("請求失敗,詳情:" + e.getMessage());
} finally {
client.shutdown();
}
}
private static void createDeliveryTask(SyncClient client){
String tableName = "sampleTable";
String taskName = "sampledeliverytask";
OSSTaskConfig taskConfig = new OSSTaskConfig();
taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
taskConfig.setOssBucket("datadeliverytest");
taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
//eventColumn為可選配置,指定按某一列資料的時間進行分區。如果不設定此參數,則按資料寫入Table Store的時間進行分區。
EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
taskConfig.setEventTimeColumn(eventColumn);
taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
request.setTableName(tableName);
request.setTaskName(taskName);
request.setTaskConfig(taskConfig);
request.setTaskType(DeliveryTaskType.BASE_INC);
CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
System.out.println("resquestID: "+ response.getRequestId());
System.out.println("traceID: " + response.getTraceId());
System.out.println("create delivery task success");
}
}