All Products
Search
Document Center

Tablestore:Getting started

Last Updated:Dec 04, 2024

Tunnel Service allows you to consume the data in a table. This topic describes how to get started with Tunnel Service by using Tablestore SDK for Java.

Usage notes

  • By default, the system starts a thread pool to read and process data based on TunnelWorkerConfig. If you want to start multiple TunnelWorkers on a single server, we recommend that you use the same TunnelWorkerConfig to configure all TunnelWorkers.

  • The incremental logs for a tunnel are retained for a maximum of seven days. The specific storage duration of incremental logs is consistent with that of the stream logs for a data table. If you create a tunnel to consume differential or incremental data, take note of the following items:

    • During full data consumption, if the tunnel does not complete the consumption of full data within seven days, an OTSTunnelExpired error occurs when the tunnel starts to consume incremental data. As a result, the tunnel cannot consume incremental data. If you estimate that full data cannot be consumed within seven days, contact Tablestore technical support.

    • During incremental data consumption, if the tunnel has not consumed incremental data for more than seven days, the consumption starts from the latest data that can be consumed. In this case, some data may not be consumed.

      Note

      If incremental data has not been consumed for more than seven days, the incremental data expires. The specific storage duration of incremental logs is consistent with that of the stream logs for a data table. If the incremental data is expired for the specified period, which is seven days by default, Tablestore disables the tunnel. In this case, the tunnel cannot be used to consume data.

  • TunnelWorker requires time to warm up for initialization, which is determined by the heartbeatIntervalInSec parameter in TunnelWorkerConfig. You can use the setHeartbeatIntervalInSec method in TunnelWorkerConfig to configure this parameter. Default value: 30. Minimum value: 5. Unit: seconds.

  • When the mode switches from the full channel to the incremental channel, the full channel is closed and the incremental channel is started. This process requires another period of time for initialization, which is also determined by the heartbeatIntervalInSec parameter.

  • When the TunnelWorker client is shut down due to an unexpected exit or manual termination, TunnelWorker uses one of the following methods to automatically recycle resources: release the thread pool, automatically call the shutdown method that you have registered for the Channel class, or shut down the tunnel.

Prerequisites

  • The following operations are performed in the Resource Access Management (RAM) console:

    • A RAM user is created and the AliyunOTSFullAccess policy is attached to the RAM user to grant the RAM user the permissions to manage Tablestore. For more information, see Create a RAM user and Grant permissions to a RAM user.

      Note

      In an actual business environment, we recommend that you create a custom policy to grant only the required permissions to the RAM user based on the principle of least privilege. This prevents security risks caused by excessive user permissions.

    • An AccessKey pair is created for the RAM user. For more information, see Create an AccessKey pair.

      Warning

      If the AccessKey pair of your Alibaba Cloud account is leaked, all resources within the account are exposed to potential risks. We recommend that you use the AccessKey pair of a RAM user to perform operations. This prevents leakage of the AccessKey pair of your Alibaba Cloud account.

Use Tunnel Service

Use Tablestore SDK for Java to get started with Tunnel Service.

  1. Initialize a TunnelClient instance.

    Note

    Make sure that the TABLESTORE_ACCESS_KEY_ID and TABLESTORE_ACCESS_KEY_SECRET environment variables are configured. The TABLESTORE_ACCESS_KEY_ID environment variable specifies the AccessKey ID of your Alibaba Cloud account or RAM user. The TABLESTORE_ACCESS_KEY_SECRET environment variable specifies the AccessKey secret of your Alibaba Cloud account or RAM user.

    // Set the endPoint parameter to the endpoint of the Tablestore instance. Example: https://instance.cn-hangzhou.ots.aliyuncs.com. 
    // Set the accessKeyId parameter to the AccessKey ID and the accessKeySecret parameter to the AccessKey secret that you want to use to access Tablestore. 
    // Set the instanceName parameter to the name of the instance. 
    final String endPoint = "";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. Create a tunnel.

    Before you create a tunnel, create a data table for testing or prepare an existing table. You can create a table in the Tablestore console or by using the createTable method of a SyncClient.

    // You can create three types of tunnels: TunnelType.BaseData, TunnelType.Stream, and TunnelType.BaseAndStream. 
    // The following code provides an example of how to create a BaseAndStream tunnel. To create a tunnel of a different type, set TunnelType in CreateTunnelRequest to the desired type. 
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // Use the tunnelId parameter to initialize a TunnelWorker. You can call the ListTunnel or DescribeTunnel operation to obtain the tunnel ID. 
    String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. Specify a custom data consumption callback function to start automatic data consumption. The following table describes the configurations of the TunnelClient instance.

    // Specify the callback function for data consumption to call the IChannelProcessor operation, which specifies the process and shutdown methods. 
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // The ProcessRecordsInput parameter contains the data that you obtain. 
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // The NextToken parameter is used by the tunnel client to paginate data. 
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Simulate data consumption and processing. 
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // By default, the system starts a thread pool to read and process data based on TunnelWorkerConfig. If you use a single server, multiple TunnelWorkers are started. 
    // We recommend that you configure TunnelWorkers by using the same TunnelWorkerConfig. TunnelWorkerConfig provides more advanced parameters. 
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // Configure TunnelWorkers and start automatic data processing. 
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

