This topic describes how to integrate a Simple Message Queue (formerly MNS) (SMQ) topic in a task step that uses the wait-for-callback mode and publish a message to the topic. After the SMQ topic receives the message, the ReportTaskSucceeded or ReportTaskFailed operation is called to call back the task status.
How a task step works
After an application is deployed, the application is executed based on the following steps:
Execute the flow. The task step publishes a message to the SMQ topic. The
task token
of the task step is placed in the message body and sent to the topic.The task step of the flow suspends and waits for the task callback.
After the SMQ topic receives the message, the message and the
task token
are pushed to the HTTP trigger of the function in Function Compute over HTTP to trigger the execution. For more information, see Push messages to an HTTP server.The function in Function Compute obtains the
task token
and calls the ReportTaskSucceeded operation to report the task status.The flow continues.
Deploy an application
Log on to the Serverless Workflow console.
On the Flows page, click Create Flow.
On the Create Flow page, select Sample Project and Task SMQ Topics, and then click Next Step.
On the Create Application page, create an application based on the template and click Deploy.
Application Name: Specify a name for the application. The name must be unique within the same account.
TopicName: Specify a name for the topic. If the specified SMQ topic does not exist, the system automatically creates it.
After you click Deploy, all resources that you created in the application are displayed.
Execute the flow.
Run the following code:
{ "messageBody": "hello world" }
After the execution is successful, you can view the status of the execution result.
Application code
Orchestrate a flow of the SMQ topic.
Encapsulate the
task token
called back in the task step into themessage body
of the message for subsequent callback. Read theoutput
specified in ReportTaskSucceeded fromoutputMappings
.version: v1 type: flow steps: - type: task name: mns-topic-task resourceArn: acs:mns:::/topics/<topic>/messages pattern: waitForCallback inputMappings: - target: messageBody source: $input.messageBody - target: taskToken source: $context.task.token outputMappings: - target: status source: $local.status serviceParams: MessageBody: $
Call back the function of the task step that is deployed in Function Compute.
Read the
task token
that is encapsulated in themessage body
and set theoutput
for the callback task status to{"status":"success"}
.def handler(environ, start_response): # Get request body try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) except ValueError: request_body_size = 0 request_body = environ['wsgi.input'].read(request_body_size) print('Request body: {}'.format(request_body)) body = json.loads(request_body) message_body_str = body['Message'] # Read MessageBody and TaskToken from message body message_body = json.loads(message_body_str) task_token = message_body['taskToken'] ori_message_body = message_body['messageBody'] print('Task token: {}\norigin message body: {}'.format(task_token, ori_message_body)) # Init fnf client use sts token context = environ['fc.context'] creds = context.credentials sts_creds = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token) fnf_client = AcsClient(credential=sts_creds, region_id=context.region) # Report task succeeded to serverless workflow req = ReportTaskSucceededRequest() req.set_TaskToken(task_token) req.set_Output('{"status": "success"}') resp = fnf_client.do_action_with_exception(req) print('Report task response: {}'.format(resp)) # Response to http request status = '200 OK' response_headers = [('Content-type', 'text/plain')] start_response(status, response_headers) return [b'OK']
References
For more information about how to use task steps to orchestrate SMQ topics, visit GitHub.