分布式模式在分布式环境中执行Map状态迭代,从而提高处理效率和容量,适用于处理大规模数据或并行计算。
基本概念
分布式模式:当Map ProcessorConfig的ExecutionMode字段被设置为Express时,Map状态的执行会以分布式模式进行。在此模式下,迭代(Map)状态将每次迭代作为Express模式的子工作流执行。
子工作流执行:当迭代(Map)状态被设置为分布式模式时,Map迭代的执行会以子工作流执行的形式进行,此时子工作流的定义为Map Processor的状态转换定义,接收的输入为Map迭代元素。
故障容忍策略:对于Map执行,当子工作流执行失败时,Map状态会以失败状态结束。对于迭代数量较多的Map执行,可以通过配置错误容忍使Map执行在子工作流执行失败时继续执行。
分布式模式包含以下字段:
字段 | 类型 | 是否必选 | 描述 | 示例值 |
Name | string | 是 | 状态名称。 | my-state-name |
Description | string | 否 | 状态描述。 | describe it here |
Type | string | 是 | 状态类型。 | Map |
InputConstructor | map[string]any | 否 | 输入构造器。 | 请参见输入和输出 |
ItemsPath | string | 是 | 用于提取输入数组的表达式。 | 请参见ItemsPath |
ItemBatcher | ItemBatcher | 否 | 开启ItemBatcher可以将多个Item聚合为Item Batch作为输入传递给子工作流执行。 | 请参见ItemBatcher |
ItemReader | ItemReader | 否 | 支持从OSS读取数据。 | 请参见ItemReader |
ItemConstructor | ItemConstructor | 否 |
| |
ResultWriter | ResultWriter | 否 | 支持将子工作流执行的信息写入到指定的OSS目录。 | 请参见ResultWriter |
MaxConcuccency | int | 否 | 支持配置子工作流执行并发。 | 40 |
MaxItems | MaxItems | 否 | 支持配置Map最多处理的Item数量。 | 请参见MaxItems |
ToleratedFailurePercentage | ToleratedFailurePercentage | 否 | 支持按照百分比配置失败容忍策略。 | |
ToleratedFailureCount | ToleratedFailureCount | 否 | 支持按照数量配置失败容忍策略。 | |
Processor | Processor | 是 | 迭代处理器。 | 请参见Processor |
ProcessorConfig | ProcessorConfig | 是 | 处理器配置。 | |
OutputConstructor | map[string]any | 否 | 输出构造器。 | 请参见状态输出构造器 |
Next | string | 否 | 当前状态的下一状态。当End取值为true时,无需指定。 | my-next-state |
End | bool | 否 | 是否为当前作用域的终结节点。 | true |
Retry | Retry | 否 | 用于定义错误重试策略。 | 请参见 错误处理 |
Catch | Catch | 否 | 用于定义错误捕获策略。 | 请参见 错误处理 |
使用限制
对于分布式模式执行存在以下配额限制,如果默认配额不能满足您的需求,您可以通过工单提交配额提升申请。
配额名称 | 含义 | 默认值 |
MaxOpenMapRun | 单个账户在单个地域最多允许同时执行的分布式Map数量。 | 10 |
MaxConcurrency | 单个MapRun支持的最大并发。 | 100 |
MaxItems | 单个MapRun最多支持读取的Item数量。 | 10000 |
分布式模式的关键字段
ItemsPath
用于提取输入数组的表达式。该表达式执行后返回JSON Array,则可以进行迭代,将其中每个元素传入ItemProcessor进行处理;可使用表达式变量$Context和$Input,示例如下:
$Input.FieldA
Processor
迭代处理器。Processor包含的字段如下表所示。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
States | array | 是 | 内部嵌套的状态数组。 |
|
StartAt | string | 是 | 内部嵌套状态数组的执行起点。 | my start task |
ProcessorConfig
处理器配置。ProcessorConfig包含的字段如下表所示。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
ExecutionMode | string | 是 | 执行模式。 | Express |
ItemReader
Map分布式模式通过支持从OSS读取数据输入的方式,支持更大的数据输入。ItemReader包含的字段如下表所示。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
SourceType | string | 是 | 来源类型,可选值:OSS_CSV、OSS_JSON_LIST、OSS_OBJECTS、OSS_INVENTORY_FILES。 | OSS_CSV |
SourceParameters | string | 否 | 来源参数。 | |
ReaderConfig | ReaderConfig | 否 | 阅读器配置。 | 请参见ReaderConfig |
SourceParameters
来源参数。SourceParameters包含的字段如下表所示。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
Bucket | string | 否 | 文件所在的Bucket名称。 | example-bucket |
ObjectName | string | 否 | 对象名称。 | object_name_1 |
Prefix | string | 否 | 限定返回的Bucket名称必须以prefix作为前缀。如果不设定,则不过滤前缀信息。 默认值:无 | example-prefix |
ReaderConfig
阅读器配置。ReaderConfig包含的字段如下表所示。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
CSVHeaders | []string | 否 | CSV文件中,第一行所包含的列标题或字段名称。 | ColA,ColB,ColC |
从OSS读取数据的方式包括以下几个方法:
存储于OSS中的CSV文件
使用Map状态从一个CSV文件中读取数据。假设您有一个
example-object.csv
文件存储在OSS的存储桶example-bucket
中,可参考以下状态机定义示例。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_CSV SourceParameters: Bucket: example-bucket ObjectName: example-object.csv End: true
以下示例显示了
example-object.csv
文件的内容。ColA,ColB,ColC col_a_1,col_b_1,col_c_1
以下示例显示了子工作流执行接收的输入。
{ "ColA": "col_a_1", "ColB": "col_b_1", "ColC": "col_c_1", }
存储于OSS中的JSON文件
重要存储于OSS中的JSON文件所包含的JSON内容必须是一个JSON Array。
用于从指定的OSS中读取JSON列表。假设您有一个
example-object.json
文件存储在OSS存储桶example-bucket
中,可参考以下状态机定义示例。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_JSON_LIST SourceParameters: Bucket: example-bucket ObjectName: example-object.json End: true
以下示例显示了
example-object.json
文件的内容。[ { "key_1": "value_1" } ]
以下示例显示了子工作流执行接收的输入。
{ "key_1": "value_1", }
列举OSS中的Objects
用于从指定的OSS中读取对象。假设您已将Objects存储在OSS存储桶
example-bucket
中,命名的前缀为example-prefix
,可参考以下状态机定义示例。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_OBJECTS SourceParameters: Bucket: example-bucket Prefix: example-prefix End: true
以下目录结构示意图表明
example-prefix/object_1
文件存储在OSS存储桶example-bucket
中。example-bucket ├── example-prefix/object_1
以下示例显示了子工作流执行接收的输入。
{ "XMLName": { "Space": "", "Local": "Contents" }, "Key": "example-prefix/object_1", "Type": "Normal", "Size": 268435, "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"", "Owner": { "XMLName": { "Space": "", "Local": "" }, "ID": "", "DisplayName": "" }, "LastModified": "2024-01-01T01:01:01Z", "StorageClass": "Standard", "RestoreInfo": "" }
存储于OSS中的Inventory
使用Map状态从指定的OSS存储桶和文件中读取数据。假设您已将清单文件
manifest.json
存储在example-bucket
的OSS存储桶中,路径是inventory/2024-01-01T01-01Z/manifest.json
,可参考以下状态机定义示例。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_INVENTORY_FILES SourceParameters: Bucket: example-bucket ObjectName: inventory/2024-01-01T01-01Z/manifest.json ItemConstructor: Key.$: $Item.Key End: true
以下示例显示了清单文件的内容。
"example-bucket","object_name_1" "example-bucket","object_name_2"
以下示例显示第一个子工作流执行接收的输入。
{ "Bucket": "example-bucket", "Key": "object_name_1" }
ItemBatcher
开启ItemBatcher可以将多个Item聚合为Item Batch作为输入传递给子工作流执行。ItemBatcher包含的字段如下表所示。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
MaxItemsPerBatch | int | 否 | 支持按照数量对Item进行聚合。 | |
MaxInputBytesPerBatch | int | 否 | 支持按照聚合后输入的大小对Item进行聚合。 | |
BatchInput | map[string]any | 否 | 支持在聚合Items时添加额外的输入。 | 请参见BatchInput |
MaxItemsPerBatch
MaxItemsPerBatch支持按照数量对Item进行聚合。以下是MaxItemsPerBatch的使用示例。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxItemsPerBatch: 2
End: true
以下示例是状态机执行的输入。
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"},
{"key_4":"value_4"},
{"key_5":"value_5"}
]
}
以下示例显示了子工作流执行接收的输入。
# execution-1
# 第1个子工作流输入示例
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"}
]
}
# execution-2
# 第2个子工作流输入示例
{
"Items": [
{"key_1":"value_3"},
{"key_2":"value_4"}
]
}
# execution-3
# 第3个子工作流输入示例
{
"Items": [
{"key_1":"value_5"},
]
}
MaxInputBytesPerBatch
MaxInputBytesPerBatch支持按照聚合后输入的大小对Item进行聚合,MaxInputBytesPerBatch可以确保聚合后的输入不超过指定的大小。以下是MaxInputBytesPerBatch的使用示例。
由于ItemBatcher会添加额外的Items key,输入的整体大小计算包含了额外附加的Key和附加的BatchInput。
MaxInputBytesPerBatch指定的数值的单位为Byte。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 50
End: true
以下示例是状态机执行的输入。
{
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下示例显示了子工作流执行接收的输入。
# execution-1
# 第1个子工作流输入示例
{
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# 第2个子工作流输入示例
{
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# 第3个子工作流输入示例
{
"Items":[
{"Key":5}
]
}
BatchInput
BatchInput支持在聚合Items时添加额外的输入。以下是BatchInput的使用示例。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 70
BatchInput:
InputKey.$: $Input.Key
End: true
以下示例是状态机执行的输入。
{
"Key":"value",
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下示例显示了子工作流执行接收的输入。
# execution-1
# 第1个子工作流输入示例
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# 第2个子工作流输入示例
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# 第3个子工作流输入示例
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":5}
]
}
ItemConstructor
ItemConstructor支持对Item进行单独构造,ItemConstructor支持通过$Item引用要被构造的Item。以下是ItemConstructor的使用示例。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 200
BatchInput:
InputKey.$: $Input.Key
ItemConstructor:
ConstructedKey.$: $Item.Key
InputKey.$: $Input.Key
End: true
以下示例是状态机执行的输入。
{
"Key":"value",
"Items":[
{"Key":1},
{"Key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下示例显示了子工作流执行接收的输入。
# execution-1
# 第1个子工作流输入示例
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 1
},
{
"InputKey": "value",
"ConstructedKey": 2
},
{
"InputKey": "value",
"ConstructedKey": 3
}
]
}
# execution-2
# 第2个子工作流输入示例
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 4
},
{
"InputKey": "value",
"ConstructedKey": 5
}
]
}
ResultWriter
ResultWriter支持将子工作流执行的信息写入到指定的OSS目录。ResultWriter包含的字段如下表所示。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
Parameters | string | 是 | 请求参数。 | 请参见Parameters |
Parameters
请求参数。Parameters包含的字段如下表所示。
字段 | 类型 | 是否必选 | 描述 | 示例值 |
Bucket | string | 是 | 文件所在的Bucket名称。 | example-bucket |
Prefix | string | 是 | 限定返回的Bucket名称必须以prefix作为前缀。如果不设定,则不过滤前缀信息。 默认值:无 | example-prefix |
以下是ResultWriter的使用示例。
工作流状态的输入输出不能超过大小限制,对于迭代数量较多的Map执行,可能会出现输出内容超过大小限制的问题,建议通过ResultWriter配置将Map的输出写入OSS。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
ItemConstructor:
Key.$: $Item.Key
FailedValue.$: $Input.FailedValue
ToleratedFailurePercentage: 30
Processor:
StartAt: Choice
States:
- Type: Choice
Name: Choice
Branches:
- Condition: $Input.Key > $Input.FailedValue
Next: Fail
Default: Succeed
- Type: Succeed
Name: Succeed
End: true
- Type: Fail
Name: Fail
Code: MockError
End: true
ResultWriter:
Parameters:
Bucket: example-bucket
Prefix: example-prefix/
End: true
以下示例是状态机执行的输入。
{
"FailedValue": 4,
"Items": [
{
"Key": 1
},
{
"Key": 2
},
{
"Key": 3
},
{
"Key": 4
},
{
"Key": 5
}
]
}
以下示例显示了3个JSON文件内容。
# 以下是manifest.json文件的内容,存储路径为example-prefix/map-run-name/manifest.json
{
"DestinationBucket": "example-bucket",
"MapRunName": "map-run-name",
"ResultFiles": {
"FAILED": [
{
"ObjectName": "example-prefix/map-run-name/FAILED_0.json",
"Size": 262
}
],
"SUCCEED": [
{
"ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
"Size": 1057
}
]
}
}
# 以下是FAILED_0.json文件的内容,存储路径为example-prefix/map-run-name/FAILED_0.json
[
{
"ExecutionName": "execution-name-5",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":5}",
"Output": "{\"ErrorCode\":\"MockError\"}",
"Status": "Failed",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
# 以下是SUCCEED_0.json文件的内容,存储路径为example-prefix/map-run-name/SUCCEED_0.json
[
{
"ExecutionName": "execution-name-1",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":1}",
"Output": "{\"FailedValue\":4,\"Key\":1}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-2",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":2}",
"Output": "{\"FailedValue\":4,\"Key\":2}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-3",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":3}",
"Output": "{\"FailedValue\":4,\"Key\":3}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-4",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":4}",
"Output": "{\"FailedValue\":4,\"Key\":4}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
MaxItems
MaxItems支持配置Map最多处理的Item数量。例如对于包含了10000个Object的特定OSS Prefix,如果配置MaxItems为1000,则Map只会从OSS加载1000个Object。
MaxConcurrency
MaxConcurrency支持配置子工作流执行并发。例如对于包含了10000个Item的Map执行,当MaxConcurrency配置为100时,Map同时最多执行100个子工作流。
ToleratedFailurePercentage
ToleratedFailurePercentage支持按照百分比配置失败容忍策略,例如当Item总数为10000时,配置ToleratedFailurePercentage为10可以使 Map 容忍最多 1000 个 Item 处理失败。
ToleratedFailureCount
ToleratedFailureCount支持按照数量配置失败容忍策略,例如当Item总数为10000时,配置ToleratedFailureCount为10可以使Map容忍最多10个Item处理失败。
使用示例
如下工作流定义包含了一个分布式模式的Map状态,它从上游状态读取输入,并通过$Input.Items
提取迭代元素。对于每一个迭代元素,Map会创建一个Express模式的子工作流执行。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
End: true
以下示例是状态机执行的输入。
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"}
]
}
Map会产生三个子工作流执行,其中子工作流执行的定义为:
Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
以下示例显示了子工作流执行接收的输入。
# execution-1
# 第1个Map迭代对应的子工作流执行接收的输入示例
{"key_1":"value_1"}
# execution-2
# 第2个Map迭代对应的子工作流执行接收的输入示例
{"key_2":"value_2"}
# execution-3
# 第3个Map迭代对应的子工作流执行接收的输入示例
{"key_3":"value_3"}
完成后,Map状态的输出是一个JSON数组,其中每个项目都是迭代的输出。
{
"Items": [
{
"key_1": "value_1"
},
{
"key_2": "value_2"
},
{
"key_3": "value_3"
}
]
}