分布式模式在分布式環境中執行Map狀態迭代,從而提高處理效率和容量,適用於處理大規模資料或並行計算。
基本概念
分布式模式:當Map ProcessorConfig的ExecutionMode欄位被設定為Express時,Map狀態的執行會以分布式模式進行。在此模式下,迭代(Map)狀態將每次迭代作為Express模式的子工作流程執行。
子工作流程執行:當迭代(Map)狀態被設定為分布式模式時,Map迭代的執行會以子工作流程執行的形式進行,此時子工作流程的定義為Map Processor的狀態轉換定義,接收的輸入為Map迭代元素。
故障容忍策略:對於Map執行,當子工作流程執行失敗時,Map狀態會以失敗狀態結束。對於迭代數量較多的Map執行,可以通過配置錯誤容忍使Map執行在子工作流程執行失敗時繼續執行。
分布式模式包含以下欄位:
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
Name | string | 是 | 狀態名稱。 | my-state-name |
Description | string | 否 | 狀態原因。 | describe it here |
Type | string | 是 | 狀態類型。 | Map |
InputConstructor | map[string]any | 否 | 輸入構造器。 | 請參見輸入和輸出 |
ItemsPath | string | 是 | 用於提取輸入數組的運算式。 | 請參見ItemsPath |
ItemBatcher | ItemBatcher | 否 | 開啟ItemBatcher可以將多個Item彙總為Item Batch作為輸入傳遞給子工作流程執行。 | 請參見ItemBatcher |
ItemReader | ItemReader | 否 | 支援從OSS讀取資料。 | 請參見ItemReader |
ItemConstructor | ItemConstructor | 否 |
| |
ResultWriter | ResultWriter | 否 | 支援將子工作流程執行的資訊寫入到指定的OSS目錄。 | 請參見ResultWriter |
MaxConcuccency | int | 否 | 支援配置子工作流程執行並發。 | 40 |
MaxItems | MaxItems | 否 | 支援配置Map最多處理的Item數量。 | 請參見MaxItems |
ToleratedFailurePercentage | ToleratedFailurePercentage | 否 | 支援按照百分比配置失敗容忍策略。 | |
ToleratedFailureCount | ToleratedFailureCount | 否 | 支援按照數量配置失敗容忍策略。 | |
Processor | Processor | 是 | 迭代處理器。 | 請參見Processor |
ProcessorConfig | ProcessorConfig | 是 | 處理器配置。 | |
OutputConstructor | map[string]any | 否 | 輸出構造器。 | 請參見狀態輸出構造器 |
Next | string | 否 | 目前狀態的下一狀態。當End取值為true時,無需指定。 | my-next-state |
End | bool | 否 | 是否為當前範圍的終結節點。 | true |
Retry | Retry | 否 | 用於定義錯誤重試策略。 | 請參見 錯誤處理 |
Catch | Catch | 否 | 用於定義錯誤捕獲策略。 | 請參見 錯誤處理 |
使用限制
對於分布式模式執行存在以下配額限制,如果預設配額不能滿足您的需求,您可以通過工單提交配額提升申請。
配額名稱 | 含義 | 預設值 |
MaxOpenMapRun | 單個賬戶在單個地區最多允許同時執行的分布式Map數量。 | 10 |
MaxConcurrency | 單個MapRun支援的最大並發。 | 100 |
MaxItems | 單個MapRun最多支援讀取的Item數量。 | 10000 |
分布式模式的關鍵字段
ItemsPath
用於提取輸入數組的運算式。該運算式執行後返回JSON Array,則可以進行迭代,將其中每個元素傳入ItemProcessor進行處理;可使用運算式變數$Context和$Input,樣本如下:
$Input.FieldA
Processor
迭代處理器。Processor包含的欄位如下表所示。
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
States | array | 是 | 內部嵌套的狀態數組。 |
|
StartAt | string | 是 | 內部嵌套狀態數組的執行起點。 | my start task |
ProcessorConfig
處理器配置。ProcessorConfig包含的欄位如下表所示。
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
ExecutionMode | string | 是 | 執行模式。 | Express |
ItemReader
Map分布式模式通過支援從OSS讀取資料輸入的方式,支援更大的資料輸入。ItemReader包含的欄位如下表所示。
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
SourceType | string | 是 | 來源類型,可選值:OSS_CSV、OSS_JSON_LIST、OSS_OBJECTS、OSS_INVENTORY_FILES。 | OSS_CSV |
SourceParameters | string | 否 | 來源參數。 | |
ReaderConfig | ReaderConfig | 否 | 閱讀器配置。 | 請參見ReaderConfig |
SourceParameters
來源參數。SourceParameters包含的欄位如下表所示。
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
Bucket | string | 否 | 檔案所在的Bucket名稱。 | example-bucket |
ObjectName | string | 否 | 對象名稱。 | object_name_1 |
Prefix | string | 否 | 限定返回的Bucket名稱必須以prefix作為首碼。如果不設定,則不過濾首碼資訊。 預設值:無 | example-prefix |
ReaderConfig
閱讀器配置。ReaderConfig包含的欄位如下表所示。
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
CSVHeaders | []string | 否 | CSV檔案中,第一行所包含的欄位標題或欄位名稱。 | ColA,ColB,ColC |
從OSS讀取資料的方式包括以下幾個方法:
儲存於OSS中的CSV檔案
使用Map狀態從一個CSV檔案中讀取資料。假設您有一個
example-object.csv
檔案儲存體在OSS的儲存桶example-bucket
中,可參考以下狀態機器定義樣本。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_CSV SourceParameters: Bucket: example-bucket ObjectName: example-object.csv End: true
以下樣本顯示了
example-object.csv
檔案的內容。ColA,ColB,ColC col_a_1,col_b_1,col_c_1
以下樣本顯示了子工作流程執行接收的輸入。
{ "ColA": "col_a_1", "ColB": "col_b_1", "ColC": "col_c_1", }
儲存於OSS中的JSON檔案
重要儲存於OSS中的JSON檔案所包含的JSON內容必須是一個JSON Array。
用於從指定的OSS中讀取JSON列表。假設您有一個
example-object.json
檔案儲存體在OSS儲存桶example-bucket
中,可參考以下狀態機器定義樣本。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_JSON_LIST SourceParameters: Bucket: example-bucket ObjectName: example-object.json End: true
以下樣本顯示了
example-object.json
檔案的內容。[ { "key_1": "value_1" } ]
以下樣本顯示了子工作流程執行接收的輸入。
{ "key_1": "value_1", }
列舉OSS中的Objects
用於從指定的OSS中讀取對象。假設您已將Objects儲存在OSS儲存桶
example-bucket
中,命名的首碼為example-prefix
,可參考以下狀態機器定義樣本。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_OBJECTS SourceParameters: Bucket: example-bucket Prefix: example-prefix End: true
以下目錄結構示意圖表明
example-prefix/object_1
檔案儲存體在OSS儲存桶example-bucket
中。example-bucket ├── example-prefix/object_1
以下樣本顯示了子工作流程執行接收的輸入。
{ "XMLName": { "Space": "", "Local": "Contents" }, "Key": "example-prefix/object_1", "Type": "Normal", "Size": 268435, "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"", "Owner": { "XMLName": { "Space": "", "Local": "" }, "ID": "", "DisplayName": "" }, "LastModified": "2024-01-01T01:01:01Z", "StorageClass": "Standard", "RestoreInfo": "" }
儲存於OSS中的Inventory
使用Map狀態從指定的OSS儲存桶和檔案中讀取資料。假設您已將資訊清單檔
manifest.json
儲存在example-bucket
的OSS儲存桶中,路徑是inventory/2024-01-01T01-01Z/manifest.json
,可參考以下狀態機器定義樣本。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_INVENTORY_FILES SourceParameters: Bucket: example-bucket ObjectName: inventory/2024-01-01T01-01Z/manifest.json ItemConstructor: Key.$: $Item.Key End: true
以下樣本顯示了資訊清單檔的內容。
"example-bucket","object_name_1" "example-bucket","object_name_2"
以下樣本顯示第一個子工作流程執行接收的輸入。
{ "Bucket": "example-bucket", "Key": "object_name_1" }
ItemBatcher
開啟ItemBatcher可以將多個Item彙總為Item Batch作為輸入傳遞給子工作流程執行。ItemBatcher包含的欄位如下表所示。
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
MaxItemsPerBatch | int | 否 | 支援按照數量對Item進行彙總。 | |
MaxInputBytesPerBatch | int | 否 | 支援按照彙總後輸入的大小對Item進行彙總。 | |
BatchInput | map[string]any | 否 | 支援在彙總Items時添加額外的輸入。 | 請參見BatchInput |
MaxItemsPerBatch
MaxItemsPerBatch支援按照數量對Item進行彙總。以下是MaxItemsPerBatch的使用樣本。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxItemsPerBatch: 2
End: true
以下樣本是狀態機器執行的輸入。
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"},
{"key_4":"value_4"},
{"key_5":"value_5"}
]
}
以下樣本顯示了子工作流程執行接收的輸入。
# execution-1
# 第1個子工作流程輸入樣本
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"}
]
}
# execution-2
# 第2個子工作流程輸入樣本
{
"Items": [
{"key_1":"value_3"},
{"key_2":"value_4"}
]
}
# execution-3
# 第3個子工作流程輸入樣本
{
"Items": [
{"key_1":"value_5"},
]
}
MaxInputBytesPerBatch
MaxInputBytesPerBatch支援按照彙總後輸入的大小對Item進行彙總,MaxInputBytesPerBatch可以確保彙總後的輸入不超過指定的大小。以下是MaxInputBytesPerBatch的使用樣本。
由於ItemBatcher會添加額外的Items key,輸入的整體大小計算包含了額外附加的Key和附加的BatchInput。
MaxInputBytesPerBatch指定的數值的單位為Byte。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 50
End: true
以下樣本是狀態機器執行的輸入。
{
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下樣本顯示了子工作流程執行接收的輸入。
# execution-1
# 第1個子工作流程輸入樣本
{
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# 第2個子工作流程輸入樣本
{
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# 第3個子工作流程輸入樣本
{
"Items":[
{"Key":5}
]
}
BatchInput
BatchInput支援在彙總Items時添加額外的輸入。以下是BatchInput的使用樣本。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 70
BatchInput:
InputKey.$: $Input.Key
End: true
以下樣本是狀態機器執行的輸入。
{
"Key":"value",
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下樣本顯示了子工作流程執行接收的輸入。
# execution-1
# 第1個子工作流程輸入樣本
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# 第2個子工作流程輸入樣本
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# 第3個子工作流程輸入樣本
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":5}
]
}
ItemConstructor
ItemConstructor支援對Item進行單獨構造,ItemConstructor支援通過$Item引用要被構造的Item。以下是ItemConstructor的使用樣本。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 200
BatchInput:
InputKey.$: $Input.Key
ItemConstructor:
ConstructedKey.$: $Item.Key
InputKey.$: $Input.Key
End: true
以下樣本是狀態機器執行的輸入。
{
"Key":"value",
"Items":[
{"Key":1},
{"Key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下樣本顯示了子工作流程執行接收的輸入。
# execution-1
# 第1個子工作流程輸入樣本
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 1
},
{
"InputKey": "value",
"ConstructedKey": 2
},
{
"InputKey": "value",
"ConstructedKey": 3
}
]
}
# execution-2
# 第2個子工作流程輸入樣本
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 4
},
{
"InputKey": "value",
"ConstructedKey": 5
}
]
}
ResultWriter
ResultWriter支援將子工作流程執行的資訊寫入到指定的OSS目錄。ResultWriter包含的欄位如下表所示。
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
Parameters | string | 是 | 請求參數。 | 請參見Parameters |
Parameters
請求參數。Parameters包含的欄位如下表所示。
欄位 | 類型 | 是否必選 | 描述 | 樣本值 |
Bucket | string | 是 | 檔案所在的Bucket名稱。 | example-bucket |
Prefix | string | 是 | 限定返回的Bucket名稱必須以prefix作為首碼。如果不設定,則不過濾首碼資訊。 預設值:無 | example-prefix |
以下是ResultWriter的使用樣本。
工作流程狀態的輸入輸出不能超過大小限制,對於迭代數量較多的Map執行,可能會出現輸出內容超過大小限制的問題,建議通過ResultWriter配置將Map的輸出寫入OSS。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
ItemConstructor:
Key.$: $Item.Key
FailedValue.$: $Input.FailedValue
ToleratedFailurePercentage: 30
Processor:
StartAt: Choice
States:
- Type: Choice
Name: Choice
Branches:
- Condition: $Input.Key > $Input.FailedValue
Next: Fail
Default: Succeed
- Type: Succeed
Name: Succeed
End: true
- Type: Fail
Name: Fail
Code: MockError
End: true
ResultWriter:
Parameters:
Bucket: example-bucket
Prefix: example-prefix/
End: true
以下樣本是狀態機器執行的輸入。
{
"FailedValue": 4,
"Items": [
{
"Key": 1
},
{
"Key": 2
},
{
"Key": 3
},
{
"Key": 4
},
{
"Key": 5
}
]
}
以下樣本顯示了3個JSON檔案內容。
# 以下是manifest.json檔案的內容,儲存路徑為example-prefix/map-run-name/manifest.json
{
"DestinationBucket": "example-bucket",
"MapRunName": "map-run-name",
"ResultFiles": {
"FAILED": [
{
"ObjectName": "example-prefix/map-run-name/FAILED_0.json",
"Size": 262
}
],
"SUCCEED": [
{
"ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
"Size": 1057
}
]
}
}
# 以下是FAILED_0.json檔案的內容,儲存路徑為example-prefix/map-run-name/FAILED_0.json
[
{
"ExecutionName": "execution-name-5",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":5}",
"Output": "{\"ErrorCode\":\"MockError\"}",
"Status": "Failed",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
# 以下是SUCCEED_0.json檔案的內容,儲存路徑為example-prefix/map-run-name/SUCCEED_0.json
[
{
"ExecutionName": "execution-name-1",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":1}",
"Output": "{\"FailedValue\":4,\"Key\":1}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-2",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":2}",
"Output": "{\"FailedValue\":4,\"Key\":2}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-3",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":3}",
"Output": "{\"FailedValue\":4,\"Key\":3}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-4",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":4}",
"Output": "{\"FailedValue\":4,\"Key\":4}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
MaxItems
MaxItems支援配置Map最多處理的Item數量。例如對於包含了10000個Object的特定OSS Prefix,如果配置MaxItems為1000,則Map只會從OSS載入1000個Object。
MaxConcurrency
MaxConcurrency支援配置子工作流程執行並發。例如對於包含了10000個Item的Map執行,當MaxConcurrency配置為100時,Map同時最多執行100個子工作流程。
ToleratedFailurePercentage
ToleratedFailurePercentage支援按照百分比配置失敗容忍策略,例如當Item總數為10000時,配置ToleratedFailurePercentage為10可以使 Map 容忍最多 1000 個 Item 處理失敗。
ToleratedFailureCount
ToleratedFailureCount支援按照數量配置失敗容忍策略,例如當Item總數為10000時,配置ToleratedFailureCount為10可以使Map容忍最多10個Item處理失敗。
使用樣本
如下工作流程定義包含了一個分布式模式的Map狀態,它從上遊狀態讀取輸入,並通過$Input.Items
提取迭代元素。對於每一個迭代元素,Map會建立一個Express模式的子工作流程執行。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
End: true
以下樣本是狀態機器執行的輸入。
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"}
]
}
Map會產生三個子工作流程執行,其中子工作流程執行的定義為:
Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
以下樣本顯示了子工作流程執行接收的輸入。
# execution-1
# 第1個Map迭代對應的子工作流程執行接收的輸入樣本
{"key_1":"value_1"}
# execution-2
# 第2個Map迭代對應的子工作流程執行接收的輸入樣本
{"key_2":"value_2"}
# execution-3
# 第3個Map迭代對應的子工作流程執行接收的輸入樣本
{"key_3":"value_3"}
完成後,Map狀態的輸出是一個JSON數組,其中每個專案都是迭代的輸出。
{
"Items": [
{
"key_1": "value_1"
},
{
"key_2": "value_2"
},
{
"key_3": "value_3"
}
]
}