すべてのプロダクト
Search
ドキュメントセンター

Tablestore:はじめに

最終更新日:Dec 28, 2024

Tunnel Serviceを使用すると、テーブル内のデータを消費できます。このトピックでは、Java向けTablestore SDKを使用してTunnel Serviceを始める方法について説明します。

使用上の注意

  • デフォルトでは、システムはTunnelWorkerConfigに基づいてデータを読み取りおよび処理するためのスレッドプールを開始します。単一のサーバーで複数のTunnelWorkerを開始する場合は、すべてのTunnelWorkerを構成するために同じTunnelWorkerConfigを使用することをお勧めします。

  • TunnelWorkerは、初期化のためにウォームアップ期間が必要です。これは、TunnelWorkerConfigのheartbeatIntervalInSecパラメータで指定されます。このパラメータを構成するには、TunnelWorkerConfigのsetHeartbeatIntervalInSecメソッドを使用します。デフォルト値:30。単位:秒。

  • 予期しない終了または手動による終了によりTunnelWorkerクライアントがシャットダウンした場合、TunnelWorkerは次のいずれかの方法を使用してリソースを自動的にリサイクルします:スレッドプールの解放、Channelクラスに登録したshutdownメソッドの自動呼び出し、およびトンネルのシャットダウン。

  • トンネル内の増分ログの保持期間は、Streamログの保持期間と同じです。Streamログは最大7日間保持できます。したがって、トンネル内の増分ログは最大7日間保持できます。

  • 差分データまたは増分データを消費するためにトンネルを作成する場合は、次の項目に注意してください。

    • フルデータ消費中に、トンネルが増分ログの保持期間(最大7日間)内にフルデータの消費を完了できない場合、トンネルが増分ログの消費を開始するときにOTSTunnelExpiredエラーが発生します。その結果、トンネルは増分ログを消費できません。

      トンネルが指定された時間枠内にフルデータ消費を完了できないと推定される場合は、Tablestoreテクニカルサポートにご連絡ください。

    • 増分データ消費中に、トンネルが増分ログの保持期間(最大7日間)内に増分ログの消費を完了できない場合、トンネルは利用可能な最新のデータからデータを読み取る可能性があります。この場合、特定のデータが消費されない可能性があります。

  • トンネルが期限切れになると、Tablestoreはトンネルを無効にする場合があります。トンネルが無効状態のまま30日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。

前提条件

  • 以下の操作は、Resource Access Management(RAM)コンソールで実行されます。

    • RAMユーザーが作成され、AliyunOTSFullAccessポリシーがRAMユーザーにアタッチされ、RAMユーザーにTablestoreを管理する権限が付与されます。詳細については、RAMユーザーの作成およびRAMユーザーへの権限の付与を参照してください。

      説明

      実際のビジネス環境では、最小限の権限の原則に基づいて、RAMユーザーに必要な権限のみを付与するカスタムポリシーを作成することをお勧めします。これにより、過剰なユーザー権限によって引き起こされるセキュリティリスクを防ぎます。

    • RAMユーザーのAccessKeyペアが作成されます。詳細については、AccessKeyペアの作成を参照してください。

      警告

      Alibaba CloudアカウントのAccessKeyペアが漏洩した場合、アカウント内のすべてのリソースが潜在的なリスクにさらされます。RAMユーザーのAccessKeyペアを使用して操作を実行することをお勧めします。これにより、Alibaba CloudアカウントのAccessKeyペアの漏洩を防ぎます。

Tunnel Serviceの使用

