All Products
Search
Document Center

Tablestore:Tutorial (TimeSeries model)

Last Updated:Sep 02, 2024

This topic describes how to use Realtime Compute for Apache Flink to write data from Tablestore tables in the Wide Column model to Tablestore time series tables.

Background information

The TimeSeries model is designed based on the characteristics of time series data. This model is suitable for scenarios, such as IoT device monitoring, and can be used to store the data that is collected by devices and the monitoring data of machines. For more information, see Overview.

In the TimeSeries model of Tablestore, a two-dimensional time series table is used to store time series data.

Each row represents the data at a point in time in a time series. The time series identifiers and timestamp are the primary key columns of the row, and the data points of the time series under the timestamp are the data columns of the row. A row can contain multiple data columns. Time series identifiers consist of the measurement, data source, and tags. The timestamp identifies a specific point in time.

Usage notes

  • We recommend that you configure two compute units (CUs) and 4 GB memory for each TaskManager in Realtime Compute for Apache Flink to maximize the computing capabilities of each TaskManager. A TaskManager can write 10,000 rows per second.

  • If the number of partitions in the source table is large, we recommend that you set the concurrency to less than 16 in Realtime Compute for Apache Flink. The write rate linearly increases with the concurrency.

  • The Realtime Compute for Apache Flink and Tablestore instances must be in the same virtual private cloud (VPC). You must use a VPC endpoint for the Tablestore instance.

  • You can use Realtime Compute for Apache Flink to write data from Tablestore tables in the Wide Column model to Tablestore time series tables in the following regions: China (Hangzhou), China (Shanghai), China (Beijing), China (Zhangjiakou), China (Ulanqab), China (Shenzhen), China (Hong Kong), Germany (Frankfurt), US (Virginia), and Singapore.

Tablestore result tables

Realtime Compute for Apache Flink allows you to use Tablestore time series tables to store the results. For more information, see Tablestore connector.

A time series table in the TimeSeries model has the following primary key columns: _m_name, _data_source, _tags, and _time. When you use a time series table as a result table, you must specify four primary key columns. Other configurations are the same as those when you use a data table as a result table. You can specify the primary key columns of a time series table by using the parameters in the WITH clause, the primary key of the SINK table, and the primary key in the Map format. If you use the preceding three methods at the same time to specify the primary key columns of a time series table, the primary key columns that are specified by using the parameters in the WITH clause have the highest priority.

Parameters in the WITH clause

The following sample code provides an example on how to use the parameters in the WITH clause to define the DDL syntax:

-- Create a temporary table for the source table. CREATE TEMPORARY TABLE timeseries_source (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_widecolume_source_table',    'tunnelName' = 'test_widecolume_source_tunnel',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'ignoreDelete' = 'true', // Specify whether to ignore the data on which Delete operations are performed.);-- Create a temporary table for the result table. CREATE TEMPORARY TABLE timeseries_sink (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING,    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_timeseries_sink_table',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'storageType' = 'TIMESERIES',    'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}');-- Insert data from the source table into the result table. INSERT INTO timeseries_sink    select     measurement,    datasource,    tag_a,    `time`,    binary_value,    bool_value,    double_value,    long_value,    string_value,    tag_b,    tag_c,    tag_d,    tag_e,    tag_f    from        timeseries_source;

The following table describes the parameters in the WITH clause.

Parameter

Applicable model

Required

Description

connector

Both models

Yes

The type of the connector. The value is ots and cannot be changed.

endPoint

Both models

Yes

The endpoint of the Tablestore instance. The value must be the VPC endpoint of the instance. For more information, see Endpoints.

instanceName

Both models

Yes

The name of the Tablestore instance.

tableName

Both models

Yes

The name of the Tablestore data table or time series table.

Enter the name of the data table when the data table is used as the source table, and enter the name of the time series table when the time series table is used as the result table.

tunnelName

Wide Column model

Yes

The name of the tunnel for the Tablestore data table. For more information about how to create a tunnel, see Create a tunnel.

accessId

Both models

Yes

The AccessKey ID and AccessKey secret of an Alibaba Cloud account or a RAM user. For more information about how to obtain an AccessKey pair, see Create an AccessKey pair.

accessKey

Both models

Yes

ignoreDelete

Wide Column model

No

Specifies whether to ignore the real time data on which Delete operations are performed. This parameter is optional. Default value: false. If you use a data table as a source table, you can configure this parameter based on your business requirements.

