全部产品
Search
文档中心

云工作流:分布式模式

更新时间:Oct 24, 2024

分布式模式在分布式环境中执行Map状态迭代,从而提高处理效率和容量,适用于处理大规模数据或并行计算。

基本概念

分布式模式:当Map ProcessorConfig的ExecutionMode字段被设置为Express时,Map状态的执行会以分布式模式进行。在此模式下,迭代(Map)状态将每次迭代作为Express模式的子工作流执行。

子工作流执行:当迭代(Map)状态被设置为分布式模式时,Map迭代的执行会以子工作流执行的形式进行,此时子工作流的定义为Map Processor的状态转换定义,接收的输入为Map迭代元素。

故障容忍策略:对于Map执行,当子工作流执行失败时,Map状态会以失败状态结束。对于迭代数量较多的Map执行,可以通过配置错误容忍使Map执行在子工作流执行失败时继续执行。

分布式模式包含以下字段:

字段

类型

是否必选

描述

示例值

Name

string

状态名称。

my-state-name

Description

string

状态描述。

describe it here

Type

string

状态类型。

Map

InputConstructor

map[string]any

输入构造器。

请参见输入和输出

ItemsPath

string

用于提取输入数组的表达式。

请参见ItemsPath

ItemBatcher

ItemBatcher

开启ItemBatcher可以将多个Item聚合为Item Batch作为输入传递给子工作流执行。

请参见ItemBatcher

ItemReader

ItemReader

支持从OSS读取数据。

请参见ItemReader

ItemConstructor

ItemConstructor

  • 支持通过$Item引用原始输入的Item。

  • 支持对Item进行单独构造。

请参见ItemConstructor

ResultWriter

ResultWriter

支持将子工作流执行的信息写入到指定的OSS目录。

请参见ResultWriter

MaxConcuccency

int

支持配置子工作流执行并发。

40

MaxItems

MaxItems

支持配置Map最多处理的Item数量。

请参见MaxItems

ToleratedFailurePercentage

ToleratedFailurePercentage

支持按照百分比配置失败容忍策略。

请参见ToleratedFailurePercentage

ToleratedFailureCount

ToleratedFailureCount

支持按照数量配置失败容忍策略。

请参见ToleratedFailureCount

Processor

Processor

迭代处理器。

请参见Processor

ProcessorConfig

ProcessorConfig

处理器配置。

请参见ProcessorConfig

OutputConstructor

map[string]any

输出构造器。

请参见状态输出构造器

Next

string

当前状态的下一状态。当End取值为true时,无需指定。

my-next-state

End

bool

是否为当前作用域的终结节点。

true

Retry

Retry

用于定义错误重试策略。

请参见 错误处理

Catch

Catch

用于定义错误捕获策略。

请参见 错误处理

使用限制

对于分布式模式执行存在以下配额限制,如果默认配额不能满足您的需求,您可以通过工单提交配额提升申请。

配额名称

含义

默认值

MaxOpenMapRun

单个账户在单个地域最多允许同时执行的分布式Map数量。

10

MaxConcurrency

单个MapRun支持的最大并发。

100

MaxItems

单个MapRun最多支持读取的Item数量。

10000

分布式模式的关键字段

ItemsPath

用于提取输入数组的表达式。该表达式执行后返回JSON Array,则可以进行迭代,将其中每个元素传入ItemProcessor进行处理;可使用表达式变量$Context和$Input,示例如下:

$Input.FieldA

Processor

迭代处理器。Processor包含的字段如下表所示。

字段

类型

是否必选

描述

示例值

States

array

内部嵌套的状态数组。

Processor:
   StartAt: Pass1
   States:
     - Type: Pass
       Name: Pass1
       End: true

StartAt

string

内部嵌套状态数组的执行起点。

my start task

ProcessorConfig

处理器配置。ProcessorConfig包含的字段如下表所示。

字段

类型

是否必选

描述

示例值

ExecutionMode

string

执行模式。

Express

ItemReader

Map分布式模式通过支持从OSS读取数据输入的方式,支持更大的数据输入。ItemReader包含的字段如下表所示。

字段

类型

是否必选

描述

示例值

SourceType

string

