Tables in Tablestore can be used as source tables and result tables of Realtime Compute for Apache Flink. You can use Realtime Compute for Apache Flink to process data in a Tablestore table and save the result to another Tablestore table.
Background information
In Realtime Compute for Apache Flink, the tunnels of Tunnel Service can serve as sources of streaming data. Each data record is in a format similar to JSON. Example:
{
"OtsRecordType": "PUT",
"OtsRecordTimestamp": 1506416585740836,
"PrimaryKey": [
{
"ColumnName": "pk_1",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_2",
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION",
"ColumnName": "attr_1"
}
]
}
Field | Description |
OtsRecordType | The operation type. Valid values:
|
OtsRecordTimestamp | The data operation time. Unit: microseconds. If you want Realtime Compute for Apache Flink to read full data, set this field to 0. |
PrimaryKey | The settings of the primary key. The value of this field is a JSON array. You can specify one to four primary key columns based on the primary key column settings of your table. You must specify the following fields for each primary key column:
|
Columns | The settings of the attribute columns. The value of this field is a JSON array. You can specify the following fields for each attribute column:
|
Tablestore source tables
In Realtime Compute for Apache Flink, you can use the DDL statements of a source table to read the primary key and attribute column values of the source table in Tablestore based on the data type mapping of fields between Tablestore and Realtime Compute for Apache Flink. For more information, see Tablestore connector.
DDL syntax
The following sample code provides an example of the DDL syntax of a source table:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // Specify whether to ignore delete operations.
);
Tunnel Service returns user data that has not been consumed and the OtsRecordType and OtsRecordTimestamp fields, and allows Realtime Compute for Apache Flink to read the data and the fields as attribute columns. The following table describes the fields.
Field | Mapped field in Realtime Compute for Apache Flink | Description |
OtsRecordType | type | The data operation type. |
OtsRecordTimestamp | timestamp | The data operation time. Unit: microseconds. If you want Realtime Compute for Apache Flink to read full data, set this field to 0. |
If you want Realtime Compute for Apache Flink to read the OtsRecordType and OtsRecordTimestamp fields, you can use the METADATA keyword that is provided by Realtime Compute for Apache Flink to obtain the attribute fields from the Tablestore source table. The following example shows the DDL statement:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
Parameters in the WITH clause
Parameter | Required | Description |
connector | Yes | The type of the source table. The value is ots and cannot be changed. |
endPoint | Yes | The endpoint of the Tablestore instance. For more information, see Endpoints. |
instanceName | Yes | The name of the Tablestore instance. |
tableName | Yes | The name of the Tablestore source table. |
tunnelName | Yes | The tunnel name of the Tablestore source table. For information about how to create a tunnel, see Create a tunnel. |
accessId | Yes | The AccessKey ID and AccessKey secret of the Alibaba Cloud account or the RAM user. For information about how to obtain an AccessKey pair, see Obtain an AccessKey pair. |
accessKey | Yes | |
ignoreDelete | No | Specifies whether to ignore delete operations. Default value: false, which specifies that delete operations are not ignored. |
skipInvalidData | No | Specifies whether to ignore dirty data. Default value: false, which specifies that dirty data is not ignored. If dirty data is not ignored, an error is reported when the system processes dirty data. To ignore dirty data, set this parameter to true. |
Data type mapping of fields between Tablestore and Realtime Compute for Apache Flink
Field data type in Tablestore | Field data type in Realtime Compute for Apache Flink |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
BINARY | BINARY |
Tablestore result tables
Realtime Compute for Apache Flink allows you to store results in a Tablestore table. For more information, see Tablestore connector.
DDL syntax
The following sample code provides an example of the DDL syntax of a result table:
You must specify the primary key and one or more attribute columns for a Tablestore result table.
CREATE TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
...
);
Parameters in the WITH clause
Parameter | Required | Description |
connector | Yes | The type of the result table. The value is ots and cannot be changed. |
endPoint | Yes | The endpoint of the Tablestore instance. For more information, see Endpoints. |
instanceName | Yes | The name of the Tablestore instance. |
tableName | Yes | The name of the Tablestore result table. |
tunnelName | Yes | The tunnel name of the Tablestore result table. For information about how to create a tunnel, see Create a tunnel. |
accessId | Yes | The AccessKey ID and AccessKey secret of the Alibaba Cloud account or the RAM user. For information about how to obtain an AccessKey pair, see Obtain an AccessKey pair. |
accessKey | Yes | |
valueColumns | Yes | The name of the column that you want to insert. If you want to insert multiple columns, separate the column names with commas (,). Example: |
bufferSize | No | The maximum number of data records that can be stored in the buffer before data is written to the result table. Default value: 5000, which specifies that data is written to the result table if the number of data records in the buffer reaches 5,000. |
batchWriteTimeoutMs | No | The write timeout period. Unit: milliseconds. Default value: 5000, which specifies that all data in the buffer is written to the result table if the number of data records in the buffer does not reach the value that is specified by the bufferSize parameter within 5,000 milliseconds. |
batchSize | No | The number of data records that can be written to the result table at a time. Default value: 100. |
retryIntervalMs | No | The interval between two consecutive retries. Unit: milliseconds. Default value: 1000. |
maxRetryTimes | No | The maximum number of retries. Default value: 100. |
ignoreDelete | No | Specifies whether to ignore delete operations. Default value: false, which specifies that delete operations are not ignored. |
autoIncrementKey | No | The name of the auto-increment primary key column. If the result table contains an auto-increment primary key column, you can configure this parameter to specify the name of the auto-increment primary key column. |
defaultTimestampInMillisecond | No | The version number of data that is written to the result table. Unit: milliseconds. If you do not configure this parameter, the system uses the data written time as the version number. |
Data type mapping of fields between Tablestore and Realtime Compute for Apache Flink
Field data type in Realtime Compute for Apache Flink | Field data type in Tablestore |
BINARY | BINARY |
VARBINARY | BINARY |
CHAR | STRING |
VARCHAR | STRING |
TINYINT | INTEGER |
SMALLINT | INTEGER |
INTEGER | INTEGER |
BIGINT | INTEGER |
FLOAT | DOUBLE |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
Sample SQL statements
Read data from a source table
The following sample SQL statement provides an example on how to read data from a source table in a batch:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // Specify whether to ignore delete operations.
);
SELECT * FROM tablestore_stream LIMIT 100;
Synchronize data to a result table
The following sample SQL statement provides an example on how to write result data to a result table by calling the updateRow operation:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // Specify whether to ignore delete operations.
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='xxxxxxxxxxx',
'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Develop a real-time computing job
Prerequisites
An AccessKey pair is created. For more information, see Create an AccessKey pair.
A tunnel is created for a Tablestore source table. For information about how to create a tunnel, see Create a tunnel.
Step 1: Create an SQL draft
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click SQL Editor.
In the upper-left corner of the SQL Editor page, click New.
In the New Draft dialog box, click Blank Stream Draft.
Fully managed Flink provides various code templates and data synchronization templates. Each code template provides specific scenarios, code samples, and instructions. You can click a template to learn about the features and the related syntax of Realtime Compute for Apache Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.
Click Next.
Configure parameters for the draft. The following table describes the parameters.
Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
flink-test
Location
The folder in which the code file of the draft is stored.
You can also click the icon to the right of an existing folder to create a subfolder.
Development
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
vvr-6.0.4-flink-1.15
Click Create.
Step 2: Write code for the draft
Create a temporary source table and a temporary result table.
NoteWhen you create a draft, we recommend that you minimize the number of times that you use temporary tables. We also recommend that you use tables that are registered in catalogs.
The following sample code provides an example on how to create a temporary source table whose name is tablestore_stream and a temporary result table whose name is ots_sink:
CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'false' // Specify whether to ignore delete operations. ); CREATE TEMPORARY TABLE ots_sink ( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED ) WITH ( 'connector'='ots', 'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com', 'instanceName'='flink-sink', 'tableName'='flink_sink_table', 'accessId'='xxxxxxxxxxx', 'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'valueColumns'='customerid,customername' );
Write the draft logic.
The following sample code provides an example on how to insert data from the source table into the result table:
INSERT INTO ots_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Step 3: Configure parameters on the Configurations tab
On the right side of the SQL Editor page, click the Configurations tab and configure the following parameters:
Engine Version: the version of the Flink engine. You can change the version that you select when you create the draft.
ImportantIn VVR 3.0.3 and later versions, Ververica Platform (VVP) allows you to run SQL jobs that use different engine versions at the same time. The version of the Flink engine that uses VVR 3.0.3 is Flink 1.12. If the engine version of your job is Flink 1.12 or earlier, you can perform the following operations to update the engine version based on the engine version that your job uses:
Flink 1.12: Stop and then restart your job. Then, the system automatically updates the engine version of your job to vvr-3.0.3-flink-1.12.
Flink 1.11 or Flink 1.10: Manually update the engine version of your job to vvr-3.0.3-flink-1.12 or vvr-4.0.8-flink-1.13, and then restart the job. Otherwise, a timeout error occurs when you start the job.
Additional Dependencies: the additional dependencies that are used in the draft, such as temporary functions.
Step 4: Perform a syntax check
In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.
(Optional) Step 5: Debug the draft
In the upper-right corner of the SQL Editor page, click Debug.
You can enable the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves the development efficiency and reduces the risks of poor data quality. For more information, see Debug a deployment.
Step 6: Deploy the draft
In the upper-right corner of the SQL Editor page, click Deploy. In the Deploy draft dialog box, configure the parameters and click Confirm.
Session clusters are suitable for non-production environments, such as development and test environments. You can deploy or debug drafts in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. However, we recommend that you do not deploy a draft in session clusters. If you deploy drafts in session clusters, stability issues may occur. For more information, see the "Step 1: Create a session cluster" step of the Debug a deployment topic.
Step 7: Start the deployment for the draft and view the computing result
If you modify the SQL code of a deployment, add parameters to or remove parameters from the WITH clause, or change the version of a deployment, you must re-publish and cancel the deployment, and then restart the deployment for the changes to take effect. If the deployment fails and cannot reuse the state data to recover, you must cancel and then restart the deployment. For more information, see Cancel a deployment.
In the left-side navigation pane, click Deployments.
Find the deployment that you want to start and click Start in the Actions column.
For more information about how to configure deployment startup parameters, see Start a deployment. After you click Start, the deployment status changes to RUNNING. This indicates that the deployment is running as expected.
On the Deployments page, view the computing result.
In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the deployment that you want to manage.
Click the Diagnostics tab.
On the Logs tab, click Running Task Managers, and then click the value in the Path, ID column.
Click Logs. On the Logs tab, search for logs related to the sink.