storageType

Both models

Yes

The type of the table. Valid values:

  • WIDE_COLUMN (default): data table.

    If you use a data table as a source table, set this parameter to WIDE_COLUMN or leave this parameter empty.

  • TIMESERIES: time series table.

    If you use a time series table as a result table, set this parameter to TIMESERIES.

timeseriesSchema

TimeSeries model

Yes

The columns that you want to specify as the primary key columns of the temporary table for the time series table. Specify the value of this parameter by using key-value pairs in the JSON format. Example: {"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}.

The types of the primary key columns that you specify must be the same as the types of the primary key columns in the time series table. The tags primary key column can consist of multiple columns.

Primary key of the SINK table

This section provides an example of the DDL syntax of a time series table that is used as the result table. The first primary key column is the _m_name column, which specifies the measurement name. The second primary key column is the _data_source column, which specifies the data source. The last primary key column is the _time column, which specifies the timestamp. The primary key column in the middle is the _tags column, which specifies the tags of the time series.

The following sample code provides an example on how to use the primary key of the SINK table to define the DDL syntax:

CREATE TEMPORARY TABLE timeseries_sink (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING    PRIMARY KEY(measurement, datasource, tag_a,tag_b,tag_c,tag_d,tag_e,tag_f `time`) NOT ENFORCED) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_timeseries_sink_table',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'storageType' = 'TIMESERIES',);

Primary key in Map format

For the primary key columns of a time series table that is used as the result table, Tablestore provides the Map data type of Flink to facilitate the generation of the _tags column of the time series table in the TimeSeries model. The Map data type supports mapping operations, such as column renaming and simple functions. When you use Map, make sure that the _tags primary key column is located in the third position.

-- Create a temporary table for the source table. CREATE TEMPORARY TABLE timeseries_source (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_widecolume_source_table',    'tunnelName' = 'test_widecolume_source_tunnel',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'ignoreDelete' = 'true', // Specify whether to ignore the data on which Delete operations are performed.);-- Create a temporary table for the result table. CREATE TEMPORARY TABLE timeseries_sink (    measurement STRING,    datasource STRING,    tags Map<String, String>,     `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING,    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_timeseries_sink_table',    'accessId' ='xxxxxxxxxxx',    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'storageType' = 'TIMESERIES',);-- Insert data from the source table into the result table. INSERT INTO timeseries_sink    select         m_name,        data_source,        MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,        `time`,        cpu_sys,        cpu_user,        disk_0,        disk_1,        disk_2,        memory_used,        net_in,        net_out     from        timeseries_source;

Develop a real-time computing draft

Prerequisites

  • An AccessKey pair is created. For more information, see Create an AccessKey pair.

  • A tunnel is created for the Tablestore data table, which is used as the source table. For more information about how to create a tunnel, see Create a tunnel.

Step 1: Create an SQL draft

  1. Log on to the Realtime Compute for Apache Flink console.

  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  3. In the left-side navigation pane, click SQL Editor.

  4. In the upper-left corner of the SQL Editor page, click New.

  5. In the New Draft dialog box, click Blank Stream Draft.

    Fully managed Flink provides various code templates and data synchronization templates. Each code template provides specific scenarios, code samples, and instructions. You can click a template to learn about the features and the related syntax of Realtime Compute for Apache Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.

  6. Click Next.

  7. Configure parameters for the draft. The following table describes the parameters.

    Parameter

    Description

    Example

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    flink-test

    Location

    The folder in which the code file of the draft is stored.

    You can also click the 新建文件夹 icon to the right of an existing folder to create a subfolder.

    Development

    Engine Version

    The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

    vvr-6.0.4-flink-1.15

  8. Click Create.

Step 2: Write code for the draft

  1. Create a temporary table for the Tablestore source table and the result table.

    Note

    When you create a draft, we recommend that you minimize the number of times that you use temporary tables. We also recommend that you use tables that are registered in catalogs.

    The following sample code provides an example on how to create a temporary table named timeseries_source for the source table and a temporary table named timeseries_sink for the result table:

    CREATE TEMPORARY TABLE timeseries_source (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_widecolume_source_table',    'tunnelName' = 'test_widecolume_source_tunnel',    'accessId' = 'xxxxxxxxxxx',    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'ignoreDelete' = 'true', // Specify whether to ignore the data on which Delete operations are performed.);CREATE TEMPORARY TABLE timeseries_sink (    measurement STRING,    datasource STRING,    tag_a STRING,    `time` BIGINT,    binary_value BINARY,    bool_value BOOLEAN,    double_value DOUBLE,    long_value BIGINT,    string_value STRING,    tag_b STRING,    tag_c STRING,    tag_d STRING,    tag_e STRING,    tag_f STRING,    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED) WITH (    'connector' = 'ots',    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',    'instanceName' = 'xxx',    'tableName' = 'test_timeseries_sink_table',    'accessId' = 'xxxxxxxxxxx',    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',    'storageType' = 'TIMESERIES',    'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}');
  2. Write the draft logic.

    The following sample code provides an example on how to insert data from the source table into the result table:

    INSERT INTO timeseries_sink    select     measurement,    datasource,    tag_a,    `time`,    binary_value,    bool_value,    double_value,    long_value,    string_value,    tag_b,    tag_c,    tag_d,    tag_e,    tag_f    from        timeseries_source;

Step 3: Configure parameters on the Configurations tab

On the right side of the SQL Editor page, click the Configurations tab and configure the following parameters:

  • Engine Version: the version of the Flink engine. You can change the version that you select when you create the draft.

    Important

    In VVR 3.0.3 and later versions, Ververica Platform (VVP) allows you to run SQL jobs that use different engine versions at the same time. The version of the Flink engine that uses VVR 3.0.3 is Flink 1.12. If the engine version of your job is Flink 1.12 or earlier, you can perform the following operations to update the engine version based on the engine version that your job uses:

    • Flink 1.12: Stop and then restart your job. Then, the system automatically updates the engine version of your job to vvr-3.0.3-flink-1.12.

    • Flink 1.11 or Flink 1.10: Manually update the engine version of your job to vvr-3.0.3-flink-1.12 or vvr-4.0.8-flink-1.13, and then restart the job. Otherwise, a timeout error occurs when you start the job.

  • Additional Dependencies: the additional dependencies that are used in the draft, such as temporary functions.

    Note

    If you do not have permissions on Ververica Runtime (VVR), you can download VVR dependencies, upload the VVR dependencies on the Upload Artifact page, and then select the uploaded VVR dependencies for Additional Dependencies. For more information, see Appendix: Configure VVR dependencies.

Step 4: Perform a syntax check

In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.

(Optional) Step 5: Debug the draft

In the upper-right corner of the SQL Editor page, click Debug.

You can enable the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves the development efficiency and reduces the risks of poor data quality. For more information, see Debug a deployment.

Step 6: Deploy the draft

In the upper-right corner of the SQL Editor page, click Deploy. In the Deploy draft dialog box, configure the parameters and click Confirm.部署

Note

Session clusters are suitable for non-production environments, such as development and test environments. You can deploy or debug drafts in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. However, we recommend that you do not deploy a draft in session clusters. If you deploy drafts in session clusters, stability issues may occur. For more information, see the "Step 1: Create a session cluster" step of the Debug a deployment topic.

Step 7: Start the deployment of the draft and view the computing result

Note

If you modify the SQL code of a deployment, add parameters to or remove parameters from the WITH clause, or change the version of a deployment, you must re-publish and cancel the deployment, and then restart the deployment for the changes to take effect. If the deployment fails and cannot reuse the state data to recover, you must cancel and then restart the deployment. For more information, see Cancel a deployment.

  1. In the left-side navigation pane, click Deployments.

  2. Find the deployment that you want to start and click Start in the Actions column.

    For more information about how to configure deployment startup parameters, see Start a deployment. After you click Start, the deployment status changes to RUNNING. This indicates that the deployment is running as expected.

  3. On the Deployments page, view the computing result.

    1. In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the deployment that you want to manage.

    2. Click the Diagnostics tab.

    3. On the Logs tab, click Running Task Managers, and then click the value in the Path, ID column.

    4. Click Logs. On the Logs tab, search for logs related to the sink.

Appendix: Configure VVR dependencies

  1. Download VVR dependencies.

  2. Upload the VVR dependencies.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Artifacts.

    4. Click Upload Artifact and select the JAR package in which the VVR dependencies are stored.

  3. In the Additional Dependencies section of the Draft Editor page, select the JAR package in which the VVR dependencies are stored.