本文介紹如何通過 SDK 快速體驗通道服務。在使用通道服務前,您需要瞭解使用通道服務的注意事項、介面等資訊。
注意事項
TunnelWorkerConfig 中預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,當需要啟動多個 TunnelWorker 時,建議共用一個 TunnelWorkerConfig。
TunnelWorker 的初始化需要預熱時間,該值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 參數影響,可以通過 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,預設為 30 s。
當用戶端(TunnelWorker)沒有被正常 shutdown 時(例如異常退出或者手動結束),TunnelWorker 會自動進行資源的回收,包括釋放線程池,自動調用使用者在 Channel 上註冊的 shutdown 方法,關閉 Tunnel 串連等。
Tunnel 的增量日誌保留時間,其數值與資料表中 Stream 的日誌到期時間長度(最長時間長度為 7 天)保持一致,因此 Tunnel 的增量日誌最多保留 7 天。
增量或者全量加增量類型 Tunnel 消費資料時,可能會出現以下情況:
當 Tunnel 處於全量階段時,如果全量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,將會觸發
OTSTunnelExpired
錯誤,從而導致無法繼續消費後續資料。如果您預計全量資料無法在指定時間內完成消費,請及時聯絡Table Store支援人員進行諮詢。
當 Tunnel 處於增量階段時,如果增量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,Tunnel 將可能從最近可消費的資料處開始消費,因此存在漏消費資料的風險。
Tunnel 到期後,Table Store可能會禁用該 Tunnel。如果禁用狀態持續超過 30 天,則該 Tunnel 將被徹底刪除,刪除後將無法恢複。
介面
介面 | 說明 |
CreateTunnel | 建立一個通道。 |
ListTunnel | 列舉某個資料表內通道的具體資訊。 |
DescribeTunnel | 描述某個通道裡的具體 Channel 資訊。 |
DeleteTunnel | 刪除一個通道。 |
使用
您可以使用如下語言的SDK實現通道服務。
前提條件
在存取控制 RAM 服務側完成如下操作:
已建立 RAM 使用者並為 RAM 使用者授予管理Table Store許可權
AliyunOTSFullAccess
。具體操作,請參見建立 RAM 使用者和為 RAM 使用者授權。說明在實際業務環境中,建議您遵循最小化授權原則,避免許可權過大帶來的安全風險。
已為 RAM 使用者建立 AccessKey。具體操作,請參見建立 AccessKey。
警告阿里雲帳號 AccessKey 泄露會威脅您所有資源的安全。建議您使用 RAM 使用者 AccessKey 進行操作,這可以有效降低 AccessKey 泄露的風險。
在Table Store服務側完成如下操作:
已建立資料表。具體操作,請參見使用控制台建立資料表、使用命令列工具建立資料表或使用SDK建立資料表。
已擷取執行個體網域名稱地址(Endpoint)。具體操作,請參見擷取執行個體Endpoint。
已配置訪問憑證。具體操作,請參見配置訪問憑證。
體驗通道服務
使用 Java SDK 最小化的體驗通道服務。
初始化 Tunnel Client。
說明在運行本程式碼範例之前,請確保已設定環境變數
TABLESTORE_ACCESS_KEY_ID
和TABLESTORE_ACCESS_KEY_SECRET
,這兩個變數分別對應阿里雲帳號或 RAM 使用者的 AccessKey ID 和 AccessKey Secret。//endPoint為Table Store執行個體的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。 //accessKeyId和accessKeySecret分別為訪問Table Store服務的AccessKey的Id和Secret。 //instanceName為執行個體名稱。 final String endPoint = ""; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); final String instanceName = ""; TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
建立新通道。
請提前建立一張測試表或者使用已有的一張資料表。如果需要建立測試表,可以使用 SyncClient 中的 createTable 方法或者使用官網控制台等方式建立。
重要建立增量或者全量加增量類型的通道時,時間戳記的配置規則如下:
如果不指定增量資料的起始時間戳記,則起始時間戳記為建立通道的時間。
如果指定增量資料的起始時間戳記(startTime)和結束時間戳記(endTime),其取值範圍為
[當前系統時間-Stream到期時間+5分鐘 , 當前系統時間]
,單位為毫秒。Stream 到期時間為增量日誌到期時間長度的毫秒單位時間戳記,最大值為 7 天。您可以在為資料表開啟 Stream 功能時設定,到期時間長度一經設定不能修改。
結束時間戳記的取值必須大於起始時間戳記。
//支援建立TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三種類型的Tunnel。 //如下樣本為建立全量加增量類型的Tunnel,如果需建立其它類型的Tunnel,則將CreateTunnelRequest中的TunnelType設定為相應的類型。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); //tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);
使用者自訂資料消費 Callback,開始自動化的資料消費。
//使用者自訂資料消費Callback,即實現IChannelProcessor介面(process和shutdown)。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { //ProcessRecordsInput中包含有拉取到的資料。 System.out.println("Default record processor, would print records count"); System.out.println( //NextToken用於Tunnel Client的翻頁。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { //類比消費處理。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } //TunnelWorkerConfig預設會啟動讀資料和處理資料的線程池。 //如果使用的是單台機器,當需要啟動多個TunnelWorker時,建議共用一個TunnelWorkerConfig。TunnelWorkerConfig中包括更多的進階參數。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); //配置TunnelWorker,並啟動自動化的資料處理任務。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
配置 TunnelWorkerConfig
TunnelWorkerConfig 提供 Tunnel Client 的自訂配置,可根據實際需要配置參數,Java SDK 中的參數說明請參見下表。
配置 | 參數 | 說明 |
Heartbeat 的間隔和逾時時間 | heartbeatTimeoutInSec | Heartbeat 的逾時間隔。預設值為 300 s。 當 Heartbeat 發生逾時,Tunnel 服務端會認為當前 TunnelClient 不可用(失活),用戶端需要重新進行 ConnectTunnel。 |
heartbeatIntervalInSec | 進行 Heartbeat 的間隔。 預設值為 30 s,最小支援配置到 5 s,單位為 s。 Heartbeat 用於活躍 Channel 的探測、Channel 狀態的更新、(自動化)資料拉取任務的初始化等。 | |
記錄消費位點的時間間隔 | checkpointIntervalInMillis | 使用者消費完資料後,向 Tunnel 服務端進行記錄消費位點操作(checkpoint)的時間間隔。 預設值為 5000 ms,單位為 ms。 說明
|
用戶端的自訂標識 | clientTag | 用戶端的自訂標識,可以產生 Tunnel Client ID,用於區分 TunnelWorker。 |
資料處理的自訂 Callback | channelProcessor | 使用者註冊的處理資料的 Callback,包括 process 和 shutdown 方法。 |
資料讀取和資料處理的線程池資源配置 | readRecordsExecutor | 用於資料讀取的線程池資源。無特殊需求,建議使用預設的配置。 |
processRecordsExecutor | 用於處理資料的線程池資源。無特殊需求,建議使用預設的配置。 說明
| |
記憶體控制 | maxChannelParallel | 讀取和處理資料的最大 Channel 並行度,可用於記憶體控制。 預設值為 -1,表示不限制最大並行度。 說明 僅 Java SDK 5.10.0 及以上版本支援此功能。 |
最大退避時間配置 | maxRetryIntervalInMillis | Tunnel 的最大退避時間基準值,最大退避時間在此基準值附近隨機變化,具體範圍為 預設值為 2000 ms,最小值為 200 ms。 說明
|
CLOSING 分區狀態檢測 | enableClosingChannelDetect | 是否開啟 CLOSING 分區即時檢測。預設值為 false,表示不開啟 CLOSING 分區即時檢測。 說明
|
附錄:完整代碼
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
//NextToken用於Tunnel Client的翻頁。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
//類比消費處理。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1.初始化Tunnel Client。
final String endPoint = "";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "";
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2.建立新通道(此步驟需要提前建立一張測試表,可以使用SyncClient的createTable或者使用官網控制台等方式建立)。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
//tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3.使用者自訂資料消費Callback,開始自動化的資料消費。
//TunnelWorkerConfig中有更多的進階參數。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}