This topic describes how to use Realtime Compute for Apache Flink to write data from Tablestore tables in the Wide Column model to Tablestore time series tables.
Background information
The TimeSeries model is designed based on the characteristics of time series data. This model is suitable for scenarios, such as IoT device monitoring, and can be used to store the data that is collected by devices and the monitoring data of machines. For more information, see Overview.
In the TimeSeries model of Tablestore, a two-dimensional time series table is used to store time series data.
Each row represents the data at a point in time in a time series. The time series identifiers and timestamp are the primary key columns of the row, and the data points of the time series under the timestamp are the data columns of the row. A row can contain multiple data columns. Time series identifiers consist of the measurement, data source, and tags. The timestamp identifies a specific point in time.
Usage notes
We recommend that you configure two compute units (CUs) and 4 GB memory for each TaskManager in Realtime Compute for Apache Flink to maximize the computing capabilities of each TaskManager. A TaskManager can write 10,000 rows per second.
If the number of partitions in the source table is large, we recommend that you set the concurrency to less than 16 in Realtime Compute for Apache Flink. The write rate linearly increases with the concurrency.
The Realtime Compute for Apache Flink and Tablestore instances must be in the same virtual private cloud (VPC). You must use a VPC endpoint for the Tablestore instance.
You can use Realtime Compute for Apache Flink to write data from Tablestore tables in the Wide Column model to Tablestore time series tables in the following regions: China (Hangzhou), China (Shanghai), China (Beijing), China (Zhangjiakou), China (Ulanqab), China (Shenzhen), China (Hong Kong), Germany (Frankfurt), US (Virginia), and Singapore.
Tablestore result tables
Realtime Compute for Apache Flink allows you to use Tablestore time series tables to store the results. For more information, see Tablestore connector.
A time series table in the TimeSeries model has the following primary key columns: _m_name
, _data_source
, _tags
, and _time
. When you use a time series table as a result table, you must specify four primary key columns. Other configurations are the same as those when you use a data table as a result table. You can specify the primary key columns of a time series table by using the parameters in the WITH clause, the primary key of the SINK table, and the primary key in the Map format. If you use the preceding three methods at the same time to specify the primary key columns of a time series table, the primary key columns that are specified by using the parameters in the WITH clause have the highest priority.
Parameters in the WITH clause
The following sample code provides an example on how to use the parameters in the WITH clause to define the DDL syntax:
-- Create a temporary table for the source table. CREATE TEMPORARY TABLE timeseries_source ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_widecolume_source_table', 'tunnelName' = 'test_widecolume_source_tunnel', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'true', // Specify whether to ignore the data on which Delete operations are performed.);-- Create a temporary table for the result table. CREATE TEMPORARY TABLE timeseries_sink ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING, PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_timeseries_sink_table', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'storageType' = 'TIMESERIES', 'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}');-- Insert data from the source table into the result table. INSERT INTO timeseries_sink select measurement, datasource, tag_a, `time`, binary_value, bool_value, double_value, long_value, string_value, tag_b, tag_c, tag_d, tag_e, tag_f from timeseries_source;
The following table describes the parameters in the WITH clause.
Parameter | Applicable model | Required | Description |
connector | Both models | Yes | The type of the connector. The value is ots and cannot be changed. |
endPoint | Both models | Yes | The endpoint of the Tablestore instance. The value must be the VPC endpoint of the instance. For more information, see Endpoints. |
instanceName | Both models | Yes | The name of the Tablestore instance. |
tableName | Both models | Yes | The name of the Tablestore data table or time series table. Enter the name of the data table when the data table is used as the source table, and enter the name of the time series table when the time series table is used as the result table. |
tunnelName | Wide Column model | Yes | The name of the tunnel for the Tablestore data table. For more information about how to create a tunnel, see Create a tunnel. |
accessId | Both models | Yes | The AccessKey ID and AccessKey secret of an Alibaba Cloud account or a RAM user. For more information about how to obtain an AccessKey pair, see Create an AccessKey pair. |
accessKey | Both models | Yes | |
ignoreDelete | Wide Column model | No | Specifies whether to ignore the real time data on which Delete operations are performed. This parameter is optional. Default value: false. If you use a data table as a source table, you can configure this parameter based on your business requirements. |
storageType | Both models | Yes | The type of the table. Valid values:
|
timeseriesSchema | TimeSeries model | Yes | The columns that you want to specify as the primary key columns of the temporary table for the time series table. Specify the value of this parameter by using key-value pairs in the JSON format. Example: The types of the primary key columns that you specify must be the same as the types of the primary key columns in the time series table. The tags primary key column can consist of multiple columns. |
Primary key of the SINK table
This section provides an example of the DDL syntax of a time series table that is used as the result table. The first primary key column is the _m_name column, which specifies the measurement name. The second primary key column is the _data_source column, which specifies the data source. The last primary key column is the _time column, which specifies the timestamp. The primary key column in the middle is the _tags column, which specifies the tags of the time series.
The following sample code provides an example on how to use the primary key of the SINK table to define the DDL syntax:
CREATE TEMPORARY TABLE timeseries_sink ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING PRIMARY KEY(measurement, datasource, tag_a,tag_b,tag_c,tag_d,tag_e,tag_f `time`) NOT ENFORCED) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_timeseries_sink_table', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'storageType' = 'TIMESERIES',);
Primary key in Map format
For the primary key columns of a time series table that is used as the result table, Tablestore provides the Map data type of Flink to facilitate the generation of the _tags column of the time series table in the TimeSeries model. The Map data type supports mapping operations, such as column renaming and simple functions. When you use Map, make sure that the _tags primary key column is located in the third position.
-- Create a temporary table for the source table. CREATE TEMPORARY TABLE timeseries_source ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_widecolume_source_table', 'tunnelName' = 'test_widecolume_source_tunnel', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'true', // Specify whether to ignore the data on which Delete operations are performed.);-- Create a temporary table for the result table. CREATE TEMPORARY TABLE timeseries_sink ( measurement STRING, datasource STRING, tags Map<String, String>, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING, PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_timeseries_sink_table', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'storageType' = 'TIMESERIES',);-- Insert data from the source table into the result table. INSERT INTO timeseries_sink select m_name, data_source, MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags, `time`, cpu_sys, cpu_user, disk_0, disk_1, disk_2, memory_used, net_in, net_out from timeseries_source;
Develop a real-time computing draft
Prerequisites
An AccessKey pair is created. For more information, see Create an AccessKey pair.
A tunnel is created for the Tablestore data table, which is used as the source table. For more information about how to create a tunnel, see Create a tunnel.
Step 1: Create an SQL draft
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click SQL Editor.
In the upper-left corner of the SQL Editor page, click New.
In the New Draft dialog box, click Blank Stream Draft.
Fully managed Flink provides various code templates and data synchronization templates. Each code template provides specific scenarios, code samples, and instructions. You can click a template to learn about the features and the related syntax of Realtime Compute for Apache Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.
Click Next.
Configure parameters for the draft. The following table describes the parameters.
Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
flink-test
Location
The folder in which the code file of the draft is stored.
You can also click the icon to the right of an existing folder to create a subfolder.
Development
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
vvr-6.0.4-flink-1.15
Click Create.
Step 2: Write code for the draft
Create a temporary table for the Tablestore source table and the result table.
NoteWhen you create a draft, we recommend that you minimize the number of times that you use temporary tables. We also recommend that you use tables that are registered in catalogs.
The following sample code provides an example on how to create a temporary table named timeseries_source for the source table and a temporary table named timeseries_sink for the result table:
CREATE TEMPORARY TABLE timeseries_source ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_widecolume_source_table', 'tunnelName' = 'test_widecolume_source_tunnel', 'accessId' = 'xxxxxxxxxxx', 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'true', // Specify whether to ignore the data on which Delete operations are performed.);CREATE TEMPORARY TABLE timeseries_sink ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING, PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_timeseries_sink_table', 'accessId' = 'xxxxxxxxxxx', 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'storageType' = 'TIMESERIES', 'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}');
Write the draft logic.
The following sample code provides an example on how to insert data from the source table into the result table:
INSERT INTO timeseries_sink select measurement, datasource, tag_a, `time`, binary_value, bool_value, double_value, long_value, string_value, tag_b, tag_c, tag_d, tag_e, tag_f from timeseries_source;
Step 3: Configure parameters on the Configurations tab
On the right side of the SQL Editor page, click the Configurations tab and configure the following parameters:
Engine Version: the version of the Flink engine. You can change the version that you select when you create the draft.
ImportantIn VVR 3.0.3 and later versions, Ververica Platform (VVP) allows you to run SQL jobs that use different engine versions at the same time. The version of the Flink engine that uses VVR 3.0.3 is Flink 1.12. If the engine version of your job is Flink 1.12 or earlier, you can perform the following operations to update the engine version based on the engine version that your job uses:
Flink 1.12: Stop and then restart your job. Then, the system automatically updates the engine version of your job to vvr-3.0.3-flink-1.12.
Flink 1.11 or Flink 1.10: Manually update the engine version of your job to vvr-3.0.3-flink-1.12 or vvr-4.0.8-flink-1.13, and then restart the job. Otherwise, a timeout error occurs when you start the job.
Additional Dependencies: the additional dependencies that are used in the draft, such as temporary functions.
NoteIf you do not have permissions on Ververica Runtime (VVR), you can download VVR dependencies, upload the VVR dependencies on the Upload Artifact page, and then select the uploaded VVR dependencies for Additional Dependencies. For more information, see Appendix: Configure VVR dependencies.
Step 4: Perform a syntax check
In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.
(Optional) Step 5: Debug the draft
In the upper-right corner of the SQL Editor page, click Debug.
You can enable the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves the development efficiency and reduces the risks of poor data quality. For more information, see Debug a deployment.
Step 6: Deploy the draft
In the upper-right corner of the SQL Editor page, click Deploy. In the Deploy draft dialog box, configure the parameters and click Confirm.
Session clusters are suitable for non-production environments, such as development and test environments. You can deploy or debug drafts in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. However, we recommend that you do not deploy a draft in session clusters. If you deploy drafts in session clusters, stability issues may occur. For more information, see the "Step 1: Create a session cluster" step of the Debug a deployment topic.
Step 7: Start the deployment of the draft and view the computing result
If you modify the SQL code of a deployment, add parameters to or remove parameters from the WITH clause, or change the version of a deployment, you must re-publish and cancel the deployment, and then restart the deployment for the changes to take effect. If the deployment fails and cannot reuse the state data to recover, you must cancel and then restart the deployment. For more information, see Cancel a deployment.
In the left-side navigation pane, click Deployments.
Find the deployment that you want to start and click Start in the Actions column.
For more information about how to configure deployment startup parameters, see Start a deployment. After you click Start, the deployment status changes to RUNNING. This indicates that the deployment is running as expected.
On the Deployments page, view the computing result.
In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the deployment that you want to manage.
Click the Diagnostics tab.
On the Logs tab, click Running Task Managers, and then click the value in the Path, ID column.
Click Logs. On the Logs tab, search for logs related to the sink.
Appendix: Configure VVR dependencies
Download VVR dependencies.
Upload the VVR dependencies.
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click Artifacts.
Click Upload Artifact and select the JAR package in which the VVR dependencies are stored.
In the Additional Dependencies section of the Draft Editor page, select the JAR package in which the VVR dependencies are stored.