通过通道服务,您可以消费表中的数据。本文介绍如何使用Go SDK快速体验通道服务。使用前,您需要了解使用通道服务的注意事项。
注意事项
TunnelWorkerConfig 中默认会启动读数据和处理数据的线程池。如果使用的是单台机器,当需要启动多个 TunnelWorker 时,建议共用一个 TunnelWorkerConfig。
TunnelWorker 的初始化需要预热时间,该值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 参数影响,可以通过 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,默认为 30 s。
当客户端(TunnelWorker)没有被正常 shutdown 时(例如异常退出或者手动结束),TunnelWorker 会自动进行资源的回收,包括释放线程池,自动调用用户在 Channel 上注册的 shutdown 方法,关闭 Tunnel 连接等。
Tunnel 的增量日志保留时间,其数值与数据表中 Stream 的日志过期时长(最长时长为 7 天)保持一致,因此 Tunnel 的增量日志最多保留 7 天。
增量或者全量加增量类型 Tunnel 消费数据时,可能会出现以下情况:
当 Tunnel 处于全量阶段时,如果全量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,将会触发
OTSTunnelExpired
错误,从而导致无法继续消费后续数据。如果您预计全量数据无法在指定时间内完成消费,请及时联系表格存储技术支持进行咨询。
当 Tunnel 处于增量阶段时,如果增量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,Tunnel 将可能从最近可消费的数据处开始消费,因此存在漏消费数据的风险。
Tunnel 过期后,表格存储可能会禁用该 Tunnel。如果禁用状态持续超过 30 天,则该 Tunnel 将被彻底删除,删除后将无法恢复。
前提条件
已创建数据表。具体操作,请参见使用控制台创建数据表、使用命令行工具创建数据表或使用 SDK 创建数据表。
已获取实例域名地址(Endpoint)。具体操作,请参见获取实例 Endpoint。
已配置访问凭证。具体操作,请参见配置访问凭证。
体验通道服务
初始化 Tunnel client。
初始化 Tunnel client时,您可以使用长期访问凭证或者临时访问凭证进行签名认证。
使用长期访问凭证初始化
在运行本代码示例之前,请确保已设置环境变量
TABLESTORE_ACCESS_KEY_ID
和TABLESTORE_ACCESS_KEY_SECRET
,这两个变量分别对应阿里云账号或 RAM 用户的 AccessKey ID 和 AccessKey Secret。警告阿里云账号拥有资源的全部权限,AK 一旦泄露,会给系统带来巨大风险,不建议使用。推荐使用最小化授权的 RAM 用户的 AK。
//endpoint是表格存储实例endpoint,例如https://instance.cn-hangzhou.ots.aliyun.com。 //instance是实例名称。 //accessKeyId和accessKeySecret分别为阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret。 endpoint := "yourEndpoint" instance := "yourInstance" accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID") accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET") tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)
使用临时访问凭证初始化
当您临时使用 GO SDK 访问 Tablestore 服务时,您可以通过 STS 服务颁发一个 STS 临时访问凭证。具体操作,请参见临时访问凭证。
Tunnel Client 内提供了 NewTunnelClientWithToken 接口用于使用临时访问凭证初始化 Tunnel Client。为了帮助您更好的使用该接口,文档中提供了一个带刷新临时访问凭证的示例代码。完整代码请参见附录:使用临时访问凭证初始化 Tunnel Client 的示例代码。
创建通道。
req := &tunnel.CreateTunnelRequest{ TableName: "<TABLE_NAME>", TunnelName: "<TUNNEL_NAME>", Type: tunnel.TunnelTypeBaseStream, //创建全量加增量类型的Tunnel。 } resp, err := tunnelClient.CreateTunnel(req) if err != nil { log.Fatal("create test tunnel failed", err) } log.Println("tunnel id is", resp.TunnelId)
根据业务自定义数据消费Callback函数,开始自动化的数据消费。
//根据业务自定义数据消费callback函数。 func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { fmt.Println("user-defined information", ctx.CustomValue) for _, rec := range records { fmt.Println("tunnel record detail:", rec.String()) } fmt.Println("a round of records consumption finished") return nil } //配置callback到SimpleProcessFactory,配置消费端TunnelWorkerConfig。 workConfig := &tunnel.TunnelWorkerConfig{ ProcessorFactory: &tunnel.SimpleProcessFactory{ CustomValue: "user custom interface{} value", ProcessFunc: exampleConsumeFunction, }, } //使用TunnelDaemon持续消费指定tunnel。 tunnelId := "<TUNNEL_ID>" daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig) log.Fatal(daemon.Run())
附录:使用临时访问凭证初始化 Tunnel Client 的示例代码
import (
otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
"sync"
"time"
)
type RefreshClient struct {
lastRefresh time.Time
refreshIntervalInMin int
}
func NewRefreshClient(intervalInMin int) *RefreshClient {
return &RefreshClient{
refreshIntervalInMin: intervalInMin,
}
}
func (c *RefreshClient) IsExpired() bool {
now := time.Now()
if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
return true
}
return false
}
func (c *RefreshClient) Update() {
c.lastRefresh = time.Now()
}
type clientCredentials struct {
accessKeyID string
accessKeySecret string
securityToken string
}
func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}
func (c *clientCredentials) GetAccessKeyID() string {
return c.accessKeyID
}
func (c *clientCredentials) GetAccessKeySecret() string {
return c.accessKeySecret
}
func (c *clientCredentials) GetSecurityToken() string {
return c.securityToken
}
type OTSCredentialsProvider struct {
refresh *RefreshClient
cred *clientCredentials
lock sync.Mutex
}
func NewOTSCredentialsProvider() *OTSCredentialsProvider {
return &OTSCredentialsProvider{
// 按需调整临时访问凭证刷新周期,需要小于StsToken的过期时间。
refresh: NewRefreshClient(30),
}
}
func (p *OTSCredentialsProvider) renewCredentials() error {
if p.cred == nil || p.refresh.IsExpired() {
// 此处需要获取用户的StsToken。调用RAM的AssumeRole接口会返回AccessKeyId、AccessKeySecret、SecurityToken和Expiration信息。
// 获取临时访问凭证后填写以下参数。关于RAM(STS) SDK的更多信息请参见RAM文档。
// resp, err := GetUserOtsStsToken()
accessKeyId := ""
accessKeySecret := ""
stsToken := ""
p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
p.refresh.Update()
}
return nil
}
func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.renewCredentials(); err != nil {
// log error
if p.cred == nil {
return newClientCredentials("", "", "")
}
}
return p.cred
}
// NewTunnelClientWithToken用于初始化一个带StsToken刷新功能的TunnelClient。
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
return tunnel.NewTunnelClientWithToken(
endpoint,
instanceName,
"",
"",
"",
nil,
tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
)
}