すべてのプロダクト
Search
ドキュメントセンター

Tablestore:Tablestore SDK を使用したトンネルサービスの使用

最終更新日:Dec 28, 2024

このトピックでは、Tablestore SDK を使用してトンネルサービスを開始する方法について説明します。トンネルサービスを使用する前に、使用上の注意と API 操作をよく理解しておいてください。

使用上の注意

  • デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。

  • TunnelWorker は初期化のためにウォームアップ期間を必要とします。これは、TunnelWorkerConfig の heartbeatIntervalInSec パラメータで指定されます。このパラメータを構成するには、TunnelWorkerConfig の setHeartbeatIntervalInSec メソッドを使用します。デフォルト値:30。単位:秒。

  • 予期しない終了または手動による終了により TunnelWorker クライアントがシャットダウンした場合、TunnelWorker は次のいずれかの方法を使用してリソースを自動的にリサイクルします。スレッドプールを解放し、Channel クラスに登録した shutdown メソッドを自動的に呼び出し、トンネルをシャットダウンします。

  • トンネル内の増分ログの保持期間は、Stream ログの保持期間と同じです。Stream ログは最大 7 日間保持できます。したがって、トンネル内の増分ログは最大 7 日間保持できます。

  • 差分データまたは増分データを消費するためにトンネルを作成する場合は、次の点に注意してください。

    • フルデータ消費中に、トンネルが増分ログの保持期間(最大 7 日間)内にフルデータの消費を完了できない場合、トンネルが増分ログの消費を開始するときに OTSTunnelExpired エラーが発生します。その結果、トンネルは増分ログを消費できません。

      指定された時間枠内にトンネルがフルデータ消費を完了できないと推定される場合は、Tablestore テクニカルサポートにご連絡ください。

    • 増分データ消費中に、トンネルが増分ログの保持期間(最大 7 日間)内に増分ログの消費を完了できない場合、トンネルは利用可能な最新のデータからデータを読み取る可能性があります。この場合、特定のデータが消費されない可能性があります。

  • トンネルの有効期限が切れると、Tablestore はトンネルを無効にする場合があります。トンネルが無効状態のまま 30 日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。

API 操作

操作

説明

CreateTunnel

トンネルを作成します。

ListTunnel

データテーブルに作成されたトンネルに関する情報をクエリします。

DescribeTunnel

トンネル内のチャネルに関する情報をクエリします。

DeleteTunnel

トンネルを削除します。

Tablestore SDK の使用

トンネルサービスを実装するには、次のプログラミング言語用の Tablestore SDK を使用できます。

前提条件

  • 以下の操作は、Resource Access Management (RAM) コンソールで実行されます。

    • RAM ユーザーが作成され、AliyunOTSFullAccess ポリシーが RAM ユーザーにアタッチされ、RAM ユーザーに Tablestore を管理する権限が付与されます。詳細については、RAM ユーザーの作成RAM ユーザーへの権限の付与を参照してください。

      説明

      実際のビジネス環境では、最小限の権限の原則に基づいて、必要な権限のみを RAM ユーザーに付与することをお勧めします。これは、過剰なユーザー権限によって引き起こされるセキュリティリスクを防ぐのに役立ちます。

    • RAM ユーザーの AccessKey ペアが作成されます。詳細については、AccessKey ペアの作成を参照してください。

      警告

      Alibaba Cloud アカウントの AccessKey ペアが漏洩した場合、アカウント内のすべてのリソースが潜在的なリスクにさらされます。操作を実行するには、RAM ユーザーの AccessKey ペアを使用することをお勧めします。これは、Alibaba Cloud アカウントの AccessKey ペアの漏洩を防ぎます。

トンネルサービスの使用