来源类型,可选值:OSS_CSV、OSS_JSON_LIST、OSS_OBJECTS、OSS_INVENTORY_FILES。

OSS_CSV

SourceParameters

string

来源参数。

请参见SourceParameters

ReaderConfig

ReaderConfig

阅读器配置。

请参见ReaderConfig

SourceParameters

来源参数。SourceParameters包含的字段如下表所示。

字段

类型

是否必选

描述

示例值

Bucket

string

文件所在的Bucket名称。

example-bucket

ObjectName

string

对象名称。

object_name_1

Prefix

string

限定返回的Bucket名称必须以prefix作为前缀。如果不设定,则不过滤前缀信息。

默认值:无

example-prefix

ReaderConfig

阅读器配置。ReaderConfig包含的字段如下表所示。

字段

类型

是否必选

描述

示例值

CSVHeaders

[]string

CSV文件中,第一行所包含的列标题或字段名称。

ColA,ColB,ColC

从OSS读取数据的方式包括以下几个方法:

  • 存储于OSS中的CSV文件

    使用Map状态从一个CSV文件中读取数据。假设您有一个example-object.csv文件存储在OSS的存储桶example-bucket中,可参考以下状态机定义示例。

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_CSV
          SourceParameters:
            Bucket: example-bucket
            ObjectName: example-object.csv
        End: true

    以下示例显示了example-object.csv文件的内容。

    ColA,ColB,ColC
    col_a_1,col_b_1,col_c_1

    以下示例显示了子工作流执行接收的输入。

    {
      "ColA": "col_a_1",
      "ColB": "col_b_1",
      "ColC": "col_c_1",
    }
  • 存储于OSS中的JSON文件

    重要

    存储于OSS中的JSON文件所包含的JSON内容必须是一个JSON Array。

    用于从指定的OSS中读取JSON列表。假设您有一个example-object.json文件存储在OSS存储桶example-bucket中,可参考以下状态机定义示例。

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_JSON_LIST
          SourceParameters:
            Bucket: example-bucket
            ObjectName: example-object.json
        End: true

    以下示例显示了example-object.json文件的内容。

    [
      {
        "key_1": "value_1"
      }
    ]

    以下示例显示了子工作流执行接收的输入。

    {
      "key_1": "value_1",
    }
  • 列举OSS中的Objects

    用于从指定的OSS中读取对象。假设您已将Objects存储在OSS存储桶example-bucket中,命名的前缀为example-prefix,可参考以下状态机定义示例。

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_OBJECTS
          SourceParameters:
            Bucket: example-bucket
            Prefix: example-prefix
        End: true

    以下目录结构示意图表明example-prefix/object_1文件存储在OSS存储桶example-bucket中。

    example-bucket
       ├── example-prefix/object_1

    以下示例显示了子工作流执行接收的输入。

    {
      "XMLName": {
        "Space": "",
        "Local": "Contents"
      },
      "Key": "example-prefix/object_1",
      "Type": "Normal",
      "Size": 268435,
      "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
      "Owner": {
        "XMLName": {
          "Space": "",
          "Local": ""
        },
        "ID": "",
        "DisplayName": ""
      },
      "LastModified": "2024-01-01T01:01:01Z",
      "StorageClass": "Standard",
      "RestoreInfo": ""
    }
  • 存储于OSS中的Inventory

    使用Map状态从指定的OSS存储桶和文件中读取数据。假设您已将清单文件manifest.json存储在example-bucket的OSS存储桶中,路径是inventory/2024-01-01T01-01Z/manifest.json,可参考以下状态机定义示例。

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_INVENTORY_FILES
          SourceParameters:
            Bucket: example-bucket
            ObjectName: inventory/2024-01-01T01-01Z/manifest.json
        ItemConstructor:
          Key.$: $Item.Key
        End: true

    以下示例显示了清单文件的内容。

    "example-bucket","object_name_1"  
    "example-bucket","object_name_2"

    以下示例显示第一个子工作流执行接收的输入。

    {
      "Bucket": "example-bucket",
      "Key": "object_name_1"
    }

ItemBatcher

开启ItemBatcher可以将多个Item聚合为Item Batch作为输入传递给子工作流执行。ItemBatcher包含的字段如下表所示。

