通過通道服務,您可以消費表中的資料。本文介紹如何使用Go SDK快速體驗通道服務。使用前,您需要瞭解使用通道服務的注意事項。
注意事項
TunnelWorkerConfig 中預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,當需要啟動多個 TunnelWorker 時,建議共用一個 TunnelWorkerConfig。
TunnelWorker 的初始化需要預熱時間,該值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 參數影響,可以通過 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,預設為 30 s。
當用戶端(TunnelWorker)沒有被正常 shutdown 時(例如異常退出或者手動結束),TunnelWorker 會自動進行資源的回收,包括釋放線程池,自動調用使用者在 Channel 上註冊的 shutdown 方法,關閉 Tunnel 串連等。
Tunnel 的增量日誌保留時間,其數值與資料表中 Stream 的日誌到期時間長度(最長時間長度為 7 天)保持一致,因此 Tunnel 的增量日誌最多保留 7 天。
增量或者全量加增量類型 Tunnel 消費資料時,可能會出現以下情況:
當 Tunnel 處於全量階段時,如果全量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,將會觸發
OTSTunnelExpired
錯誤,從而導致無法繼續消費後續資料。如果您預計全量資料無法在指定時間內完成消費,請及時聯絡Table Store支援人員進行諮詢。
當 Tunnel 處於增量階段時,如果增量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,Tunnel 將可能從最近可消費的資料處開始消費,因此存在漏消費資料的風險。
Tunnel 到期後,Table Store可能會禁用該 Tunnel。如果禁用狀態持續超過 30 天,則該 Tunnel 將被徹底刪除,刪除後將無法恢複。
前提條件
已建立資料表。具體操作,請參見使用控制台建立資料表、使用命令列工具建立資料表或使用 SDK 建立資料表。
已擷取執行個體網域名稱地址(Endpoint)。具體操作,請參見擷取執行個體 Endpoint。
已配置訪問憑證。具體操作,請參見配置訪問憑證。
體驗通道服務
初始化 Tunnel client。
初始化 Tunnel client時,您可以使用長期訪問憑證或者臨時訪問憑證進行簽名認證。
使用長期訪問憑證初始化
在運行本程式碼範例之前,請確保已設定環境變數
TABLESTORE_ACCESS_KEY_ID
和TABLESTORE_ACCESS_KEY_SECRET
,這兩個變數分別對應阿里雲帳號或 RAM 使用者的 AccessKey ID 和 AccessKey Secret。警告阿里雲帳號擁有資源的全部許可權,AK 一旦泄露,會給系統帶來巨大風險,不建議使用。推薦使用最小化授權的 RAM 使用者的 AK。
//endpoint是Table Store執行個體endpoint,例如https://instance.cn-hangzhou.ots.aliyun.com。 //instance是執行個體名稱。 //accessKeyId和accessKeySecret分別為阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。 endpoint := "yourEndpoint" instance := "yourInstance" accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID") accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET") tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)
使用臨時訪問憑證初始化
當您臨時使用 GO SDK 訪問 Tablestore 服務時,您可以通過 STS 服務頒發一個 STS 臨時訪問憑證。具體操作,請參見臨時訪問憑證。
Tunnel Client 內提供了 NewTunnelClientWithToken 介面用於使用臨時訪問憑證初始化 Tunnel Client。為了協助您更好的使用該介面,文檔中提供了一個帶重新整理臨時訪問憑證的範例程式碼。完整代碼請參見附錄:使用臨時訪問憑證初始化 Tunnel Client 的範例程式碼。
建立通道。
req := &tunnel.CreateTunnelRequest{ TableName: "<TABLE_NAME>", TunnelName: "<TUNNEL_NAME>", Type: tunnel.TunnelTypeBaseStream, //建立全量加增量類型的Tunnel。 } resp, err := tunnelClient.CreateTunnel(req) if err != nil { log.Fatal("create test tunnel failed", err) } log.Println("tunnel id is", resp.TunnelId)
根據業務自訂資料消費Callback函數,開始自動化的資料消費。
//根據業務自訂資料消費callback函數。 func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { fmt.Println("user-defined information", ctx.CustomValue) for _, rec := range records { fmt.Println("tunnel record detail:", rec.String()) } fmt.Println("a round of records consumption finished") return nil } //配置callback到SimpleProcessFactory,配置消費端TunnelWorkerConfig。 workConfig := &tunnel.TunnelWorkerConfig{ ProcessorFactory: &tunnel.SimpleProcessFactory{ CustomValue: "user custom interface{} value", ProcessFunc: exampleConsumeFunction, }, } //使用TunnelDaemon持續消費指定tunnel。 tunnelId := "<TUNNEL_ID>" daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig) log.Fatal(daemon.Run())
附錄:使用臨時訪問憑證初始化 Tunnel Client 的範例程式碼
import (
otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
"sync"
"time"
)
type RefreshClient struct {
lastRefresh time.Time
refreshIntervalInMin int
}
func NewRefreshClient(intervalInMin int) *RefreshClient {
return &RefreshClient{
refreshIntervalInMin: intervalInMin,
}
}
func (c *RefreshClient) IsExpired() bool {
now := time.Now()
if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
return true
}
return false
}
func (c *RefreshClient) Update() {
c.lastRefresh = time.Now()
}
type clientCredentials struct {
accessKeyID string
accessKeySecret string
securityToken string
}
func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}
func (c *clientCredentials) GetAccessKeyID() string {
return c.accessKeyID
}
func (c *clientCredentials) GetAccessKeySecret() string {
return c.accessKeySecret
}
func (c *clientCredentials) GetSecurityToken() string {
return c.securityToken
}
type OTSCredentialsProvider struct {
refresh *RefreshClient
cred *clientCredentials
lock sync.Mutex
}
func NewOTSCredentialsProvider() *OTSCredentialsProvider {
return &OTSCredentialsProvider{
// 按需調整臨時訪問憑證重新整理周期,需要小於StsToken的到期時間。
refresh: NewRefreshClient(30),
}
}
func (p *OTSCredentialsProvider) renewCredentials() error {
if p.cred == nil || p.refresh.IsExpired() {
// 此處需要擷取使用者的StsToken。調用RAM的AssumeRole介面會返回AccessKeyId、AccessKeySecret、SecurityToken和Expiration資訊。
// 擷取臨時訪問憑證後填寫以下參數。關於RAM(STS) SDK的更多資訊請參見RAM文檔。
// resp, err := GetUserOtsStsToken()
accessKeyId := ""
accessKeySecret := ""
stsToken := ""
p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
p.refresh.Update()
}
return nil
}
func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.renewCredentials(); err != nil {
// log error
if p.cred == nil {
return newClientCredentials("", "", "")
}
}
return p.cred
}
// NewTunnelClientWithToken用於初始化一個帶StsToken重新整理功能的TunnelClient。
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
return tunnel.NewTunnelClientWithToken(
endpoint,
instanceName,
"",
"",
"",
nil,
tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
)
}