全部產品
Search
文件中心

CloudFlow:分布式模式

更新時間:Oct 25, 2024

分布式模式在分布式環境中執行Map狀態迭代,從而提高處理效率和容量,適用於處理大規模資料或並行計算。

基本概念

分布式模式:當Map ProcessorConfig的ExecutionMode欄位被設定為Express時,Map狀態的執行會以分布式模式進行。在此模式下,迭代(Map)狀態將每次迭代作為Express模式的子工作流程執行。

子工作流程執行:當迭代(Map)狀態被設定為分布式模式時,Map迭代的執行會以子工作流程執行的形式進行,此時子工作流程的定義為Map Processor的狀態轉換定義,接收的輸入為Map迭代元素。

故障容忍策略:對於Map執行,當子工作流程執行失敗時,Map狀態會以失敗狀態結束。對於迭代數量較多的Map執行,可以通過配置錯誤容忍使Map執行在子工作流程執行失敗時繼續執行。

分布式模式包含以下欄位:

欄位

類型

是否必選

描述

樣本值

Name

string

狀態名稱。

my-state-name

Description

string

狀態原因。

describe it here

Type

string

狀態類型。

Map

InputConstructor

map[string]any

輸入構造器。

請參見輸入和輸出

ItemsPath

string

用於提取輸入數組的運算式。

請參見ItemsPath

ItemBatcher

ItemBatcher

開啟ItemBatcher可以將多個Item彙總為Item Batch作為輸入傳遞給子工作流程執行。

請參見ItemBatcher

ItemReader

ItemReader

支援從OSS讀取資料。

請參見ItemReader

ItemConstructor

ItemConstructor

  • 支援通過$Item引用原始輸入的Item。

  • 支援對Item進行單獨構造。

請參見ItemConstructor

ResultWriter

ResultWriter

支援將子工作流程執行的資訊寫入到指定的OSS目錄。

請參見ResultWriter

MaxConcuccency

int

支援配置子工作流程執行並發。

40

MaxItems

MaxItems

支援配置Map最多處理的Item數量。

請參見MaxItems

ToleratedFailurePercentage

ToleratedFailurePercentage

支援按照百分比配置失敗容忍策略。

請參見ToleratedFailurePercentage

ToleratedFailureCount

ToleratedFailureCount

支援按照數量配置失敗容忍策略。

請參見ToleratedFailureCount

Processor

Processor

迭代處理器。

請參見Processor

ProcessorConfig

ProcessorConfig

處理器配置。

請參見ProcessorConfig

OutputConstructor

map[string]any

輸出構造器。

請參見狀態輸出構造器

Next

string

目前狀態的下一狀態。當End取值為true時,無需指定。

my-next-state

End

bool

是否為當前範圍的終結節點。

true

Retry

Retry

用於定義錯誤重試策略。

請參見 錯誤處理

Catch

Catch

用於定義錯誤捕獲策略。

請參見 錯誤處理

使用限制

對於分布式模式執行存在以下配額限制,如果預設配額不能滿足您的需求,您可以通過工單提交配額提升申請。

配額名稱

含義

預設值

MaxOpenMapRun

單個賬戶在單個地區最多允許同時執行的分布式Map數量。

10

MaxConcurrency

單個MapRun支援的最大並發。

100

MaxItems

單個MapRun最多支援讀取的Item數量。

10000

分布式模式的關鍵字段

ItemsPath

用於提取輸入數組的運算式。該運算式執行後返回JSON Array,則可以進行迭代,將其中每個元素傳入ItemProcessor進行處理;可使用運算式變數$Context和$Input,樣本如下:

$Input.FieldA

Processor

迭代處理器。Processor包含的欄位如下表所示。

欄位

類型

是否必選

描述

樣本值

States

array

內部嵌套的狀態數組。

Processor:
   StartAt: Pass1
   States:
     - Type: Pass
       Name: Pass1
       End: true

StartAt

string

內部嵌套狀態數組的執行起點。

my start task

ProcessorConfig

處理器配置。ProcessorConfig包含的欄位如下表所示。

欄位

類型

是否必選

描述

樣本值

ExecutionMode

string

執行模式。

Express

ItemReader

Map分布式模式通過支援從OSS讀取資料輸入的方式,支援更大的資料輸入。ItemReader包含的欄位如下表所示。

欄位

類型

是否必選

描述

樣本值

SourceType

string

來源類型,可選值:OSS_CSV、OSS_JSON_LIST、OSS_OBJECTS、OSS_INVENTORY_FILES。

OSS_CSV

SourceParameters

string

來源參數。

請參見SourceParameters

ReaderConfig

ReaderConfig

閱讀器配置。

請參見ReaderConfig

SourceParameters

來源參數。SourceParameters包含的欄位如下表所示。