Configure TunnelWorkerConfig

TunnelWorkerConfig allows you to customize parameters for a TunnelClient instance based on your requirements. The following table describes the parameters.

Item

Parameter

Description

Interval and timeout period of heartbeats

heartbeatTimeoutInSec

The timeout period to receive heartbeats.

Default value: 300. Unit: seconds.

When a heartbeat timeout occurs, the tunnel server considers that the current TunnelClient instance is unavailable. The tunnel client needs to reconnect to the tunnel server.

heartbeatIntervalInSec

The interval to detect heartbeats.

The time used to detect active channels, update the status of channels, and automatically initialize data processing tasks.

Default value: 30. Minimum value: 5. Unit: seconds.

Interval between checkpoints

checkpointIntervalInMillis

The interval between checkpoints when data is consumed. The interval is recorded on the tunnel server.

Default value: 5000. Unit: milliseconds.

Note
  • Because the data that you want to read is stored in different servers, various errors may occur when you run processes. For example, the server may restart due to environmental factors. Therefore, the tunnel server regularly records checkpoints after data is processed. After a task is restarted, it can continue from the previous checkpoint. In exceptional conditions, Tunnel Service may sequentially synchronize data once or multiple times. If some data is reprocessed, check the business processing logic.

  • To prevent data from being reprocessed when errors occur, record checkpoints more frequently. However, too frequent checkpoints may compromise the system throughput. We recommend that you record the checkpoints based on your business requirements.

Custom client tag

clientTag

The custom client tag that is used to generate a tunnel client ID. You can customize this parameter to distinguish between TunnelWorkers.

Custom callback to process data

channelProcessor

The callback that you register to process data, including the process and shutdown methods.

Configuration of the thread pools to read and process data

readRecordsExecutor

The thread pool to read data. If you do not have special requirements, use the default configuration.

processRecordsExecutor

The thread pool to process data. If you do not have special requirements, use the default configuration.

Note
  • When you customize the thread pools, we recommend that you set the number of threads to the number of channels in the tunnel. This way, computing resources such as CPU can be quickly allocated to each channel.

  • In the default configurations of the pools, Tablestore performs the following operations to ensure throughput:

    • Allocate 32 core threads in advance to guarantee real-time throughput if a small amount of data or a small number of channels exists.

    • Reduce the queue length if a large amount of data must be processed or a large number of channels exist. This way, the policy is triggered to create a thread in the pool and allocate more computing resources.

    • Set the default thread keep-alive time to 60s. This way, if the amount of data reduces, thread resources are recycled in time.

Memory control

maxChannelParallel

The concurrency level of channels to read and process data. Configure this parameter to control memory usage.

The default value is -1, which indicates that the concurrency level is unlimited.

Note

This configuration applies only to Tablestore SDK for Java V5.10.0 and later.

Maximum backoff time

maxRetryIntervalInMillis

The base value used to calculate the maximum backoff time for the tunnel. The maximum backoff time is a random number that ranges from 0.75 × maxRetryIntervalInMillis to 1.25 × maxRetryIntervalInMillis.

Default value: 2000. Minimum value: 200. Unit: milliseconds.

Note
  • This configuration applies only to Tablestore SDK for Java V5.4.0 and later.

  • If the amount of data is smaller than 900 KB or 500 pieces for each export, the tunnel client uses the exponential backoff method until the maximum backoff time is reached.

CLOSING channel detection

enableClosingChannelDetect

Specifies whether to enable real-time detection for CLOSING channels. Type: Boolean. Default value: false. The value false indicates that real-time detection for CLOSING channels is disabled.

Note
  • This configuration applies only to Tablestore SDK for Java V5.13.13 and later.

  • If you do not enable this feature, channels can be suspended and the consumption can be interrupted in rare scenarios, such as a large number of channels exist but client resources are insufficient.

  • CLOSING channel: a channel that is being switched from one Tunnel Client to another.

Appendix: Sample code

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // The NextToken parameter is used by the tunnel client to paginate data. 
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Simulate data consumption and processing. 
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1. Initialize the tunnel client. 
        final String endPoint = "";
        final String accessKeyId = System.getenv("OTS_AK_ENV");
        final String accessKeySecret = System.getenv("OTS_SK_ENV");
        final String instanceName = "";
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2. Create a tunnel. Before you perform this step, you must create a table for testing. You can create a table in the Tablestore console or by using the createTable method of a SyncClient. 
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        // Use the tunnelId parameter to initialize a TunnelWorker. You can call the ListTunnel or DescribeTunnel operation to obtain the tunnel ID. 
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3. Define a custom callback to start automatic data consumption. 
        // TunnelWorkerConfig provides more advanced parameters. 
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}