すべてのプロダクト
Search
ドキュメントセンター

:Argo Workflows SDK for Pythonを使用して大規模なワークフローを作成する

最終更新日:Dec 10, 2024

Argoワークフローは、スケジュールされたタスク、機械学習タスク、抽出、変換、読み込み (ETL) タスクの構成に広く使用されている強力なワークフロー管理ツールです。 YAMLファイルを使用してワークフローを調整すると、課題が発生する可能性があります。 Heraは、Python用のArgoワークフローSDKです。 HeraはYAMLの代替手段であり、Pythonで複雑なワークフローを簡単に調整およびテストできます。 さらに、HeraはPythonエコシステムとシームレスに統合され、ワークフロー設計を大幅に簡素化します。 このトピックでは、Argo Workflows SDK for PythonのHeraを使用して大規模なワークフローを作成する方法について説明します。

背景情報

Argo Workflowsは、Kubernetes上の複雑なワークフローオーケストレーションを自動化するためのオープンソースのワークフローエンジンです。 Argoワークフローを使用して、タスクのコレクションを作成し、タスクの実行シーケンスと依存関係を設定できます。 これにより、カスタム自動ワークフローを効率的に作成および管理できます。

Argoワークフローは、スケジュールタスク、機械学習、シミュレーション、科学計算、抽出、変換、ロード (ETL) タスク、モデルトレーニング、継続的統合 /継続的配信 (CI/CD) パイプラインなどのシナリオで広く使用されています。 ArgoワークフローはYAMLファイルを使用して、わかりやすく簡単にワークフローを構成します。 これは、階層コード構造を構築するために厳密なインデントを必要とするYAML構文に慣れていない、または慣れていないユーザーに課題をもたらす可能性があります。 これは、これらのユーザにとって、長い学習曲線及び複雑な構成ステップにつながる可能性がある。

image

Heraは、Argoワークフローに基づくワークフローの作成と送信を目的としたArgo Workflows SDKです。 Heraは、ワークフローの作成と送信の手順を簡素化することを目的としており、YAMLではなくPythonに精通しているデータサイエンティストに適しています。 ヘラは次の利点を提供します。

  • シンプルさ: Heraは直感的で使いやすいコードを提供し、開発効率を大幅に向上させます。

  • 複雑なワークフローのサポート: Heraは、複雑なワークフローオーケストレーションでのYAML構文エラーの解消に役立ちます。

  • Pythonエコシステムとの統合: 各関数はテンプレートで定義できます。 HeraはPythonフレームワークと統合されています。

  • Observability: Heraは、コードの品質と保守性の向上に役立つPythonテストフレームワークをサポートしています。

Distributed Cloud Container Platform for Kubernetes (ACK One) のワークフロークラスタは、サーバーレスモードで実行されます。 Argoワークフローは、ワークフロークラスターの管理コンポーネントです。 次の図は、ワークフロークラスターでのArgoワークフローのアーキテクチャを示しています。

image

手順1: ワークフロークラスターを作成し、アクセストークンを取得する

  1. 分散Argoワークフロー用のKubernetesクラスターの作成

  2. 次のいずれかの方法を使用して、ワークフロークラスターのArgo Serverを有効にします。

  3. 次のコマンドを実行して、クラスターのアクセストークンを生成して取得します。

    kubectl create token default -n default

