全部产品
Search
文档中心

云工作流:EventBridge事件调度

更新时间:Sep 14, 2024

EventBridge事件调度包括云产品事件调度和自定义事件源事件调度。云产品事件调度支持包括弹性计算、存储服务、数据库、容器、大数据处理、可观测性服务及中间件服务在内的几乎所有阿里云官方事件源,而自定义事件源事件调度支持包括SLS、Kafka、RocketMQ、RabbitMQ等事件源的接入。本文介绍如何创建EventBridge事件调度以及工作流调度的高级功能。

创建EventBridge事件调度

云产品事件调度

云产品事件调度能够实现通过阿里云产品事件调度工作流执行,这些事件主要包括云监控事件、审计事件、云服务器事件、阿里云物联网IoT事件和部分云产品运维事件等云产品事件。本文以云服务器为例,介绍如何在控制台创建阿里云产品事件调度。

功能简介

您在工作流控制台提交创建工作流的请求后,根据工作流调度的配置信息,将自动在阿里云官方事件总线default下创建一个事件规则rule-created-by-fnf-随机串。创建完成后,您可以在工作流查看工作流调度信息,也可以在事件总线EventBridge控制台查看自动创建的事件规则信息。当事件源指定类型的事件投递到事件总线时,将调度该工作流调度关联的工作流执行一次。

注意事项

事件总线EventBridge的云服务专用事件总线default上最多只能创建10个事件规则,超过10个之后将无法再创建云产品事件调度。

前提条件

创建云产品事件调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择云服务器ECS,设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    ecs-schedule

    事件类型

    选择自定义事件类型选中全部事件类型。如果选择自定义事件类型,您可以选择云服务器的一个或多个事件类型。

    自定义事件类型ecs:Disk:ConvertToPostpaidCompleted 保留云盘

    事件模式内容

    不支持手动编辑,您在事件类型参数中选择事件类型后,事件模式内容自动填充。关于事件模式的信息,请参见事件模式

    {
     "source": [
     "acs.ecs"
     ],
     "type": [
     "ecs:Disk:ConvertToPostpaidCompleted"
     ]
    }

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

HTTP调度

HTTP请求作为事件源通过事件总线云工作流集成后,通过HTTP调度能够触发关联工作流执行。本文介绍如何在控制台创建HTTP调度。

注意事项

创建的自定义总线以及事件规则数量超过上限后,将无法再创建事件模式的HTTP调度。在单个阿里云账号单个地域维度下,关于创建工作流调度涉及的资源数量的限制,请参见使用限制

前提条件

创建HTTP调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择HTTP/HTTPS 触发,设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    https-schedule

    请求类型

    选择HTTPSHTTPHTTP&HTTPS。

    HTTP

    请求方法

    选择支持的HTTP请求方法。取值说明如下。

    • GET

    • POST

    • PUT

    • DELETE

    • HEAD

    • PATCH

    GET

    安全配置

    选择安全配置的类型。取值说明如下。

    • 无需配置:无需进行安全配置,接收到的所有URL请求均可触发工作流执行。

    • IP网段:输入正确的IP地址或者IP网段。只有使用该IP地址或该IP网段内的IP地址访问的URL请求支持触发工作流执行。最多支持添加5个IP地址或者IP网段。

    • 安全域名:输入安全的域名信息。只有使用该域名访问的URL请求支持触发工作流执行。最多支持添加5个安全域名。

    IP网段:10.45.12.0/24

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

MNS调度

注意事项

  • 作为事件源的轻量消息队列(原 MNS)必须和创建工作流调度的工作流所在的地域相同。

  • 创建的事件流数量超过上限后,将无法再创建事件模式的MNS调度。在单个阿里云账号单个地域维度下,关于创建工作流调度涉及的资源数量的限制,请参见使用限制

前提条件

创建工作流调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择轻量消息队列(原 MNS),设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    mns-schedule

    队列名称

    选择已创建的轻量消息队列(原 MNS)

    MyQueue

    Base64 解码

    如果需要将MNS的数据解码后进行投递,请勾选开启 Base64 解码复选框。

    开启 Base64 解码

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

Kafka调度

注意事项

  • 作为事件源的消息队列Kafka版实例必须和创建工作流调度的工作流在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建Kafka调度。单个阿里云账号单个地域维度下,关于创建工作流调度涉及的资源数量的限制,请参见使用限制

前提条件

