すべてのプロダクト
Search
ドキュメントセンター

Tablestore:はじめに

最終更新日:Jun 19, 2025

トンネルサービスを使用すると、テーブル内のデータを使用できます。このトピックでは、Tablestore SDK for Java を使用してトンネルサービスを開始する方法について説明します。

使用上の注意

  • デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを開始します。単一サーバーで複数の TunnelWorker を開始する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。

  • TunnelWorker は初期化のためにウォームアップ期間が必要です。これは、TunnelWorkerConfig の heartbeatIntervalInSec パラメーターで指定されます。このパラメーターを構成するには、TunnelWorkerConfig の setHeartbeatIntervalInSec メソッドを使用します。デフォルト値:30。単位:秒。

  • 予期しない終了または手動終了により TunnelWorker クライアントがシャットダウンすると、TunnelWorker は次のいずれかの方法を使用してリソースを自動的にリサイクルします。スレッドプールを解放し、Channel クラスに登録した shutdown メソッドを自動的に呼び出し、トンネルをシャットダウンします。

  • トンネル内の増分ログの保持期間は、Stream ログの保持期間と同じです。 Stream ログは最大 7 日間保持できます。したがって、トンネル内の増分ログは最大 7 日間保持できます。

  • 差分データまたは増分データを使用するためにトンネルを作成する場合は、次の点に注意してください。

    • 完全データ消費中に、トンネルが増分ログの保持期間(最大 7 日間)内に完全データの消費を完了できない場合、トンネルが増分ログの消費を開始するときに OTSTunnelExpired エラーが発生します。その結果、トンネルは増分ログを使用できません。

      指定されたタイムウィンドウ内でトンネルが完全データ消費を完了できないと推定される場合は、Tablestore テクニカルサポートに連絡してください。

    • 増分データ消費中に、トンネルが増分ログの保持期間(最大 7 日間)内に増分ログの消費を完了できない場合、トンネルは使用可能な最新のデータからデータを使用する場合があります。この場合、特定のデータは使用されない可能性があります。

  • トンネルの有効期限が切れると、Tablestore はトンネルを無効にする場合があります。トンネルが無効状態のままで 30 日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。

前提条件

  • 以下の操作は、Resource Access Management(RAM)コンソールで実行されます。

    • RAM ユーザーが作成され、AliyunOTSFullAccess ポリシーが RAM ユーザーにアタッチされ、RAM ユーザーに Tablestore を管理する権限が付与されます。詳細については、「RAM ユーザーの作成」および「RAM ユーザーへの権限付与」をご参照ください。

      説明

      実際のビジネス環境では、最小権限の原則に基づいて、必要な権限のみを RAM ユーザーに付与することをお勧めします。これは、過剰なユーザー権限によって引き起こされるセキュリティリスクを防ぐのに役立ちます。

    • RAM ユーザーの AccessKey ペアが作成されます。詳細については、「AccessKey ペアを作成する」をご参照ください。

      警告

      Alibaba Cloud アカウントの AccessKey ペアが漏洩した場合、アカウント内のすべてのリソースが潜在的なリスクにさらされます。 RAM ユーザーの AccessKey ペアを使用して操作を実行することをお勧めします。これにより、Alibaba Cloud アカウントの AccessKey ペアの漏洩を防ぎます。

トンネルサービスを開始する

