MaxCompute支援您使用Apache Airflow通過Python介面實現作業調度。本文為您介紹如何使用Apache Airflow的Python Operator調度MaxCompute作業。
背景資訊
Apache Airflow是Airbnb開源的、基於Python編寫的調度工具,基於有向非循環圖(DAG),可以定義一組有依賴的作業,並按照依賴順序依次執行作業。還支援通過Python定義子作業,並支援各種Operators操作器,靈活性大,能滿足使用者的各種需求。更多Apache Airflow資訊,請參見Apache Airflow。
前提條件
在執行操作前,請確認您已滿足如下條件:
已安裝PyODPS。
更多安裝PyODPS操作,請參見安裝PyODPS。
已安裝並啟動Apache Airflow。
更多安裝及啟動Apache Airflow操作,請參見Apache Airflow快速入門。
本文中的Apache Airflow樣本版本為1.10.7。
步驟一:在Apache Airflow家目錄編寫調度Python指令碼
編寫作業調度Python指令碼並儲存為.py檔案,指令檔中會呈現完整的調度邏輯及對應的調度作業名稱。假設Python指令碼名稱為Airiflow_MC.py,指令碼內容樣本如下:
# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
#修改系統預設編碼
#MaxCompute參數設定
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
# 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變數設定為使用者 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變數設定為使用者 Access Key Secret,
# 不建議直接使用 Access Key ID / Access Key Secret 字串
odps = ODPS(cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')),cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retry_delay': timedelta(minutes=5),
'start_date':datetime(2020,1,15)
# 'email': ['airflow@example.com'],
# 'email_on_failure': False,
# 'email_on_retry': False,
# 'retries': 1,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
#調度流程
dag = DAG(
'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
with io.open(sqlfile, encoding='utf-8', mode='r') as f:
sql=f.read()
f.closed
return sql
#調度作業
def get_time():
print '目前時間是{}'.format(time.time())
return time.time()
#調度作業
def mc_job ():
project = odps.get_project() # 取到預設專案。
instance=odps.run_sql("select * from long_chinese;")
print(instance.get_logview_address())
instance.wait_for_success()
with instance.open_reader() as reader:
count = reader.count
print("查詢表資料條數:{}".format(count))
for record in reader:
print record
return count
t1 = PythonOperator (
task_id = 'get_time' ,
provide_context = False ,
python_callable = get_time,
dag = dag )
t2 = PythonOperator (
task_id = 'mc_job' ,
provide_context = False ,
python_callable = mc_job ,
dag = dag )
t2.set_upstream(t1)
步驟二:提交調度指令碼
在系統的命令列視窗執行如下命令提交步驟一中編寫的調度作業Python指令碼。
python Airiflow_MC.py
在系統的命令列視窗執行如下命令產生調度流程並測試調度作業。
# print the list of active DAGs airflow list_dags # prints the list of tasks the "tutorial" dag_id airflow list_tasks Airiflow_MC # prints the hierarchy of tasks in the tutorial DAG airflow list_tasks Airiflow_MC --tree #測試task airflow test Airiflow_MC get_time 2010-01-16 airflow test Airiflow_MC mc_job 2010-01-16
步驟三:運行調度作業
您可以登入Apache Airflow的Web介面,在DAGs頁簽,尋找到提交的調度流程,單擊Links列的表徵圖即可運行調度作業。
步驟四:查看調度作業運行結果
您也可以單擊調度作業名稱,在Graph View頁簽查看到調度作業流程。單擊調度流程中的某個作業,例如mc_job,您可以在mc_job對話方塊,單擊View Log,即可查看運行結果。