この例では、Tablestore SDK for Java を使用してトンネルサービスを開始します。

  1. TunnelClient インスタンスを初期化します。

    説明

    TABLESTORE_ACCESS_KEY_ID および TABLESTORE_ACCESS_KEY_SECRET 環境変数が構成されていることを確認してください。TABLESTORE_ACCESS_KEY_ID 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。TABLESTORE_ACCESS_KEY_SECRET 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。

    // endPoint パラメータを Tablestore インスタンスのエンドポイントに設定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。
    // accessKeyId パラメータを AccessKey ID に、accessKeySecret パラメータを Tablestore にアクセスするために使用する AccessKey シークレットに設定します。
    // instanceName パラメータをインスタンスの名前に設定します。
    final String endPoint = "";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. トンネルを作成します。

    トンネルを作成する前に、テスト用のデータテーブルを作成するか、既存のテーブルを準備します。Tablestore コンソールで、または SyncClient の createTable メソッドを使用してテーブルを作成できます。

    重要

    増分トンネルまたは差分トンネルを作成する場合は、タイムスタンプを指定するために次のルールに従う必要があります。

    • 増分データの開始タイムスタンプを指定しない場合、トンネルが作成された時刻が開始タイムスタンプとして使用されます。

    • 増分データの開始タイムスタンプと終了タイムスタンプを指定する場合、有効な値は [現在のシステム時刻 - Stream の有効期間 + 5 分、現在のシステム時刻] の範囲内である必要があります。単位:ミリ秒。

      • Stream の有効期間とは、増分ログの有効期間(ミリ秒単位)を指します。Stream の有効期間の最大値は 7 日間です。データテーブルの Stream を有効にするときに、Stream の有効期間を指定できます。Stream の有効期間を指定した後、期間を変更することはできません。

      • 終了タイムスタンプは開始タイムスタンプよりも後である必要があります。

    // 次のタイプのトンネルがサポートされています:TunnelType.BaseData、TunnelType.Stream、および TunnelType.BaseAndStream。
    // 次のサンプルコードは、BaseAndStream トンネルを作成する方法の例を示しています。別のタイプのトンネルを作成するには、ビジネス要件に基づいて CreateTunnelRequest の TunnelType パラメータを構成します。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // tunnelId パラメータを使用して TunnelWorker を初期化します。ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
    String tunnelId = resp.getTunnelId();
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. カスタムデータ消費コールバックを指定して、自動データ消費を開始します。

    // データ消費のコールバックを指定して、process メソッドと shutdown メソッドを指定する IChannelProcessor 操作を呼び出します。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // ProcessRecordsInput パラメータには、取得したデータが含まれています。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // NextToken パラメータは、トンネルクライアントがデータをページ分割するために使用されます。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // データの消費と処理をシミュレートします。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。
    // 単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。TunnelWorkerConfig は、より高度なパラメータを提供します。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // TunnelWorker を構成し、自動データ処理を開始します。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

TunnelWorkerConfig の構成

TunnelWorkerConfig を使用すると、ビジネス要件に基づいてトンネルクライアントのカスタムパラメータを構成できます。次の表に、Tablestore SDK for Java のパラメータを示します。

構成

パラメータ

説明

ハートビートの間隔とタイムアウト期間

heartbeatTimeoutInSec

ハートビートのタイムアウト期間。デフォルト値:300。単位:秒。

ハートビートタイムアウトが発生した場合、トンネルサーバーは現在の TunnelClient インスタンスが使用不可であると見なします。この場合、トンネルクライアントはトンネルサーバーに再接続する必要があります。

heartbeatIntervalInSec

ハートビートの間隔。デフォルト値:30。最小値:5。単位:秒。

ハートビートを検出して、アクティブなチャネルを監視し、チャネルのステータスを更新し、データ処理タスクを自動的に初期化できます。

チェックポイント間の間隔

checkpointIntervalInMillis

データが消費されるときのチェックポイント間の間隔。間隔はトンネルサーバーに記録されます。

デフォルト値:5000。単位:ミリ秒。

説明
  • 読み取るデータが異なるサーバーに保存されている場合、プロセスを実行するときにさまざまなエラーが発生する可能性があります。たとえば、環境要因によりサーバーが再起動する場合があります。したがって、トンネルサーバーはデータが処理された後に定期的にチェックポイントを記録します。タスクは再起動後に最後のチェックポイントからデータを処理します。特定の場合、トンネルサービスはデータを 1 回または複数回順次同期する場合があります。特定のデータが再処理される場合は、ビジネス処理ロジックを確認してください。

  • エラー発生時にデータが再処理されないようにするには、追加のチェックポイントを記録します。チェックポイントが多すぎると、システムスループットが低下する可能性があることに注意してください。ビジネス要件に基づいてチェックポイントを記録することをお勧めします。

