All Products
Search
Document Center

CloudFlow:Integrate Simple Message Queue (formerly MNS) topics to publish messages

Last Updated:Oct 22, 2024

This topic describes how to integrate a Simple Message Queue (formerly MNS) (SMQ) topic in a task step that uses the wait-for-callback mode and publish a message to the topic. After the SMQ topic receives the message, the ReportTaskSucceeded or ReportTaskFailed operation is called to call back the task status.

How a task step works

After an application is deployed, the application is executed based on the following steps:

  1. Execute the flow. The task step publishes a message to the SMQ topic. The task token of the task step is placed in the message body and sent to the topic.

  2. The task step of the flow suspends and waits for the task callback.

  3. After the SMQ topic receives the message, the message and the task token are pushed to the HTTP trigger of the function in Function Compute over HTTP to trigger the execution. For more information, see Push messages to an HTTP server.

  4. The function in Function Compute obtains the task token and calls the ReportTaskSucceeded operation to report the task status.

  5. The flow continues.

image

Deploy an application

  1. Log on to the Serverless Workflow console.

  2. On the Flows page, click Create Flow.

  3. On the Create Flow page, select Sample Project and Task SMQ Topics, and then click Next Step.

  4. On the Create Application page, create an application based on the template and click Deploy.

    • Application Name: Specify a name for the application. The name must be unique within the same account.

    • TopicName: Specify a name for the topic. If the specified SMQ topic does not exist, the system automatically creates it.

    After you click Deploy, all resources that you created in the application are displayed.4

  5. Execute the flow.

    Run the following code:

    {
       "messageBody": "hello world"
    }

    After the execution is successful, you can view the status of the execution result.

    5

Application code

  1. Orchestrate a flow of the SMQ topic.

    Encapsulate the task token called back in the task step into the message body of the message for subsequent callback. Read the output specified in ReportTaskSucceeded from outputMappings.

    version: v1
    type: flow
    steps:
     - type: task
     name: mns-topic-task
     resourceArn: acs:mns:::/topics/<topic>/messages
     pattern: waitForCallback
     inputMappings:
     - target: messageBody
     source: $input.messageBody
     - target: taskToken
     source: $context.task.token
     outputMappings:
     - target: status
     source: $local.status
     serviceParams:
     MessageBody: $
  2. Call back the function of the task step that is deployed in Function Compute.

    Read the task token that is encapsulated in the message body and set the output for the callback task status to {"status":"success"}.

    def handler(environ, start_response):
     # Get request body
     try:
     request_body_size = int(environ.get('CONTENT_LENGTH',
    0))
     except ValueError:
     request_body_size = 0
     request_body =
    environ['wsgi.input'].read(request_body_size)
     print('Request body:
    {}'.format(request_body))
    
     body = json.loads(request_body)
     message_body_str =
    body['Message']
    
     # Read MessageBody and TaskToken from
    message body
     message_body =
    json.loads(message_body_str)
     task_token =
    message_body['taskToken']
     ori_message_body =
    message_body['messageBody']
     print('Task token: {}\norigin message
    body: {}'.format(task_token, ori_message_body))
    
     # Init fnf client use sts token
     context = environ['fc.context']
     creds = context.credentials
     sts_creds =
    StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)
     fnf_client =
    AcsClient(credential=sts_creds, region_id=context.region)
    
     # Report task succeeded to serverless
    workflow
     req =
    ReportTaskSucceededRequest()
     req.set_TaskToken(task_token)
     req.set_Output('{"status":
    "success"}')
     resp =
    fnf_client.do_action_with_exception(req)
     print('Report task response:
    {}'.format(resp))
    
     # Response to http request
     status = '200 OK'
     response_headers = [('Content-type',
    'text/plain')]
     start_response(status,
    response_headers)
     return [b'OK']

References

For more information about how to use task steps to orchestrate SMQ topics, visit GitHub.