Tunnel Serviceを使用すると、テーブル内のデータを消費できます。このトピックでは、Java向けTablestore SDKを使用してTunnel Serviceを始める方法について説明します。
使用上の注意
デフォルトでは、システムはTunnelWorkerConfigに基づいてデータを読み取りおよび処理するためのスレッドプールを開始します。単一のサーバーで複数のTunnelWorkerを開始する場合は、すべてのTunnelWorkerを構成するために同じTunnelWorkerConfigを使用することをお勧めします。
TunnelWorkerは、初期化のためにウォームアップ期間が必要です。これは、TunnelWorkerConfigのheartbeatIntervalInSecパラメータで指定されます。このパラメータを構成するには、TunnelWorkerConfigのsetHeartbeatIntervalInSecメソッドを使用します。デフォルト値:30。単位:秒。
予期しない終了または手動による終了によりTunnelWorkerクライアントがシャットダウンした場合、TunnelWorkerは次のいずれかの方法を使用してリソースを自動的にリサイクルします:スレッドプールの解放、Channelクラスに登録したshutdownメソッドの自動呼び出し、およびトンネルのシャットダウン。
トンネル内の増分ログの保持期間は、Streamログの保持期間と同じです。Streamログは最大7日間保持できます。したがって、トンネル内の増分ログは最大7日間保持できます。
差分データまたは増分データを消費するためにトンネルを作成する場合は、次の項目に注意してください。
フルデータ消費中に、トンネルが増分ログの保持期間(最大7日間)内にフルデータの消費を完了できない場合、トンネルが増分ログの消費を開始するときに
OTSTunnelExpired
エラーが発生します。その結果、トンネルは増分ログを消費できません。トンネルが指定された時間枠内にフルデータ消費を完了できないと推定される場合は、Tablestoreテクニカルサポートにご連絡ください。
増分データ消費中に、トンネルが増分ログの保持期間(最大7日間)内に増分ログの消費を完了できない場合、トンネルは利用可能な最新のデータからデータを読み取る可能性があります。この場合、特定のデータが消費されない可能性があります。
トンネルが期限切れになると、Tablestoreはトンネルを無効にする場合があります。トンネルが無効状態のまま30日以上経過すると、トンネルは削除されます。削除されたトンネルは復元できません。
前提条件
以下の操作は、Resource Access Management(RAM)コンソールで実行されます。
RAMユーザーが作成され、
AliyunOTSFullAccess
ポリシーがRAMユーザーにアタッチされ、RAMユーザーにTablestoreを管理する権限が付与されます。詳細については、RAMユーザーの作成およびRAMユーザーへの権限の付与を参照してください。説明実際のビジネス環境では、最小限の権限の原則に基づいて、RAMユーザーに必要な権限のみを付与するカスタムポリシーを作成することをお勧めします。これにより、過剰なユーザー権限によって引き起こされるセキュリティリスクを防ぎます。
RAMユーザーのAccessKeyペアが作成されます。詳細については、AccessKeyペアの作成を参照してください。
警告Alibaba CloudアカウントのAccessKeyペアが漏洩した場合、アカウント内のすべてのリソースが潜在的なリスクにさらされます。RAMユーザーのAccessKeyペアを使用して操作を実行することをお勧めします。これにより、Alibaba CloudアカウントのAccessKeyペアの漏洩を防ぎます。
以下の操作は、Tablestoreコンソールで実行されます。
データテーブルが作成されます。詳細については、Tablestoreコンソールの使用、Tablestore CLIの使用、およびTablestore SDKの使用を参照してください。
データテーブルが存在するインスタンスのエンドポイントが取得されます。詳細については、Tablestoreインスタンスのエンドポイントの取得を参照してください。
アクセス認証情報が構成されます。詳細については、アクセス認証情報の構成を参照してください。
Tunnel Serviceの使用
Java向けTablestore SDKを使用して、Tunnel Serviceを始めましょう。
TunnelClientインスタンスを初期化します。
説明TABLESTORE_ACCESS_KEY_ID
およびTABLESTORE_ACCESS_KEY_SECRET
環境変数が構成されていることを確認してください。TABLESTORE_ACCESS_KEY_ID環境変数は、Alibaba CloudアカウントまたはRAMユーザーのAccessKey IDを指定します。TABLESTORE_ACCESS_KEY_SECRET環境変数は、Alibaba CloudアカウントまたはRAMユーザーのAccessKeyシークレットを指定します。// endPointパラメータをTablestoreインスタンスのエンドポイントに設定します。例:https://instance.cn-hangzhou.ots.aliyuncs.com。 // accessKeyIdパラメータをAccessKey IDに設定し、accessKeySecretパラメータをTablestoreにアクセスするために使用するAccessKeyシークレットに設定します。 // instanceNameパラメータをインスタンスの名前に設定します。 final String endPoint = ""; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); final String instanceName = ""; TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
トンネルを作成します。
トンネルを作成する前に、テスト用のデータテーブルを作成するか、既存のテーブルを準備します。Tablestoreコンソールで、またはSyncClientのcreateTableメソッドを使用してテーブルを作成できます。
// TunnelType.BaseData、TunnelType.Stream、およびTunnelType.BaseAndStreamの3種類のトンネルを作成できます。 // 次のコードは、BaseAndStreamトンネルを作成する方法の例を示しています。別のタイプのトンネルを作成するには、CreateTunnelRequestのTunnelTypeを目的のタイプに設定します。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); // tunnelIdパラメータを使用してTunnelWorkerを初期化します。ListTunnelまたはDescribeTunnel操作を呼び出して、トンネルIDを取得できます。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);
カスタムデータ消費コールバック関数を指定して、自動データ消費を開始します。次の表に、TunnelClientインスタンスの構成を示します。
// データ消費のコールバック関数を指定して、processメソッドとshutdownメソッドを指定するIChannelProcessor操作を呼び出します。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { // ProcessRecordsInputパラメータには、取得したデータが含まれています。 System.out.println("Default record processor, would print records count"); System.out.println( // NextTokenパラメータは、トンネルクライアントがデータをページ分割するために使用されます。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // データの消費と処理をシミュレートします。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } // デフォルトでは、システムはTunnelWorkerConfigに基づいてデータを読み取りおよび処理するためのスレッドプールを開始します。単一のサーバーを使用する場合、複数のTunnelWorkerが開始されます。 // 同じTunnelWorkerConfigを使用してTunnelWorkerを構成することをお勧めします。TunnelWorkerConfigは、より高度なパラメータを提供します。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // TunnelWorkerを構成し、自動データ処理を開始します。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
TunnelWorkerConfigの構成
TunnelWorkerConfigを使用すると、ビジネス要件に基づいてトンネルクライアントのカスタムパラメータを構成できます。次の表に、Java向けTablestore SDKのパラメータを示します。
構成 | パラメータ | 説明 |
ハートビートの間隔とタイムアウト期間 | heartbeatTimeoutInSec | ハートビートのタイムアウト期間。デフォルト値: 300。単位: 秒。 ハートビートのタイムアウトが発生すると、トンネルサーバーは現在の TunnelClient インスタンスが使用不可であると見なします。この場合、トンネルクライアントはトンネルサーバーに再接続する必要があります。 |
heartbeatIntervalInSec | ハートビートの間隔です。 既定値: 30。最小値: 5。単位: 秒。 ハートビートを検出して、アクティブなチャネルを監視し、チャネルのステータスを更新し、データ処理タスクを自動的に初期化できます。 | |
チェックポイントの間隔 | checkpointIntervalInMillis | データ消費時のチェックポイントの間隔。間隔はトンネルサーバーに記録されます。 デフォルト値: 5000。単位: ミリ秒。 説明
|
カスタムクライアントタグ | clientTag | トンネルクライアントIDの生成に使用されるカスタムクライアントタグ。このパラメータを設定して、TunnelWorker を区別できます。 |
データ処理用のカスタムコールバックを指定します | channelProcessor | ユーザーが登録した、処理メソッドとシャットダウンメソッドを含むデータ処理用のコールバックです。 |
データの読み取りと処理を行うスレッドプールの構成 | レコード読み取りエグゼキューター | ハートビートの間隔とタイムアウト期間 |
processRecordsExecutor | データ処理に使用するスレッドプール。特別な要件がない場合は、デフォルト設定を使用してください。 説明
| |
メモリ制御 | heartbeatIntervalInSec | メモリ制御のためにデータを読み取って処理するチャネルの最大同時実行レベル。 ハートビートの間隔。デフォルト値:30。最小値:5。単位:秒。 説明 Java V5.10.0以降のTablestore SDKでは、この機能がサポートされています。 |
最大バックオフ時間 | 最大再試行間隔(ミリ秒) | トンネルの最大バックオフ時間を計算するための基準値。最大バックオフ時間は、0.75 × maxRetryIntervalInMillis から 1.25 × maxRetryIntervalInMillis までの範囲の乱数です。 デフォルト値: 2000。最小値: 200。単位: ミリ秒。 説明
|
CLOSING チャネルの検出 | enableClosingChannelDetect | CLOSING チャネルのリアルタイム検出を有効にするかどうかを指定します。デフォルト値: false。CLOSING チャネルのリアルタイム検出が無効になっていることを指定します。 説明
|
付録: サンプルコード
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
// NextToken パラメーターは、トンネルクライアントがデータのページ分割に使用するものです。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
// データの消費と処理をシミュレートします。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1. トンネルクライアントを初期化します。
final String endPoint = "";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "";
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2. トンネルを作成します。この手順を実行する前に、テスト用のテーブルを作成する必要があります。Tablestore コンソールまたは SyncClient の createTable メソッドを使用してテーブルを作成できます。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
// tunnelId パラメーターを使用して TunnelWorker を初期化します。ListTunnel または DescribeTunnel 操作を呼び出して、トンネル ID を取得できます。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3. 自動データ消費を開始するためのカスタムコールバックを定義します。
// TunnelWorkerConfig は、より高度なパラメーターを提供します。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}