All Products
Search
Document Center

Tablestore:Synchronize data from one table to another table in Tablestore

最終更新日:Nov 27, 2024

This topic describes how to synchronize data from one table to another table in Tablestore with Tunnel Service, DataWorks, or DataX.

Prerequisites

A destination table is created. The destination table must contain the columns that you want to synchronize from the source table. For more information, see the Step 3: Create a data table section of the "Use the Wide Column model in the Tablestore console" topic.

Note

If you want to migrate data across accounts and regions, use DataX to connect to a virtual private cloud (VPC) over the Internet or Cloud Enterprise Network (CEN). For information about how to use CEN, see Overview.

Use Tunnel Service to synchronize data

After the tunnel of the source table is created, you can use a Tablestore SDK to synchronize data from the source table to the destination table. You can specify custom logic to process data for your business during synchronization.

Prerequisites

  • The endpoint that you want to use is obtained. For more information, see Initialize an OTSClient instance.

  • An AccessKey pair is configured. For more information, see Initialize an OTSClient instance.

  • The AccessKey pair is configured in environment variables. For more information, see Initialize an OTSClient instance.

    The OTS_AK_ENV environment variable indicates the AccessKey ID of an Alibaba Cloud account or a Resource Access Management (RAM) user. The OTS_SK_ENV environment variable indicates the AccessKey secret of an Alibaba Cloud account or a RAM user. Specify the AccessKey pair based on your business requirements.

Procedure

  1. Create a tunnel for the source table in the Tablestore console or by using a Tablestore SDK, and record the tunnel ID. For more information, see Quick start or Use Tunnel Service by using Tablestore SDKs.

  2. Synchronize data by using a Tablestore SDK.

    Sample code:

    public class TunnelTest {
    
        public static void main(String[] args){
           String accessKeyId = System.getenv("OTS_AK_ENV");
           String accessKeySecret = System.getenv("OTS_SK_ENV");
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   accessKeyId,accessKeySecret,"instanceName");
    
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    
            // You can view the tunnel ID on the Tunnels tab of the Tablestore console or call the describeTunnelRequest operation to query the tunnel ID. 
            TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    
        public static class SimpleProcessor implements IChannelProcessor{
        
           // Connect the tunnel to the destination table. 
           TunnelClient tunnelClient = new TunnelClient("endpoint",
                   "accessKeyId","accessKeySecret","instanceName");
                   
           @Override
            public void process(ProcessRecordsInput processRecordsInput) {
            
                // Incremental data or full data is returned in ProcessRecordsInput. 
                List<StreamRecord> list = processRecordsInput.getRecords();
                for(StreamRecord streamRecord : list){
                    switch (streamRecord.getRecordType()){
                        case PUT:
                            // Specify the custom logic that you want to use to process data for your business. 
                            //putRow
                            break;
                        case UPDATE:
                            //updateRow
                            break;
                        case DELETE:
                            //deleteRow
                            break;
                    }
    
                    System.out.println(streamRecord.toString());
                }
            }
    
            @Override
            public void shutdown() {
                
            }
        }
    }

Use DataWorks or DataX to synchronize data

You can use DataWorks or DataX to synchronize data from the source table to the destination table. This section describes how to synchronize data by using DataWorks.

Step 1: Add a Tablestore data source

Add the Tablestore instances of the source table and the destination table as data sources.

  1. Go to the Data Integration page.

    Log on to the DataWorks console, select a region in the upper-left corner, choose Data Development and Governance > Data Integration, select a workspace from the drop-down list, and then click Go to Data Integration.

  2. In the left-side navigation pane, click Data Source.

  3. On the Data Source page, click Add Data Source.

  4. In the Add Data Source dialog box, click the Tablestore block.

  5. In the Add OTS data source dialog box, configure the parameters that are described in the following table.

    Parameter

    Description

    Data Source Name

    The name of the data source. The name can contain letters, digits, and underscores (_), and must start with a letter.

    Data Source Description

    The description of the data source. The description cannot exceed 80 characters in length.

    Endpoint

    The endpoint of the Tablestore instance. For more information, see Endpoints.

    If the Tablestore instance and the resources of the destination data source are in the same region, enter a virtual private cloud (VPC) endpoint. Otherwise, enter a public endpoint.

    Table Store instance name

    The name of the Tablestore instance. For more information, see Instance.

    AccessKey ID

    The AccessKey ID and AccessKey secret of your Alibaba Cloud account or RAM user. For more information about how to create an AccessKey pair, see Create an AccessKey pair.

    AccessKey Secret

  6. Test the network connectivity between the data source and the resource group that you select.

    To ensure that your synchronization nodes run as expected, you need to test the connectivity between the data source and all types of resource groups on which your synchronization nodes will run.

    Important

    A synchronization task can use only one type of resource group. By default, only shared resource groups for Data Integration are displayed in the resource group list. To ensure the stability and performance of data synchronization, we recommend that you use an exclusive resource group for Data Integration.

    1. Click Purchase to create a new resource group or click Associate Purchased Resource Group to associate an existing resource group. For more information, see Create and use an exclusive resource group for Data Integration.

    2. After the resource group is started, click Test Network Connectivity in the Connection Status (Production Environment) column of the resource group.

      If Connected is displayed, the connectivity test is passed.

  7. If the data source passes the network connectivity test, click Complete.

    The newly created data source is displayed in the data source list.

