All Products
Search
Document Center

CloudFlow:Distributed mode

Last Updated:Nov 26, 2024

Map states in distributed environments are iterated in distributed mode to enhance processing efficiency and capacity to ensure their suitability for big data processing and parallel computing.

Concepts

Distributed mode: If you set the ExecutionMode field of ProcessorConfig to Express, Map states are iterated in distributed mode. In this mode, each iteration of Map states is executed as a sub-workflow in Express mode.

Sub-workflow execution: When Map states are iterated in distributed mode, the process is the same as the execution of sub-workflows. The definitions of sub-workflows are affected by the state transition of Processor, and the inputs serve as the iteration elements of Map states.

Fault tolerance policy: When sub-workflows fail to be executed during the iteration of Map states, the iteration fails. For Map states with multiple iterations, you can configure fault tolerance policies to ensure the iterations can continue even if the sub-workflow execution fails.

The following table describes the fields in distributed mode.

Field

Type

Required

Description

Example

Name

string

Yes

The name of the state.

my-state-name

Description

string

No

The description of the state.

describe it here

Type

string

Yes

The type of the state.

Map

InputConstructor

map[string]any

No

The input constructor.

See Inputs and outputs.

ItemsPath

string

Yes

The expression that is used to extract the input array.

See the ItemsPath section of this topic.

ItemBatcher

ItemBatcher

No

By configuring the ItemBatcher field, you can combine multiple items into a batch and use the batch as input for executing sub-workflows.

See the ItemBatcher section of this topic.

ItemReader

ItemReader

No

Reads data from Object Storage Service (OSS) buckets.

See the ItemReader section of this topic.

ItemConstructor

ItemConstructor

No

  • References the items of the original input by using $Item.

  • Separately constructs items.

See the ItemConstructor section of this topic.

ResultWriter

ResultWriter

No

Writes the execution information of sub-workflows to the specified OSS bucket.

See the ResultWriter section of this topic.

MaxConcuccency

int

No

Specifies the number of sub-workflows that can be executed in parallel.

40

MaxItems

MaxItems

No

Specifies the maximum number of items that can be executed by the Map state machine.

See the MaxItems section of this topic.

ToleratedFailurePercentage

ToleratedFailurePercentage

No

Specifies a percentage-based failure tolerance policy.

See the ToleratedFailurePercentage section of this topic.

ToleratedFailureCount

ToleratedFailureCount

No

Specifies a count-based failure tolerance policy.

See the ToleratedFailureCount section of this topic.

Processor

Processor

Yes

The iteration processor.

See the Processor section of this topic.

ProcessorConfig

ProcessorConfig

Yes

The processor configurations.

See the ProcessorConfig section of this topic.

OutputConstructor

map[string]any

No

The output constructor.

See the State OutputConstructor section of the "Inputs and outputs" topic.

Next

string

No

The next state after the current state is complete. If the value of the End field is true, you do not need to configure this field.

my-next-state

End

bool

No

Specifies whether to end the current scope.

true

Retry

Retry

No

Defines the retry-upon-error policy.

See Error handling.

Catch

Catch

No

Defines the error-capture policy.

See Error handling.

Quotas

The following table describes the quota limits in distributed mode. If the default quota cannot meet your business requirements, you can submit a ticket to request a quota increase.

Quota name

Description

Default value

MaxOpenMapRun

The maximum number of Map states that can be simultaneously iterated in distributed mode within a single account in each region.

10

MaxConcurrency

The maximum number of Map states that can be simultaneously iterated within a single MapRun task.

100

MaxItems

The maximum number of items that can be read by a single MapRun task.

10000

Key fields in distributed mode

ItemsPath

The expression that is used to extract the input array. When a value in the JSON array format is returned after executing an ItemsPath expression, the value can be iterated and all its elements are passed to ItemProcessor for processing. During this process, you can use the $Context and $Input expression variables as shown in the following example:

$Input.FieldA

Processor

The iteration processor. The following table describes the fields in Processor.

Field

Type

Required

Description

Example

States

array

Yes

The array of nested states in the flow.

Processor:
   StartAt: Pass1
   States:
     - Type: Pass
       Name: Pass1
       End: true

StartAt

string

Yes

The starting point for executing the nested state array in the flow.

my start task

ProcessorConfig

The processor configurations. The following table describes the fields in ProcessorConfig.

Field

Type

Required

Description

Example

ExecutionMode

string

Yes

The execution mode.

Express

