全部产品
Search
文档中心

实时计算Flink版:Task快速重启配置

更新时间:Sep 12, 2024

本文为您介绍,如何配置Task快速重启,从而降低Failover对作业的影响。

背景信息

重要

此功能为实验性功能,请在生产环境下谨慎使用,如遇问题请及时提交工单和技术支持部门取得联系。

通常,当Flink流作业中的某个Task发生异常时,为了保证数据一致性,同一个PipelineRegion的所有Task都会进行Failover。作业Failover后,Source节点需要从上一个Checkpoint位点开始消费数据。然而,在一些作业中,Task Failover后还需要下载大资源文件或者State数据。如果作业并发很高,所有Task进行一轮Failover的调度时间可能也会比较长。这些都会导致作业出现延迟或阻塞,一段时间内无法正常消费数据等问题,恢复正常运行所需的时间会更长。

Task快速重启配置可以有效缓解上述问题。配置Task快速重启后,当某个Task发生异常时,可以只重启失败的Task。从而避免由于非Source Task异常导致的Source Task回退到上一个Checkpoint位点重新消费数据的情况,减少由于Task取消、重启、追数据导致作业无法正常消费数据的时长。此外,还可以缓解高并发的情况下,所有Task进行一轮Failover的调度时间和初始化给集群造成的压力,从而降低Failover对作业的影响。

目前Task快速重启支持两种一致性语义:APPROXIMATE(不保证数据不丢失和重复)和AT_LEAST_ONCE(保证数据不丢失,不保证数据不重复)。其中APPROXIMATE语义无性能开销,而AT_LEAST_ONCE语义存在性能开销且需要额外使用对象存储OSS。

使用限制

  • 不能在有限数据源的流作业中使用。

  • 无法与Unaligned Checkpoint共同使用。

  • 无法在批作业中使用。

  • 同一个Session集群内无法同时运行未配置和配置了Task快速重启的作业。

  • 若作业中有算子实现了prepareSnapshotPreBarrier方法,或在运行中会发送与Checkpoint相关的信息,则不可使用AT_LEAST_ONCE语义。

注意事项

语义

注意事项

APPROXIMATE

  • 当某个Task进行Failover时,其上游Task将无法继续向该Task发送数据,数据将会造成反压。因此Task Failover期间和之后一段时间出现的反压和numRecordsInPerSecond指标归零都是正常情况,在Task Failover完成后会恢复。

    如果作业仅有Rebalance或Rescale边,且每个边的下游均有多个并发,可以配合Dynamic Rebalance功能,将数据发送给未出错的任务处理,从而实现Failover期间数据处理不受影响。

  • 如果Task异常是由TaskManager异常退出或网络异常导致的,则配置Task快速重启后,作业Failover的耗时可能不会有明显的缩短,极端情况下可能会更长。

  • 由于单个Task Failover期间作业Checkpoint会失败,建议调高容忍Checkpoint失败的次数。

  • 目前,配置Task快速重启后,如果发生作业Failover,将会出现数据丢失或重复。因此请您一定要先确保您的业务可以允许出现数据的丢失或重复,再配置Task快速重启。

  • 启用Task快速重启后,您可以忽略Flink UI页面上显示的Checkpoint一致性语义。在Flink UI页面上,Checkpoint一致性语义固定显示为AT_LEAST_ONCE,但实际为APPROXIMATE,该语义不保证数据不丢失和重复。

AT_LEAST_ONCE

  • 目前的实现下,开启后作业性能会有一定下降。请关注性能和延迟情况,适当扩充资源。

  • 当某个Task进行Failover时,或AT_LEAST_ONCE语义下Failover后回追数据过程中,其上游Task将无法继续向该Task发送数据,数据将会造成反压。因此Task Failover期间和之后一段时间出现的反压和numRecordsInPerSecond指标归零都是正常情况,在Task Failover完成后会恢复。

    如果作业仅有Rebalance或Rescale边,且每个边的下游均有多个并发,可以配合Dynamic Rebalance功能,将数据发送给未出错的任务处理,从而实现Failover期间数据处理不受影响。

  • 如果Task异常是由TaskManager异常退出或网络异常导致的,则配置Task快速重启后,作业Failover的耗时可能不会有明显的缩短,极端情况下可能会更长。

  • 由于单个Task Failover期间作业Checkpoint会失败,建议调高容忍Checkpoint失败的次数。

  • 开启后各个Task将以较高频率保存自身的状态,同时会记录输出的数据,这都会增加作业读写OSS的IO量。不建议在State较大或数据流量较大的作业上开启。

    说明
    • 您可以在EMR控制台查看OSS使用的带宽和QPS。

    • 作业当前总IO量可用各节点 Bytes Received的总和 / 运行时间估算。

  • 某个作业开启该功能导致OSS的IO量增加,可能导致使用同一账号下的OSS的作业受到影响,反之亦然。因此,请同时关注其他作业情况和OSS使用情况。

  • 部分情况下,包括首次Checkpoint完成前,作业异常会回退为正常的Failover,属于正常情况。

  • 启用AT_LEAST_ONCE语义后作业的Checkpoint ID不再为连续数值。

  • 由于作业Failover次数为独立计数,如果出现全局性异常导致大量Task同时Failover,Failover计数会按照发生异常的Task数量相应增加。

操作步骤

  1. 进入Task快速重启配置入口。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 运维中心 > 作业运维页面,单击目标作业名称。

    4. 部署详情页签,单击运行参数配置区域右侧的编辑

  2. 其他配置中,增加如下代码信息。

    • 若使用APPROXIMATE语义:

      individual-task-failover.enabled: enabled_approximate
      shuffle-service-factory.class: org.apache.flink.runtime.io.network.IndividualRecoverableNettyShuffleServiceFactory
    • 若使用AT_LEAST_ONCE语义:

      individual-task-failover.enabled: enabled
      shuffle-service-factory.class: org.apache.flink.runtime.io.network.IndividualRecoverableNettyShuffleServiceFactory
      individual-task-failover.intermediate-checkpointing.interval: 适当的间隔,一般建议为cp周期的1/5到1/10,单位为ms。
      classloader.check-leaked-classloader: false

    如果您的作业使用的是Session模式的集群,则也需要在Session集群的配置中增加上述代码。Session集群配置详情请参见步骤一:创建Session集群

  3. 单击保存

  4. 在页面顶部,单击停止

  5. 启动作业,详情请参见作业启动