Tunnel Service allows you to consume the data of a table. This topic describes how to get started with Tunnel Service by using Tablestore SDK for Java.
Usage notes
By default, the system starts a thread pool to read and process data based on TunnelWorkerConfig. If you want to start multiple TunnelWorkers on a single server, we recommend that you configure the TunnelWorkers to share the same TunnelWorkerConfig.
If you create a differential tunnel to consume full and incremental data, the incremental logs of the tunnel are retained for up to seven days. The specific expiration time of incremental logs is the same with that of the logs in the streams for a data table. If the tunnel does not consume full data within seven days, an OTSTunnelExpired error occurs when the tunnel starts to consume incremental data. As a result, the tunnel cannot consume incremental data. If you estimate that the tunnel cannot consume full data within seven days, contact Tablestore technical support.
TunnelWorker requires time to warm up for initialization, which is determined by the heartbeatIntervalInSec parameter in TunnelWorkerConfig. You can use the setHeartbeatIntervalInSec method in TunnelWorkerConfig to set this parameter. Default value: 30. Minimum value: 5. Unit: seconds.
When the mode switches from the full channel to the incremental channel, the full channel is closed and the incremental channel is started. This process requires another period of time for initialization, which is also specified by the heartbeatIntervalInSec parameter.
When the TunnelWorker client is shut down due to an unexpected exit or manual termination, TunnelWorker uses one of the following methods to automatically recycle resources: Release the thread pool, automatically use the shutdown method that you have registered for the Channel class, and shut down the tunnel.
Prerequisites
The endpoint of the instance that you want to use is obtained. For more information, see the "Obtain the endpoint of an instance" section of the Initialize a client topic.
An AccessKey pair is configured. For more information, see the "Configure an AccessKey pair" section of the Initialize a client topic.
The AccessKey pair is configured in environment variables. For more information, see the "Configure environment variables" section of the Initialize a client topic.
Tablestore uses the OTS_AK_ENV and OTS_SK_ENV envrionment variables to store an AccessKey pair. The OTS_AK_ENV environment variable stores the AccessKey ID of an Alibaba Cloud account or a Resource Access Management (RAM) user. The OTS_SK_ENV environment variable stores the AccessKey secret of an Alibaba Cloud account or a RAM user. Configure the environment variables based on the AccessKey pair that you want to use.
A data table is created. For more information, see Create a data table.
Use Tunnel Service
This section describes how to use Tablestore SDK for Java to get started with Tunnel Service.
Initialize a TunnelClient.
// Set the endPoint parameter to the endpoint of the Tablestore instance. Example: https://instance.cn-hangzhou.ots.aliyuncs.com. // Set the accessKeyId and accessKeySecret parameters to the AccessKey ID and AccessKey Secret that you use to access Tablestore. // Set the instanceName parameter to the name of the instance. final String endPoint = ""; final String accessKeyId = System.getenv("OTS_AK_ENV"); final String accessKeySecret = System.getenv("OTS_SK_ENV"); final String instanceName = ""; TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
Create a tunnel.
Before you create a tunnel, create a data table for testing or prepare an existing table. You can create a table by using the createTable method of a SyncClient or in the Tablestore console.
// The following types of tunnels are supported: TunnelType.BaseData, TunnelType.Stream, and TunnelType.BaseAndStream. // The following sample code provides an example on how to create a BaseAndStream tunnel. To create a tunnel of a different type, configure the TunnelType parameter in CreateTunnelRequest based on your requirements. final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); // Use the tunnelId parameter to initialize a TunnelWorker. You can call the ListTunnel or DescribeTunnel operation to obtain the tunnel ID. String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);
Define a custom callback function to start automatic data consumption. For more information about the configurations of a TunnelClient, see the following section.
// Define a custom callback function for data consumption by implementing the process and shutdown methods of the IChannelProcessor interface. private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { // The ProcessRecordsInput parameter contains the data that you obtain. System.out.println("Default record processor, would print records count"); System.out.println( // The NextToken parameter is used by the Tunnel client to paginate data. String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // Simulate the processing of data consumption. Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } // By default, the system starts a thread pool to read and process data based on TunnelWorkerConfig. If you use a single server, multiple TunnelWorkers are started. // We recommend that you configure TunnelWorkers by using the same TunnelWorkerConfig. TunnelWorkerConfig provides more advanced parameters. TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); // Configure TunnelWorkers and start automatic data processing. TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
Configure TunnelWorkerConfig
TunnelWorkerConfig allows you to customize parameters for a TunnelClient instance based on your requirements. The following table describes the parameters.
Item | Parameter | Description |
the interval and time-out period of heartbeats | heartbeatTimeoutInSec | The time-out period to receive heartbeats. Default value: 300. Unit: seconds. When a heartbeat timeout occurs, the tunnel server considers that the current TunnelClient instance is unavailable. The tunnel client need to reconnect to the tunnel server. |
heartbeatIntervalInSec | The interval to detect heartbeats. The time used to detect active channels, update the status of channels, and automatically initialize data processing tasks. Default value: 30. Minimum value: 5. Unit: seconds. | |
Interval between checkpoints | checkpointIntervalInMillis | The interval between checkpoints when data is consumed. The interval is recorded on the tunnel server. Default value: 5000. Unit: milliseconds. Note
|
The custom client tag | clientTag | The custom client tag that is used to generate a tunnel client ID. You can customize this parameter to distinguish between TunnelWorkers. |
The custom callback to process data | channelProcessor | The callback that you register to process data, including the process and shutdown methods. |
The configuration of the thread pools to read and process data | readRecordsExecutor | The thread pool to read data. If you do not have special requirements, use the default configuration. |
processRecordsExecutor | The thread pool to process data. If you do not have special requirements, use the default configuration. Note
| |
Memory control | maxChannelParallel | The concurrency level of channels to read and process data. Configure this parameter to control memory usage. The default value is -1, which indicates that the concurrency level is unlimited. Note This configuration applies only to Tablestore SDK for Java V5.10.0 and later. |
Maximum backoff time | maxRetryIntervalInMillis | The base value to calculate the maximum backoff time for the tunnel. The maximum backoff time is a random number that ranges from 0.75 × maxRetryIntervalInMillis to 1.25 × maxRetryIntervalInMillis. Default value: 2000. Minimum value: 200. Unit: milliseconds. Note
|
CLOSING channel detection | enableClosingChannelDetect | Specifies whether to enable real-time detection for CLOSING channels. Type: Boolean. Default value: false. The value false indicates that real-time detection for CLOSING channels is disabled. Note
|
Appendix: Complete sample code
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
// The NextToken parameter is used to by the Tunnel client to paginate data.
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
// Simulate the processing of data consumption.
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1. Initialize a Tunnel client.
final String endPoint = "";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "";
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2. Create a tunnel. You must create a table for testing before you perform this step. You can create a table by using the createTable method of a SyncClient or in the Tablestore console.
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
// Use the tunnelId parameter to initialize a TunnelWorker. You can call the ListTunnel or DescribeTunnel operation to obtain the tunnel ID.
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3. Define a custom callback function to start automatic data consumption.
// TunnelWorkerConfig provides more advanced parameters.
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}