All Products
Search
Document Center

Tablestore:Use Tunnel Service by using Tablestore SDKs

Last Updated:Dec 04, 2024

This topic describes how to get started with Tunnel Service by using Tablestore SDKs. Before you use Tunnel Service, make sure that you are familiar with the usage notes and API operations.

Usage notes

  • By default, the system starts a thread pool to read and process data based on TunnelWorkerConfig. If you want to start multiple TunnelWorkers on a single server, we recommend that you use the same TunnelWorkerConfig to configure all TunnelWorkers.

  • TunnelWorker requires a warm-up period for initialization, which is specified by the heartbeatIntervalInSec parameter in TunnelWorkerConfig. You can use the setHeartbeatIntervalInSec method in TunnelWorkerConfig to configure this parameter. Default value: 30. Unit: seconds.

  • When the TunnelWorker client shuts down due to an unexpected exit or manual termination, TunnelWorker automatically recycles resources by using one of the following methods: release the thread pool, automatically call the shutdown method that you registered for the Channel class, and shut down the tunnel.

  • The retention period of incremental logs in tunnels is the same as the retention period of Stream logs. Stream logs can be retained for up to seven days. Therefore, incremental logs in tunnels can be retained for up to seven days.

  • If you create a tunnel to consume differential or incremental data, take note of the following items:

    • During full data consumption, if the tunnel fails to complete consumption of full data within the retention period of incremental logs (seven days at most), an OTSTunnelExpired error occurs when the tunnel starts to consume incremental logs. As a result, the tunnel cannot consume incremental logs.

      If you estimate that the tunnel cannot complete full data consumption within the specified time window, contact Tablestore technical support.

    • During incremental data consumption, if the tunnel fails to complete consumption of incremental logs within the retention period of incremental logs (seven days at most), the tunnel may consume data from the latest available data. In this case, specific data may not be consumed.

  • After a tunnel expires, Tablestore may disable the tunnel. If a tunnel remains in the disabled state for more than 30 days, the tunnel is deleted. You cannot restore a deleted tunnel.

API operations

Operation

Description

CreateTunnel

Creates a tunnel.

ListTunnel

Queries information about the tunnels that are created for a data table.

DescribeTunnel

Queries information about the channels in a tunnel.

DeleteTunnel

Deletes a tunnel.

Use Tablestore SDKs

You can use Tablestore SDKs for the following programming languages to implement Tunnel Service:

Prerequisites

  • The following operations are performed in the Resource Access Management (RAM) console:

    • A RAM user is created and the AliyunOTSFullAccess policy is attached to the RAM user to grant the RAM user the permissions to manage Tablestore. For more information, see Create a RAM user and Grant permissions to a RAM user.

      Note

      In actual business environments, we recommend that you grant only the required permissions to RAM users based on the principle of least privilege. This helps prevent security risks caused by excessive user permissions.

    • An AccessKey pair is created for the RAM user. For more information, see Create an AccessKey pair.

      Warning

      If the AccessKey pair of your Alibaba Cloud account is leaked, all resources within the account are exposed to potential risks. We recommend that you use the AccessKey pair of a RAM user to perform operations. This prevents leakage of the AccessKey pair of your Alibaba Cloud account.

Use Tunnel Service

In this example, Tablestore SDK for Java is used to get started with Tunnel Service.

  1. Initialize a TunnelClient instance.

    Note

    Make sure that the TABLESTORE_ACCESS_KEY_ID and TABLESTORE_ACCESS_KEY_SECRET environment variables are configured. The TABLESTORE_ACCESS_KEY_ID environment variable specifies the AccessKey ID of your Alibaba Cloud account or RAM user. The TABLESTORE_ACCESS_KEY_SECRET environment variable specifies the AccessKey secret of your Alibaba Cloud account or RAM user.

    // Set the endPoint parameter to the endpoint of the Tablestore instance. Example: https://instance.cn-hangzhou.ots.aliyuncs.com. 
    // Set the accessKeyId parameter to the AccessKey ID and the accessKeySecret parameter to the AccessKey secret that you want to use to access Tablestore. 
    // Set the instanceName parameter to the name of the instance. 
    final String endPoint = "";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. Create a tunnel.

    Before you create a tunnel, create a data table for testing or prepare an existing table. You can create a table in the Tablestore console or by using the createTable method of a SyncClient.

    Important

    When you create an incremental or differential tunnel, you must comply with the following rules to specify timestamps:

    • If you do not specify a start timestamp for incremental data, the time when the tunnel is created is used as the start timestamp.

    • If you specify the start and end timestamps for incremental data, the valid values must fall within the range of [Current system time - Stream validity period + 5 minutes, Current system time]. Unit: milliseconds.

      • The Stream validity period refers to the validity period of incremental logs in milliseconds. The maximum Stream validity period is seven days. You can specify the Stream validity period when you enable Stream for the data table. After you specify the Stream validity period, you cannot modify the period.

      • The end timestamp must be later than the start timestamp.

    // The following types of tunnels are supported: TunnelType.BaseData, TunnelType.Stream, and TunnelType.BaseAndStream. 
    // The following sample code provides an example on how to create a BaseAndStream tunnel. To create a different type of tunnel, configure the TunnelType parameter in CreateTunnelRequest based on your business requirements. 
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // Use the tunnelId parameter to initialize a TunnelWorker. You can call the ListTunnel or DescribeTunnel operation to obtain the tunnel ID. 
    String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. Specify a custom data consumption callback to start automatic data consumption.

    // Specify the callback for data consumption to call the IChannelProcessor operation, which specifies the process and shutdown methods. 
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // The ProcessRecordsInput parameter contains the data that you obtain. 
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // The NextToken parameter is used by the tunnel client to paginate data. 
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Simulate data consumption and processing. 
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // By default, the system starts a thread pool to read and process data based on TunnelWorkerConfig. 
    // If you want to start multiple TunnelWorkers on a single server, we recommend that you use the same TunnelWorkerConfig to configure all TunnelWorkers. TunnelWorkerConfig provides more advanced parameters. 
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // Configure TunnelWorkers and start automatic data processing. 
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

