すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:Apache Airflowを使用したMaxComputeジョブのスケジュール

最終更新日:Dec 06, 2024

MaxComputeでは、Apache Airflowを使用して、Pythonインターフェイスを使用してジョブをスケジュールできます。 このトピックでは、Apache AirflowのPython演算子を使用してMaxComputeジョブをスケジュールする方法について説明します。

背景情報

Apache Airflowは、Airbnbによって開発されたオープンソースツールです。 Apache AirflowはPythonで記述され、ジョブのスケジュールに使用されます。 Apache Airflowは、有向非巡回グラフ (DAG) を使用して、依存関係を持つジョブのグループを定義し、それらの関係に基づいてこれらのジョブをスケジュールします。 Apache Airflowでは、Pythonインターフェイスを使用してサブジョブを定義することもできます。 Apache Airflowは、ビジネス要件を満たすためにさまざまなオペレーターをサポートします。 Apache Airflowの詳細については、「Apache Airflow」をご参照ください。

前提条件

Apache Airflowを使用してMaxComputeジョブをスケジュールする前に、次の条件が満たされていることを確認してください。

  • Python on MaxCompute (PyODPS) がインストールされています。

    詳細については、「インストールガイドと制限」をご参照ください。

  • Apache Airflowがインストールされ、開始されます。

    詳しくは、「クイックスタート」をご参照ください。

    このトピックでは、Apache Airflow 1.10.7を使用します。

ステップ1: ジョブスケジューリング用のPythonスクリプトを作成し、ファイルをApache Airflowのホームディレクトリに保存します

ジョブスケジューリング用のPythonスクリプトを記述し、として保存します。pyファイル。 スクリプトファイルには、完全なスケジューリングロジックと、スケジュールするジョブの名前が含まれています。 このステップでは、Airiflow_MC.pyという名前のPythonスクリプトファイルが作成されます。 このファイルには、次の内容が含まれます。

# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
# Change the default encoding format.
# MaxCompute parameter settings
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
# Replace the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable with the AccessKey ID of the user account. 
# Replace the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable with the AccessKey secret of the user account. 
# We recommend that you do not directly use the strings of your AccessKey ID and AccessKey secret.
odps = ODPS(cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')),cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retry_delay': timedelta(minutes=5),
    'start_date':datetime(2020,1,15)
    # 'email': ['airflow@example.com'],
    # 'email_on_failure': False,
    # 'email_on_retry': False,
    # 'retries': 1,
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
# Scheduling workflow
dag = DAG(
    'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
    with io.open(sqlfile, encoding='utf-8', mode='r') as f:
        sql=f.read()
    f.closed
    return sql
# Job scheduling
def get_time():
    print 'Current time {}'.format(time.time())
    return time.time()
# Job scheduling
def mc_job ():

    project = odps.get_project()  # Obtain information of the default project. 
    instance=odps.run_sql("select * from long_chinese;")
    print(instance.get_logview_address())
    instance.wait_for_success()
    with instance.open_reader() as reader:
        count = reader.count
    print("Number of data records in the table: {}".format(count))
    for record in reader:
        print record
    return count
t1 = PythonOperator (
    task_id = 'get_time' ,
    provide_context = False ,
    python_callable = get_time,
    dag = dag )

t2 = PythonOperator (
    task_id = 'mc_job' ,
    provide_context = False ,
    python_callable = mc_job ,
    dag = dag )
t2.set_upstream(t1)

ステップ2: ジョブスケジューリング用のスクリプトを送信する

  1. システムのコマンドラインウィンドウで、ステップ1 で作成されたスクリプトをサブミットして、次のコマンドを実行します。

    python Airiflow_MC.py
  2. システムのコマンドラインウィンドウで、次のコマンドを実行してスケジューリングワークフローを生成し、テストジョブを実行します。

    # print the list of active DAGs
    airflow list_dags
    
    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks Airiflow_MC
    
    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks Airiflow_MC --tree
    # Run a test job.
    airflow test Airiflow_MC get_time 2010-01-16
    airflow test Airiflow_MC mc_job 2010-01-16

ステップ3: ジョブを実行する

Apache Airflowのweb UIにログインできます。 DAGページで、送信したワークフローを見つけ、[リンク] 列の运行アイコンをクリックしてジョブを実行します。

运行调度作业

ステップ4: ジョブの実行結果を表示する

ジョブの名前をクリックして、[グラフ表示] タブでワークフローを表示できます。 次に、ワークフロー内のジョブ (mc_jobなど) をクリックします。 表示されるダイアログボックスで、[ログの表示] をクリックしてジョブの実行結果を表示します。

调度流程