This topic describes some key parameters that you may need to configure when you develop an SQL draft. This topic also provides examples on how to configure the parameters.
table.exec.sink.keyed-shuffle
When you write data to a table that has a primary key, the data may be out of order. To resolve this issue, you can configure the table.exec.sink.keyed-shuffle parameter to perform hash shuffling. This ensures that data with the same primary key is distributed to the same subtask of an operator and reduces the probability of the disorder issue.
Precautions
The hash shuffling operation is helpful only when the upstream operator can ensure the valid order of the update records in the primary key field.
If you change the parallelism of an operator for a deployment that runs in expert mode, the following parallelism rules do not apply.
Valid values
AUTO: If the parallelism of the sink operator is not 1 and the parallelism of the sink operator is different from the parallelism of the upstream operator, Realtime Compute for Apache Flink automatically performs hash shuffling on the primary key field when data flows to the sink operator. This is the default value.
FORCE: If the parallelism of the sink operator is not 1, Realtime Compute for Apache Flink forcefully performs hash shuffling on the primary key field when data flows to the sink operator.
NONE: Realtime Compute for Apache Flink does not perform hash shuffling based on the parallelism of the sink operator and the parallelism of the upstream operator.
Examples
AUTO
Create an SQL streaming draft, copy the following test SQL statements to the code editor, and then deploy the draft. In this example, the parallelism of the sink operator is explicitly set to 2.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print', -- You can configure the sink.parallelism parameter to specify the parallelism of the sink operator. 'sink.parallelism'='2' ); INSERT INTO sink SELECT * FROM s1; -- You can also configure the dynamic table options to specify the parallelism of the sink operator. --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
In the Resources section of the Configuration tab on the Deployments page, set the Parallelism parameter to 1. In the Parameters section, do not configure the
table.exec.sink.keyed-shuffle
parameter. Alternatively, add thetable.exec.sink.keyed-shuffle: AUTO
configuration to the Other Configuration field.Start the deployment. On the Status tab, the data connection mode between the sink operator and the upstream operator is HASH.
FORCE
Create an SQL streaming draft, copy the following test SQL statements to the code editor, and then deploy the draft. You do not need to specify the parallelism of the sink operator.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT * FROM s1;
In the Resources section of the Configuration tab on the Deployments page, set the Parallelism parameter to 2. In the Parameters section, add the
table.exec.sink.keyed-shuffle: FORCE
configuration to the Other Configuration field.Start the deployment. On the Status tab, both the parallelism of the sink operator and the parallelism of the upstream operator are 2 and the data connection mode between the sink operator and the upstream operator is changed to HASH.
table.exec.mini-batch.size
This parameter specifies the maximum number of input records that can be cached on a compute node for micro-batch operations. If the number of the cached data records reaches the value of this parameter, final calculation and data output are triggered. This parameter takes effect only when it is used together with the table.exec.mini-batch.enabled and table.exec.mini-batch.allow-latency parameters. For more information about miniBatch-related optimization, see MiniBatch Aggregation and MiniBatch Regular Joins.
Precautions
If you do not explicitly configure this parameter in the Parameters section before a deployment is started, the managed memory is used to cache data in miniBatch processing mode. If one of the following conditions is met, final calculation and data output are triggered:
The compute node receives the watermark message sent by the MiniBatchAssigner operator.
The managed memory is full.
The CHECKPOINT command is received and checkpointing has not been performed.
The deployment is canceled.
Valid values
-1: The managed memory is used to cache data. This is the default value.
A negative value of the LONG type: The processing mechanism is the same as the processing mechanism for the default value.
A positive value of the LONG type: The heap memory is used to cache data. When the number of cached input records reaches the specified value, data output is triggered.
Example
Create an SQL streaming draft, copy the following test SQL statements, and then deploy the draft.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random', 'fields.ts.max-past'='5s', 'fields.b.kind'='random', 'fields.b.min'='0', 'fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b BIGINT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
In the Parameters section on the Configuration tab of the Deployments page, add the
table.exec.mini-batch.enabled: true
andtable.exec.mini-batch.allow-latency: 2s
configurations to the Other Configuration field. Do not configure thetable.exec.mini-batch.size
parameter. Retain the default value (-1) of the table.exec.mini-batch.size parameter.Start the deployment. On the Status tab, the topology of the deployment contains the MiniBatchAssigner, LocalGroupAggregate, and GlobalGroupAggregate operators.
table.exec.agg.mini-batch.output-identical-enabled
If the time to live (TTL) of state data is enabled and the aggregation result remains unchanged after data is consumed by the MinibatchGlobalAgg and MinibatchAgg operators, duplicate data is not sent to downstream operators by default. This may cause the state data of downstream stateful operators to expire because the operators do not receive data sent from the upstream operators for an extended period. This parameter specifies whether to send duplicate data to downstream operators if the TTL of state data is enabled and the aggregation result remains unchanged. You can set this parameter to true
for the MinibatchGlobalAgg and MinibatchAgg operators. If the change period of the aggregation result of your deployment is less than the specified TTL of state data, you do not need to manually configure this parameter. For more information about Apache Flink issues, see FLINK-33936.
Usage notes
This parameter takes effect only in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.8 or later. The configurations when the table.exec.agg.mini-batch.output-identical-enabled parameter is set to false are used in versions earlier than VVR 8.0.8.
If you change the value from false to true, the amount of data that is sent by the MinibatchGlobalAgg and MinibatchAgg operators to downstream operators may increase. This may cause heavy workloads on the downstream operators.
Valid values
false: If the TTL of state data is enabled and the aggregation result remains unchanged after data is consumed by the MinibatchGlobalAgg and MinibatchAgg operators, duplicate data is not sent to downstream operators. This is the default value.
true: If the TTL of state data is enabled and the aggregation result remains unchanged after data is consumed by the MinibatchGlobalAgg and MinibatchAgg operators, duplicate data is sent to downstream operators.
Example
Create an SQL streaming draft, copy the following test SQL statements, and then deploy the draft.
create temporary table src( a int, b string ) with ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.a.min' = '1', 'fields.a.max' = '1', 'fields.b.length' = '3' ); create temporary table snk( a int, max_length_b bigint ) with ( 'connector' = 'blackhole' ); insert into snk select a, max(CHAR_LENGTH(b)) from src group by a;
In the Parameters section on the Configuration tab of the Deployments page, add the
table.exec.mini-batch.enabled: true
andtable.exec.mini-batch.allow-latency: 2s
configurations to the Other Configuration field to enable the miniBatch optimization feature on Aggregate operators.Start the deployment. On the Status tab, you can view the MinibatchGlobalAggregate operator. Click + on the operator to check that the GlobalGroupAggregate operator does not send data to downstream operators if the aggregation result remains unchanged.
Stop the deployment. In the Parameters section on the Configuration tab of the Deployments page, add the
table.exec.agg.mini-batch.output-identical-enabled: true
configuration to the Other Configuration field.Start the deployment. On the Status tab, you can view the MinibatchGlobalAggregate operator. Click + on the operator to check that the GlobalGroupAggregate operator sends data to downstream operators if the aggregation result remains unchanged.
table.exec.async-lookup.key-ordered-enabled
In business scenarios where JOIN operations are performed on dimension tables to generate a wide table, you can enable the asynchronous mode to achieve higher throughput. The following table describes the data ordering of asynchronous I/O operations based on the setting of the table.exec.async-lookup.output-mode parameter in the JOIN operation of dimension tables and whether an input stream is an update stream.
output-mode\Input data stream | Update stream | Non-update stream |
ORDERED | Ordered mode | Ordered mode |
ALLOW_UNORDERED | Ordered mode | Unordered mode |
If table.exec.async-lookup.output-mode is set to ALLOW_UNORDERED for an update stream, the ordered mode is used to ensure data correctness. However, the throughput performance deteriorates to some extent. To optimize this scenario, the table.exec.async-lookup.key-ordered-enabled parameter is introduced to ensure both the data correctness of update streams and the throughput performance of asynchronous I/O operations. Messages that have the same update key (regarded as the primary key of the change log) in a stream are processed based on the order in which the messages enter an operator.
Ordered mode: In this mode, the order of streams remains unchanged. The order in which result messages are sent is the same as the order in which asynchronous requests are triggered (the order in which messages enter an operator).
Unordered mode: In this mode, result messages are sent as soon as the asynchronous requests are completed. The order of the messages in streams is changed after the messages are processed by the asynchronous I/O operator. For more information, see Async I/O in the Apache Flink document.
Scenarios
The number of messages with the same update key in a stream is small over a period of time. For example, the update key is regarded as the primary key and the data with the same primary key is not frequently updated. In addition, the processing order based on the update key is required when dimension tables are joined. In this case, you can perform optimization by specifying the table.exec.async-lookup.key-ordered-enabled parameter. This can ensure the data processing order based on the update key.
In a Change Data Capture (CDC) stream that contains a primary key, dimension tables are joined to generate a wide table for data writing to the sink. The primary key of the sink is consistent with the primary key of the source. In addition, the join key of the JOIN operation on the dimension tables is inconsistent with the primary key. The join key is considered as the primary key. In this case, you can perform optimization by specifying the table.exec.async-lookup.key-ordered-enabled parameter. This allows the system to shuffle data based on the CDC primary key, which is regarded as the update key. You can also enable the SHUFFLE_HASH join policy to optimize this scenario. In high-concurrency scenarios, compared with this method, optimization by specifying the table.exec.async-lookup.key-ordered-enabled parameter can prevent the generation of the SinkMaterializer operator before data is written to the sink. This prevents potential performance issues caused by the operator, especially the issue that a large amount of state data is generated during the long-term running of the operator. For more information about the SinkUpsertMaterializer operator, see Usage notes.
The join key of the JOIN operation on dimension tables is inconsistent with the primary key. The join key of the dimension tables is regarded as the primary key and a rank operator is available after the JOIN operation is performed. In this case, optimization by specifying the table.exec.async-lookup.key-ordered-enabled parameter allows the system to shuffle data based on the CDC primary key, which is regarded as the update key. You can also enable the SHUFFLE_HASH join policy to optimize this scenario. Compared with this method, optimization by specifying the table.exec.async-lookup.key-ordered-enabled parameter can prevent UpdateFastRank from degrading into RetractRank. For more information about how to change RetractRank to UpdateFastRank, see TopN practices.
Usage notes
If no update key is available in a stream, the entire row of data is used as the key.
The throughput decreases when the same update key is frequently updated in a short period of time. This is because the data with the same update key is processed in a strict order.
Compared with JOIN operations on dimension tables in asynchronous mode before optimization, the Key-Ordered mode provides Keyed State. If you enable or disable the Key-Ordered mode, the state data compatibility is affected.
The optimization takes effect only when you add the
table.exec.async-lookup.output-mode='ALLOW_UNORDERED'
andtable.exec.async-lookup.key-ordered-enabled='true'
configurations to the JOIN operation on a dimension table in Realtime Compute for Apache Flink that uses VVR 8.0.10 or later and the input stream is a non-update stream.
Valid values
false: disables the Key-Ordered mode. This is the default value.
true: enables the Key-Ordered mode.
Example
In this example, a JOIN operation is performed on a Hologres dimension table in asynchronous mode. You must create an SQL streaming draft, copy the following SQL statements, and then deploy the draft.
For more information about the Hologres connector, see Hologres connector.
create TEMPORARY table bid_source( auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR, proc_time as proctime(), WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND ) with ( 'connector' = 'kafka', -- A non-insert-only stream connector. 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE users ( user_id STRING PRIMARY KEY NOT ENFORCED, -- Define the primary key. user_name VARCHAR(255) NOT NULL, age INT NOT NULL ) WITH ( 'connector' = 'hologres', -- The connector that supports the asynchronous lookup feature. 'async' = 'true', 'dbname' = 'holo db name', -- The name of the Hologres database. 'tablename' = 'schema_name.table_name', -- The name of the Hologres table that is used to receive data. 'username' = 'access id', -- The AccessKey ID of your Alibaba Cloud account. 'password' = 'access key', -- The AccessKey secret of your Alibaba Cloud account. 'endpoint' = 'holo vpc endpoint', -- The virtual private cloud (VPC) endpoint of your Hologres instance. ); CREATE TEMPORARY TABLE bh ( auction BIGINT, age int ) WITH ( 'connector' = 'blackhole' ); insert into bh SELECT bid_source.auction, u.age FROM bid_source JOIN users FOR SYSTEM_TIME AS OF bid_source.proc_time AS u ON bid_source.channel = u.user_id;
In the Parameters section on the Configuration tab of the Deployments page, add the
table.exec.async-lookup.output-mode='ALLOW_UNORDERED'
andtable.exec.async-lookup.key-ordered-enabled='true'
configurations to the Other Configuration field.Start the deployment. On the Status tab, you can view KEY_ORDERED:true in the async attribute of the deployment.