Java向けTablestore SDKを使用して、Tunnel Serviceを始めましょう。

  1. TunnelClientインスタンスを初期化します。

    説明

    TABLESTORE_ACCESS_KEY_IDおよびTABLESTORE_ACCESS_KEY_SECRET環境変数が構成されていることを確認してください。TABLESTORE_ACCESS_KEY_ID環境変数は、Alibaba CloudアカウントまたはRAMユーザーのAccessKey IDを指定します。TABLESTORE_ACCESS_KEY_SECRET環境変数は、Alibaba CloudアカウントまたはRAMユーザーのAccessKeyシークレットを指定します。

    // endPointパラメータをTablestoreインスタンスのエンドポイントに設定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。
    // accessKeyIdパラメータをAccessKey IDに設定し、accessKeySecretパラメータをTablestoreにアクセスするために使用するAccessKeyシークレットに設定します。
    // instanceNameパラメータをインスタンスの名前に設定します。
    final String endPoint = "";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. トンネルを作成します。

    トンネルを作成する前に、テスト用のデータテーブルを作成するか、既存のテーブルを準備します。Tablestoreコンソールで、またはSyncClientのcreateTableメソッドを使用してテーブルを作成できます。

    // TunnelType.BaseData、TunnelType.Stream、およびTunnelType.BaseAndStreamの3種類のトンネルを作成できます。
    // 次のコードは、BaseAndStreamトンネルを作成する方法の例を示しています。別のタイプのトンネルを作成するには、CreateTunnelRequestのTunnelTypeを目的のタイプに設定します。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // tunnelIdパラメータを使用してTunnelWorkerを初期化します。ListTunnelまたはDescribeTunnel操作を呼び出して、トンネルIDを取得できます。
    String tunnelId = resp.getTunnelId();
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. カスタムデータ消費コールバック関数を指定して、自動データ消費を開始します。次の表に、TunnelClientインスタンスの構成を示します。

    // データ消費のコールバック関数を指定して、processメソッドとshutdownメソッドを指定するIChannelProcessor操作を呼び出します。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // ProcessRecordsInputパラメータには、取得したデータが含まれています。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // NextTokenパラメータは、トンネルクライアントがデータをページ分割するために使用されます。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // データの消費と処理をシミュレートします。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // デフォルトでは、システムはTunnelWorkerConfigに基づいてデータを読み取りおよび処理するためのスレッドプールを開始します。単一のサーバーを使用する場合、複数のTunnelWorkerが開始されます。
    // 同じTunnelWorkerConfigを使用してTunnelWorkerを構成することをお勧めします。TunnelWorkerConfigは、より高度なパラメータを提供します。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // TunnelWorkerを構成し、自動データ処理を開始します。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

TunnelWorkerConfigの構成

TunnelWorkerConfigを使用すると、ビジネス要件に基づいてトンネルクライアントのカスタムパラメータを構成できます。次の表に、Java向けTablestore SDKのパラメータを示します。

構成

パラメータ

説明

ハートビートの間隔とタイムアウト期間

heartbeatTimeoutInSec

ハートビートのタイムアウト期間。デフォルト値: 300。単位: 秒。

ハートビートのタイムアウトが発生すると、トンネルサーバーは現在の TunnelClient インスタンスが使用不可であると見なします。この場合、トンネルクライアントはトンネルサーバーに再接続する必要があります。

heartbeatIntervalInSec

ハートビートの間隔です。 既定値: 30。最小値: 5。単位: 秒。

ハートビートを検出して、アクティブなチャネルを監視し、チャネルのステータスを更新し、データ処理タスクを自動的に初期化できます。

チェックポイントの間隔

checkpointIntervalInMillis

データ消費時のチェックポイントの間隔。間隔はトンネルサーバーに記録されます。

デフォルト値: 5000。単位: ミリ秒。

説明
  • 読み取るデータが異なるサーバーに保存されている場合、プロセスを実行するとさまざまなエラーが発生する可能性があります。たとえば、環境要因によりサーバーが再起動する場合があります。そのため、トンネルサーバーはデータ処理後に定期的にチェックポイントを記録します。タスクは、タスクの再起動後、最後のチェックポイントからデータ処理を開始します。特定のケースでは、Tunnel Serviceは1回または複数回、データを順次同期する場合があります。特定のデータが再処理される場合は、ビジネス処理ロジックを確認してください。

  • エラー発生時にデータが再処理されないようにするには、追加のチェックポイントを記録します。チェックポイントの数が多すぎると、システムスループットが低下する可能性があることに注意してください。ビジネス要件に基づいてチェックポイントを記録することをお勧めします。

