全部產品
Search
文件中心

Tablestore:通過SDK使用通道服務

更新時間:Nov 09, 2024

本文介紹如何通過 SDK 快速體驗通道服務。在使用通道服務前,您需要瞭解使用通道服務的注意事項、介面等資訊。

注意事項

  • TunnelWorkerConfig 中預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,當需要啟動多個 TunnelWorker 時,建議共用一個 TunnelWorkerConfig。

  • TunnelWorker 的初始化需要預熱時間,該值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 參數影響,可以通過 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,預設為 30 s。

  • 當用戶端(TunnelWorker)沒有被正常 shutdown 時(例如異常退出或者手動結束),TunnelWorker 會自動進行資源的回收,包括釋放線程池,自動調用使用者在 Channel 上註冊的 shutdown 方法,關閉 Tunnel 串連等。

  • Tunnel 的增量日誌保留時間,其數值與資料表中 Stream 的日誌到期時間長度(最長時間長度為 7 天)保持一致,因此 Tunnel 的增量日誌最多保留 7 天。

  • 增量或者全量加增量類型 Tunnel 消費資料時,可能會出現以下情況:

    • 當 Tunnel 處於全量階段時,如果全量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,將會觸發 OTSTunnelExpired 錯誤,從而導致無法繼續消費後續資料。

      如果您預計全量資料無法在指定時間內完成消費,請及時聯絡Table Store支援人員進行諮詢。

    • 當 Tunnel 處於增量階段時,如果增量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,Tunnel 將可能從最近可消費的資料處開始消費,因此存在漏消費資料的風險。

  • Tunnel 到期後,Table Store可能會禁用該 Tunnel。如果禁用狀態持續超過 30 天,則該 Tunnel 將被徹底刪除,刪除後將無法恢複。

介面

介面

說明

CreateTunnel

建立一個通道。

ListTunnel

列舉某個資料表內通道的具體資訊。

DescribeTunnel

描述某個通道裡的具體 Channel 資訊。

DeleteTunnel

刪除一個通道。

使用

您可以使用如下語言的SDK實現通道服務。

前提條件

  • 在存取控制 RAM 服務側完成如下操作:

    • 已建立 RAM 使用者並為 RAM 使用者授予管理Table Store許可權 AliyunOTSFullAccess。具體操作,請參見建立 RAM 使用者為 RAM 使用者授權

      說明

      在實際業務環境中,建議您遵循最小化授權原則,避免許可權過大帶來的安全風險。

    • 已為 RAM 使用者建立 AccessKey。具體操作,請參見建立 AccessKey

      警告

      阿里雲帳號 AccessKey 泄露會威脅您所有資源的安全。建議您使用 RAM 使用者 AccessKey 進行操作,這可以有效降低 AccessKey 泄露的風險。

體驗通道服務

使用 Java SDK 最小化的體驗通道服務。

  1. 初始化 Tunnel Client。

    說明

    在運行本程式碼範例之前,請確保已設定環境變數TABLESTORE_ACCESS_KEY_IDTABLESTORE_ACCESS_KEY_SECRET,這兩個變數分別對應阿里雲帳號或 RAM 使用者的 AccessKey ID 和 AccessKey Secret。

    //endPoint為Table Store執行個體的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。
    //accessKeyId和accessKeySecret分別為訪問Table Store服務的AccessKey的Id和Secret。
    //instanceName為執行個體名稱。
    final String endPoint = "";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. 建立新通道。

    請提前建立一張測試表或者使用已有的一張資料表。如果需要建立測試表,可以使用 SyncClient 中的 createTable 方法或者使用官網控制台等方式建立。

    重要

    建立增量或者全量加增量類型的通道時,時間戳記的配置規則如下:

    • 如果不指定增量資料的起始時間戳記,則起始時間戳記為建立通道的時間。

    • 如果指定增量資料的起始時間戳記(startTime)和結束時間戳記(endTime),其取值範圍為[當前系統時間-Stream到期時間+5分鐘 , 當前系統時間],單位為毫秒。

      • Stream 到期時間為增量日誌到期時間長度的毫秒單位時間戳記,最大值為 7 天。您可以在為資料表開啟 Stream 功能時設定,到期時間長度一經設定不能修改。

      • 結束時間戳記的取值必須大於起始時間戳記。

    //支援建立TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三種類型的Tunnel。
    //如下樣本為建立全量加增量類型的Tunnel,如果需建立其它類型的Tunnel,則將CreateTunnelRequest中的TunnelType設定為相應的類型。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    //tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。
    String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. 使用者自訂資料消費 Callback,開始自動化的資料消費。

    //使用者自訂資料消費Callback,即實現IChannelProcessor介面(process和shutdown)。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            //ProcessRecordsInput中包含有拉取到的資料。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用於Tunnel Client的翻頁。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //類比消費處理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    //TunnelWorkerConfig預設會啟動讀資料和處理資料的線程池。
    //如果使用的是單台機器,當需要啟動多個TunnelWorker時,建議共用一個TunnelWorkerConfig。TunnelWorkerConfig中包括更多的進階參數。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    //配置TunnelWorker,並啟動自動化的資料處理任務。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