カスタムクライアントタグ

clientTag

トンネルクライアント ID を生成するために使用されるカスタムクライアントタグ。このパラメータを構成して、TunnelWorker を区別できます。

データ処理のカスタムコールバックを指定する

channelProcessor

ユーザーがデータを処理するために登録するコールバック。process メソッドと shutdown メソッドが含まれます。

データを読み取り、処理するためのスレッドプールの構成

readRecordsExecutor

データを読み取るために使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。

processRecordsExecutor

データを処理するために使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。

説明
  • スレッドプールの構成を指定する場合は、スレッド数をトンネル内のチャネル数に設定することをお勧めします。こうすることで、CPU などの計算リソースを各チャネルに迅速に割り当てることができます。

  • Tablestore は、スループットを確保するために、プールのデフォルト構成で次の操作を実行します。

    • 少量のデータまたは少数のチャネルが存在する場合にリアルタイムスループットを確保するために、事前に 32 個のコアスレッドを割り当てます。

    • 大量のデータを処理する必要がある場合、または多数のチャネルが存在する場合にキューの長さを短縮します。こうすることで、プールにスレッドを作成し、より多くの計算リソースを割り当てるポリシーがトリガーされます。

    • スレッドのキープアライブ時間を 60 秒に設定することをお勧めします。処理するデータ量が少なくなると、スレッドリソースをリサイクルできます。

メモリ制御

maxChannelParallel

メモリ制御のためにデータを読み取り、処理するためのチャネルの最大同時実行レベル。

デフォルト値は -1 で、同時実行レベルが無制限であることを指定します。

説明

Tablestore SDK for Java V5.10.0 以降でこの機能がサポートされています。

最大バックオフ時間

maxRetryIntervalInMillis

トンネルの最大バックオフ時間を計算するための基準値。最大バックオフ時間は、0.75 × maxRetryIntervalInMillis から 1.25 × maxRetryIntervalInMillis の範囲の乱数です。

デフォルト値:2000。最小値:200。単位:ミリ秒。

説明
  • Tablestore SDK for Java V5.4.0 以降でこの機能がサポートされています。

  • 処理するデータ量がエクスポートあたり 900 KB または 500 個未満の場合、トンネルクライアントは最大バックオフ時間に達するまで指数バックオフを使用します。

CLOSING チャネル検出

enableClosingChannelDetect

CLOSING チャネルのリアルタイム検出を有効にするかどうかを指定します。デフォルト値:false。CLOSING チャネルのリアルタイム検出が無効になっていることを指定します。

説明
  • Tablestore SDK for Java V5.13.13 以降でこの機能がサポートされています。

  • この機能を有効にしないと、多数のチャネルが存在するがクライアントリソースが不足している場合など、特定のシナリオでチャネルが中断され、消費が中断される可能性があります。

  • CLOSING チャネル:あるトンネルクライアントから別のトンネルクライアントに切り替えられているチャネル。

付録:サンプルコード

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // レコードプロセッサのデフォルト実装。レコード数を表示します。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // NextToken パラメータは、トンネルクライアントがデータをページ分割するために使用されます。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // データの消費と処理をシミュレートします。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1. トンネルクライアントを初期化します。
        final String endPoint = "";
        final String accessKeyId = System.getenv("OTS_AK_ENV");
        final String accessKeySecret = System.getenv("OTS_SK_ENV");
        final String instanceName = "";
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2. トンネルを作成します。この手順を実行する前に、テスト用のテーブルを作成する必要があります。Tablestore コンソールで、または SyncClient の createTable メソッドを使用してテーブルを作成できます。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        // tunnelId パラメータを使用して TunnelWorker を初期化します。ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3. カスタムコールバックを定義して、自動データ消費を開始します。
        // TunnelWorkerConfig は、より高度なパラメータを提供します。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}