全部產品
Search
文件中心

Tablestore:快速開始

更新時間:Dec 20, 2024

通過通道服務,您可以消費表中的資料。本文介紹如何使用Go SDK快速體驗通道服務。使用前,您需要瞭解使用通道服務的注意事項。

注意事項

  • TunnelWorkerConfig 中預設會啟動讀資料和處理資料的線程池。如果使用的是單台機器,當需要啟動多個 TunnelWorker 時,建議共用一個 TunnelWorkerConfig。

  • TunnelWorker 的初始化需要預熱時間,該值受 TunnelWorkerConfig 中的 HeartbeatInterval 參數影響,預設為 30 s。

  • 當用戶端(TunnelWorker)沒有被正常 shutdown 時(例如異常退出或者手動結束),TunnelWorker 會自動進行資源的回收,包括釋放線程池,自動調用使用者在 Channel 上註冊的 shutdown 方法,關閉 Tunnel 串連等。

  • Tunnel 的增量日誌保留時間,其數值與資料表中 Stream 的日誌到期時間長度(最長時間長度為 7 天)保持一致,因此 Tunnel 的增量日誌最多保留 7 天。

  • 增量或者全量加增量類型 Tunnel 消費資料時,可能會出現以下情況:

    • 當 Tunnel 處於全量階段時,如果全量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,將會觸發 OTSTunnelExpired 錯誤,從而導致無法繼續消費後續資料。

      如果您預計全量資料無法在指定時間內完成消費,請及時聯絡Table Store支援人員進行諮詢。

    • 當 Tunnel 處於增量階段時,如果增量資料在增量日誌保留時間內(最多保留 7 天)未能完成消費,Tunnel 將可能從最近可消費的資料處開始消費,因此存在漏消費資料的風險。

  • Tunnel 到期後,Table Store可能會禁用該 Tunnel。如果禁用狀態持續超過 30 天,則該 Tunnel 將被徹底刪除,刪除後將無法恢複。

前提條件

體驗通道服務

  1. 初始化 Tunnel client。

    初始化 Tunnel client時,您可以使用長期訪問憑證或者臨時訪問憑證進行簽名認證。

    • 使用長期訪問憑證初始化

      在運行本程式碼範例之前,請確保已設定環境變數TABLESTORE_ACCESS_KEY_IDTABLESTORE_ACCESS_KEY_SECRET,這兩個變數分別對應阿里雲帳號或 RAM 使用者的 AccessKey ID 和 AccessKey Secret。

      警告

      阿里雲帳號擁有資源的全部許可權,AK 一旦泄露,會給系統帶來巨大風險,不建議使用。推薦使用最小化授權的 RAM 使用者的 AK。

      //endpoint是Table Store執行個體endpoint,例如https://instance.cn-hangzhou.ots.aliyun.com。
      //instance是執行個體名稱。
      //accessKeyId和accessKeySecret分別為阿里雲帳號或者RAM使用者的AccessKey ID和AccessKey Secret。
      endpoint := "yourEndpoint"
      instance := "yourInstance"
      accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID")
      accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET")
      tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)                    
    • 使用臨時訪問憑證初始化

      1. 當您臨時使用 GO SDK 訪問 Tablestore 服務時,您可以通過 STS 服務頒發一個 STS 臨時訪問憑證。具體操作,請參見臨時訪問憑證

      2. Tunnel Client 內提供了 NewTunnelClientWithToken 介面用於使用臨時訪問憑證初始化 Tunnel Client。為了協助您更好的使用該介面,文檔中提供了一個帶重新整理臨時訪問憑證的範例程式碼。完整代碼請參見附錄:使用臨時訪問憑證初始化 Tunnel Client 的範例程式碼

  2. 建立通道。

    req := &tunnel.CreateTunnelRequest{
       TableName:  "<TABLE_NAME>",
       TunnelName: "<TUNNEL_NAME>",
       Type:       tunnel.TunnelTypeBaseStream, //建立全量加增量類型的Tunnel。
    }
    resp, err := tunnelClient.CreateTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.TunnelId)
  3. 根據業務自訂資料消費Callback函數,開始自動化的資料消費。

    //根據業務自訂資料消費callback函數。
    func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
        fmt.Println("user-defined information", ctx.CustomValue)
        for _, rec := range records {
            fmt.Println("tunnel record detail:", rec.String())
        }
        fmt.Println("a round of records consumption finished")
        return nil
    }
    
    //配置callback到SimpleProcessFactory,配置消費端TunnelWorkerConfig。
    workConfig := &tunnel.TunnelWorkerConfig{
       ProcessorFactory: &tunnel.SimpleProcessFactory{
          CustomValue: "user custom interface{} value",
          ProcessFunc: exampleConsumeFunction,
       },
    }
    
    //使用TunnelDaemon持續消費指定tunnel。
    tunnelId := "<TUNNEL_ID>"
    daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
    log.Fatal(daemon.Run())

附錄:使用臨時訪問憑證初始化 Tunnel Client 的範例程式碼

import (
    otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
    "github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
    "sync"
    "time"
)

type RefreshClient struct {
    lastRefresh          time.Time
    refreshIntervalInMin int
}

func NewRefreshClient(intervalInMin int) *RefreshClient {
    return &RefreshClient{
        refreshIntervalInMin: intervalInMin,
    }
}

func (c *RefreshClient) IsExpired() bool {
    now := time.Now()
    if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
        return true
    }

    return false
}

func (c *RefreshClient) Update() {
    c.lastRefresh = time.Now()
}

type clientCredentials struct {
    accessKeyID     string
    accessKeySecret string
    securityToken   string
}

func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
    return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}

func (c *clientCredentials) GetAccessKeyID() string {
    return c.accessKeyID
}

func (c *clientCredentials) GetAccessKeySecret() string {
    return c.accessKeySecret
}

func (c *clientCredentials) GetSecurityToken() string {
    return c.securityToken
}

type OTSCredentialsProvider struct {
    refresh *RefreshClient
    cred    *clientCredentials
    lock    sync.Mutex
}

func NewOTSCredentialsProvider() *OTSCredentialsProvider {
    return &OTSCredentialsProvider{
        // 按需調整臨時訪問憑證重新整理周期,需要小於StsToken的到期時間。
        refresh: NewRefreshClient(30),
    }
}

func (p *OTSCredentialsProvider) renewCredentials() error {
    if p.cred == nil || p.refresh.IsExpired() {
        // 此處需要擷取使用者的StsToken。調用RAM的AssumeRole介面會返回AccessKeyId、AccessKeySecret、SecurityToken和Expiration資訊。
        // 擷取臨時訪問憑證後填寫以下參數。關於RAM(STS) SDK的更多資訊請參見RAM文檔。 
        // resp, err := GetUserOtsStsToken()
        accessKeyId := ""
        accessKeySecret := ""
        stsToken := ""
        p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
        p.refresh.Update()
    }

    return nil
}

func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
    p.lock.Lock()
    defer p.lock.Unlock()

    if err := p.renewCredentials(); err != nil {
        // log error
        if p.cred == nil {
            return newClientCredentials("", "", "")
        }
    }

    return p.cred
}

// NewTunnelClientWithToken用於初始化一個帶StsToken重新整理功能的TunnelClient。
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
    return tunnel.NewTunnelClientWithToken(
        endpoint,
        instanceName,
        "",
        "",
        "",
        nil,
        tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
    )
}