カスタムクライアントタグ

clientTag

トンネルクライアントIDの生成に使用されるカスタムクライアントタグ。このパラメータを設定して、TunnelWorker を区別できます。

データ処理用のカスタムコールバックを指定します

channelProcessor

ユーザーが登録した、処理メソッドとシャットダウンメソッドを含むデータ処理用のコールバックです。

データの読み取りと処理を行うスレッドプールの構成

レコード読み取りエグゼキューター

ハートビートの間隔とタイムアウト期間

processRecordsExecutor

データ処理に使用するスレッドプール。特別な要件がない場合は、デフォルト設定を使用してください。

説明
  • スレッドプールの設定を指定する場合、スレッド数をトンネルのチャネル数に設定することをお勧めします。これにより、CPU などの計算リソースを各チャネルに迅速に割り当てることができます。

  • Tablestore は、スループットを確保するために、プールのデフォルト設定で次の操作を実行します。

    • 少量のデータまたは少数のチャネルが存在する場合にリアルタイムのスループットを確保するために、32 個のコアスレッドを事前に割り当てます。

    • 大量のデータを処理する必要がある場合、または多数のチャネルが存在する場合に、キューの長さを短縮します。これにより、プールにスレッドを作成し、より多くの計算リソースを割り当てるポリシーがトリガーされます。

    • スレッドのキープアライブ時間を 60 秒に設定することをお勧めします。処理するデータ量が減少した場合、スレッドリソースをリサイクルできます。

メモリ制御

heartbeatIntervalInSec

メモリ制御のためにデータを読み取って処理するチャネルの最大同時実行レベル。

ハートビートの間隔。デフォルト値:30。最小値:5。単位:秒。

説明

Java V5.10.0以降のTablestore SDKでは、この機能がサポートされています。

最大バックオフ時間

最大再試行間隔(ミリ秒)

トンネルの最大バックオフ時間を計算するための基準値。最大バックオフ時間は、0.75 × maxRetryIntervalInMillis から 1.25 × maxRetryIntervalInMillis までの範囲の乱数です。

デフォルト値: 2000。最小値: 200。単位: ミリ秒。

説明
  • Java V5.4.0以降向けのTablestore SDKでは、この機能がサポートされています。

  • 処理するデータ量がエクスポートあたり900 KB未満または500個未満の場合、トンネルクライアントは最大バックオフ時間に達するまで指数バックオフを使用します。

CLOSING チャネルの検出

enableClosingChannelDetect

CLOSING チャネルのリアルタイム検出を有効にするかどうかを指定します。デフォルト値: false。CLOSING チャネルのリアルタイム検出が無効になっていることを指定します。

説明
  • Tablestore SDK for Java V5.13.13 以降はこの機能をサポートしています。

  • この機能を有効にしない場合、多数のチャネルが存在する一方でクライアントリソースが不足している場合など、特定のシナリオでチャネルが一時停止され、消費が中断される可能性があります。

  • CLOSING チャネル: あるトンネルクライアントから別のトンネルクライアントに切り替えられているチャネル。

付録: サンプルコード

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // NextToken パラメーターは、トンネルクライアントがデータのページ分割に使用するものです。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // データの消費と処理をシミュレートします。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1. トンネルクライアントを初期化します。
        final String endPoint = "";
        final String accessKeyId = System.getenv("OTS_AK_ENV");
        final String accessKeySecret = System.getenv("OTS_SK_ENV");
        final String instanceName = "";
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2. トンネルを作成します。この手順を実行する前に、テスト用のテーブルを作成する必要があります。Tablestore コンソールまたは SyncClient の createTable メソッドを使用してテーブルを作成できます。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        // tunnelId パラメーターを使用して TunnelWorker を初期化します。ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3. 自動データ消費を開始するためのカスタムコールバックを定義します。
        // TunnelWorkerConfig は、より高度なパラメーターを提供します。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}