本文介绍了如何使用Serverless 工作流提供长流程分布式事务保证,帮助用户聚焦于自身业务逻辑。
简介
复杂的业务场景例如电商网站、酒店、航班预定这类涉及订单管理的应用通常要访问多个远程服务,并且对操作事务性语义(即所有步骤全部成功或全部失败,不存在中间状态)有较高要求。在流量较小、数据存储集中的应用中,事务性可以通过关系型数据库提供的ACID特性满足。然而在大流量场景下,为了高可用和可扩展性,业务通常选择向微服务的分布式架构方向演进。在这样的架构中提供多步骤事务性的保证通常需要引入队列和数据库来持久化消息以及展现流程状态,这类系统的开发和运维会给业务方带来额外的成本和负担。而使用Serverless 工作流提供长流程分布式事务保证会帮您解决这些问题。
场景描述
假设某应用为其用户提供预定火车票、航班和酒店的功能,要求三个步骤保证事务性。该功能需要三个远程调用实现(例如预定火车票需要调用12306接口),如果三个调用都成功则该订单成功。然而实际上任何一个远程调用都有可能会失败,因此该应用需要对不同的失败场景做出相应的补偿逻辑,回退已完成操作。如下图所示:
- 如果预定火车票(BuyTrainTicket)成功,而预定航班(ReserveFlight)失败,则需要取消已经购买的火车票(CancelTrainTicket),并告知您订单失败。
- 如果预定火车票(BuyTrainTicket)和预定航班(ReserveFlight)均成功,但是预订酒店(ReserveHotel)失败,则需要取消已经预定的航班(CancelFlight)和火车票(CancelTrainTicket),并告知您订单失败。
Serverless 工作流实现
下文的示例将FC函数编排成一个Serverless 工作流流程从而实现了一个可靠的多步骤长流程,该示例分为3步:
步骤1:创建FC函数
本步骤是模拟场景案例中提示的三个操作即预订火车票、预订航班及预定酒店。
- Service: fnf-demo
- Function: Operation
Operation函数模拟各操作(例如预定航班、预定酒店)的实现,根据输入决定该操作执行结果(成功或失败)。
import json
import logging
import uuid
def handler(event, context):
evt = json.loads(event)
logger = logging.getLogger()
id = uuid.uuid4()
op = "operation"
if 'operation' in evt:
op = evt['operation']
if op in evt:
result = evt[op]
if result == False:
logger.info("%s failed" % op)
exit()
logger.info("%s succeeded, id %s" % (op, id))
return '{"%s":"success", "%s_txnID": "%s"}' % (op, op, id)
步骤2:创建流程
使用Serverless工作流控制台创建下面的流程。
- 配置流程RAM角色
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "fnf.aliyuncs.com" ] } } ], "Version": "1" }
- 流程定义
version: v1 type: flow steps: - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: BuyTrainTicket inputMappings: - target: operation source: buy_train_ticket - target: buy_train_ticket source: $input.buy_train_ticket_result catch: - errors: - FC.Unknown goto: OrderFailed - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: ReserveFlight inputMappings: - target: operation source: reserve_flight - target: reserve_flight source: $input.reserve_flight_result catch: # 捕获ReserveFlight task抛出的FC.Unknown错误,跳转到CancelTrainTicket。 - errors: - FC.Unknown goto: CancelTrainTicket - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: ReserveHotel inputMappings: - target: operation source: reserve_hotel - target: reserve_hotel source: $input.reserve_hotel_result retry: # 对FC.Unknown类型的错误最多指数退避重试3次,初始间隔1s,后续间隔=上次间隔*2。 - errors: - FC.Unknown intervalSeconds: 1 maxAttempts: 3 multiplier: 2 catch: # 捕获ReserveHotel task抛出的FC.Unknown错误,跳转到CancelFlight。 - errors: - FC.Unknown goto: CancelFlight - type: succeed name: OrderSucceeded - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: CancelFlight inputMappings: - target: operation source: cancel_flight - target: reserve_flight_txnID source: $local.reserve_flight_txnID - type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: CancelTrainTicket inputMappings: - target: operation source: cancel_train_ticket - target: reserve_flight_txnID source: $local.reserve_flight_txnID - type: fail name: OrderFailed
步骤3:执行并查看结果
在控制台上对创建好的流程(Flow)开始一个新的执行(Execution)。StartExecution API要求传入JSON格式的输入。下面的JSON对象可以模拟每个步骤的成功或失败(例如"reserve_hotel_result":"fail"代表模拟预定酒店这步失败)。StartExecution是一个异步API,调用结束后,Serverless 工作流会返回一个执行名字用来查询流程执行状态。
{
"buy_train_ticket_result":"success",
"reserve_flight_result":"success",
"reserve_hotel_result":"fail"
}
流程执行开始后,在Serverless 工作流控制台可以查看流程的执行过程和结果。从步骤信息页签您可以看到,由于"reserve_hotel_result":"fail"
和ReserveHotel
函数调用失败,Serverless 工作流按照流程定义,依次取消航班(CancelFlight)、取消火车票(CancelTrainTicket)。Serverless 工作流每个步骤转换有持久化的保证,因此网络中断或进程崩溃等失败场景不会影响流程事务性的保证。
流程执行会产生执行历史事件(event),这些事件可以通过控制台、SDK或CLI调用GetExecutionHistory
API查询。
错误处理和重试
- 上面示例中的预定航班、预定酒店等远程调用都有可能受到网络或服务错误等原因导致调用失败,而增加对瞬时错误的重试可以提高订单流程成功率。Serverless 工作流在任务(Task)类型的步骤(Step)自带重试功能,如预定酒店这个步骤用下面的写法可以实现对FC.Unknown类型的错误指数退避。假设重试到达最大次数后
ReserveHotel
都无法成功,按照该步骤中catch
的定义,ReserveHotel函数抛出的FC.Unknown错误会被捕获并将跳转到CancelFlight
执行定义好的补偿逻辑。- type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: ReserveHotel inputMappings: - target: operation source: reserve_hotel retry: # 对FC.Unknown类型的错误最多指数退避重试3次,初始间隔1s,后续间隔=上次间隔*2。 - errors: - FC.Unknown intervalSeconds: 1 maxAttempts: 3 multiplier: 2 catch: # 捕获ReserveHotel task抛出的FC.Unknown错误,跳转到CancelFlight。 - errors: - FC.Unknown goto: CancelFlight
- 下图可以看到加入重试之后预订酒店(ReserveHotel)任务执行了多次直到最大重试数。
步骤间的数据传递
- 预定酒店失败后需要取消航班和火车票,这两部分别需要用到预定航班和预定火车票返回的交易ID(txnID),下面的
inputMapping
对象描述了如何将之前步骤产生的输出传入CancelFlight
这个步骤中。- type: task resourceArn: acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation name: CancelFlight inputMappings: - target: operation source: cancel_flight - target: reserve_flight_txnID source: $local.reserve_flight_txnID
- 流程执行各步骤结束的输出都会被放在
StepExited
事件详情(EventDetail)的local对象中。{ "input":{ "operation":"reserve_hotel", "reserve_hotel_result":"fail" }, "local":{ "buy_train_ticket":"success", "buy_train_ticket_txnID":"d37412b3-bb68-4d04-9d90-c8c15643d45e", "reserve_flight_result":"success", "reserve_flight_txnID":"024caecf-cfa3-43a6-b561-9b6fe0571b55" }, "resourceArn":"acs:fc:{region}:{accountID}:services/fnf-demo/functions/Operation", "cause":"{\"errorMessage\":\"Process exited unexpectedly before completing request (duration: 12ms, maxMemoryUsage: 9.18MB)\"}", "error":"FC.Unknown", "retryCount":3, "goto":"CancelFlight" }
- 结合上面的
EventDetail
和inputMappings
的映射之后,传入到CancelFlight
步骤的输入变成如下JSON对象,这样CancelFlight
函数的输入会包含reserve_flight_txnID
字段。"input":{ "operation":"cancel_flight", "reserve_flight_txnID":"024caecf-cfa3-43a6-b561-9b6fe0571b55" }