Map states in distributed environments are iterated in distributed mode to enhance processing efficiency and capacity to ensure their suitability for big data processing and parallel computing.
Concepts
Distributed mode: If you set the ExecutionMode field of ProcessorConfig to Express, Map states are iterated in distributed mode. In this mode, each iteration of Map states is executed as a sub-workflow in Express mode.
Sub-workflow execution: When Map states are iterated in distributed mode, the process is the same as the execution of sub-workflows. The definitions of sub-workflows are affected by the state transition of Processor, and the inputs serve as the iteration elements of Map states.
Fault tolerance policy: When sub-workflows fail to be executed during the iteration of Map states, the iteration fails. For Map states with multiple iterations, you can configure fault tolerance policies to ensure the iterations can continue even if the sub-workflow execution fails.
The following table describes the fields in distributed mode.
Field | Type | Required | Description | Example |
Name | string | Yes | The name of the state. | my-state-name |
Description | string | No | The description of the state. | describe it here |
Type | string | Yes | The type of the state. | Map |
InputConstructor | map[string]any | No | The input constructor. | See Inputs and outputs. |
ItemsPath | string | Yes | The expression that is used to extract the input array. | See the ItemsPath section of this topic. |
ItemBatcher | ItemBatcher | No | By configuring the ItemBatcher field, you can combine multiple items into a batch and use the batch as input for executing sub-workflows. | See the ItemBatcher section of this topic. |
ItemReader | ItemReader | No | Reads data from Object Storage Service (OSS) buckets. | See the ItemReader section of this topic. |
ItemConstructor | ItemConstructor | No |
| See the ItemConstructor section of this topic. |
ResultWriter | ResultWriter | No | Writes the execution information of sub-workflows to the specified OSS bucket. | See the ResultWriter section of this topic. |
MaxConcuccency | int | No | Specifies the number of sub-workflows that can be executed in parallel. | 40 |
MaxItems | MaxItems | No | Specifies the maximum number of items that can be executed by the Map state machine. | See the MaxItems section of this topic. |
ToleratedFailurePercentage | ToleratedFailurePercentage | No | Specifies a percentage-based failure tolerance policy. | See the ToleratedFailurePercentage section of this topic. |
ToleratedFailureCount | ToleratedFailureCount | No | Specifies a count-based failure tolerance policy. | See the ToleratedFailureCount section of this topic. |
Processor | Processor | Yes | The iteration processor. | See the Processor section of this topic. |
ProcessorConfig | ProcessorConfig | Yes | The processor configurations. | See the ProcessorConfig section of this topic. |
OutputConstructor | map[string]any | No | The output constructor. | See the State OutputConstructor section of the "Inputs and outputs" topic. |
Next | string | No | The next state after the current state is complete. If the value of the End field is true, you do not need to configure this field. | my-next-state |
End | bool | No | Specifies whether to end the current scope. | true |
Retry | Retry | No | Defines the retry-upon-error policy. | See Error handling. |
Catch | Catch | No | Defines the error-capture policy. | See Error handling. |
Quotas
The following table describes the quota limits in distributed mode. If the default quota cannot meet your business requirements, you can submit a ticket to request a quota increase.
Quota name | Description | Default value |
MaxOpenMapRun | The maximum number of Map states that can be simultaneously iterated in distributed mode within a single account in each region. | 10 |
MaxConcurrency | The maximum number of Map states that can be simultaneously iterated within a single MapRun task. | 100 |
MaxItems | The maximum number of items that can be read by a single MapRun task. | 10000 |
Key fields in distributed mode
ItemsPath
The expression that is used to extract the input array. When a value in the JSON array format is returned after executing an ItemsPath expression, the value can be iterated and all its elements are passed to ItemProcessor for processing. During this process, you can use the $Context and $Input expression variables as shown in the following example:
$Input.FieldA
Processor
The iteration processor. The following table describes the fields in Processor.
Field | Type | Required | Description | Example |
States | array | Yes | The array of nested states in the flow. |
|
StartAt | string | Yes | The starting point for executing the nested state array in the flow. | my start task |
ProcessorConfig
The processor configurations. The following table describes the fields in ProcessorConfig.
Field | Type | Required | Description | Example |
ExecutionMode | string | Yes | The execution mode. | Express |
ItemReader
Distributed mode supports reading larger size of inputs from OSS buckets. The following table describes the fields in ItemReader.
Field | Type | Required | Description | Example |
SourceType | string | Yes | The type of the data source. Valid values: OSS_CSV, OSS_JSON_LIST, OSS_OBJECTS, and OSS_INVENTORY_FILES. | OSS_CSV |
SourceParameters | string | No | The parameters of the data source. | See the SourceParameters section of this topic. |
ReaderConfig | ReaderConfig | No | The reader configurations. | See the ReaderConfig section of this topic. |
SourceParameters
The parameters of the data source. The following table describes the fields in SourceParameters.
Field | Type | Required | Description | Example |
Bucket | string | No | The name of the bucket in which the object resides. | example-bucket |
ObjectName | string | No | The name of the object. | object_name_1 |
Prefix | string | No | The required prefix in the name of the bucket that you want to return. If you leave this field empty, prefixes are not used to filter returned buckets. By default, this field is empty. | example-prefix |
ReaderConfig
The reader configurations. The following table describes the fields in ReaderConfig.
Field | Type | Required | Description | Example |
CSVHeaders | []string | No | The column title or field name contained in the first row of the CSV file. | ColA,ColB,ColC |
To read data from OSS buckets, use one of the following methods:
Read data from the CSV files stored in OSS buckets
You can read data from CSV files by using Map states. For example, you have a CSV file named
example-object.csv
that is stored in an OSS bucket namedexample-bucket
. You can refer to the following sample snippet to read data from the CSV file:Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_CSV SourceParameters: Bucket: example-bucket ObjectName: example-object.csv End: true
The following snippet shows the content of the
example-object.csv
file:ColA,ColB,ColC col_a_1,col_b_1,col_c_1
The following snippet shows the sample input received after executing the sub-workflow:
{ "ColA": "col_a_1", "ColB": "col_b_1", "ColC": "col_c_1", }
Read data from the JSON files stored in OSS buckets
ImportantJSON files stored in OSS buckets contain JSON arrays.
You can read JSON arrays from a specified OSS bucket. For example, you have a JSON file named
example-object.json
that is stored in an OSS bucket namedexample-bucket
. You can refer to the following sample snippet to read JSON arrays from the OSS bucket:Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_JSON_LIST SourceParameters: Bucket: example-bucket ObjectName: example-object.json End: true
The following snippet shows the content of the
example-object.json
file:[ { "key_1": "value_1" } ]
The following snippet shows the sample input received after executing the sub-workflow:
{ "key_1": "value_1", }
List objects stored in OSS buckets
You can read objects from a specified OSS bucket. For example, you have a list of objects whose names are prefixed with
example-prefix
. The objects are stored in an OSS bucket namedexample-bucket
. You can refer to the following sample snippet to read the objects from the OSS bucket:Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_OBJECTS SourceParameters: Bucket: example-bucket Prefix: example-prefix End: true
The following snippet shows that the
example-prefix/object_1
object is stored in theexample-bucket
bucket:example-bucket ├── example-prefix/object_1
The following snippet shows the sample input received after executing the sub-workflow:
{ "XMLName": { "Space": "", "Local": "Contents" }, "Key": "example-prefix/object_1", "Type": "Normal", "Size": 268435, "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"", "Owner": { "XMLName": { "Space": "", "Local": "" }, "ID": "", "DisplayName": "" }, "LastModified": "2024-01-01T01:01:01Z", "StorageClass": "Standard", "RestoreInfo": "" }
Read inventories from OSS buckets
You can read data from specified OSS buckets or objects by using Map states. For example, you have an inventory file named
manifest.json
that is stored in an OSS bucket namedexample-bucket
. The path to the inventory file isinventory/2024-01-01T01-01Z/manifest.json
. You can refer to the following sample snippet to read the inventory file from the OSS bucket:Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_INVENTORY_FILES SourceParameters: Bucket: example-bucket ObjectName: inventory/2024-01-01T01-01Z/manifest.json ItemConstructor: Key.$: $Item.Key End: true
The following snippet shows the content of the inventory file:
"example-bucket","object_name_1" "example-bucket","object_name_2"
The following snippet shows the sample input received after executing the first sub-workflow:
{ "Bucket": "example-bucket", "Key": "object_name_1" }
ItemBatcher
By configuring the ItemBatcher field, you can combine multiple items into a batch and use the batch as input for executing sub-workflows. The following table describes the fields in ItemBatcher.
Field | Type | Required | Description | Example |
MaxItemsPerBatch | int | No | Aggregates items by quantity. | See the MaxItemsPerBatch section of this topic. |
MaxInputBytesPerBatch | int | No | Aggregates items based on the input size after aggregation. | See the MaxInputBytesPerBatch section of this topic. |
BatchInput | map[string]any | No | Adds extra inputs when you aggregate items. | See the BatchInput section of this topic. |
MaxItemsPerBatch
MaxItemsPerBatch allows you to aggregate items by quantity. The following snippet provides an example on how to use MaxItemsPerBatch:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxItemsPerBatch: 2
End: true
The following snippet shows the input used by the state machine:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"},
{"key_4":"value_4"},
{"key_5":"value_5"}
]
}
The following snippet shows the sample input received after executing the sub-workflow:
# execution-1
# Sample input of the first sub-workflow
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"}
]
}
# execution-2
# Sample input of the second sub-workflow
{
"Items": [
{"key_1":"value_3"},
{"key_2":"value_4"}
]
}
# execution-3
# Sample input of the third sub-workflow
{
"Items": [
{"key_1":"value_5"},
]
}
MaxInputBytesPerBatch
MaxInputBytesPerBatch allows you to aggregate items based on the input size after aggregation and ensures that the input after aggregation does not exceed the specified size. The following snippet provides an example on how to use MaxInputBytesPerBatch:
ItemBatcher and BatchInput add extra keys and inputs, respectively, which are included in the overall size calculation of the input.
The unit of MaxInputBytesPerBatch is byte:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 50
End: true
The following snippet shows the input used by the state machine:
{
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
The following snippet shows the sample input received after executing the sub-workflow:
# execution-1
# Sample input of the first sub-workflow
{
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# Sample input of the second sub-workflow
{
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# Sample input of the third sub-workflow
{
"Items":[
{"Key":5}
]
}
BatchInput
BatchInput allows you to add extra inputs when aggregating items. The following snippet provides an example on how to use BatchInput:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 70
BatchInput:
InputKey.$: $Input.Key
End: true
The following snippet shows the input used by the state machine:
{
"Key":"value",
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
The following snippet shows the sample input received after executing the sub-workflow:
# execution-1
# Sample input of the first sub-workflow
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# Sample input of the second sub-workflow
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# Sample input of the third sub-workflow
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":5}
]
}
ItemConstructor
ItemConstructor allows you to separately construct items and use $Item to reference the items you want to construct. The following snippet provides an example on how to use ItemConstructor:
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 200
BatchInput:
InputKey.$: $Input.Key
ItemConstructor:
ConstructedKey.$: $Item.Key
InputKey.$: $Input.Key
End: true
The following snippet shows the input used by the state machine:
{
"Key":"value",
"Items":[
{"Key":1},
{"Key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
The following snippet shows the sample input received after executing the sub-workflow:
# execution-1
# Sample input of the first sub-workflow
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 1
},
{
"InputKey": "value",
"ConstructedKey": 2
},
{
"InputKey": "value",
"ConstructedKey": 3
}
]
}
# execution-2
# Sample input of the second sub-workflow
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 4
},
{
"InputKey": "value",
"ConstructedKey": 5
}
]
}
ResultWriter
ResultWriter allows you to write the execution information of sub-workflows to the specified OSS bucket. The following table describes the fields in ResultWriter.
Field | Type | Required | Description | Example |
Parameters | string | Yes | The request parameters. | See the Parameters section of this topic. |
Parameters
The request parameters. The following table describes the fields in Parameters.
Field | Type | Required | Description | Example |
Bucket | string | Yes | The name of the bucket in which the object resides. | example-bucket |
Prefix | string | Yes | The required prefix in the name of the bucket that you want to return. If you leave this field empty, prefixes are not used to filter returned buckets. By default, this field is empty. | example-prefix |
The following snippet provides an example on how to use ResultWriter.
Workflow state inputs and outputs must not exceed the specified sizes. For Map states with multiple iterations, the output may exceed the limit. In this case, we recommend that you configure ResultWriter to store the outputs of the Map states in OSS buckets.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
ItemConstructor:
Key.$: $Item.Key
FailedValue.$: $Input.FailedValue
ToleratedFailurePercentage: 30
Processor:
StartAt: Choice
States:
- Type: Choice
Name: Choice
Branches:
- Condition: $Input.Key > $Input.FailedValue
Next: Fail
Default: Succeed
- Type: Succeed
Name: Succeed
End: true
- Type: Fail
Name: Fail
Code: MockError
End: true
ResultWriter:
Parameters:
Bucket: example-bucket
Prefix: example-prefix/
End: true
The following snippet shows the input used by the state machine:
{
"FailedValue": 4,
"Items": [
{
"Key": 1
},
{
"Key": 2
},
{
"Key": 3
},
{
"Key": 4
},
{
"Key": 5
}
]
}
The following snippet provides the content of three JSON files.
# The following snippet provides the content of the manifest.json file whose storage path is example-prefix/map-run-name/manifest.json.
{
"DestinationBucket": "example-bucket",
"MapRunName": "map-run-name",
"ResultFiles": {
"FAILED": [
{
"ObjectName": "example-prefix/map-run-name/FAILED_0.json",
"Size": 262
}
],
"SUCCEED": [
{
"ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
"Size": 1057
}
]
}
}
# The following snippet provides the content of the FAILED_0.json file whose storage path is example-prefix/map-run-name/FAILED_0.json.
[
{
"ExecutionName": "execution-name-5",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":5}",
"Output": "{\"ErrorCode\":\"MockError\"}",
"Status": "Failed",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
# The following snippet provides the content of the SUCCEED_0.json file whose storage path is example-prefix/map-run-name/SUCCEED_0.json.
[
{
"ExecutionName": "execution-name-1",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":1}",
"Output": "{\"FailedValue\":4,\"Key\":1}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-2",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":2}",
"Output": "{\"FailedValue\":4,\"Key\":2}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-3",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":3}",
"Output": "{\"FailedValue\":4,\"Key\":3}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-4",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":4}",
"Output": "{\"FailedValue\":4,\"Key\":4}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
MaxItems
MaxItems allows you to configure the maximum number of items that can be executed by the Map state machine. For example, you have an OSS bucket that contains 10,000 objects. If you set MaxItems to 1,000, the Map state machine loads only 1,000 objects from the OSS bucket.
MaxConcurrency
MaxConcurrency allows you to configure the number of sub-workflows that can be executed in parallel. For example, if the Map state machine is configured to execute 10,000 items and you set MaxConcurrency to 100, the Map state machine simultaneously executes 100 sub-workflows.
ToleratedFailurePercentage
ToleratedFailurePercentage allows you to configure a percentage-based failure tolerance policy. For example, you have 10,000 items and set ToleratedFailurePercentage to 10. In this case, the Map state machine can tolerate up to 1,000 failed item executions.
ToleratedFailureCount
ToleratedFailureCount allows you to configure a count-based failure tolerance policy. For example, you have 10,000 items and set ToleratedFailureCount to 10. In this case, the Map state machine can tolerate up to 10 failed item executions.
Example
The following workflow defines a Map state in distributed mode, which reads input from an upstream state and extracts iteration items by using $Input.Items
. Each iteration of the Map state is executed as a sub-workflow in Express mode.
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
End: true
The following snippet shows the input used by the state machine:
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"}
]
}
In this example, the Map state machine produces three sub-workflow executions. The following snippet shows the definitions of the executions:
Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
The following snippet shows the sample input received after executing the sub-workflow:
# execution-1
# Sample input received by the first sub-workflow execution corresponding to the Map state
{"key_1":"value_1"}
# execution-2
# Sample input received by the second sub-workflow execution corresponding to the Map state
{"key_2":"value_2"}
# execution-3
# Sample input received by the third sub-workflow execution corresponding to the Map state
{"key_3":"value_3"}
After you run the preceding snippets, the Map state machine outputs a JSON array. Each project in the array is an output of the Map state.
{
"Items": [
{
"key_1": "value_1"
},
{
"key_2": "value_2"
},
{
"key_3": "value_3"
}
]
}