欄位

類型

是否必選

描述

樣本值

Bucket

string

檔案所在的Bucket名稱。

example-bucket

ObjectName

string

對象名稱。

object_name_1

Prefix

string

限定返回的Bucket名稱必須以prefix作為首碼。如果不設定,則不過濾首碼資訊。

預設值:無

example-prefix

ReaderConfig

閱讀器配置。ReaderConfig包含的欄位如下表所示。

欄位

類型

是否必選

描述

樣本值

CSVHeaders

[]string

CSV檔案中,第一行所包含的欄位標題或欄位名稱。

ColA,ColB,ColC

從OSS讀取資料的方式包括以下幾個方法:

  • 儲存於OSS中的CSV檔案

    使用Map狀態從一個CSV檔案中讀取資料。假設您有一個example-object.csv檔案儲存體在OSS的儲存桶example-bucket中,可參考以下狀態機器定義樣本。

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_CSV
          SourceParameters:
            Bucket: example-bucket
            ObjectName: example-object.csv
        End: true

    以下樣本顯示了example-object.csv檔案的內容。

    ColA,ColB,ColC
    col_a_1,col_b_1,col_c_1

    以下樣本顯示了子工作流程執行接收的輸入。

    {
      "ColA": "col_a_1",
      "ColB": "col_b_1",
      "ColC": "col_c_1",
    }
  • 儲存於OSS中的JSON檔案

    重要

    儲存於OSS中的JSON檔案所包含的JSON內容必須是一個JSON Array。

    用於從指定的OSS中讀取JSON列表。假設您有一個example-object.json檔案儲存體在OSS儲存桶example-bucket中,可參考以下狀態機器定義樣本。

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_JSON_LIST
          SourceParameters:
            Bucket: example-bucket
            ObjectName: example-object.json
        End: true

    以下樣本顯示了example-object.json檔案的內容。

    [
      {
        "key_1": "value_1"
      }
    ]

    以下樣本顯示了子工作流程執行接收的輸入。

    {
      "key_1": "value_1",
    }
  • 列舉OSS中的Objects

    用於從指定的OSS中讀取對象。假設您已將Objects儲存在OSS儲存桶example-bucket中,命名的首碼為example-prefix,可參考以下狀態機器定義樣本。

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_OBJECTS
          SourceParameters:
            Bucket: example-bucket
            Prefix: example-prefix
        End: true

    以下目錄結構示意圖表明example-prefix/object_1檔案儲存體在OSS儲存桶example-bucket中。

    example-bucket
       ├── example-prefix/object_1

    以下樣本顯示了子工作流程執行接收的輸入。

    {
      "XMLName": {
        "Space": "",
        "Local": "Contents"
      },
      "Key": "example-prefix/object_1",
      "Type": "Normal",
      "Size": 268435,
      "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"",
      "Owner": {
        "XMLName": {
          "Space": "",
          "Local": ""
        },
        "ID": "",
        "DisplayName": ""
      },
      "LastModified": "2024-01-01T01:01:01Z",
      "StorageClass": "Standard",
      "RestoreInfo": ""
    }
  • 儲存於OSS中的Inventory

    使用Map狀態從指定的OSS儲存桶和檔案中讀取資料。假設您已將資訊清單檔manifest.json儲存在example-bucket的OSS儲存桶中,路徑是inventory/2024-01-01T01-01Z/manifest.json,可參考以下狀態機器定義樣本。

    Type: StateMachine
    Name: MyWorkflow
    SpecVersion: v1
    StartAt: Map
    States:
      - Type: Map
        Name: Map
        ProcessorConfig:
          ExecutionMode: Express
        Processor:
          StartAt: Pass
          States:
            - Type: Pass
              Name: Pass
              End: true
        ItemReader:
          SourceType: OSS_INVENTORY_FILES
          SourceParameters:
            Bucket: example-bucket
            ObjectName: inventory/2024-01-01T01-01Z/manifest.json
        ItemConstructor:
          Key.$: $Item.Key
        End: true

    以下樣本顯示了資訊清單檔的內容。

    "example-bucket","object_name_1"  
    "example-bucket","object_name_2"

    以下樣本顯示第一個子工作流程執行接收的輸入。

    {
      "Bucket": "example-bucket",
      "Key": "object_name_1"
    }

ItemBatcher

開啟ItemBatcher可以將多個Item彙總為Item Batch作為輸入傳遞給子工作流程執行。ItemBatcher包含的欄位如下表所示。

欄位

類型

是否必選

描述

樣本值

MaxItemsPerBatch

int

支援按照數量對Item進行彙總。

請參見MaxItemsPerBatch

MaxInputBytesPerBatch

int

支援按照彙總後輸入的大小對Item進行彙總。

請參見MaxInputBytesPerBatch

