このトピックでは、Tablestore SDK を使用してトンネルサービスを開始する方法について説明します。トンネルサービスを使用する前に、使用上の注意と API 操作をよく理解しておいてください。
使用上の注意
デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。
TunnelWorker は初期化のためにウォームアップ期間を必要とします。これは、TunnelWorkerConfig の heartbeatIntervalInSec パラメータで指定されます。このパラメータを構成するには、TunnelWorkerConfig の setHeartbeatIntervalInSec メソッドを使用します。デフォルト値:30。単位:秒。
予期しない終了または手動による終了により TunnelWorker クライアントがシャットダウンした場合、TunnelWorker は次のいずれかの方法を使用してリソースを自動的にリサイクルします。スレッドプールを解放し、Channel クラスに登録した shutdown メソッドを自動的に呼び出し、トンネルをシャットダウンします。
トンネル内の増分ログの保持期間は、Stream ログの保持期間と同じです。Stream ログは最大 7 日間保持できます。したがって、トンネル内の増分ログは最大 7 日間保持できます。
差分データまたは増分データを消費するためにトンネルを作成する場合は、次の点に注意してください。
フルデータ消費中に、トンネルが増分ログの保持期間(最大 7 日間)内にフルデータの消費を完了できない場合、トンネルが増分ログの消費を開始するときに
OTSTunnelExpired
エラーが発生します。その結果、トンネルは増分ログを消費できません。指定された時間枠内にトンネルがフルデータ消費を完了できないと推定される場合は、Tablestore テクニカルサポートにご連絡ください。
増分データ消費中に、トンネルが増分ログの保持期間(最大 7 日間)内に増分ログの消費を完了できない場合、トンネルは利用可能な最新のデータからデータを読み取る可能性があります。この場合、特定のデータが消費されない可能性があります。
トンネルの有効期限が切れると、Tablestore はトンネルを無効にする場合があります。トンネルが無効状態のまま 30 日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。
API 操作
操作 | 説明 |
CreateTunnel | トンネルを作成します。 |
ListTunnel | データテーブルに作成されたトンネルに関する情報をクエリします。 |
DescribeTunnel | トンネル内のチャネルに関する情報をクエリします。 |
DeleteTunnel | トンネルを削除します。 |
Tablestore SDK の使用
トンネルサービスを実装するには、次のプログラミング言語用の Tablestore SDK を使用できます。
前提条件
以下の操作は、Resource Access Management (RAM) コンソールで実行されます。
RAM ユーザーが作成され、
AliyunOTSFullAccess
ポリシーが RAM ユーザーにアタッチされ、RAM ユーザーに Tablestore を管理する権限が付与されます。詳細については、RAM ユーザーの作成とRAM ユーザーへの権限の付与を参照してください。説明実際のビジネス環境では、最小限の権限の原則に基づいて、必要な権限のみを RAM ユーザーに付与することをお勧めします。これは、過剰なユーザー権限によって引き起こされるセキュリティリスクを防ぐのに役立ちます。
RAM ユーザーの AccessKey ペアが作成されます。詳細については、AccessKey ペアの作成を参照してください。
警告Alibaba Cloud アカウントの AccessKey ペアが漏洩した場合、アカウント内のすべてのリソースが潜在的なリスクにさらされます。操作を実行するには、RAM ユーザーの AccessKey ペアを使用することをお勧めします。これは、Alibaba Cloud アカウントの AccessKey ペアの漏洩を防ぎます。
以下の操作は、Tablestore コンソールで実行されます。
データテーブルが作成されます。詳細については、データテーブルの操作、データテーブルの操作、およびTablestore SDK を使用してデータテーブルを作成するを参照してください。
Tablestore インスタンスのエンドポイントが取得されます。詳細については、Tablestore インスタンスのエンドポイントを取得するを参照してください。
アクセス認証情報が構成されます。詳細については、アクセス認証情報を構成するを参照してください。
トンネルサービスの使用
この例では、Tablestore SDK for Java を使用してトンネルサービスを開始します。
TunnelClient インスタンスを初期化します。
説明TABLESTORE_ACCESS_KEY_ID
およびTABLESTORE_ACCESS_KEY_SECRET
環境変数が構成されていることを確認してください。TABLESTORE_ACCESS_KEY_ID 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID を指定します。TABLESTORE_ACCESS_KEY_SECRET 環境変数は、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。// endPoint パラメータを Tablestore インスタンスのエンドポイントに設定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。 // accessKeyId パラメータを AccessKey ID に、accessKeySecret パラメータを Tablestore にアクセスするために使用する AccessKey シークレットに設定します。 // instanceName パラメータをインスタンスの名前に設定します。 final String endPoint = ""; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); final String instanceName = ""; TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
トンネルを作成します。
トンネルを作成する前に、テスト用のデータテーブルを作成するか、既存のテーブルを準備します。Tablestore コンソールで、または SyncClient の createTable メソッドを使用してテーブルを作成できます。
重要増分トンネルまたは差分トンネルを作成する場合は、タイムスタンプを指定するために次のルールに従う必要があります。
増分データの開始タイムスタンプを指定しない場合、トンネルが作成された時刻が開始タイムスタンプとして使用されます。
増分データの開始タイムスタンプと終了タイムスタンプを指定する場合、有効な値は
[現在のシステム時刻 - Stream の有効期間 + 5 分、現在のシステム時刻]
の範囲内である必要があります。単位:ミリ秒。Stream の有効期間とは、増分ログの有効期間(ミリ秒単位)を指します。Stream の有効期間の最大値は 7 日間です。データテーブルの Stream を有効にするときに、Stream の有効期間を指定できます。Stream の有効期間を指定した後、期間を変更することはできません。
終了タイムスタンプは開始タイムスタンプよりも後である必要があります。
// 次のタイプのトンネルがサポートされています:TunnelType.BaseData、TunnelType.Stream、および TunnelType.BaseAndStream。 // 次のサンプルコードは、BaseAndStream トンネルを作成する方法の例を示しています。別のタイプのトンネルを作成するには、ビジネス要件に基づいて CreateTunnelRequest の TunnelType パラメータを構成します。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); // tunnelId パラメータを使用して TunnelWorker を初期化します。ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);
カスタムデータ消費コールバックを指定して、自動データ消費を開始します。
// データ消費のコールバックを指定して、process メソッドと shutdown メソッドを指定する IChannelProcessor 操作を呼び出します。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { // ProcessRecordsInput パラメータには、取得したデータが含まれています。 System.out.println("Default record processor, would print records count"); System.out.println( // NextToken パラメータは、トンネルクライアントがデータをページ分割するために使用されます。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // データの消費と処理をシミュレートします。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } // デフォルトでは、システムは TunnelWorkerConfig に基づいてデータを読み取り、処理するためのスレッドプールを起動します。 // 単一のサーバーで複数の TunnelWorker を起動する場合は、すべての TunnelWorker を構成するために同じ TunnelWorkerConfig を使用することをお勧めします。TunnelWorkerConfig は、より高度なパラメータを提供します。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // TunnelWorker を構成し、自動データ処理を開始します。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
TunnelWorkerConfig の構成
TunnelWorkerConfig を使用すると、ビジネス要件に基づいてトンネルクライアントのカスタムパラメータを構成できます。次の表に、Tablestore SDK for Java のパラメータを示します。
構成 | パラメータ | 説明 |
ハートビートの間隔とタイムアウト期間 | heartbeatTimeoutInSec | ハートビートのタイムアウト期間。デフォルト値:300。単位:秒。 ハートビートタイムアウトが発生した場合、トンネルサーバーは現在の TunnelClient インスタンスが使用不可であると見なします。この場合、トンネルクライアントはトンネルサーバーに再接続する必要があります。 |
heartbeatIntervalInSec | ハートビートの間隔。デフォルト値:30。最小値:5。単位:秒。 ハートビートを検出して、アクティブなチャネルを監視し、チャネルのステータスを更新し、データ処理タスクを自動的に初期化できます。 | |
チェックポイント間の間隔 | checkpointIntervalInMillis | データが消費されるときのチェックポイント間の間隔。間隔はトンネルサーバーに記録されます。 デフォルト値:5000。単位:ミリ秒。 説明
|
カスタムクライアントタグ | clientTag | トンネルクライアント ID を生成するために使用されるカスタムクライアントタグ。このパラメータを構成して、TunnelWorker を区別できます。 |
データ処理のカスタムコールバックを指定する | channelProcessor | ユーザーがデータを処理するために登録するコールバック。process メソッドと shutdown メソッドが含まれます。 |
データを読み取り、処理するためのスレッドプールの構成 | readRecordsExecutor | データを読み取るために使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。 |
processRecordsExecutor | データを処理するために使用するスレッドプール。特別な要件がない場合は、デフォルト構成を使用します。 説明
| |
メモリ制御 | maxChannelParallel | メモリ制御のためにデータを読み取り、処理するためのチャネルの最大同時実行レベル。 デフォルト値は -1 で、同時実行レベルが無制限であることを指定します。 説明 Tablestore SDK for Java V5.10.0 以降でこの機能がサポートされています。 |
最大バックオフ時間 | maxRetryIntervalInMillis | トンネルの最大バックオフ時間を計算するための基準値。最大バックオフ時間は、0.75 × maxRetryIntervalInMillis から 1.25 × maxRetryIntervalInMillis の範囲の乱数です。 デフォルト値:2000。最小値:200。単位:ミリ秒。 説明
|
CLOSING チャネル検出 | enableClosingChannelDetect | CLOSING チャネルのリアルタイム検出を有効にするかどうかを指定します。デフォルト値:false。CLOSING チャネルのリアルタイム検出が無効になっていることを指定します。 説明
|
付録:サンプルコード
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
// レコードプロセッサのデフォルト実装。レコード数を表示します。
System.out.println("Default record processor, would print records count");
System.out.println(
// NextToken パラメータは、トンネルクライアントがデータをページ分割するために使用されます。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
// データの消費と処理をシミュレートします。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1. トンネルクライアントを初期化します。
final String endPoint = "";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "";
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2. トンネルを作成します。この手順を実行する前に、テスト用のテーブルを作成する必要があります。Tablestore コンソールで、または SyncClient の createTable メソッドを使用してテーブルを作成できます。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
// tunnelId パラメータを使用して TunnelWorker を初期化します。ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3. カスタムコールバックを定義して、自動データ消費を開始します。
// TunnelWorkerConfig は、より高度なパラメータを提供します。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}