ステップ2: ヘラを使い始める

  1. 次のコマンドを実行して、Heraをインストールします。

    pip install hera-workflow
  2. ワークフローを調整して送信します。

    シナリオ1: シンプルなDAGダイヤモンド

    Argoワークフローは、有向非巡回グラフ (DAG) を使用して、ワークフロー内のタスクの複雑な依存関係を定義します。 ダイヤモンド構造は、ワークフローで一般的に採用されています。 ダイヤモンドワークフローでは、複数の並列タスクの実行結果が、後続のタスクの入力に集約されます。 Diamond構造は、データフローと実行結果を効率的に集約できます。 次のサンプルコードは、タスクaとタスクBが並行して実行され、タスクAとタスクBの実行結果がタスクCの入力に集約されるDiamondワークフローを調整するために、Heraを使用する方法の例を示しています。

    1. simpleDAG.pyという名前のファイルを作成し、次の内容をファイルにコピーします。

      # 必要なパッケージをインポートします。 
      hera. WorkflowからDAG、ワークフロー、スクリプトをインポートする
      hera.shared import global_configから
      urllib3をインポート
      
      urllib3.disable_warnings()
      
      # ワークフロークラスターのエンドポイントとトークンを指定します。 
      global_config.host = "https:// argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
      global_config.token = "abcdefgxxxxxx"# 取得したトークンを入力します。 
      global_config.verify_ssl = ""
      
      # スクリプトデコレータは、Heraを使用してPythonのような関数オーケストレーションを有効にするための鍵です。 
      # ワークフローやStepsコンテキストなどのHeraコンテキストマネージャの下で関数を呼び出すことができます。 
      # 関数はまだ通常の外のHeraコンテキストとして実行されます。つまり、指定された関数で単体テストを書くことができます。 
      # 次のコードは、サンプル入力を提供します。 
      @ script()
      def echo (メッセージ: str):
          print (メッセージ)
      
      # ワークフローを調整します。 ワークフローは、Argoのメインリソースであり、Heraの主要クラスです。 ワークフローは、テンプレートの保存、エントリポイントの設定、およびテンプレートの実行を担当します。 
      ワークフローを使用 (
          generate_name="dag-diamond-" 、
          entrypoint="diamond" 、) としてw:
          DAG(name="diamond"):
              A = echo(name="A", arguments={"message": "A"}) # テンプレートを作成します。 
              B = echo(name="B" 、引数={"message": "B"})
              C = echo(name="C" 、引数={"message": "C"})
              D = echo(name="D" 、引数={"message": "D"})
              A >> [B, C] >> D# 依存関係を定義します。 この例では、タスクAはタスクBとタスクCの依存関係です。タスクBとタスクCはタスクDの依存関係です。# ワークフローを作成します。 
      w.create() 
    2. 次のコマンドを実行して、ワークフローを送信します。

      python simpleDAG.py
    3. ワークフローの実行が開始されたら、[ワークフローコンソール (Argo)] に移動して、DAGプロセスと結果を表示できます。

      image

    シナリオ2: MapReduce

    Argoワークフローでは、MapReduceスタイルでデータを処理するための鍵は、DAGテンプレートを使用して複数のタスクを整理および調整し、MapフェーズとReduceフェーズをシミュレートすることです。 次のサンプルコードは、Heraを使用して、テキストファイル内の単語をカウントするために使用されるサンプルMapReduceワークフローを調整する方法の詳細な例を示しています。 各ステップは、Pythonエコシステムと統合するためにPython関数で定義されます。

    1. map-reduce.pyという名前のファイルを作成し、次の内容をファイルにコピーします。

      コードの表示

      from hera. Workflow import DAG、Artifact、NoneArchiveStrategy、パラメータ、OSSArtifact、ワークフロー、スクリプト
      hera.shared import global_configから
      urllib3をインポート
      
      urllib3.disable_warnings()
      # ワークフロークラスターのエンドポイントを指定します。 
      global_config.host = "https:// argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
      global_config.token = "abcdefgxxxxxx"# 取得したトークンを入力します。 
      global_config.verify_ssl = ""
      
      # スクリプトデコレータを使用する場合は、スクリプトパラメータをスクリプトデコレータに渡す必要があります。 パラメータは、画像、入力、出力、およびリソースを含む。 
      @ script (
          image="python:alpine3.6" 、
          inputs=パラメータ (name="num_parts") 、
          outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
      )
      def split(num_parts: int) -> なし:# num_parts入力パラメーターに基づいて複数のファイルを作成します。 各ファイルには、値としてfooキーと部品番号が含まれています。
          輸入json
          import os
          import sys
      
          os.mkdir("/mnt/out")
      
          part_ids = list(map (ラムダx: str(x), range(num_parts)))
          iの場合、列挙型のpart_id (part_ids, start=1):
              fとしてopen("/mnt/out/" + part_id + ".json", "w") を持つ:
                  json.dump({"foo": i}, f)
          json.dump(part_ids、sys.stdout)
      
      # スクリプトデコレータでイメージ、入力、出力のパラメータを定義します。
      @ script (
          image="python:alpine3.6" 、
          inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],],
          outputs=OSSArtifact (
              name="part" 、
              path="/mnt/out/part.json" 、
              archive=NoneArchiveStrategy() 、
              key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json" 、
          ),
      )
      def map_() -> None: # fooを含むファイルの数に基づいて新しいファイルを生成します。 新しい各ファイルには、バーキーと、対応する部品番号に2を掛けた結果に等しい値が含まれています。
          輸入json
          import os
      
          os.mkdir("/mnt/out")
          fとしてopen("/mnt/in/part.json") を持つ:
              part = json.load(f)
          fとしてopen("/mnt/out/part.json" 、"w") を持つ:
              json.dump({"bar": part["foo"] * 2}, f)
      
      # スクリプトデコレータでイメージ、入力、出力のパラメータを定義します。
      @ script (
          image="python:alpine3.6" 、
          inputs=OSSArtifact(name="results" 、path="/mnt/in" 、key="{{workflow.name}}/results") 、
          outputs=OSSArtifact (
              name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
          ),
      )
      def reduce() -> None: # 各部品番号のバーキーの値を集計します。 
          輸入json
          import os
      
          os.mkdir("/mnt/out")
      
          合計=0
          リストのfの場合 (map (ラムダx: open("/mnt/in/" + x) 、os.listdir("/mnt/in"))):
              result = json.load(f)
              total = total + result["bar"]
          fとしてopen("/mnt/out/total.json" 、"w") を持つ:
              json.dump({"total": total}, f)
      
      # ワークフローを調整します。 ワークフロー名、エントリポイント、名前空間、およびグローバルパラメーターを指定します。 
      ワークフロー (generate_name="map-reduce-" 、entrypoint="main" 、namespace="default" 、arguments=Parameter(name="num_parts" 、value="4")) をwとして使用します。
          DAG(name="main") 付き:
              s = split(arguments=Parameter(name="num_parts", value="{{workflow.parameters.num_parts}}")) # テンプレートを編成します。 
              m = map_(
                  with_param=s.result、
                  arguments=[パラメータ (name="part_id", value="{{item}}"), OSSArtifact(name="part", key="{{workflow.name}}/parts/{{item}}.json")],],
              )# 入力パラメーターを指定し、テンプレートを調整します。 
              s >> m >> reduce() # タスクの依存関係を定義します。 
      # ワークフローを作成します。 
      w.create()
      
    2. 次のコマンドを実行して、ワークフローを送信します。

      python map-reduce.py
    3. ワークフローの実行が開始されたら、[ワークフローコンソール (Argo)] に移動して、DAGプロセスと結果を表示できます。

      image