BatchInput

map[string]any

支援在彙總Items時添加額外的輸入。

請參見BatchInput

MaxItemsPerBatch

MaxItemsPerBatch支援按照數量對Item進行彙總。以下是MaxItemsPerBatch的使用樣本。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxItemsPerBatch: 2
    End: true

以下樣本是狀態機器執行的輸入。

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"},
    {"key_4":"value_4"},
    {"key_5":"value_5"}
  ]
}

以下樣本顯示了子工作流程執行接收的輸入。

# execution-1
# 第1個子工作流程輸入樣本
{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"}
  ]
}

# execution-2
# 第2個子工作流程輸入樣本
{
  "Items": [
    {"key_1":"value_3"},
    {"key_2":"value_4"}
  ]
}

# execution-3
# 第3個子工作流程輸入樣本
{
  "Items": [
    {"key_1":"value_5"},
  ]
}

MaxInputBytesPerBatch

MaxInputBytesPerBatch支援按照彙總後輸入的大小對Item進行彙總,MaxInputBytesPerBatch可以確保彙總後的輸入不超過指定的大小。以下是MaxInputBytesPerBatch的使用樣本。

重要
  • 由於ItemBatcher會添加額外的Items key,輸入的整體大小計算包含了額外附加的Key和附加的BatchInput。

  • MaxInputBytesPerBatch指定的數值的單位為Byte。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 50
    End: true

以下樣本是狀態機器執行的輸入。

{
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

以下樣本顯示了子工作流程執行接收的輸入。

# execution-1
# 第1個子工作流程輸入樣本
{
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

# execution-2
# 第2個子工作流程輸入樣本
{
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

# execution-3
# 第3個子工作流程輸入樣本
{
  "Items":[
    {"Key":5}
  ]
}

BatchInput

BatchInput支援在彙總Items時添加額外的輸入。以下是BatchInput的使用樣本。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 70
      BatchInput:
        InputKey.$: $Input.Key
    End: true

以下樣本是狀態機器執行的輸入。

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

以下樣本顯示了子工作流程執行接收的輸入。

# execution-1
# 第1個子工作流程輸入樣本
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":1},
    {"key":2}
  ]
}

# execution-2
# 第2個子工作流程輸入樣本
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":3},
    {"key":4}
  ]
}

# execution-3
# 第3個子工作流程輸入樣本
{
  "BatchInput":{
    "InputKey":"value"
  },
  "Items":[
    {"Key":5}
  ]
}

ItemConstructor

ItemConstructor支援對Item進行單獨構造,ItemConstructor支援通過$Item引用要被構造的Item。以下是ItemConstructor的使用樣本。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    ItemBatcher:
      MaxInputBytesPerBatch: 200
      BatchInput:
        InputKey.$: $Input.Key
    ItemConstructor:
      ConstructedKey.$: $Item.Key
      InputKey.$: $Input.Key
    End: true

以下樣本是狀態機器執行的輸入。

{
  "Key":"value",
  "Items":[
    {"Key":1},
    {"Key":2},
    {"Key":3},
    {"Key":4},
    {"Key":5}
  ]
}

以下樣本顯示了子工作流程執行接收的輸入。

# execution-1
# 第1個子工作流程輸入樣本
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 1
    },
    {
      "InputKey": "value",
      "ConstructedKey": 2
    },
    {
      "InputKey": "value",
      "ConstructedKey": 3
    }
  ]
}

# execution-2
# 第2個子工作流程輸入樣本
{
  "BatchInput": {
    "InputKey": "value"
  },
  "Items": [
    {
      "InputKey": "value",
      "ConstructedKey": 4
    },
    {
      "InputKey": "value",
      "ConstructedKey": 5
    }
  ]
}

ResultWriter

ResultWriter支援將子工作流程執行的資訊寫入到指定的OSS目錄。ResultWriter包含的欄位如下表所示。

欄位

類型

是否必選

描述

樣本值

Parameters

string

請求參數。

請參見Parameters

Parameters

請求參數。Parameters包含的欄位如下表所示。

欄位

類型

是否必選

描述

樣本值

Bucket

string

檔案所在的Bucket名稱。

example-bucket

Prefix

string

限定返回的Bucket名稱必須以prefix作為首碼。如果不設定,則不過濾首碼資訊。

預設值:無

example-prefix

以下是ResultWriter的使用樣本。

說明

