如果需要即時消費表中資料,您需要調用CreateTunnel介面為資料表建立一個通道,一張資料表上可以建立多個通道。在建立通道時需要指定資料表名稱、通道名稱和通道類型。
前提條件
已初始化TunnelClient。
已建立資料表。具體操作,請參見建立資料表。
參數
請求參數
參數 | 說明 |
TableName | 資料表名稱。 |
TunnelName | 通道的名稱。 |
TunnelType | 通道的類型。取值範圍如下:
建立增量或者全量加增量類型的通道時,系統預設建立通道後寫入的資料為增量資料。如果要消費指定時間點後的增量資料,請配置增量資料的起始時間戳記(startTime)。
|
響應參數
參數 | 說明 |
TunnelId | 通道的ID。 |
ResponseInfo | 返回的一些其它欄位。 |
RequestId | 當次請求的Request ID。 |
樣本
建立全量類型的通道
以下樣本用於為資料表建立一個全量類型的通道。
//支援建立三種類型的通道TunnelType.BaseData(全量)、TunnelType.Stream(增量)和TunnelType.BaseAndStream(全量加增量)。
//本樣本為建立全量類型的通道,如果需要建立其它類型的通道,則將CreateTunnelRequest中的TunnelType設定為相應的類型。
private static void createTunnel(TunnelClient client, String tableName, String tunnelName) {
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseData);
CreateTunnelResponse resp = client.createTunnel(request);
System.out.println("RequestId: " + resp.getRequestId());
System.out.println("TunnelId: " + resp.getTunnelId());
}
建立增量或者全量加增量類型的通道
以下樣本用於為資料表建立增量或全量加增量類型的通道,並指定讀取的增量資料時間範圍。
//建立增量或者全量加增量類型的通道,指定起始時間戳記或結束時間戳記,表示讀取的增量資料時間範圍。對於全量類型的通道,StreamTunnelConfig的配置不生效。
private static void createStreamTunnelByOffset(TunnelClient client,String tableName,String tunnelName, long startTime, long endTime){
CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.Stream);//建立增量類型通道。
//CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest(tableName,tunnelName, TunnelType.BaseAndStream);//建立全量加增量類型通道。
StreamTunnelConfig streamTunnelConfig = new StreamTunnelConfig();
/*
指定增量資料的起始時間戳記(startTime)和結束時間戳記(endTime)。單位為毫秒,取值範圍為[CurrentSystemTime - StreamExpiration + 5 minute, CurrentSystemTime)。
其中CurrentSystemTime為當前系統時間的毫秒單位時間戳記;StreamExpiration為增量日誌到期時間的毫秒單位時間戳記,最大值為7天。您可以在為資料表開啟Stream功能時設定。
結束時間戳記的取值必須大於起始時間戳記。
*/
streamTunnelConfig.setStartOffset(startTime);
streamTunnelConfig.setEndOffset(endTime);
createTunnelRequest.setStreamTunnelConfig(streamTunnelConfig);
CreateTunnelResponse resp = client.createTunnel(createTunnelRequest);
System.out.println("RequestId: " + resp.getRequestId());
System.out.println("TunnelId: " + resp.getTunnelId());
}
常見問題
使用通道服務消費資料時報錯OTSTrimmedDataAccess Requested stream data is already trimmed or does not exist
相關文檔
關於API說明的更多資訊,請參見CreateTunnel。
如果要快速使用通道服務消費資料,請參見快速使用通道服務文檔進行操作。
如果要查看指定表的所有通道資訊,您可以通過擷取表內的通道資訊實現。更多資訊,請參見擷取表內的通道資訊。
如果要查看指定通道的詳細資料,您可以通過擷取通道的具體資訊實現。更多資訊,請參見擷取通道的具體資訊。
如果不再使用某個通道,您可以刪除相應通道。更多資訊,請參見刪除通道。
使用通道服務可以實現資料移轉。更多資訊,請參見將Table Store資料表中資料同步到另一個資料表。
Realtime ComputeFlink能將通道服務的資料通道作為流式資料的輸入,可實現通過Flink計算與分析Table Store資料,更多資訊,請參見使用教程(寬表模型)和使用教程(時序模型)。
當前通道服務本身沒有額外的費用開銷。在消費通道服務資料時,Table Store會根據實際拉取的資料產生讀輸送量計量計費。更多資訊,請參見計費概述。