すべてのプロダクト
Search
ドキュメントセンター

Tablestore:はじめに

最終更新日:Dec 28, 2024

Tunnel Service を使用すると、テーブル内のデータを使用できます。このトピックでは、Tablestore SDK for Go を使用して Tunnel Service を使い始める方法について説明します。Tunnel Service を使用する前に、Tunnel Service の使用上の注意をよく理解しておいてください。

使用上の注意

  • デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを開始します。単一のサーバーで複数の TunnelWorker を開始する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。

  • TunnelWorker は初期化のためにウォームアップ期間を必要とします。これは、TunnelWorkerConfig の heartbeatIntervalInSec パラメーターで指定されます。TunnelWorkerConfig の setHeartbeatIntervalInSec メソッドを使用して、このパラメーターを構成できます。デフォルト値:30。単位:秒。

  • 予期しない終了または手動による終了により TunnelWorker クライアントがシャットダウンした場合、TunnelWorker は次のいずれかの方法を使用してリソースを自動的にリサイクルします。スレッドプールを解放し、Channel クラスに登録した shutdown メソッドを自動的に呼び出し、トンネルをシャットダウンします。

  • トンネル内の増分ログの保持期間は、Stream ログの保持期間と同じです。Stream ログは最大 7 日間保持できます。したがって、トンネル内の増分ログは最大 7 日間保持できます。

  • 差分データまたは増分データを使用するためにトンネルを作成する場合は、次の点に注意してください。

    • フルデータ使用中、トンネルが増分ログの保持期間(最大 7 日間)内にフルデータの使用を完了できない場合、トンネルが増分ログの使用を開始するときに OTSTunnelExpired エラーが発生します。その結果、トンネルは増分ログを使用できません。

      トンネルが指定された時間枠内にフルデータ使用を完了できないと推定される場合は、Tablestore テクニカルサポートにご連絡ください。

    • 増分データ使用中、トンネルが増分ログの保持期間(最大 7 日間)内に増分ログの使用を完了できない場合、トンネルは使用可能な最新のデータからデータを使用する場合があります。この場合、特定のデータが使用されない可能性があります。

  • トンネルの有効期限が切れると、Tablestore はトンネルを無効にする場合があります。トンネルが無効状態のまま 30 日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。

前提条件

Tunnel Service の使用開始

  1. TunnelClient インスタンスを初期化します。

    TunnelClient インスタンスを初期化するときは、長期アクセス認証情報または一時アクセス認証情報を使用して認証できます。

    • 長期アクセス認証情報を使用して初期化

      TABLESTORE_ACCESS_KEY_ID および TABLESTORE_ACCESS_KEY_SECRET 環境変数が構成されていることを確認します。TABLESTORE_ACCESS_KEY_ID 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。TABLESTORE_ACCESS_KEY_SECRET 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。

      警告

      Alibaba Cloud アカウントは、アカウントのすべてのリソースへのフルアクセス権限を持っています。Alibaba Cloud アカウントの AccessKey ペアの漏洩は、システムにとって重大な脅威となります。したがって、TunnelClient インスタンスを初期化するために必要な最小限の権限が付与された RAM ユーザーの AccessKey ペアを使用することをお勧めします。

      // endpoint パラメーターを Tablestore インスタンスのエンドポイントに設定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。
      // インスタンスの名前を指定します。
      // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレットを指定します。
      endpoint := "yourEndpoint"
      instance := "yourInstance"
      accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID")
      accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET")
      tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)                    
    • 一時アクセス認証情報を使用して初期化

      1. Tablestore SDK for Go を使用して Tablestore に一時的にアクセスする場合は、Security Token Service (STS) を使用して一時アクセス認証情報を生成できます。詳細については、一時アクセス認証情報の構成を参照してください。

      2. トンネルクライアントは、NewTunnelClientWithToken 操作を提供します。この操作を呼び出すことで、一時アクセス認証情報に基づいて TunnelClient インスタンスを初期化できます。このトピックでは、定期的に更新できる一時アクセス認証情報を使用して TunnelClient インスタンスを初期化するためのサンプルコードを提供します。詳細については、付録:一時アクセス認証情報を使用して TunnelClient インスタンスを初期化するためのサンプルコードを参照してください。

  2. トンネルを作成します。

    req := &tunnel.CreateTunnelRequest{
       TableName:  "<TABLE_NAME>",
       TunnelName: "<TUNNEL_NAME>",
       Type:       tunnel.TunnelTypeBaseStream, // BaseAndStream トンネルを作成します。
    }
    resp, err := tunnelClient.CreateTunnel(req)
    if err != nil {
       log.Fatal("テストトンネルの作成に失敗しました", err)
    }
    log.Println("トンネル ID は", resp.TunnelId)
  3. カスタムコールバック関数を指定して、自動データ使用を開始します。

    // カスタムコールバック関数を指定します。
    func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
        fmt.Println("ユーザー定義情報", ctx.CustomValue)
        for _, rec := range records {
            fmt.Println("トンネルレコードの詳細:", rec.String())
        }
        fmt.Println("レコード使用のラウンドが完了しました")
        return nil
    }
    
    // コールバック関数を構成します。コールバック関数に関する情報は SimpleProcessFactory に渡されます。コンシューマーの TunnelWorkerConfig を構成します。
    workConfig := &tunnel.TunnelWorkerConfig{
       ProcessorFactory: &tunnel.SimpleProcessFactory{
          CustomValue: "ユーザーカスタム interface{} 値",
          ProcessFunc: exampleConsumeFunction,
       },
    }
    
    // TunnelDaemon を使用して、指定されたトンネルを継続的に使用します。
    tunnelId := "<TUNNEL_ID>"
    daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
    log.Fatal(daemon.Run())