配置 TunnelWorkerConfig

TunnelWorkerConfig 提供 Tunnel Client 的自訂配置,可根據實際需要配置參數,Java SDK 中的參數說明請參見下表。

配置

參數

說明

Heartbeat 的間隔和逾時時間

heartbeatTimeoutInSec

Heartbeat 的逾時間隔。預設值為 300 s。

當 Heartbeat 發生逾時,Tunnel 服務端會認為當前 TunnelClient 不可用(失活),用戶端需要重新進行 ConnectTunnel。

heartbeatIntervalInSec

進行 Heartbeat 的間隔。 預設值為 30 s,最小支援配置到 5 s,單位為 s。

Heartbeat 用於活躍 Channel 的探測、Channel 狀態的更新、(自動化)資料拉取任務的初始化等。

記錄消費位點的時間間隔

checkpointIntervalInMillis

使用者消費完資料後,向 Tunnel 服務端進行記錄消費位點操作(checkpoint)的時間間隔。

預設值為 5000 ms,單位為 ms。

說明
  • 因為讀取任務所在機器不同,進程可能會遇到各種類型的錯誤。例如因為環境因素重啟,需要定期對處理完的資料做記錄(checkpoint)。當任務重啟後,會接著上次的 checkpoint 繼續往後做。在極端情況下,通道服務不保證傳給您的記錄只有一次,只保證資料至少傳一次,且記錄的順序不變。如果出現局部資料重複發送的情況,需要您注意業務的處理邏輯。

  • 如果希望減少在出錯情況下資料的重複處理,可以增加做 checkpoint 的頻率。但是過於頻繁的 checkpoint 會降低系統的輸送量,請根據自身業務特點決定 checkpoint 的操作頻率。

用戶端的自訂標識

clientTag

用戶端的自訂標識,可以產生 Tunnel Client ID,用於區分 TunnelWorker。

資料處理的自訂 Callback

channelProcessor

使用者註冊的處理資料的 Callback,包括 process 和 shutdown 方法。

資料讀取和資料處理的線程池資源配置

readRecordsExecutor

用於資料讀取的線程池資源。無特殊需求,建議使用預設的配置。

processRecordsExecutor

用於處理資料的線程池資源。無特殊需求,建議使用預設的配置。

說明
  • 自訂上述線程池時,線程池中的線程數要和 Tunnel 中的 Channel 數儘可能一致,此時可以保障每個 Channel 都能很快地分配到計算資源(CPU)。

  • 在預設線程池配置中,為了保證輸送量,Table Store進行了如下操作:

    • 預設預先分配 32 個核心線程,以保障資料較小時(Channel數較少時)的即時輸送量。

    • 工作隊列的大小適當調小,當使用者資料量比較大(Channel數較多)時,可以更快觸發線程池建立線程的策略,及時彈出更多的計算資源。

    • 設定了預設的線程保活時間(預設為 60 s),當資料量下降後,可以及時回收線程資源。

記憶體控制

maxChannelParallel

讀取和處理資料的最大 Channel 並行度,可用於記憶體控制。

預設值為 -1,表示不限制最大並行度。

說明

僅 Java SDK 5.10.0 及以上版本支援此功能。

最大退避時間配置

maxRetryIntervalInMillis

Tunnel 的最大退避時間基準值,最大退避時間在此基準值附近隨機變化,具體範圍為 0.75*maxRetryIntervalInMillis~1.25*maxRetryIntervalInMillis

預設值為 2000 ms,最小值為 200 ms。

說明
  • 僅 Java SDK 5.4.0 及以上版本支援此功能。

  • Tunnel 對於資料量較小的情況(單次拉取小於 900 KB 或 500 條)會進行一定時間的指數退避,直至達到最大退避時間。

CLOSING 分區狀態檢測

enableClosingChannelDetect

是否開啟 CLOSING 分區即時檢測。預設值為 false,表示不開啟 CLOSING 分區即時檢測。

說明
  • 僅 Java SDK 5.13.13 及以上版本支援此功能。

  • 未開啟此功能時,在某些極端情境(包括但不限於通道分區數較多但用戶端資源較低等)下,會出現分區卡住不消費的情況。

  • CLOSING 分區指調度中的分區,表示該分區正在切換 Tunnel Client,會調度到其他 Tunnel Client。

附錄:完整代碼

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用於Tunnel Client的翻頁。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //類比消費處理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1.初始化Tunnel Client。
        final String endPoint = "";
        final String accessKeyId = System.getenv("OTS_AK_ENV");
        final String accessKeySecret = System.getenv("OTS_SK_ENV");
        final String instanceName = "";
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2.建立新通道(此步驟需要提前建立一張測試表,可以使用SyncClient的createTable或者使用官網控制台等方式建立)。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        //tunnelId用於後續TunnelWorker的初始化,該值也可以通過ListTunnel或者DescribeTunnel擷取。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3.使用者自訂資料消費Callback,開始自動化的資料消費。
        //TunnelWorkerConfig中有更多的進階參數。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}