工作流程狀態的輸入輸出不能超過大小限制,對於迭代數量較多的Map執行,可能會出現輸出內容超過大小限制的問題,建議通過ResultWriter配置將Map的輸出寫入OSS。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    ItemConstructor:
      Key.$: $Item.Key
      FailedValue.$: $Input.FailedValue
    ToleratedFailurePercentage: 30
    Processor:
      StartAt: Choice
      States:
        - Type: Choice
          Name: Choice
          Branches:
            - Condition: $Input.Key > $Input.FailedValue
              Next: Fail
          Default: Succeed
        - Type: Succeed
          Name: Succeed
          End: true
        - Type: Fail
          Name: Fail
          Code: MockError
          End: true
    ResultWriter:
      Parameters:
        Bucket: example-bucket
        Prefix: example-prefix/
    End: true

以下樣本是狀態機器執行的輸入。

{
  "FailedValue": 4,
  "Items": [
    {
      "Key": 1
    },
    {
      "Key": 2
    },
    {
      "Key": 3
    },
    {
      "Key": 4
    },
    {
      "Key": 5
    }
  ]
}

以下樣本顯示了3個JSON檔案內容。

# 以下是manifest.json檔案的內容,儲存路徑為example-prefix/map-run-name/manifest.json
{
    "DestinationBucket": "example-bucket",
    "MapRunName": "map-run-name",
    "ResultFiles": {
        "FAILED": [
            {
                "ObjectName": "example-prefix/map-run-name/FAILED_0.json",
                "Size": 262
            }
        ],
        "SUCCEED": [
            {
                "ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
                "Size": 1057
            }
        ]
    }
}

# 以下是FAILED_0.json檔案的內容,儲存路徑為example-prefix/map-run-name/FAILED_0.json
[
    {
        "ExecutionName": "execution-name-5",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":5}",
        "Output": "{\"ErrorCode\":\"MockError\"}",
        "Status": "Failed",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

# 以下是SUCCEED_0.json檔案的內容,儲存路徑為example-prefix/map-run-name/SUCCEED_0.json
[
    {
        "ExecutionName": "execution-name-1",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":1}",
        "Output": "{\"FailedValue\":4,\"Key\":1}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-2",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":2}",
        "Output": "{\"FailedValue\":4,\"Key\":2}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-3",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":3}",
        "Output": "{\"FailedValue\":4,\"Key\":3}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    },
    {
        "ExecutionName": "execution-name-4",
        "FlowName": "example",
        "Input": "{\"FailedValue\":4,\"Key\":4}",
        "Output": "{\"FailedValue\":4,\"Key\":4}",
        "Status": "Succeeded",
        "StartedTime": "rfc3339-format-time-string",
        "StoppedTime": "rfc3339-format-time-string"
    }
]

MaxItems

MaxItems支援配置Map最多處理的Item數量。例如對於包含了10000個Object的特定OSS Prefix,如果配置MaxItems為1000,則Map只會從OSS載入1000個Object。

MaxConcurrency

MaxConcurrency支援配置子工作流程執行並發。例如對於包含了10000個Item的Map執行,當MaxConcurrency配置為100時,Map同時最多執行100個子工作流程。

ToleratedFailurePercentage

ToleratedFailurePercentage支援按照百分比配置失敗容忍策略,例如當Item總數為10000時,配置ToleratedFailurePercentage為10可以使 Map 容忍最多 1000 個 Item 處理失敗。

ToleratedFailureCount

ToleratedFailureCount支援按照數量配置失敗容忍策略,例如當Item總數為10000時,配置ToleratedFailureCount為10可以使Map容忍最多10個Item處理失敗。

使用樣本

如下工作流程定義包含了一個分布式模式的Map狀態,它從上遊狀態讀取輸入,並通過$Input.Items提取迭代元素。對於每一個迭代元素,Map會建立一個Express模式的子工作流程執行。

Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
  - Type: Map
    Name: Map
    ProcessorConfig:
      ExecutionMode: Express
    ItemsPath: $Input.Items
    Processor:
      StartAt: Pass
      States:
        - Type: Pass
          Name: Pass
          End: true
    End: true

以下樣本是狀態機器執行的輸入。

{
  "Items": [
    {"key_1":"value_1"},
    {"key_2":"value_2"},
    {"key_3":"value_3"}
  ]
}

Map會產生三個子工作流程執行,其中子工作流程執行的定義為:

Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
  - Type: Pass
    Name: Pass
    End: true

以下樣本顯示了子工作流程執行接收的輸入。

# execution-1
# 第1個Map迭代對應的子工作流程執行接收的輸入樣本
{"key_1":"value_1"}

# execution-2
# 第2個Map迭代對應的子工作流程執行接收的輸入樣本
{"key_2":"value_2"}

# execution-3
# 第3個Map迭代對應的子工作流程執行接收的輸入樣本
{"key_3":"value_3"}

完成後,Map狀態的輸出是一個JSON數組,其中每個專案都是迭代的輸出。

{
    "Items": [
        {
            "key_1": "value_1"
        },
        {
            "key_2": "value_2"
        },
        {
            "key_3": "value_3"
        }
    ]
}