付録:一時アクセス認証情報を使用して TunnelClient インスタンスを初期化するためのサンプルコード

import (
    otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
    "github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
    "sync"
    "time"
)

type RefreshClient struct {
    lastRefresh          time.Time
    refreshIntervalInMin int
}

func NewRefreshClient(intervalInMin int) *RefreshClient {
    return &RefreshClient{
        refreshIntervalInMin: intervalInMin,
    }
}

func (c *RefreshClient) IsExpired() bool {
    now := time.Now()
    if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
        return true
    }

    return false
}

func (c *RefreshClient) Update() {
    c.lastRefresh = time.Now()
}

type clientCredentials struct {
    accessKeyID     string
    accessKeySecret string
    securityToken   string
}

func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
    return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}

func (c *clientCredentials) GetAccessKeyID() string {
    return c.accessKeyID
}

func (c *clientCredentials) GetAccessKeySecret() string {
    return c.accessKeySecret
}

func (c *clientCredentials) GetSecurityToken() string {
    return c.securityToken
}

type OTSCredentialsProvider struct {
    refresh *RefreshClient
    cred    *clientCredentials
    lock    sync.Mutex
}

func NewOTSCredentialsProvider() *OTSCredentialsProvider {
    return &OTSCredentialsProvider{
        // 業務要件に基づいて一時アクセス認証情報の更新サイクルを変更します。更新サイクルは、一時アクセス認証情報の有効期間よりも短くする必要があります。
        refresh: NewRefreshClient(30),
    }
}

func (p *OTSCredentialsProvider) renewCredentials() error {
    if p.cred == nil || p.refresh.IsExpired() {
        // 一時アクセス認証情報を取得します。RAM の AssumeRole 操作を呼び出して、一時アクセス認証情報の AccessKey ID、AccessKey シークレット、セキュリティトークン、および有効期間を取得できます。
        // 次のパラメーターを構成します。RAM SDK の詳細については、RAM のドキュメントを参照してください。
        // resp, err := GetUserOtsStsToken()
        accessKeyId := ""
        accessKeySecret := ""
        stsToken := ""
        p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
        p.refresh.Update()
    }

    return nil
}

func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
    p.lock.Lock()
    defer p.lock.Unlock()

    if err := p.renewCredentials(); err != nil {
        // エラーをログに記録します
        if p.cred == nil {
            return newClientCredentials("", "", "")
        }
    }

    return p.cred
}

// NewTunnelClientWithToken は、一時アクセス認証情報の更新機能を使用して TunnelClient インスタンスを初期化するために使用されます。
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
    return tunnel.NewTunnelClientWithToken(
        endpoint,
        instanceName,
        "",
        "",
        "",
        nil,
        tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
    )
}