全部产品
Search
文档中心

云工作流:使用轻量消息队列(原 MNS)服务集成及回调编排任意任务类型

更新时间:Sep 19, 2024

Serverless 工作流的服务集成功能可以简化用户与云服务的交互。本文示例中,我们使用轻量消息队列(原 MNS)队列集成的功能,结合回调 (callback)完成更多非函数计算的计算任务的编排。

简介

Serverless 工作流的使用场景不仅限于编排函数计算FC(Function Compute)的FaaS函数,也包括广义上任意的计算任务。在上一篇最佳实践文档异步任务回调中介绍了利用函数计算的函数向轻量消息队列(原 MNS)队列中发送消息,自定义环境中的任务执行者 (worker) 接收到消息,结合回调(callback)通知Serverless 工作流任务执行结果。在下文中,将介绍如何使用Serverless 工作流的新功能轻量消息队列(原 MNS)队列。轻量消息队列(原 MNS)队列进一步简化编排自定义任务类型。Serverless 工作流可以直接向的发送消息,省去了发送消息的函数计算的函数的开发、测试和维护,提高了可用性,降低了延迟。使用轻量消息队列(原 MNS)集成相比通过函数计算的函数发送消息到轻量消息队列(原 MNS)的做法有以下好处:轻量消息队列(原 MNS)队列。轻量消息队列(原 MNS)队列进一步简化编排自定义任务类型。

  • 无需为发送消息做函数计算的函数的开发,降低了开发、测试和维护成本。

  • 降低了消息传递的延时、少了一次远程访问、避免了函数计算的冷启动。

  • 去除了一个服务依赖、提高了容错性。

Serverless 工作流未来会推出更多的云服务集成,让不同类型任务组成的工作流编排变得更加容易。

服务集成功能

下图中3个串行的任务由Serverless 工作流负责依次发送至用户指定的轻量消息队列(原 MNS)队列中。消息发送成功之后Serverless 工作流将会在该步骤暂定等待回调。用户在自定义环境中的worker(例如ECS VM、容器、自建机房内的机器)调用轻量消息队列(原 MNS) ReceiveMessage接口拉取消息。收到消息后,worker根据消息内容执行相应的任务。任务结束后,调用Serverless 工作流ReportTaskSucceeded/Failed接口,Serverless 工作流收到任务结果后继续该步骤执行。worker在汇报任务结果成功后删除轻量消息队列(原 MNS)队列消息。

fnf-docs-service-integration

步骤详解

下文将详细介绍使用该功能的步骤。

  1. 准备工作

  2. 填写流程(Flow)

  3. 编写worker

  4. 执行并查看结果

步骤一:准备工作

  1. 通过轻量消息队列(原 MNS)控制台创建轻量消息队列(原 MNS)队列,详细步骤请参见创建队列

  2. Serverless 工作流需要扮演用户在Flow中指定的执行角色 (RAM role)向用户账号下的轻量消息队列(原 MNS)队列发送消息,因此需要为该RAM role添加轻量消息队列(原 MNS) SendMessage相关的权限策略 (policy),细粒度的策略示例如下。如没有细粒度权限控制的需求,可以通过Serverless 工作流控制台Flow RAM role添加系统策略AliyunMNSFullAccess

{
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "mns:SendMessage"
            ],
            "Resource": [
                "acs:mns:$region:$account_id:/queues/$queue_name/messages"
            ]
        }
    ],
    "Version": "1"
}         

步骤二:编写流程 (Flow)

下面的FDL是一个可以向fnf-demo这个轻量消息队列(原 MNS)队列发送消息并且等待回调的任务(Task)步骤。

version: v1
type: flow
steps:
 - type: task
   name: Task_1
   resourceArn: acs:mns:::/queues/fnf-demo/messages  # 表示该任务(Task)步骤会向同区域, 同账号下的轻量消息队列(原 MNS)队列fnf-demo发送消息。pattern: waitForCallback  # 表示该任务步骤在发送轻量消息队列(原 MNS)消息成功后会暂停,直到收到回调。inputMappings:
      - target: task_token
        source: $context.task.token  # 从context对象中获取标识该任务的令牌 (task token)。
      - target: key
        source: value
   serviceParams:  # 服务集成参数。
   MessageBody: $  # 用映射后的input作为要发送消息的内容。
   Priority: 1  # 消息队列的优先级。

步骤三:编写worker

下面的Python 2.7代码模拟一个执行任务的worker,它可以运行在任何可以访问到Serverless 工作流轻量消息队列(原 MNS)服务的环境中。该worker长轮询调用轻量消息队列(原 MNS) ReceiveMessage,当一个带有轻量消息队列(原 MNS)配置的任务步骤进入时,Serverless 工作流会向fnf-demo这个队列中发送一个消息。该worker执行相应任务结束后回调 (callback)Serverless 工作流ReportTaskSucceeded/Failed接口,在任务结果汇报完成后Serverless 工作流会继续当前任务步骤执行,worker可以删除消息。

  1. 在虚拟环境中,安装fnf、mns、Python SDK。

    cd /tmp; mkdir -p fnf-demo-callback; cd fnf-demo-callback
    virtualenv env; source env/bin/activate
    pip install -t . aliyun-python-sdk-core -t . aliyun-python-sdk-fnf -t . aliyun-mns                              
  2. 编写本地任务执行者worker.py代码 。

    import json
    import os
    
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.acs_exception.exceptions import ServerException
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
    from mns.account import Account # pip install aliyun-mns
    from mns.queue import *
    
    def main():
      region = os.environ['REGION']
      account_id = os.environ['ACCOUNT_ID']
      ak_id = os.environ['AK_ID']
      ak_secret = os.environ['AK_SECRET']
    
      queue_name = "fnf-demo"
      fnf_client = AcsClient(
        ak_id,
        ak_secret,
        region
      )
    
      mns_endpoint = "https://%s.mns.%s.aliyuncs.com" % (account_id, region)
      my_account = Account(mns_endpoint, ak_id, ak_secret)
      my_queue = my_account.get_queue("fnf-demo")
      my_queue.set_encoding(False)
      wait_seconds = 10
    
      try:
        while True:
          try:
            print "Receiving messages"
            recv_msg = my_queue.receive_message(wait_seconds)
    
            print "Received message %s, body %s" % (recv_msg.message_id, recv_msg.message_body)
            body = json.loads(recv_msg.message_body)
            task_token = body["task_token"]
            output = "{\"key\": \"value\"}"
            request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
            request.set_Output(output)
            request.set_TaskToken(task_token)
            resp = fnf_client.do_action_with_exception(request)
            print "Report task succeeded finished"
            my_queue.delete_message(recv_msg.receipt_handle)
            print "Deleted message " + recv_msg.message_id
          except MNSExceptionBase as e:
            print(e)
          except ServerException as e:
            print(e)
            if e.error_code == 'TaskAlreadyCompleted':
              my_queue.delete_message(recv_msg.receipt_handle)
              print "Task already completed, deleted message " + recv_msg.message_id
      except ServerException as e:
        print(e)
    
    if __name__ == '__main__':
        main()                               
  3. 开启worker,长轮询fnf-demo队列,收到消息后回调Serverless 工作流汇报结果。

    # 执行worker进程。export REGION={your-region}
    export ACCOUNT_ID={your-account-id}
    export AK_ID={your-ak-id}
    export AK_SECRET={your-ak-secret}
    
    python worker.py           

步骤四:执行并查看结果

Serverless 工作流控制台开始一个流程的执行,配合worker可以看到流程成功执行。

Screen Shot 2019-09-17 at 7.51.12 PM