Step 2: Create a synchronization node

  1. Go to the DataStudio console.

    Log on to the DataWorks console, select a region in the upper-left corner, choose Data Development and Governance > Data Development, select a workspace from the drop-down list, and then click Go to DataStudio.

  2. On the Scheduled Workflow page of the DataStudio console, click Business Flow and select a business flow.

    For information about how to create a workflow, see Create a workflow.

  3. Right-click the Data Integration node and choose Create Node > Offline synchronization.

  4. In the Create Node dialog box, select a path and enter a node name.

  5. Click Confirm.

    The newly created offline synchronization node will be displayed under the Data Integration node.

Step 3: Configure and run an offline synchronization task

  1. Double-click the new synchronization node under Data Integration.

  2. Establish network connections between the resource group and data sources.

    Select the source and destination data sources for the data synchronization task and the resource group that is used to run the data synchronization task. Establish network connections between the resource group and data sources and test the connectivity.

    Important

    Data synchronization tasks are run by using resource groups. Select a resource group and make sure that network connections between the resource group and data sources are established.

    1. In the Configure Network Connections and Resource Group step, select Tablestore from the Source drop-down list and set the Data Source Name parameter to the source data source that you created.

    2. Select a resource group from the Resource Group drop-down list.

      After you select a resource group, the system displays the region and specifications of the resource group. The system automatically tests the connectivity between the resource group and the source data source.

      Important

      Make sure that the resource group is the same as that you selected when you created the data source.

    3. Select Tablestore from the Destination drop-down list and set the Data Source Name parameter to the new destination data source.

      The system automatically tests the connectivity between the resource group and the destination data source.

    4. Click Next.

    5. In the message that appears, click Use Script Mode.

      Important
      • Tablestore supports only the script mode. If a data source cannot be configured by using the wizard mode, the script mode is used to configure the batch synchronization task.

      • After a task is switched to the script mode, you cannot switch back to the wizard mode.

  3. Configure and save the task.

    To synchronize full data, you need to use Tablestore Reader and Tablestore Writer. For more information about how to configure the script, see Tablestore data source.

    1. Modify the script in the Configure tasks step.

      • Configure Tablestore Reader

        Tablestore Reader reads data from Tablestore. You can specify a data range to extract incremental data from Tablestore. For more information, see Appendix: Code and parameters for Tablestore Reader.

      • Configure Tablestore Writer

        By using Tablestore SDK for Java, Tablestore Writer connects to the Tablestore server and writes data to the Tablestore server. Tablestore Writer provides features to allow users to optimize the write process, such as retries upon write timeouts, retries upon write exceptions, and batch submission. For more information, see Appendix: Code and parameters for Tablestore Writer.

    2. Press Ctrl+S to save the script.

      Note

      If you do not save the script, a message appears when you perform subsequent operations. In this case, click OK in the message to save.

  4. Run the synchronization task.

    Note

    In most cases, you need to synchronize full data only once and do not need to configure scheduling properties.

    1. Click the 1680170333627-a1e19a43-4e2a-4340-9564-f53f2fa6806e icon.

    2. In the Parameters dialog box, select the name of the resource group from the drop-down list.

    3. Click Run.

      After the script is run, click the link next to Detail log url on the Runtime Log tab. On the Detailed Runtime Logs page, check the value of Current task status.

      If the value of Current task status is FINISH, the task is complete.