This topic describes some key parameters that you may need to configure when you develop an SQL draft. This topic also provides examples on how to configure the parameters.
table.exec.sink.keyed-shuffle
When you write data to a table that has a primary key, the data may be out of order. To resolve this issue, you can configure the table.exec.sink.keyed-shuffle parameter to perform hash shuffling. This ensures that data with the same primary key is distributed to the same subtask of an operator and reduces the probability of the disorder issue.
Precautions
The hash shuffling operation is helpful only when the upstream operator can ensure the valid order of the update records in the primary key field.
If you change the parallelism of an operator for a deployment that runs in expert mode, the following parallelism rules do not apply.
Valid values
AUTO: If the parallelism of the sink operator is not 1 and the parallelism of the sink operator is different from the parallelism of the upstream operator, Realtime Compute for Apache Flink automatically performs hash shuffling on the primary key field when data flows to the sink operator. This is the default value.
FORCE: If the parallelism of the sink operator is not 1, Realtime Compute for Apache Flink forcefully performs hash shuffling on the primary key field when data flows to the sink operator.
NONE: Realtime Compute for Apache Flink does not perform hash shuffling based on the parallelism of the sink operator and the parallelism of the upstream operator.
Examples
AUTO
Create an SQL streaming draft, copy the following test SQL statements to the code editor, and then deploy the draft. In this example, the parallelism of the sink operator is explicitly set to 2.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print', -- You can configure the sink.parallelism parameter to specify the parallelism of the sink operator. 'sink.parallelism'='2' ); INSERT INTO sink SELECT * FROM s1; -- You can also configure the dynamic table options to specify the parallelism of the sink operator. --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
In the Resources section of the Configuration tab on the Deployments page, set the Parallelism parameter to 1. In the Parameters section, do not configure the
table.exec.sink.keyed-shuffle
parameter. Alternatively, add thetable.exec.sink.keyed-shuffle: AUTO
configuration to the Other Configuration field.Start the deployment. On the Status tab, the data connection mode between the sink operator and the upstream operator is HASH.
FORCE
Create an SQL streaming draft, copy the following test SQL statements to the code editor, and then deploy the draft. You do not need to specify the parallelism of the sink operator.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT * FROM s1;
In the Resources section of the Configuration tab on the Deployments page, set the Parallelism parameter to 2. In the Parameters section, add the
table.exec.sink.keyed-shuffle: FORCE
configuration to the Other Configuration field.Start the deployment. On the Status tab, both the parallelism of the sink operator and the parallelism of the upstream operator are 2 and the data connection mode between the sink operator and the upstream operator is changed to HASH.
table.exec.mini-batch.size
This parameter specifies the maximum number of input records that can be cached on a compute node for micro-batch operations. If the number of the cached data records reaches the value of this parameter, final calculation and data output are triggered. This parameter takes effect only when it is used together with the table.exec.mini-batch.enabled and table.exec.mini-batch.allow-latency parameters. For more information about miniBatch-related optimization, see MiniBatch Aggregation and MiniBatch Regular Joins.
Precautions
If you do not explicitly configure this parameter in the Parameters section before a deployment is started, the managed memory is used to cache data in miniBatch processing mode. If one of the following conditions is met, final calculation and data output are triggered:
The compute node receives the watermark message sent by the MiniBatchAssigner operator.
The managed memory is full.
The CHECKPOINT command is received and checkpointing has not been performed.
The deployment is canceled.
Valid values
-1: The managed memory is used to cache data. This is the default value.
A negative value of the LONG type: The processing mechanism is the same as the processing mechanism for the default value.
A positive value of the LONG type: The heap memory is used to cache data. When the number of cached input records reaches the specified value, data output is triggered.
Example
Create an SQL streaming draft, copy the following test SQL statements, and then deploy the draft.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random', 'fields.ts.max-past'='5s', 'fields.b.kind'='random', 'fields.b.min'='0', 'fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b BIGINT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
In the Parameters section of the Configuration tab on the Deployments page, add the
table.exec.mini-batch.enabled: true
andtable.exec.mini-batch.allow-latency: 2s
configurations to the Other Configuration field. Do not configure thetable.exec.mini-batch.size
parameter.Start the deployment. On the Status tab, the topology of the deployment contains the MiniBatchAssigner, LocalGroupAggregate, and GlobalGroupAggregate operators.