创建Kafka工作流调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择消息队列 Kafka 版,设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    kafka-schedule

    Kafka 实例

    选择已创建的消息队列Kafka版实例。

    alikafka_pre-cn-i7m2t7t1****

    Topic

    选择已创建的消息队列Kafka版实例的Topic。

    topic1

    Group ID

    选择已创建的消息队列Kafka版实例的Group ID。

    重要

    请使用独立的Group ID来创建工作流调度,不要与已有的业务混用Group ID,否则会影响已有的消息收发。

    GID_group1

    消费任务并发数

    消费者的并发数量,取值范围为[1,Topic的分区数]。

    2

    消费位点

    选择消息的消费位点,即消息队列Kafka版从事件总线开始拉取消息的位置。取值说明如下。

    • 最早位点:从最早位点开始消费。

    • 最新位点:从最新位点开始消费。

    最新位点

    网络配置

    选择路由消息的网络类型。取值说明如下。

    • 默认网络:默认使用部署Kafka实例时选择的VPC IDvSwitch ID

    • 自建公网:需选择另外的专有网络VPC交换机安全组

    默认网络

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

RocketMQ调度

注意事项

  • 作为事件源的消息队列RocketMQ版的实例必须和创建调度的工作流在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建事件模式的RocketMQ调度。单个阿里云账号单个地域维度下,关于创建工作流调度涉及的资源数量的限制,请参见使用限制

前提条件

创建RocketMQ调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择消息队列 RocketMQ 版,设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    rocketmq-schedule

    RocketMQ 实例

    选择已创建的消息队列RocketMQ版的实例。

    MQ_INST_164901546557****_BX7****

    Topic

    选择已创建的消息队列RocketMQ版实例的Topic。

    topic1

    Tag

    填写消息过滤标签。只有收到包含此处设置的过滤标签字符串的消息时,才会触发工作流执行。

    tag

    Group ID

    选择已创建的消息队列RocketMQ版实例的Group ID。推荐您选择快速创建,自动创建以GID_FNF_TRIGGER_{uuid}_{timestamp} 命名的Group ID。

    重要

    请使用独立的Group ID来创建工作流调度,不要与已有的业务混用Group ID,否则会影响已有的消息收发。

    GID_group1

    消费位点

    选择消息的消费位点,即消息队列RocketMQ版从事件总线开始拉取消息的位置。取值说明如下。

    • 最新位点:从最新位点开始消费。

    • 最早位点:从最早位点开始消费。

    • 指定时间戳:从指定时间戳开始消费。

    最新位点

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

RabbitMQ调度

注意事项

  • 作为触发源的消息队列RabbitMQ版实例必须和创建调度的工作流在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建事件模式的RabbitMQ调度。单个阿里云账号单个地域维度下,关于创建工作流调度涉及的资源数量的限制,请参见使用限制

前提条件

创建RabbitMQ调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择消息队列 RabbitMQ 版,设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    rabbitmq-schedule

    RabbitMQ 实例

    选择已创建的消息队列RabbitMQ版的实例。

    amqp-cn-i7m2l6m2****

    Vhost

    选择已创建的消息队列RabbitMQ版实例的Vhost。

    myhost-1

    Queue

    选择已创建的消息队列RabbitMQ版实例的Queue。

    myqueue-1

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

SLS调度

通过创建日志服务SLS调度(以下简称SLS调度)将日志服务SLS与云工作流连接起来,当有新日志产生时触发工作流执行,对日志进行处理。本文介绍如何在控制台创建SLS调度。

注意事项

  • 作为触发源的日志服务SLS项目必须和创建调度的工作流在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建事件模式的SLS调度。在单个阿里云账号单个地域维度下,关于创建工作流调度涉及的资源数量的限制,请参见使用限制

前提条件

创建SLS调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择日志服务 SLS,设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    sls-schedule

    日志项目

    选择创建的日志服务SLS的Project。

    test-Project

    日志库

    选择创建的日志服务SLS的Logstore

    test-LogStore

    起始消费位点

    起始消费位点,可以选择最早或最新位点,也可以从指定时间开始消费。

    最新位点

    角色配置

    授权事件总线EventBridge使用此角色读取SLS日志内容。

    testRole

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

MQTT调度

云消息队列 MQTT 版作为事件源通过事件总线云工作流调度集成后,通过云消息队列 MQTT 版调度能够触发关联工作流执行,通过云工作流可以对发布到云消息队列 MQTT 版的消息进行自定义处理。本文介绍如何在控制台创建MQTT调度。

注意事项

  • 作为触发源的云消息队列MQTT版实例必须和创建调度的工作流在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建事件模式的MQTT调度。在单个阿里云账号单个地域维度下,关于创建工作流调度涉及的资源数量的限制,请参见使用限制