Configure TunnelWorkerConfig

TunnelWorkerConfig allows you to configure custom parameters for a tunnel client based on your business requirements. The following table describes the parameters in Tablestore SDK for Java.

Configuration

Parameter

Description

Interval and timeout period for heartbeats

heartbeatTimeoutInSec

The timeout period for heartbeats. Default value: 300. Unit: seconds.

When a heartbeat timeout occurs, the tunnel server considers the current TunnelClient instance unavailable. In this case, the tunnel client must reconnect to the tunnel server.

heartbeatIntervalInSec

The interval for heartbeats. Default value: 30. Minimum value: 5. Unit: seconds.

You can detect heartbeats to monitor active channels, update the status of channels, and automatically initialize data processing tasks.

Interval between checkpoints

checkpointIntervalInMillis

The interval between checkpoints when data is consumed. The interval is recorded on the tunnel server.

Default value: 5000. Unit: milliseconds.

Note
  • If the data that you want to read is stored on different servers, various errors may occur when you run processes. For example, the server may restart due to environmental factors. Therefore, the tunnel server regularly records checkpoints after data is processed. A task processes data from the last checkpoint after the task is restarted. In specific cases, Tunnel Service may sequentially synchronize data once or multiple times. If specific data is reprocessed, check the business processing logic.

  • To prevent data from being reprocessed when errors occur, record additional checkpoints. Take note that an excessive number of checkpoints may compromise the system throughput. We recommend that you record the checkpoints based on your business requirements.

Custom client tag

clientTag

The custom client tag that is used to generate a tunnel client ID. You can configure this parameter to distinguish TunnelWorkers.

Specify custom callbacks for data processing

channelProcessor

The callback that is registered by the user to process data, including the process and shutdown methods.

Configuration of the thread pools to read and process data

readRecordsExecutor

The thread pool that you want to use to read data. If you do not have special requirements, use the default configuration.

processRecordsExecutor

The thread pool that you want to use to process data. If you do not have special requirements, use the default configuration.

Note
  • When you specify configurations for the thread pools, we recommend that you set the number of threads to the number of channels in the tunnel. This way, computing resources, such as CPU, can be quickly allocated to each channel.

  • Tablestore performs the following operations on the default configurations of the pools to ensure throughput:

    • Allocate 32 core threads in advance to ensure real-time throughput when a small amount of data or a small number of channels exist.

    • Reduce the queue length when a large amount of data must be processed or when a large number of channels exist. This way, the policy is triggered to create a thread in the pool and allocate more computing resources.

    • We recommend that you set the thread keep-alive time to 60s. If the amount of data that you want to process is reduced, you can recycle thread resources.

Memory control

maxChannelParallel

The maximum concurrency level of channels to read and process data for memory control.

The default value is -1, which specifies that the concurrency level is unlimited.

Note

Tablestore SDK for Java V5.10.0 and later support this feature.

Maximum backoff time

maxRetryIntervalInMillis

The base value to calculate the maximum backoff time for the tunnel. The maximum backoff time is a random number that ranges from 0.75 × maxRetryIntervalInMillis to 1.25 × maxRetryIntervalInMillis.

Default value: 2000. Minimum value: 200. Unit: milliseconds.

Note
  • Tablestore SDK for Java V5.4.0 and later support this feature.

  • If the amount of data that you want to process is less than 900 KB or 500 pieces per export, the tunnel client uses exponential backoff until the maximum backoff time is reached.

CLOSING channel detection

enableClosingChannelDetect

Specifies whether to enable real-time detection for CLOSING channels. Default value: false, which specifies that real-time detection for CLOSING channels is disabled.

Note
  • Tablestore SDK for Java V5.13.13 and later support this feature.

  • If you do not enable this feature, channels may be suspended and consumption can be interrupted in specific scenarios, such as when a large number of channels exist but client resources are insufficient.

  • CLOSING channel: a channel that is being switched from one tunnel client to another.

Appendix: Sample code

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                // The NextToken parameter is used by the tunnel client to paginate data. 
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Simulate data consumption and processing. 
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1. Initialize a tunnel client. 
        final String endPoint = "";
        final String accessKeyId = System.getenv("OTS_AK_ENV");
        final String accessKeySecret = System.getenv("OTS_SK_ENV");
        final String instanceName = "";
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2. Create a tunnel. Before you perform this step, you must create a table for testing. You can create a table in the Tablestore console or by using the createTable method of a SyncClient. 
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        // Use the tunnelId parameter to initialize a TunnelWorker. You can call the ListTunnel or DescribeTunnel operation to obtain the tunnel ID. 
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3. Define a custom callback to start automatic data consumption. 
        // TunnelWorkerConfig provides more advanced parameters. 
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}