Tablestore SDK for Java を使用して、トンネルサービスを開始します。

  1. トンネルクライアントを初期化します。

    説明

    TABLESTORE_ACCESS_KEY_ID および TABLESTORE_ACCESS_KEY_SECRET 環境変数が構成されていることを確認します。 TABLESTORE_ACCESS_KEY_ID 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。 TABLESTORE_ACCESS_KEY_SECRET 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。

    // Tablestore インスタンスの名前を指定します。
    // Tablestore インスタンスのエンドポイントを指定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。
    // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレットを指定します。
    final String instanceName = "yourInstanceName";
    final String endPoint = "yourEndpoint";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. トンネルを作成します。

    トンネルを作成する前に、テスト用のデータテーブルを作成するか、既存のテーブルを準備します。 Tablestore コンソールで、または SyncClient の createTable メソッドを使用して、テーブルを作成できます。

    重要

    Stream または BaseAndStream トンネルを作成する場合は、次のルールに従ってタイムスタンプを指定します。

    • 増分データの開始タイムスタンプを指定しない場合、開始タイムスタンプはトンネルが作成された時刻です。

    • 増分データの開始タイムスタンプと終了タイムスタンプを指定する場合、開始タイムスタンプまたは終了タイムスタンプは、ミリ秒単位で [現在のシステム時刻 - Stream の有効期間 + 5 分、現在のシステム時刻] の範囲内である必要があります。

      • Stream の有効期間とは、ミリ秒単位の増分ログの有効期間を指します。 Stream の有効期間の最大値は 7 日間です。データテーブルの Stream を有効にするときに、Stream の有効期間を指定できます。 Stream の有効期間を指定した後、期間を変更することはできません。

      • 終了タイムスタンプは開始タイムスタンプよりも大きくする必要があります。

    // 3 つのタイプのトンネルを作成できます:TunnelType.BaseData、TunnelType.Stream、および TunnelType.BaseAndStream。
    // 次のコードは、BaseAndStream トンネルを作成する方法の例を示しています。別のタイプのトンネルを作成するには、CreateTunnelRequest の TunnelType を目的のタイプに設定します。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // tunnelId を使用して TunnelWorker を初期化します。 ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得します。
    String tunnelId = resp.getTunnelId();
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. カスタムデータ消費コールバックを指定して、自動データ消費を開始します。次の表に、TunnelClient の構成を示します。

    // データ消費のコールバックを指定して、process メソッドと shutdown メソッドを指定する IChannelProcessor 操作を呼び出します。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // ProcessRecordsInput パラメーターには、取得したデータが含まれています。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // NextToken パラメーターは、トンネルクライアントがデータをページ分割するために使用されます。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // データの消費と処理をシミュレートします。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを開始します。単一サーバーを使用する場合、複数の TunnelWorker が開始されます。
    // 同じ TunnelWorkerConfig を使用して TunnelWorker を構成することをお勧めします。 TunnelWorkerConfig は、より高度なパラメーターを提供します。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // TunnelWorker を構成し、自動データ処理を開始します。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

TunnelWorkerConfig を構成する

TunnelWorkerConfig を使用すると、ビジネス要件に基づいてトンネルクライアントのカスタムパラメーターを構成できます。次の表に、Tablestore SDK for Java のパラメーターを示します。

構成

パラメーター

説明

ハートビートの間隔とタイムアウト期間

heartbeatTimeoutInSec

ハートビートのタイムアウト期間。デフォルト値:300。単位:秒。

ハートビートタイムアウトが発生すると、トンネルサーバーは現在の TunnelClient インスタンスが使用できないと見なします。この場合、トンネルクライアントはトンネルサーバーに再接続する必要があります。

heartbeatIntervalInSec

ハートビートの間隔。デフォルト値:30。最小値:5。単位:秒。

ハートビートを検出して、アクティブなチャネルを監視し、チャネルのステータスを更新し、データ処理タスクを自動的に初期化できます。

チェックポイント間の間隔

checkpointIntervalInMillis

データが消費されるときのチェックポイント間の間隔。間隔はトンネルサーバーに記録されます。

デフォルト値:5000。単位:ミリ秒。

説明
  • 読み取るデータが異なるサーバーに保存されている場合、プロセスを実行するときにさまざまなエラーが発生する可能性があります。たとえば、環境要因によりサーバーが再起動する場合があります。したがって、トンネルサーバーはデータが処理された後にチェックポイントを定期的に記録します。タスクは再起動後、最後のチェックポイントからデータを処理します。特定の場合、トンネルサービスはデータを 1 回または複数回順次同期する場合があります。特定のデータが再処理される場合は、ビジネス処理ロジックを確認してください。

  • エラー発生時にデータが再処理されないようにするには、追加のチェックポイントを記録します。チェックポイントが多すぎると、システムスループットが低下する可能性があることに注意してください。ビジネス要件に基づいてチェックポイントを記録することをお勧めします。