前提条件

创建MQTT调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择消息队列 MQTT版,设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    mqtt-schedule

    MQTT 实例

    选择已创建的MQTT实例。

    mqtt-xxx

    MQTT Topic

    选择已创建的MQTT实例的Topic。

    testTopic

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

DTS调度

数据传输服务 DTS(Data Transmission Service)作为事件源通过事件总线云工作流集成后,通过DTS调度能够触发关联工作流的执行。本文介绍如何在流控制台创建DTS调度。

注意事项

  • 作为触发源的DTS数据订阅任务必须和创建调度的工作流在相同的地域。

  • 创建的事件流数量超过上限后,将无法再创建事件模式的DTS调度。在单个阿里云账号单个地域维度下,关于创建工作流调度涉及的资源数量的限制,请参见使用限制

前提条件

创建DTS调度

  1. 登录云工作流控制台,然后在顶部菜单栏,选择地域。

  2. 在左侧导航栏,选择工作流列表,然后在工作流列表页面,单击目标工作流。

  3. 在工作流详情页面,单击工作流调度页签,然后单击创建工作流调度

  4. 创建工作流调度面板,调度类型选择数据传输服务 DTS,设置相关配置项,然后单击确定。基础配置项说明如下所示。

    配置项

    操作

    本文示例

    名称

    填写自定义的工作流调度名称。

    dts-schedule

    数据订阅任务

    已创建的数据订阅任务名称。

    dtsqntc2***

    消费组

    已创建的用于消费订阅任务的消费组名称。

    test

    账号

    创建消费组时设置的账号。

    test

    密码

    创建消费组时设置的密码。

    *******

    消费位点

    期望消费第一条数据的时间戳,消费位点必须在订阅实例的数据范围之内。

    2022-06-21 00:00:00

    关于推送配置、重试和死信等高级配置项说明,请参见工作流调度高级功能。创建完成后,您可以在工作流详情页面的工作流调度页签,根据界面提示查看、编辑、删除或禁用/启用工作流。

工作流调度高级功能

推送格式

用于指定Event参数中每个数据元素的格式。

  • CloudEvents:以通用格式描述事件数据的规范,旨在简化不同服务和平台间的事件声明和传输。

  • RawData:只投递CloudEvents中数据字段的内容,不包含CloudEvents格式中的其它元数据信息。

批量推送

开启批量推送后,需设置批量推送条数和推量推送间隔。

  • 批量推送条数:一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求。取值范围为[1,10000]。

  • 批量推送间隔:调用函数的时间间隔,系统每到间隔时间点会将消息聚合后发给云工作流。取值范围为[0,15],单位为秒。0秒表示无需等待,实时投递。

批量推送案例:

  • 案例一:

    您设置的批量推送条数为10条,每条消息大小为1 KB,推量推送间隔为15s。在10s内消息条数累积10条,则立即触发推送,无需等到15s再推送。

  • 案例二:

    您设置的批量推送条数为32条,每条消息大小为1 KB,推量推送间隔为15s。在15s内消息积累10条,则立即触发推送,无需等待消息积累到32条后再推送。

  • 案例三:

    您设置的批量推送条数为20条,每条消息大小为2 KB,推量推送间隔为15s。在10s内消息积累40条(消息累积大小40*2 KB=80 KB),达到工作流Input大小限制数,则立即触发推送,第一批聚合32条消息推送到云工作流,第二批聚合8条消息推送到云工作流

重试策略

事件推送失败时,可按配置的重试策略进行重试。重试策略选项如下。

  • 退避重试:重试3次,每次重试的时间间隔为介于10s~20s的随机值。

  • 指数衰减重试:默认重试策略。重试176次,每次重试的时间间隔按照指数递增至512s,总计重试时间为24小时,即重试时间间隔为1s、2s、4s、8s、16s、32s、64s、128s、256s、512s……512s(共计167次间隔512s)。

容错策略

当发生错误时是否选择容错。

  • 允许容错

    请求失败且重试失败后,跳过此请求,继续处理下一条请求。

  • 禁止容错

    请求失败且重试失败后,消费任务阻塞。

死信队列

仅当开启允许容错时,可配置死信队列。

  • 如果启用死信队列,未被处理或超过重试次数的消息会被投递到目标服务中。云工作流支持的目标服务包括轻量消息队列(原 MNS)、消息队列RocketMQ版、消息队列Kafka版和事件总线EventBridge,您可以根据需求选择不同的队列类型。

  • 如果未启用死信队列,超过重试次数的消息会被丢弃。