ItemReader

Distributed mode supports reading larger size of inputs from OSS buckets. The following table describes the fields in ItemReader.

Field

Type

Required

Description

Example

SourceType

string

Yes

The type of the data source. Valid values: OSS_CSV, OSS_JSON_LIST, OSS_OBJECTS, and OSS_INVENTORY_FILES.

OSS_CSV

SourceParameters

string

No

The parameters of the data source.

See the SourceParameters section of this topic.

ReaderConfig

ReaderConfig

No

The reader configurations.

See the ReaderConfig section of this topic.

SourceParameters

The parameters of the data source. The following table describes the fields in SourceParameters.

Field

Type

Required

Description

Example

Bucket

string

No

The name of the bucket in which the object resides.

example-bucket

ObjectName

string

No

The name of the object.

object_name_1

Prefix

string

No

The required prefix in the name of the bucket that you want to return. If you leave this field empty, prefixes are not used to filter returned buckets.

By default, this field is empty.

example-prefix

ReaderConfig

The reader configurations. The following table describes the fields in ReaderConfig.

Field

Type

Required

Description

Example

CSVHeaders

[]string

No

The column title or field name contained in the first row of the CSV file.

ColA,ColB,ColC

To read data from OSS buckets, use one of the following methods:

  • Read data from the CSV files stored in OSS buckets

    You can read data from CSV files by using Map states. For example, you have a CSV file named example-object.csv that is stored in an OSS bucket named example-bucket. You can refer to the following sample snippet to read data from the CSV file:

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_CSV
          SourceParameters:
            Bucket: example-bucket
            ObjectName: example-object.csv
        End: true

    The following snippet shows the content of the example-object.csv file:

    ColA,ColB,ColC
    col_a_1,col_b_1,col_c_1

    The following snippet shows the sample input received after executing the sub-workflow:

    {
      "ColA": "col_a_1",
      "ColB": "col_b_1",
      "ColC": "col_c_1",
    }
  • Read data from the JSON files stored in OSS buckets

    Important

    JSON files stored in OSS buckets contain JSON arrays.

    You can read JSON arrays from a specified OSS bucket. For example, you have a JSON file named example-object.json that is stored in an OSS bucket named example-bucket. You can refer to the following sample snippet to read JSON arrays from the OSS bucket:

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_JSON_LIST
          SourceParameters:
            Bucket: example-bucket
            ObjectName: example-object.json
        End: true

    The following snippet shows the content of the example-object.json file:

    [
      {
        "key_1": "value_1"
      }
    ]

    The following snippet shows the sample input received after executing the sub-workflow:

    {
      "key_1": "value_1",
    }
  • List objects stored in OSS buckets

    You can read objects from a specified OSS bucket. For example, you have a list of objects whose names are prefixed with example-prefix. The objects are stored in an OSS bucket named example-bucket. You can refer to the following sample snippet to read the objects from the OSS bucket:

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_OBJECTS
          SourceParameters:
            Bucket: example-bucket
            Prefix: example-prefix
        End: true

    The following snippet shows that the example-prefix/object_1 object is stored in the example-bucket bucket:

    example-bucket
       ├── example-prefix/object_1

    The following snippet shows the sample input received after executing the sub-workflow:

    {
      "XMLName": {
        "Space": "",
        "Local": "Contents"
      },
      "Key": "example-prefix/object_1",
      "Type": "Normal",
      "Size": 268435,
      "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
      "Owner": {
        "XMLName": {
          "Space": "",
          "Local": ""
        },
        "ID": "",
        "DisplayName": ""
      },
      "LastModified": "2024-01-01T01:01:01Z",
      "StorageClass": "Standard",
      "RestoreInfo": ""
    }
  • Read inventories from OSS buckets

    You can read data from specified OSS buckets or objects by using Map states. For example, you have an inventory file named manifest.json that is stored in an OSS bucket named example-bucket. The path to the inventory file is inventory/2024-01-01T01-01Z/manifest.json. You can refer to the following sample snippet to read the inventory file from the OSS bucket:

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_INVENTORY_FILES
          SourceParameters:
            Bucket: example-bucket
            ObjectName: inventory/2024-01-01T01-01Z/manifest.json
        ItemConstructor:
          Key.$: $Item.Key
        End: true

    The following snippet shows the content of the inventory file:

    "example-bucket","object_name_1"  
    "example-bucket","object_name_2"

    The following snippet shows the sample input received after executing the first sub-workflow:

    {
      "Bucket": "example-bucket",
      "Key": "object_name_1"
    }