カスタムクライアントタグ

clientTag

トンネルクライアント ID を生成するために使用されるカスタムクライアントタグ。このパラメーターを構成して、TunnelWorker を区別できます。

データ処理のカスタムコールバックを指定する

channelProcessor

process メソッドと shutdown メソッドを含む、データを処理するためにユーザーが登録するコールバック。

データを読み取り、処理するためのスレッドプールの構成

readRecordsExecutor

データの読み取りに使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。

processRecordsExecutor

データの処理に使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。

説明
  • スレッドプールの構成を指定する場合は、スレッド数をトンネル内のチャネル数に設定することをお勧めします。こうすることで、CPU などの計算リソースを各チャネルにすばやく割り当てることができます。

  • Tablestore は、スループットを確保するために、プールのデフォルト構成で次の操作を実行します。

    • 少量のデータまたは少数のチャネルが存在する場合にリアルタイムスループットを確保するために、事前に 32 個のコアスレッドを割り当てます。

    • 大量のデータを処理する必要がある場合、または多数のチャネルが存在する場合に、キューの長さを短縮します。こうすることで、プールにスレッドを作成し、より多くの計算リソースを割り当てるポリシーがトリガーされます。

    • スレッドのキープアライブ時間を 60 秒に設定することをお勧めします。処理するデータ量が削減された場合は、スレッドリソースをリサイクルできます。

メモリ制御

maxChannelParallel

メモリ制御のためにデータを読み取り、処理するためのチャネルの最大同時実行レベル。

デフォルト値は -1 で、同時実行レベルが無制限であることを指定します。

説明

Tablestore SDK for Java V5.10.0 以降では、この機能がサポートされています。

最大バックオフ時間

maxRetryIntervalInMillis

トンネルの最大バックオフ時間を計算するための基準値。最大バックオフ時間は、0.75 × maxRetryIntervalInMillis から 1.25 × maxRetryIntervalInMillis の範囲の乱数です。

デフォルト値:2000。最小値:200。単位:ミリ秒。

説明
  • Tablestore SDK for Java V5.4.0 以降では、この機能がサポートされています。

  • 処理するデータ量がエクスポートあたり 900 KB または 500 個未満の場合、トンネルクライアントは最大バックオフ時間に達するまで指数バックオフを使用します。

CLOSING チャネル検出

enableClosingChannelDetect

CLOSING チャネルのリアルタイム検出を有効にするかどうかを指定します。デフォルト値:false。CLOSING チャネルのリアルタイム検出が無効になっていることを指定します。

説明
  • Tablestore SDK for Java V5.13.13 以降では、この機能がサポートされています。

  • この機能を有効にしないと、多数のチャネルが存在するがクライアントリソースが不足している場合など、特定のシナリオでチャネルが一時停止し、消費が中断される可能性があります。

  • CLOSING チャネル:あるトンネルクライアントから別のトンネルクライアントに切り替えられているチャネル。

付録:サンプルコード

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // NextToken パラメーターは、トンネルクライアントがデータをページ分割するために使用されます。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // データの消費と処理をシミュレートします。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1. トンネルクライアントを初期化します。
        // インスタンスの名前を指定します。
        final String instanceName = "yourInstanceName";
        // インスタンスのエンドポイントを指定します。
        final String endPoint = "yourEndpoint";
        // 環境変数から AccessKey ID と AccessKey シークレットを取得します。
        final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
        final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2. トンネルを作成します。この手順を実行する前に、テスト用のテーブルを作成する必要があります。 Tablestore コンソールで、または SyncClient の createTable メソッドを使用して、テーブルを作成できます。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        // tunnelId パラメーターを使用して TunnelWorker を初期化します。 ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3. カスタムコールバック関数を指定して、自動データ消費を開始します。
        // TunnelWorkerConfig は、より高度なパラメーターを提供します。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}