This topic describes the callback feature of Serverless workflow. Compared with polling, a callback effectively reduces the delay and unnecessary pressure on the server caused by polling. In addition, callback can be used with queues to orchestrate non-Function Compute tasks. In this way, Serverless workflow allows you to orchestrate any type of computing resources.
Overview
Long-running tasks are asynchronously submitted and a task ID is returned. You can use either polling or callback to check whether an asynchronous task ends. The Poll for task status topic describes how to use polling to check whether a task ends. The callback feature of Serverless workflow has the following benefits:
- Eliminate unnecessary delay caused by long polling.
- Eliminate unnecessary pressure on and waste of server resources caused by highly concurrent polling in large-traffic scenarios.
- Orchestrate tasks that do not involve functions in Function Compute, such as processes running in an on-premises data center or an Elastic Compute Service (ECS) instance.
- Automate steps that require manual intervention, such as notifying that a task has been approved.
The following figure shows how to use Message Service (MNS) queues with the callback API to orchestrate user-created resources in Serverless workflow.
Callback usage
In the task step, specify pattern: waitForCallback
. As shown in the following figure, after the task, such as Function Compute call, specified in resourceArn
is submitted, this step stores taskToken
to the context
object of the step and is suspended until Serverless workflow receives that the callback or the specified task times out. When taskToken
is passed to the ReportTaskSucceed
or ReportTaskFailed
operation for callback, this step continues.
- type: task
name: mytask
resourceArn: acs:fc:::services/{fc-service}/functions/{fc-function}
pattern: waitForCallback # Enables the task step to wait for callback after the task is submitted.
inputMappings:
- target: taskToken
source: $context.task.token # Uses taskToken in the context object as an input for the function that is specified in resourceArn.
outputMappings:
- target: k
source: $local.key # Maps output {"key": "value"} in ReportTaskSucceeded to {"k": "value"} and uses the mapped data as the output of this step.
Example
This example consists of the following three steps:
Step 1: Prepare a task function
- Service: fnf-demo.
- Function: echo.
- Runtime environment: Python 2.7.
- Entry point: index.handler.
#! /usr/bin/env python
import json
def handler(event, context):
return event
Step 2: Start a flow
- Flow name: fnf-demo-callback.
- Flow role: a role with the Function Compute Invocation permission.
version: v1
type: flow
steps:
- type: task
name: mytask
resourceArn: acs:fc:::services/fnf-demo/functions/echo
pattern: waitForCallback
inputMappings:
- target: taskToken
source: $context.task.token
outputMappings:
- target: s
source: $local.status
After the flow starts, the mytask
step is suspended in the TaskSubmitted
event and waits for the callback. The event output
contains taskToken
that identifies the callback task.
Step 3: Perform the callback
callback.py
script locally or in any environment where Python can run. Replace {task-token} with the value of the TaskSubmitted
event.cd /tmp
mkdir fnf-demo-callback
cd fnf-demo-callback
# Install Serverless Workflow Python SDK in a virtual environment.
virtualenv env
source env/bin/activate
pip install -t . aliyun-python-sdk-core
pip install -t . aliyun-python-sdk-fnf
# Run the worker process.
export ACCOUNT_ID={your-account-id}; export AK_ID={your-ak-id}; export AK_SECRET={your-ak-secret}
python worker.py {task-token-from-TaskSubmitted}
# The worker.py code:
import os
import sys
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkfnf.request.v20190315 import ReportTaskSucceededRequest
def main():
account_id = os.environ['ACCOUNT_ID']
akid = os.environ['AK_ID']
ak_secret = os.environ['AK_SECRET']
fnf_client = AcsClient(akid, ak_secret, "cn-hangzhou")
task_token = sys.argv[1]
print "task token " + task_token
try:
request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
request.set_Output("{\"status\": \"ok\"}")
request.set_TaskToken(task_token)
resp = fnf_client.do_action_with_exception(request)
print "Report task succeeded finished"
except ServerException as e:
print(e)
if __name__ == '__main__':
main()
mytask
step continues, and the "{"status": "ok"}" output specified in ReportTaskSucceeded
is mapped by outputMappings to "{"s": "ok"}".