字段

类型

是否必选

描述

示例值

MaxItemsPerBatch

int

支持按照数量对Item进行聚合。

请参见MaxItemsPerBatch

MaxInputBytesPerBatch

int

支持按照聚合后输入的大小对Item进行聚合。

请参见MaxInputBytesPerBatch

BatchInput

map[string]any

支持在聚合Items时添加额外的输入。

请参见BatchInput

MaxItemsPerBatch

MaxItemsPerBatch支持按照数量对Item进行聚合。以下是MaxItemsPerBatch的使用示例。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxItemsPerBatch: 2
    End: true

以下示例是状态机执行的输入。

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"},
    {"key_4":"value_4"},
    {"key_5":"value_5"}
  ]
}

以下示例显示了子工作流执行接收的输入。

# execution-1
# 第1个子工作流输入示例
{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"}
  ]
}

# execution-2
# 第2个子工作流输入示例
{
  "Items": [
    {"key_1":"value_3"},
    {"key_2":"value_4"}
  ]
}

# execution-3
# 第3个子工作流输入示例
{
  "Items": [
    {"key_1":"value_5"},
  ]
}

MaxInputBytesPerBatch

MaxInputBytesPerBatch支持按照聚合后输入的大小对Item进行聚合,MaxInputBytesPerBatch可以确保聚合后的输入不超过指定的大小。以下是MaxInputBytesPerBatch的使用示例。

重要
  • 由于ItemBatcher会添加额外的Items key,输入的整体大小计算包含了额外附加的Key和附加的BatchInput。

  • MaxInputBytesPerBatch指定的数值的单位为Byte。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 50
    End: true

以下示例是状态机执行的输入。

{
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

以下示例显示了子工作流执行接收的输入。

# execution-1
# 第1个子工作流输入示例
{
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

# execution-2
# 第2个子工作流输入示例
{
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

# execution-3
# 第3个子工作流输入示例
{
  "Items":[
    {"Key":5}
  ]
}

BatchInput

BatchInput支持在聚合Items时添加额外的输入。以下是BatchInput的使用示例。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 70
      BatchInput:
        InputKey.$: $Input.Key
    End: true

以下示例是状态机执行的输入。

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

以下示例显示了子工作流执行接收的输入。

# execution-1
# 第1个子工作流输入示例
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

# execution-2
# 第2个子工作流输入示例
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

# execution-3
# 第3个子工作流输入示例
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":5}
  ]
}

ItemConstructor

ItemConstructor支持对Item进行单独构造,ItemConstructor支持通过$Item引用要被构造的Item。以下是ItemConstructor的使用示例。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 200
      BatchInput:
        InputKey.$: $Input.Key
    ItemConstructor:
      ConstructedKey.$: $Item.Key
      InputKey.$: $Input.Key
    End: true

以下示例是状态机执行的输入。

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"Key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

以下示例显示了子工作流执行接收的输入。

# execution-1
# 第1个子工作流输入示例
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 1
    },
    {
      "InputKey": "value",
      "ConstructedKey": 2
    },
    {
      "InputKey": "value",
      "ConstructedKey": 3
    }
  ]
}

# execution-2
# 第2个子工作流输入示例
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 4
    },
    {
      "InputKey": "value",
      "ConstructedKey": 5
    }
  ]
}

ResultWriter

ResultWriter支持将子工作流执行的信息写入到指定的OSS目录。ResultWriter包含的字段如下表所示。

字段

类型

是否必选

描述

示例值

Parameters

string

请求参数。

请参见Parameters

Parameters

请求参数。Parameters包含的字段如下表所示。

字段

类型

是否必选

描述

示例值

Bucket

string

文件所在的Bucket名称。

example-bucket

Prefix

string

限定返回的Bucket名称必须以prefix作为前缀。如果不设定,则不过滤前缀信息。

默认值:无

example-prefix

以下是ResultWriter的使用示例。

说明

