通過通道服務,您可以消費表中的資料。本文介紹如何使用Go SDK快速體驗通道服務。使用前,您需要瞭解使用通道服務的注意事項。
注意事項
TunnelWorkerConfig 中預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,當需要啟動多個 TunnelWorker 時,建議共用一個 TunnelWorkerConfig。
TunnelWorker 的初始化需要預熱時間,該值受 TunnelWorkerConfig 中的 HeartbeatInterval 參數影響,預設為 30 s。
當用戶端(TunnelWorker)沒有被正常 shutdown 時(例如異常退出或者手動結束),TunnelWorker 會自動進行資源的回收,包括釋放線程池,自動調用使用者在 Channel 上註冊的 shutdown 方法,關閉 Tunnel 串連等。
Tunnel 的增量日誌保留時間,其數值與資料表中 Stream 的日誌到期時間長度(最長時間長度為 7 天)保持一致,因此 Tunnel 的增量日誌最多保留 7 天。
增量或者全量加增量類型 Tunnel 消費資料時,可能會出現以下情況:
當 Tunnel 處於全量階段時,如果全量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,將會觸發
OTSTunnelExpired
錯誤,從而導致無法繼續消費後續資料。如果您預計全量資料無法在指定時間內完成消費,請及時聯絡Table Store支援人員進行諮詢。
當 Tunnel 處於增量階段時,如果增量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,Tunnel 將可能從最近可消費的資料處開始消費,因此存在漏消費資料的風險。
Tunnel 到期後,Table Store可能會禁用該 Tunnel。如果禁用狀態持續超過 30 天,則該 Tunnel 將被徹底刪除,刪除後將無法恢複。
前提條件
已建立資料表。具體操作,請參見使用控制台建立資料表、使用命令列工具建立資料表或使用 SDK 建立資料表。
已擷取執行個體網域名稱地址(Endpoint)。具體操作,請參見擷取執行個體 Endpoint。
已配置訪問憑證。具體操作,請參見配置訪問憑證。
體驗通道服務
初始化 Tunnel client。
初始化 Tunnel client時,您可以使用長期訪問憑證或者臨時訪問憑證進行簽名認證。
使用長期訪問憑證初始化
在運行本程式碼範例之前,請確保已設定環境變數
TABLESTORE_ACCESS_KEY_ID
和TABLESTORE_ACCESS_KEY_SECRET
,這兩個變數分別對應阿里雲帳號或 RAM 使用者的 AccessKey ID 和 AccessKey Secret。警告阿里雲帳號擁有資源的全部許可權,AK 一旦泄露,會給系統帶來巨大風險,不建議使用。推薦使用最小化授權的 RAM 使用者的 AK。
//endpoint是Table Store執行個體endpoint,例如https://instance.cn-hangzhou.ots.aliyun.com。 //instance是執行個體名稱。 //accessKeyId和accessKeySecret分別為阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。 endpoint := "yourEndpoint" instance := "yourInstance" accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID") accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET") tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)
使用臨時訪問憑證初始化
當您臨時使用 GO SDK 訪問 Tablestore 服務時,您可以通過 STS 服務頒發一個 STS 臨時訪問憑證。具體操作,請參見臨時訪問憑證。
Tunnel Client 內提供了 NewTunnelClientWithToken 介面用於使用臨時訪問憑證初始化 Tunnel Client。為了協助您更好的使用該介面,文檔中提供了一個帶重新整理臨時訪問憑證的範例程式碼。完整代碼請參見附錄:使用臨時訪問憑證初始化 Tunnel Client 的範例程式碼。
建立通道。
req := &tunnel.CreateTunnelRequest{ TableName: "<TABLE_NAME>", TunnelName: "<TUNNEL_NAME>", Type: tunnel.TunnelTypeBaseStream, //建立全量加增量類型的Tunnel。 } resp, err := tunnelClient.CreateTunnel(req) if err != nil { log.Fatal("create test tunnel failed", err) } log.Println("tunnel id is", resp.TunnelId)
根據業務自訂資料消費Callback函數,開始自動化的資料消費。
//根據業務自訂資料消費callback函數。 func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { fmt.Println("user-defined information", ctx.CustomValue) for _, rec := range records { fmt.Println("tunnel record detail:", rec.String()) } fmt.Println("a round of records consumption finished") return nil } //配置callback到SimpleProcessFactory,配置消費端TunnelWorkerConfig。 workConfig := &tunnel.TunnelWorkerConfig{ ProcessorFactory: &tunnel.SimpleProcessFactory{ CustomValue: "user custom interface{} value", ProcessFunc: exampleConsumeFunction, }, } //使用TunnelDaemon持續消費指定tunnel。 tunnelId := "<TUNNEL_ID>" daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig) log.Fatal(daemon.Run())
附錄:使用臨時訪問憑證初始化 Tunnel Client 的範例程式碼
import (
otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
"sync"
"time"
)
type RefreshClient struct {
lastRefresh time.Time
refreshIntervalInMin int
}
func NewRefreshClient(intervalInMin int) *RefreshClient {
return &RefreshClient{
refreshIntervalInMin: intervalInMin,
}
}
func (c *RefreshClient) IsExpired() bool {
now := time.Now()
if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
return true
}
return false
}
func (c *RefreshClient) Update() {
c.lastRefresh = time.Now()
}
type clientCredentials struct {
accessKeyID string
accessKeySecret string
securityToken string
}
func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}
func (c *clientCredentials) GetAccessKeyID() string {
return c.accessKeyID
}
func (c *clientCredentials) GetAccessKeySecret() string {
return c.accessKeySecret
}
func (c *clientCredentials) GetSecurityToken() string {
return c.securityToken
}
type OTSCredentialsProvider struct {
refresh *RefreshClient
cred *clientCredentials
lock sync.Mutex
}
func NewOTSCredentialsProvider() *OTSCredentialsProvider {
return &OTSCredentialsProvider{
// 按需調整臨時訪問憑證重新整理周期,需要小於StsToken的到期時間。
refresh: NewRefreshClient(30),
}
}
func (p *OTSCredentialsProvider) renewCredentials() error {
if p.cred == nil || p.refresh.IsExpired() {
// 此處需要擷取使用者的StsToken。調用RAM的AssumeRole介面會返回AccessKeyId、AccessKeySecret、SecurityToken和Expiration資訊。
// 擷取臨時訪問憑證後填寫以下參數。關於RAM(STS) SDK的更多資訊請參見RAM文檔。
// resp, err := GetUserOtsStsToken()
accessKeyId := ""
accessKeySecret := ""
stsToken := ""
p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
p.refresh.Update()
}
return nil
}
func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.renewCredentials(); err != nil {
// log error
if p.cred == nil {
return newClientCredentials("", "", "")
}
}
return p.cred
}
// NewTunnelClientWithToken用於初始化一個帶StsToken重新整理功能的TunnelClient。
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
return tunnel.NewTunnelClientWithToken(
endpoint,
instanceName,
"",
"",
"",
nil,
tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
)
}