通過通道服務功能,您可以消費處理表中資料。本文介紹如何使用 Java SDK 快速體驗通道服務。
注意事項
TunnelWorkerConfig 中預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,當需要啟動多個 TunnelWorker 時,建議共用一個 TunnelWorkerConfig。
TunnelWorker 的初始化需要預熱時間,該值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 參數影響,可以通過 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,預設為 30 s。
當用戶端(TunnelWorker)沒有被正常 shutdown 時(例如異常退出或者手動結束),TunnelWorker 會自動進行資源的回收,包括釋放線程池,自動調用使用者在 Channel 上註冊的 shutdown 方法,關閉 Tunnel 串連等。
Tunnel 的增量日誌保留時間,其數值與資料表中 Stream 的日誌到期時間長度(最長時間長度為 7 天)保持一致,因此 Tunnel 的增量日誌最多保留 7 天。
增量或者全量加增量類型 Tunnel 消費資料時,可能會出現以下情況:
當 Tunnel 處於全量階段時,如果全量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,將會觸發
OTSTunnelExpired
錯誤,從而導致無法繼續消費後續資料。如果您預計全量資料無法在指定時間內完成消費,請及時聯絡Table Store支援人員進行諮詢。
當 Tunnel 處於增量階段時,如果增量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,Tunnel 將可能從最近可消費的資料處開始消費,因此存在漏消費資料的風險。
Tunnel 到期後,Table Store可能會禁用該 Tunnel。如果禁用狀態持續超過 30 天,則該 Tunnel 將被徹底刪除,刪除後將無法恢複。
前提條件
在存取控制 RAM 服務側完成如下操作:
已建立 RAM 使用者並為 RAM 使用者授予管理Table Store許可權
AliyunOTSFullAccess
。具體操作,請參見建立 RAM 使用者和為 RAM 使用者授權。說明在實際業務環境中,建議您遵循最小化授權原則,避免許可權過大帶來的安全風險。
已為 RAM 使用者建立 AccessKey。具體操作,請參見建立 AccessKey。
警告阿里雲帳號 AccessKey 泄露會威脅您所有資源的安全。建議您使用 RAM 使用者 AccessKey 進行操作,這可以有效降低 AccessKey 泄露的風險。
在Table Store服務側完成如下操作:
已建立資料表。具體操作,請參見使用控制台建立資料表、使用命令列工具建立資料表或使用SDK建立資料表。
已擷取執行個體網域名稱地址(Endpoint)。具體操作,請參見擷取執行個體Endpoint。
已配置訪問憑證。具體操作,請參見配置訪問憑證。
體驗通道服務
使用 Java SDK 最小化地體驗通道服務。
初始化 TunnelClient。
說明在運行本程式碼範例之前,請確保已設定環境變數
TABLESTORE_ACCESS_KEY_ID
和TABLESTORE_ACCESS_KEY_SECRET
,這兩個變數分別對應阿里雲帳號或 RAM 使用者的 AccessKey ID 和 AccessKey Secret。//endPoint為Table Store執行個體的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。 //accessKeyId和accessKeySecret分別為訪問Table Store服務的AccessKey的Id和Secret。 //instanceName為執行個體名稱。 final String endPoint = ""; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); final String instanceName = ""; TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
建立通道。
請提前建立一張測試表或者使用已有的一張資料表。如果需要建立測試表,可以使用 SyncClient 中的 createTable 方法或者使用官網控制台等方式建立。
//支援建立TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三種類型的Tunnel。 //如下樣本為建立全量加增量類型的Tunnel,如果需建立其它類型的Tunnel,則將CreateTunnelRequest中的TunnelType設定為相應的類型。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); //tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);
根據業務自訂資料消費 Callback 函數,開始自動化的資料消費。 TunnelClient 的自訂配置請參見下面的表格。
//根據業務自訂資料消費Callback函數,即實現IChannelProcessor介面(process和shutdown)。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { //ProcessRecordsInput中包含有拉取到的資料。 System.out.println("Default record processor, would print records count"); System.out.println( //NextToken用於Tunnel Client的翻頁。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { //類比消費處理。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } //TunnelWorkerConfig預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,則會啟動多個TunnelWorker。 //建議共用一個TunnelWorkerConfig,TunnelWorkerConfig中包括更多的進階參數。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); //配置TunnelWorker,並啟動自動化的資料處理任務。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
配置 TunnelWorkerConfig
TunnelWorkerConfig 提供 Tunnel Client 的自訂配置,可根據實際需要配置參數,Java SDK 中的參數說明請參見下表。
配置 | 參數 | 說明 |
Heartbeat 的間隔和逾時時間 | heartbeatTimeoutInSec | Heartbeat 的逾時間隔。預設值為 300 s。 當 Heartbeat 發生逾時,Tunnel 服務端會認為當前 TunnelClient 不可用(失活),用戶端需要重新進行 ConnectTunnel。 |
heartbeatIntervalInSec | 進行 Heartbeat 的間隔。 預設值為 30 s,最小支援配置到 5 s,單位為 s。 Heartbeat 用於活躍 Channel 的探測、Channel 狀態的更新、(自動化)資料拉取任務的初始化等。 | |
記錄消費位點的時間間隔 | checkpointIntervalInMillis | 使用者消費完資料後,向 Tunnel 服務端進行記錄消費位點操作(checkpoint)的時間間隔。 預設值為 5000 ms,單位為 ms。 說明
|
用戶端的自訂標識 | clientTag | 用戶端的自訂標識,可以產生 Tunnel Client ID,用於區分 TunnelWorker。 |
資料處理的自訂 Callback | channelProcessor | 使用者註冊的處理資料的 Callback,包括 process 和 shutdown 方法。 |
資料讀取和資料處理的線程池資源配置 | readRecordsExecutor | 用於資料讀取的線程池資源。無特殊需求,建議使用預設的配置。 |
processRecordsExecutor | 用於處理資料的線程池資源。無特殊需求,建議使用預設的配置。 說明
| |
記憶體控制 | maxChannelParallel | 讀取和處理資料的最大 Channel 並行度,可用於記憶體控制。 預設值為 -1,表示不限制最大並行度。 說明 僅 Java SDK 5.10.0 及以上版本支援此功能。 |
最大退避時間配置 | maxRetryIntervalInMillis | Tunnel 的最大退避時間基準值,最大退避時間在此基準值附近隨機變化,具體範圍為 預設值為 2000 ms,最小值為 200 ms。 說明
|
CLOSING 分區狀態檢測 | enableClosingChannelDetect | 是否開啟 CLOSING 分區即時檢測。預設值為 false,表示不開啟 CLOSING 分區即時檢測。 說明
|
附錄:完整代碼
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
//NextToken用於Tunnel Client的翻頁。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
//類比消費處理。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1.初始化Tunnel Client。
final String endPoint = "";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "";
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2.建立新通道(此步驟需要提前建立一張測試表,可以使用SyncClient的createTable或者使用官網控制台等方式建立)。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
//tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3.使用者自訂資料消費Callback,開始自動化的資料消費。
//TunnelWorkerConfig中有更多的進階參數。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}