本文介绍SQL开发中涉及的重要参数说明和使用示例。
table.exec.sink.keyed-shuffle
为解决向带有主键的表中写入数据时出现的分布式乱序问题,您可以通过table.exec.sink.keyed-shuffle参数来进行Hash Shuffle操作,这将确保相同主键的数据被发送到算子的同一个并发,减少分布式乱序问题。
注意事项
仅在上游算子能够确保更新记录在主键字段上的顺序性时,Hash Shuffle操作才起作用;否则,Hash Shuffle操作不能解决问题。
在作业专家模式时,修改算子并发度,不适用下面的并发度判定规则。
取值说明
AUTO(默认值):表示在Sink的并发度不为1,且Sink的并发度与上游算子不同时,当数据流向Sink时,Flink会自动对主键字段进行Hash Shuffle操作。
FORCE:表示在Sink并发度不为1时,当数据流向Sink时,Flink会强制对主键字段进行Hash Shuffle操作。
NONE:表示Flink不会根据Sink和上游算子的并发度信息进行Hash Shuffle操作。
使用示例
参数值为AUTO
新建SQL流作业,复制如下测试SQL(显式指定Sink并发度为2),部署作业。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print', --您可以通过sink.parallelism参数直接指定Sink并发度。 'sink.parallelism'='2' ); INSERT INTO sink SELECT * FROM s1; --您也可以通过动态表选项的方式指定Sink并发度。 --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
在作业运维页面的部署详情页签资源配置区域,将并发度设置为1,在运行参数配置区域其他配置中,不设置
table.exec.sink.keyed-shuffle
参数或显式添加table.exec.sink.keyed-shuffle: AUTO
(两者效果一致)。启动作业。在状态总览页签下,您可以看到Sink节点和上游的数据连接方式为HASH。
参数值为FORCE
新建SQL流作业,复制如下测试SQL(不再显式指定Sink并发度),部署作业。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT * FROM s1;
在作业运维页面的部署详情页签资源配置区域,将并发度设置为2。在运行参数配置区域其他配置中添加
table.exec.sink.keyed-shuffle: FORCE
。启动作业后,在状态总览页签下,您可以看到Sink节点和上游节点的并发度都为2,并且数据连接方式变成了HASH。
table.exec.mini-batch.size
该参数控制了相关的计算节点进行微批操作所缓存的最大数据条数,达到该值后触发最终的计算和数据输出。该参数只有与table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency同时使用时才会生效。有关MiniBatch相关的优化请参见MiniBatch Aggregation和MiniBatch 双流Join。
注意事项
在作业启动前,如果未在运行参数配置区域显式设置该参数,在MiniBatch处理模式下,将使用Managed Memory缓存数据,在以下几种条件下都会触发最终计算和数据输出:
收到MiniBatchAssigner节点发送的watermark消息
Managed Memory已满
进行Checkpoint前
作业停止时
取值说明
-1(默认值):表示使用Managed Memory缓存数据。
其他Long类型的负值:同默认设置。
其他Long类型的正值:表示使用Heap Memory来缓存数据。当缓存的数据量达到N条时,会自动触发输出操作。
使用示例
新建SQL流作业,复制如下测试SQL,部署作业。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random', 'fields.ts.max-past'='5s', 'fields.b.kind'='random', 'fields.b.min'='0', 'fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b BIGINT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
在作业运维页面的部署详情页运行参数配置区域其他配置中,设置
table.exec.mini-batch.enabled: true
和table.exec.mini-batch.allow-latency: 2s
参数,不设置table.exec.mini-batch.size
(取默认值-1)。启动作业。在状态总览页签下,您可以看到作业包含了MiniBatchAssigner节点、LocalGroupAggregate节点和GlobalGroupAggregate节点。
table.exec.agg.mini-batch.output-identical-enabled
在开启State TTL的情况下,MinibatchGlobalAgg节点和MinibatchAgg节点在消费数据后,如果聚合结果未发生变化,默认将不会向下游发送重复的数据,这可能导致下游的有状态节点由于长时间未收到上游发送的数据,自身State过期的问题。该参数控制了开启StateTTL且聚合结果未发生变化的情况下,是否仍然向下游发送重复数据。您可以设置该参数为true
,使得MinibatchGlobalAgg和MinibatchAgg两个节点在这种情况下,仍然下发数据。如果您的作业聚合结果变化周期小于State TTL设置时间,则无需手动设置此参数。具体社区Issues详情请参考FLINK-33936。
注意事项
该参数仅在VVR-8.0.8及以上版本生效,在VVR-8.0.8前的版本,其行为等同于取值为false的行为。
当取值从false修改为true时,可能会导致MinibatchGlobalAgg节点和MinibatchAgg节点向下游发送的数据量增加,对下游算子造成压力。
取值说明
false(默认值):表示在开启State TTL的情况下,MinibatchGlobalAgg节点和MinibatchAgg节点在消费数据后,如果聚合结果未发生变化,就不向下游下发数据。
true:表示在开启State TTL的情况下,MinibatchGlobalAgg节点和MinibatchAgg节点在消费数据后,如果聚合结果未发生变化,仍然向下游发送更新的(重复的)数据。
使用示例
新建SQL流作业,复制如下测试SQL,部署作业。
create temporary table src( a int, b string ) with ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.a.min' = '1', 'fields.a.max' = '1', 'fields.b.length' = '3' ); create temporary table snk( a int, max_length_b bigint ) with ( 'connector' = 'blackhole' ); insert into snk select a, max(CHAR_LENGTH(b)) from src group by a;
在作业运维页面的部署详情页运行参数配置区域其他配置中,设置
table.exec.mini-batch.enabled: true
和table.exec.mini-batch.allow-latency: 2s
参数,启用Minibatch Aggregate优化。启动作业。在状态总览页签下,您可以看到作业包含了MinibatchGlobalAggregate节点,点击该节点上的“+”号,可以观察到GlobalGroupAggregate节点在聚合结果不变的情况下,不向下游发送数据。
停止该作业,在作业运维页面的部署详情页运行参数配置区域其他配置中,添加参数
table.exec.agg.mini-batch.output-identical-enabled: true
启动作业。在状态总览页签下,您可以看到作业包含了MinibatchGlobalAggregate节点,点击该节点上的“+”号,可以观察到GlobalGroupAggregate节点在聚合结果不变的情况下,仍然向下游发送数据。