ItemBatcher

By configuring the ItemBatcher field, you can combine multiple items into a batch and use the batch as input for executing sub-workflows. The following table describes the fields in ItemBatcher.

Field

Type

Required

Description

Example

MaxItemsPerBatch

int

No

Aggregates items by quantity.

See the MaxItemsPerBatch section of this topic.

MaxInputBytesPerBatch

int

No

Aggregates items based on the input size after aggregation.

See the MaxInputBytesPerBatch section of this topic.

BatchInput

map[string]any

No

Adds extra inputs when you aggregate items.

See the BatchInput section of this topic.

MaxItemsPerBatch

MaxItemsPerBatch allows you to aggregate items by quantity. The following snippet provides an example on how to use MaxItemsPerBatch:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxItemsPerBatch: 2
    End: true

The following snippet shows the input used by the state machine:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"},
    {"key_4":"value_4"},
    {"key_5":"value_5"}
  ]
}

The following snippet shows the sample input received after executing the sub-workflow:

# execution-1
# Sample input of the first sub-workflow
{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"}
  ]
}

# execution-2
# Sample input of the second sub-workflow
{
  "Items": [
    {"key_1":"value_3"},
    {"key_2":"value_4"}
  ]
}

# execution-3
# Sample input of the third sub-workflow
{
  "Items": [
    {"key_1":"value_5"},
  ]
}

MaxInputBytesPerBatch

MaxInputBytesPerBatch allows you to aggregate items based on the input size after aggregation and ensures that the input after aggregation does not exceed the specified size. The following snippet provides an example on how to use MaxInputBytesPerBatch:

Important
  • ItemBatcher and BatchInput add extra keys and inputs, respectively, which are included in the overall size calculation of the input.

  • The unit of MaxInputBytesPerBatch is byte:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 50
    End: true

The following snippet shows the input used by the state machine:

{
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

The following snippet shows the sample input received after executing the sub-workflow:

# execution-1
# Sample input of the first sub-workflow
{
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

# execution-2
# Sample input of the second sub-workflow
{
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

# execution-3
# Sample input of the third sub-workflow
{
  "Items":[
    {"Key":5}
  ]
}

BatchInput

BatchInput allows you to add extra inputs when aggregating items. The following snippet provides an example on how to use BatchInput:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 70
      BatchInput:
        InputKey.$: $Input.Key
    End: true

The following snippet shows the input used by the state machine:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

The following snippet shows the sample input received after executing the sub-workflow:

# execution-1
# Sample input of the first sub-workflow
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

# execution-2
# Sample input of the second sub-workflow
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

# execution-3
# Sample input of the third sub-workflow
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":5}
  ]
}

ItemConstructor

ItemConstructor allows you to separately construct items and use $Item to reference the items you want to construct. The following snippet provides an example on how to use ItemConstructor:

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 200
      BatchInput:
        InputKey.$: $Input.Key
    ItemConstructor:
      ConstructedKey.$: $Item.Key
      InputKey.$: $Input.Key
    End: true

The following snippet shows the input used by the state machine:

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"Key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

The following snippet shows the sample input received after executing the sub-workflow:

# execution-1
# Sample input of the first sub-workflow
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 1
    },
    {
      "InputKey": "value",
      "ConstructedKey": 2
    },
    {
      "InputKey": "value",
      "ConstructedKey": 3
    }
  ]
}

# execution-2
# Sample input of the second sub-workflow
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 4
    },
    {
      "InputKey": "value",
      "ConstructedKey": 5
    }
  ]
}

ResultWriter

ResultWriter allows you to write the execution information of sub-workflows to the specified OSS bucket. The following table describes the fields in ResultWriter.

Field

Type

Required

Description

Example

Parameters

string

Yes

The request parameters.

See the Parameters section of this topic.

Parameters

The request parameters. The following table describes the fields in Parameters.

Field

Type

Required

Description

Example

Bucket

string

Yes

The name of the bucket in which the object resides.

example-bucket

Prefix

string

Yes

The required prefix in the name of the bucket that you want to return. If you leave this field empty, prefixes are not used to filter returned buckets.

By default, this field is empty.

example-prefix

The following snippet provides an example on how to use ResultWriter.

Note