設定方法の比較

Argoワークフローは、YAMLとHera Frameworkの2つの設定方法をサポートしています。 次の表は、2つの方法を比較する。

機能

YAML

Heraフレームワーク

シンプリシティ

比較的高い

高い。 このメソッドは低コードです。

ワークフローオーケストレーションの複雑さ

高い

低い

Pythonエコシステムとの統合

低い

高い。 このメソッドは豊富なPythonライブラリと統合されています)

テスト可能性

低い。 この方法は構文エラーが発生しやすいです。

高い。 このメソッドは、テストフレームワークをサポートします。

Hera Frameworkは、PythonエコシステムをArgoワークフローと適切に統合して、ワークフローオーケストレーションの複雑さを軽減します。 YAMLと比較して、Hera Frameworkは、大規模なワークフローオーケストレーションの単純化された代替手段を提供します。 さらに、Hera Frameworkを使用すると、データエンジニアはPythonを使用できます。 Hera Frameworkは、機械学習シナリオのシームレスで効率的なワークフローオーケストレーションと最適化も可能にします。 これにより、イテレーションを通じて創造的なアイデアを実際の展開に変換し、インテリジェントアプリケーションの効率的な実装と持続可能な開発を促進できます。

ACK Oneについてご質問がある場合は、DingTalkグループ35688562に参加してください。

関連ドキュメント

  • ヘラのドキュメント:

    • ヘラの詳細については、「ヘラの概要」をご参照ください。

    • Heraを使用してラージランゲージモデル (LLM) をトレーニングする方法の詳細については、「HeraでLLMをトレーニングする」をご参照ください。

  • YAMLデプロイ設定のサンプル:

    • YAMLファイルを使用してsimple-diamondをデプロイする方法の詳細については、「dag-diamond.yaml」をご参照ください。

    • YAMLファイルを使用してmap-reduceをデプロイする方法の詳細については、「map-reduce.yaml」をご参照ください。