全部产品
Search
文档中心

云工作流:轻量消息队列(原 MNS)主题集成和消息发布

更新时间:Sep 19, 2024

本文介绍了如何使用任务步骤的等待回调(waitForCallback)模式集成轻量消息队列(原 MNS)主题,并发布消息到主题。轻量消息队列(原 MNS)主题接收到消息后,调用工作流ReportTaskSucceeded或ReportTaskFailed API回调任务状态。

框架原理

应用部署后执行流程如下:

  1. 执行工作流,任务步骤发布消息到MNS主题。任务步骤的TaskToken会被放入消息体一起发送到主题。

  2. 工作流任务步骤暂停执行,等待任务回调。

  3. 轻量消息队列(原 MNS)主题接收到消息后,将消息和TaskToken通过HTTP推送发送到函数计算FC的函数HTTP触发器,触发函数执行。

  4. 函数计算函数最终获取到TaskToken,并调用ReportTaskSucceeded - 汇报指定的任务执行成功来报告任务状态。

  5. 流程继续执行。

image

部署应用

  1. 登录Serverless工作流控制台

  2. 流程页面,单击创建流程

  3. 创建流程页面,选择示例项目 > 任务步骤编排轻量消息队列(原 MNS)主题模板,单击下一步

    2

  4. 创建应用页面,配置相关信息,创建模板对应的应用,并单击部署

    3

    • 应用名称:自定义参数,同一账号下必须唯一。

    • TopicName:自定义参数,如果对应轻量消息队列(原 MNS)主题不存在会自动创建。

    单击部署后,会显示应用下创建的所有资源。4

  5. 执行工作流。

    执行以下命令。

    {
       "messageBody": "hello world"
    }

    执行成功后,您可以看到执行结果的状态。

    5

应用代码

  1. 编排轻量消息队列(原 MNS)主题的工作流。

    将任务步骤回调的TaskToken封装在消息的MessageBody中,用于后续的回调。outputMappings中读取ReportTaskSucceeded设置的output

    version: v1
    type: flow
    steps:
     - type: task
     name: mns-topic-task
     resourceArn: acs:mns:::/topics/<topic>/messages
     pattern: waitForCallback
     inputMappings:
     - target: messageBody
     source: $input.messageBody
     - target: taskToken
     source: $context.task.token
     outputMappings:
     - target: status
     source: $local.status
     serviceParams:
     MessageBody: $
  2. 回调任务步骤的FC函数。

    读取MessageBody中封装的TaskToken,回调任务状态设置output{"status":"success"}

    def handler(environ, start_response):
     # Get request body
     try:
     request_body_size = int(environ.get('CONTENT_LENGTH',
    0))
     except ValueError:
     request_body_size = 0
     request_body =
    environ['wsgi.input'].read(request_body_size)
     print('Request body:
    {}'.format(request_body))
    
     body = json.loads(request_body)
     message_body_str =
    body['Message']
    
     # Read MessageBody and TaskToken from
    message body
     message_body =
    json.loads(message_body_str)
     task_token =
    message_body['taskToken']
     ori_message_body =
    message_body['messageBody']
     print('Task token: {}\norigin message
    body: {}'.format(task_token, ori_message_body))
    
     # Init fnf client use sts token
     context = environ['fc.context']
     creds = context.credentials
     sts_creds =
    StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
     fnf_client =
    AcsClient(credential=sts_creds, region_id=context.region)
    
     # Report task succeeded to serverless
    workflow
     req =
    ReportTaskSucceededRequest()
     req.set_TaskToken(task_token)
     req.set_Output('{"status":
    "success"}')
     resp =
    fnf_client.do_action_with_exception(req)
     print('Report task response:
    {}'.format(resp))
    
     # Response to http request
     status = '200 OK'
     response_headers = [('Content-type',
    'text/plain')]
     start_response(status,
    response_headers)
     return [b'OK']

更多信息

任务步骤编排轻量消息队列(原 MNS)主题应用的更多信息,请参见task-mns-topics应用代码