Workflow state inputs and outputs must not exceed the specified sizes. For Map states with multiple iterations, the output may exceed the limit. In this case, we recommend that you configure ResultWriter to store the outputs of the Map states in OSS buckets.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    ItemConstructor:
      Key.$: $Item.Key
      FailedValue.$: $Input.FailedValue
    ToleratedFailurePercentage: 30
    Processor:
      StartAt: Choice
      States:
        - Type: Choice
          Name: Choice
          Branches:
            - Condition: $Input.Key > $Input.FailedValue
              Next: Fail
          Default: Succeed
        - Type: Succeed
          Name: Succeed
          End: true
        - Type: Fail
          Name: Fail
          Code: MockError
          End: true
    ResultWriter:
      Parameters:
        Bucket: example-bucket
        Prefix: example-prefix/
    End: true

The following snippet shows the input used by the state machine:

{
  "FailedValue": 4,
  "Items": [
    {
      "Key": 1
    },
    {
      "Key": 2
    },
    {
      "Key": 3
    },
    {
      "Key": 4
    },
    {
      "Key": 5
    }
  ]
}

The following snippet provides the content of three JSON files.

# The following snippet provides the content of the manifest.json file whose storage path is example-prefix/map-run-name/manifest.json.
{
    "DestinationBucket": "example-bucket",
    "MapRunName": "map-run-name",
    "ResultFiles": {
        "FAILED": [
            {
                "ObjectName": "example-prefix/map-run-name/FAILED_0.json",
                "Size": 262
            }
        ],
        "SUCCEED": [
            {
                "ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
                "Size": 1057
            }
        ]
    }
}

# The following snippet provides the content of the FAILED_0.json file whose storage path is example-prefix/map-run-name/FAILED_0.json.
[
    {
        "ExecutionName": "execution-name-5",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":5}",
        "Output": "{\"ErrorCode\":\"MockError\"}",
        "Status": "Failed",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

# The following snippet provides the content of the SUCCEED_0.json file whose storage path is example-prefix/map-run-name/SUCCEED_0.json.
[
    {
        "ExecutionName": "execution-name-1",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":1}",
        "Output": "{\"FailedValue\":4,\"Key\":1}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-2",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":2}",
        "Output": "{\"FailedValue\":4,\"Key\":2}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-3",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":3}",
        "Output": "{\"FailedValue\":4,\"Key\":3}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-4",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":4}",
        "Output": "{\"FailedValue\":4,\"Key\":4}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

MaxItems

MaxItems allows you to configure the maximum number of items that can be executed by the Map state machine. For example, you have an OSS bucket that contains 10,000 objects. If you set MaxItems to 1,000, the Map state machine loads only 1,000 objects from the OSS bucket.

MaxConcurrency

MaxConcurrency allows you to configure the number of sub-workflows that can be executed in parallel. For example, if the Map state machine is configured to execute 10,000 items and you set MaxConcurrency to 100, the Map state machine simultaneously executes 100 sub-workflows.

ToleratedFailurePercentage

ToleratedFailurePercentage allows you to configure a percentage-based failure tolerance policy. For example, you have 10,000 items and set ToleratedFailurePercentage to 10. In this case, the Map state machine can tolerate up to 1,000 failed item executions.

ToleratedFailureCount

ToleratedFailureCount allows you to configure a count-based failure tolerance policy. For example, you have 10,000 items and set ToleratedFailureCount to 10. In this case, the Map state machine can tolerate up to 10 failed item executions.

Example

The following workflow defines a Map state in distributed mode, which reads input from an upstream state and extracts iteration items by using $Input.Items. Each iteration of the Map state is executed as a sub-workflow in Express mode.

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    End: true

The following snippet shows the input used by the state machine:

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"}
  ]
}

In this example, the Map state machine produces three sub-workflow executions. The following snippet shows the definitions of the executions:

Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
  - Type: Pass
    Name: Pass
    End: true

The following snippet shows the sample input received after executing the sub-workflow:

# execution-1
# Sample input received by the first sub-workflow execution corresponding to the Map state
{"key_1":"value_1"}

# execution-2
# Sample input received by the second sub-workflow execution corresponding to the Map state
{"key_2":"value_2"}

# execution-3
# Sample input received by the third sub-workflow execution corresponding to the Map state
{"key_3":"value_3"}

After you run the preceding snippets, the Map state machine outputs a JSON array. Each project in the array is an output of the Map state.

{
    "Items": [
        {
            "key_1": "value_1"
        },
        {
            "key_2": "value_2"
        },
        {
            "key_3": "value_3"
        }
    ]
}