トンネルサービスを使用すると、テーブル内のデータを使用できます。このトピックでは、Tablestore SDK for Java を使用してトンネルサービスを開始する方法について説明します。
使用上の注意
デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを開始します。単一サーバーで複数の TunnelWorker を開始する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。
TunnelWorker は初期化のためにウォームアップ期間が必要です。これは、TunnelWorkerConfig の heartbeatIntervalInSec パラメーターで指定されます。このパラメーターを構成するには、TunnelWorkerConfig の setHeartbeatIntervalInSec メソッドを使用します。デフォルト値:30。単位:秒。
予期しない終了または手動終了により TunnelWorker クライアントがシャットダウンすると、TunnelWorker は次のいずれかの方法を使用してリソースを自動的にリサイクルします。スレッドプールを解放し、Channel クラスに登録した shutdown メソッドを自動的に呼び出し、トンネルをシャットダウンします。
トンネル内の増分ログの保持期間は、Stream ログの保持期間と同じです。 Stream ログは最大 7 日間保持できます。したがって、トンネル内の増分ログは最大 7 日間保持できます。
差分データまたは増分データを使用するためにトンネルを作成する場合は、次の点に注意してください。
完全データ消費中に、トンネルが増分ログの保持期間(最大 7 日間)内に完全データの消費を完了できない場合、トンネルが増分ログの消費を開始するときに
OTSTunnelExpiredエラーが発生します。その結果、トンネルは増分ログを使用できません。指定されたタイムウィンドウ内でトンネルが完全データ消費を完了できないと推定される場合は、Tablestore テクニカルサポートに連絡してください。
増分データ消費中に、トンネルが増分ログの保持期間(最大 7 日間)内に増分ログの消費を完了できない場合、トンネルは使用可能な最新のデータからデータを使用する場合があります。この場合、特定のデータは使用されない可能性があります。
トンネルの有効期限が切れると、Tablestore はトンネルを無効にする場合があります。トンネルが無効状態のままで 30 日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。
前提条件
以下の操作は、Resource Access Management(RAM)コンソールで実行されます。
RAM ユーザーが作成され、
AliyunOTSFullAccessポリシーが RAM ユーザーにアタッチされ、RAM ユーザーに Tablestore を管理する権限が付与されます。詳細については、「RAM ユーザーの作成」および「RAM ユーザーへの権限付与」をご参照ください。説明実際のビジネス環境では、最小権限の原則に基づいて、必要な権限のみを RAM ユーザーに付与することをお勧めします。これは、過剰なユーザー権限によって引き起こされるセキュリティリスクを防ぐのに役立ちます。
RAM ユーザーの AccessKey ペアが作成されます。詳細については、「AccessKey ペアを作成する」をご参照ください。
警告Alibaba Cloud アカウントの AccessKey ペアが漏洩した場合、アカウント内のすべてのリソースが潜在的なリスクにさらされます。 RAM ユーザーの AccessKey ペアを使用して操作を実行することをお勧めします。これにより、Alibaba Cloud アカウントの AccessKey ペアの漏洩を防ぎます。
以下の操作は、Tablestore コンソールで実行されます。
データテーブルが作成されます。詳細については、「Tablestore コンソールを使用する」、「Tablestore CLI を使用する」、および「Tablestore SDK を使用する」をご参照ください。
データテーブルが存在するインスタンスのエンドポイントが取得されます。詳細については、「Tablestore クライアントを初期化する」をご参照ください。
アクセス認証情報が構成されます。詳細については、「アクセス認証情報を構成する」をご参照ください。
トンネルサービスを開始する
Tablestore SDK for Java を使用して、トンネルサービスを開始します。
トンネルクライアントを初期化します。
説明TABLESTORE_ACCESS_KEY_IDおよびTABLESTORE_ACCESS_KEY_SECRET環境変数が構成されていることを確認します。 TABLESTORE_ACCESS_KEY_ID 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。 TABLESTORE_ACCESS_KEY_SECRET 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。// Tablestore インスタンスの名前を指定します。 // Tablestore インスタンスのエンドポイントを指定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。 // Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレットを指定します。 final String instanceName = "yourInstanceName"; final String endPoint = "yourEndpoint"; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);トンネルを作成します。
トンネルを作成する前に、テスト用のデータテーブルを作成するか、既存のテーブルを準備します。 Tablestore コンソールで、または SyncClient の createTable メソッドを使用して、テーブルを作成できます。
重要Stream または BaseAndStream トンネルを作成する場合は、次のルールに従ってタイムスタンプを指定します。
増分データの開始タイムスタンプを指定しない場合、開始タイムスタンプはトンネルが作成された時刻です。
増分データの開始タイムスタンプと終了タイムスタンプを指定する場合、開始タイムスタンプまたは終了タイムスタンプは、ミリ秒単位で
[現在のシステム時刻 - Stream の有効期間 + 5 分、現在のシステム時刻]の範囲内である必要があります。Stream の有効期間とは、ミリ秒単位の増分ログの有効期間を指します。 Stream の有効期間の最大値は 7 日間です。データテーブルの Stream を有効にするときに、Stream の有効期間を指定できます。 Stream の有効期間を指定した後、期間を変更することはできません。
終了タイムスタンプは開始タイムスタンプよりも大きくする必要があります。
// 3 つのタイプのトンネルを作成できます:TunnelType.BaseData、TunnelType.Stream、および TunnelType.BaseAndStream。 // 次のコードは、BaseAndStream トンネルを作成する方法の例を示しています。別のタイプのトンネルを作成するには、CreateTunnelRequest の TunnelType を目的のタイプに設定します。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); // tunnelId を使用して TunnelWorker を初期化します。 ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得します。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);カスタムデータ消費コールバックを指定して、自動データ消費を開始します。次の表に、TunnelClient の構成を示します。
// データ消費のコールバックを指定して、process メソッドと shutdown メソッドを指定する IChannelProcessor 操作を呼び出します。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { // ProcessRecordsInput パラメーターには、取得したデータが含まれています。 System.out.println("Default record processor, would print records count"); System.out.println( // NextToken パラメーターは、トンネルクライアントがデータをページ分割するために使用されます。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // データの消費と処理をシミュレートします。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } // デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを開始します。単一サーバーを使用する場合、複数の TunnelWorker が開始されます。 // 同じ TunnelWorkerConfig を使用して TunnelWorker を構成することをお勧めします。 TunnelWorkerConfig は、より高度なパラメーターを提供します。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // TunnelWorker を構成し、自動データ処理を開始します。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
TunnelWorkerConfig を構成する
TunnelWorkerConfig を使用すると、ビジネス要件に基づいてトンネルクライアントのカスタムパラメーターを構成できます。次の表に、Tablestore SDK for Java のパラメーターを示します。
構成 | パラメーター | 説明 |
ハートビートの間隔とタイムアウト期間 | heartbeatTimeoutInSec | ハートビートのタイムアウト期間。デフォルト値:300。単位:秒。 ハートビートタイムアウトが発生すると、トンネルサーバーは現在の TunnelClient インスタンスが使用できないと見なします。この場合、トンネルクライアントはトンネルサーバーに再接続する必要があります。 |
heartbeatIntervalInSec | ハートビートの間隔。デフォルト値:30。最小値:5。単位:秒。 ハートビートを検出して、アクティブなチャネルを監視し、チャネルのステータスを更新し、データ処理タスクを自動的に初期化できます。 | |
チェックポイント間の間隔 | checkpointIntervalInMillis | データが消費されるときのチェックポイント間の間隔。間隔はトンネルサーバーに記録されます。 デフォルト値:5000。単位:ミリ秒。 説明
|
カスタムクライアントタグ | clientTag | トンネルクライアント ID を生成するために使用されるカスタムクライアントタグ。このパラメーターを構成して、TunnelWorker を区別できます。 |
データ処理のカスタムコールバックを指定する | channelProcessor | process メソッドと shutdown メソッドを含む、データを処理するためにユーザーが登録するコールバック。 |
データを読み取り、処理するためのスレッドプールの構成 | readRecordsExecutor | データの読み取りに使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。 |
processRecordsExecutor | データの処理に使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。 説明
| |
メモリ制御 | maxChannelParallel | メモリ制御のためにデータを読み取り、処理するためのチャネルの最大同時実行レベル。 デフォルト値は -1 で、同時実行レベルが無制限であることを指定します。 説明 Tablestore SDK for Java V5.10.0 以降では、この機能がサポートされています。 |
最大バックオフ時間 | maxRetryIntervalInMillis | トンネルの最大バックオフ時間を計算するための基準値。最大バックオフ時間は、0.75 × maxRetryIntervalInMillis から 1.25 × maxRetryIntervalInMillis の範囲の乱数です。 デフォルト値:2000。最小値:200。単位:ミリ秒。 説明
|
CLOSING チャネル検出 | enableClosingChannelDetect | CLOSING チャネルのリアルタイム検出を有効にするかどうかを指定します。デフォルト値:false。CLOSING チャネルのリアルタイム検出が無効になっていることを指定します。 説明
|
付録:サンプルコード
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
// NextToken パラメーターは、トンネルクライアントがデータをページ分割するために使用されます。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
// データの消費と処理をシミュレートします。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1. トンネルクライアントを初期化します。
// インスタンスの名前を指定します。
final String instanceName = "yourInstanceName";
// インスタンスのエンドポイントを指定します。
final String endPoint = "yourEndpoint";
// 環境変数から AccessKey ID と AccessKey シークレットを取得します。
final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2. トンネルを作成します。この手順を実行する前に、テスト用のテーブルを作成する必要があります。 Tablestore コンソールで、または SyncClient の createTable メソッドを使用して、テーブルを作成できます。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
// tunnelId パラメーターを使用して TunnelWorker を初期化します。 ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3. カスタムコールバック関数を指定して、自動データ消費を開始します。
// TunnelWorkerConfig は、より高度なパラメーターを提供します。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}