工作流状态的输入输出不能超过大小限制,对于迭代数量较多的Map执行,可能会出现输出内容超过大小限制的问题,建议通过ResultWriter配置将Map的输出写入OSS。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    ItemConstructor:
      Key.$: $Item.Key
      FailedValue.$: $Input.FailedValue
    ToleratedFailurePercentage: 30
    Processor:
      StartAt: Choice
      States:
        - Type: Choice
          Name: Choice
          Branches:
            - Condition: $Input.Key > $Input.FailedValue
              Next: Fail
          Default: Succeed
        - Type: Succeed
          Name: Succeed
          End: true
        - Type: Fail
          Name: Fail
          Code: MockError
          End: true
    ResultWriter:
      Parameters:
        Bucket: example-bucket
        Prefix: example-prefix/
    End: true

以下示例是状态机执行的输入。

{
  "FailedValue": 4,
  "Items": [
    {
      "Key": 1
    },
    {
      "Key": 2
    },
    {
      "Key": 3
    },
    {
      "Key": 4
    },
    {
      "Key": 5
    }
  ]
}

以下示例显示了3个JSON文件内容。

# 以下是manifest.json文件的内容,存储路径为example-prefix/map-run-name/manifest.json
{
    "DestinationBucket": "example-bucket",
    "MapRunName": "map-run-name",
    "ResultFiles": {
        "FAILED": [
            {
                "ObjectName": "example-prefix/map-run-name/FAILED_0.json",
                "Size": 262
            }
        ],
        "SUCCEED": [
            {
                "ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
                "Size": 1057
            }
        ]
    }
}

# 以下是FAILED_0.json文件的内容,存储路径为example-prefix/map-run-name/FAILED_0.json
[
    {
        "ExecutionName": "execution-name-5",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":5}",
        "Output": "{\"ErrorCode\":\"MockError\"}",
        "Status": "Failed",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

# 以下是SUCCEED_0.json文件的内容,存储路径为example-prefix/map-run-name/SUCCEED_0.json
[
    {
        "ExecutionName": "execution-name-1",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":1}",
        "Output": "{\"FailedValue\":4,\"Key\":1}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-2",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":2}",
        "Output": "{\"FailedValue\":4,\"Key\":2}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-3",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":3}",
        "Output": "{\"FailedValue\":4,\"Key\":3}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-4",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":4}",
        "Output": "{\"FailedValue\":4,\"Key\":4}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

MaxItems

MaxItems支持配置Map最多处理的Item数量。例如对于包含了10000个Object的特定OSS Prefix,如果配置MaxItems为1000,则Map只会从OSS加载1000个Object。

MaxConcurrency

MaxConcurrency支持配置子工作流执行并发。例如对于包含了10000个Item的Map执行,当MaxConcurrency配置为100时,Map同时最多执行100个子工作流。

ToleratedFailurePercentage

ToleratedFailurePercentage支持按照百分比配置失败容忍策略,例如当Item总数为10000时,配置ToleratedFailurePercentage为10可以使 Map 容忍最多 1000 个 Item 处理失败。

ToleratedFailureCount

ToleratedFailureCount支持按照数量配置失败容忍策略,例如当Item总数为10000时,配置ToleratedFailureCount为10可以使Map容忍最多10个Item处理失败。

使用示例

如下工作流定义包含了一个分布式模式的Map状态,它从上游状态读取输入,并通过$Input.Items提取迭代元素。对于每一个迭代元素,Map会创建一个Express模式的子工作流执行。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    End: true

以下示例是状态机执行的输入。

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"}
  ]
}

Map会产生三个子工作流执行,其中子工作流执行的定义为:

Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
  - Type: Pass
    Name: Pass
    End: true

以下示例显示了子工作流执行接收的输入。

# execution-1
# 第1个Map迭代对应的子工作流执行接收的输入示例
{"key_1":"value_1"}

# execution-2
# 第2个Map迭代对应的子工作流执行接收的输入示例
{"key_2":"value_2"}

# execution-3
# 第3个Map迭代对应的子工作流执行接收的输入示例
{"key_3":"value_3"}

完成后,Map状态的输出是一个JSON数组,其中每个项目都是迭代的输出。

{
    "Items": [
        {
            "key_1": "value_1"
        },
        {
            "key_2": "value_2"
        },
        {
            "key_3": "value_3"
        }
    ]
}