全部產品
Search
文件中心

:Python SDK 快速開始

更新時間:Jul 06, 2024

本文檔將介紹如何使用 Python 版 SDK 來提交一個作業,目的是統計一個記錄檔中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。

  • 作業準備

    • 上傳資料檔案到 OSS

    • 上傳任務程式到 OSS

  • 使用 SDK 建立(提交)作業

  • 查看結果

1. 作業準備

本作業是統計一個記錄檔中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。

該作業包含3個任務:split, count 和 merge:

  • split 任務會把記錄檔分成 3 份。

  • count 任務會統計每份記錄檔中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數(count 任務需要配置 InstanceCount 為 3,表示同時啟動3台機器運行個 count 程式)。

  • merge 任務會把 count 任務的結果統一合并起來。

DAG圖例:

DAG

(1) 上傳資料檔案到OSS

下載本例子所需的資料:log-count-data.txt

將 log-count-data.txt 上傳到:oss://your-bucket/log-count/log-count-data.txt

  • your-bucket 表示您自己建立的 bucket,本例子假設 region 為:cn-shenzhen。

  • 如何上傳到 OSS,請參考 OSS檔案上傳

(2) 上傳任務程式到OSS

本例的作業程式是使用 python 編寫的, 下載本例子所需的程式:log-count.tar.gz

本例不需要改動範例程式碼。直接將 log-count.tar.gz 上傳到 oss,如上傳到:oss://your-bucket/log-count/log-count.tar.gz

如何上傳前面已經講過。

  • BatchCompute 只支援以 tar.gz 為尾碼的壓縮包, 請注意務必用以上方式(gzip)打包, 否則將會無法解析。

  • 如果您要修改代碼,可以解壓後修改,然後要用下面的方法打包:

    命令如下:

    > cd log-count  #進入目錄
    > tar -czf log-count.tar.gz * #打包,將所有這個目錄下的檔案打包到 log-count.tar.gz

    可以運行這條命令查看壓縮包內容:

    $ tar -tvf log-count.tar.gz

    可以看到以下列表:

    conf.py
    count.py
    merge.py
    split.py

2. 使用SDK建立(提交)作業

python SDK 的相關下載與安裝請參閱 相關下載與安裝

v20151111 版本,提交作業需要指定叢集 ID 或者使用匿名叢集參數。本例子使用匿名叢集方式進行,匿名叢集需要配置 2 個參數, 其中:

  • 可用的鏡像 ID, 可以使用系統提供的 Image,也可以自行製作鏡像, 請參見 自訂鏡像

  • 執行個體規格(InstanceType,執行個體類型),請參考 目前支援類型

在 OSS 中建立儲存 StdoutRedirectPath(程式輸出結果)和 StderrRedirectPath(錯誤記錄檔)的檔案路徑,本例中建立的路徑為oss://your-bucket/log-count/logs/

如需運行本例,請按照上文所述的變數擷取以及與上文對應的您的 OSS 路徑對程式中注釋中的變數進行修改。

Python SDK 提交程式模板如下,程式中具體參數含義請參見 參數說明

#encoding=utf-8
import sys

from batchcompute import Client, ClientError
from batchcompute import CN_SHENZHEN as REGION    #這裡的region根據實際情況填寫
from batchcompute.resources import (
    JobDescription, TaskDescription, DAG, AutoCluster, Configs, Networks, VPC,
)

ACCESS_KEY_ID='' # 填寫您的 AK
ACCESS_KEY_SECRET='' # 填寫您的 AK

IMAGE_ID = 'img-ubuntu' #這裡填寫您的鏡像 ID
INSTANCE_TYPE = 'ecs.sn1.medium' # 根據實際 region 支援的 InstanceType 填寫
WORKER_PATH = '' # 'oss://your-bucket/log-count/log-count.tar.gz'  這裡填寫您上傳的 log-count.tar.gz 的 OSS 儲存路徑
LOG_PATH = '' # 'oss://your-bucket/log-count/logs/' 這裡填寫您建立的錯誤反饋和 task 輸出的 OSS 儲存路徑
OSS_MOUNT= '' # 'oss://your-bucket/log-count/' 同時掛載到/home/inputs 和 /home/outputs

client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)

def main():
    try:
        job_desc = JobDescription()

        # Create auto cluster.
        cluster = AutoCluster()
        cluster.InstanceType = INSTANCE_TYPE
        cluster.ResourceType = "OnDemand"
        cluster.ImageId = IMAGE_ID
        configs = Configs()
        networks  = Networks()
        vpc = VPC()
        vpc.CidrBlock = '192.168.0.0/16'
        # vpc.VpcId = "vpc-8vbfxdyhxxxx"
        networks.VPC = vpc
        configs.Networks = networks
        # 設定系統硬碟type(cloud_efficiency/cloud_ssd)以及size(單位GB)
        configs.add_system_disk(size=40, type_='cloud_efficiency')
        configs.InstanceCount = 1
        cluster.Configs = configs

        # Create split task.
        split_task = TaskDescription()
        split_task.Parameters.Command.CommandLine = "python split.py"
        split_task.Parameters.Command.PackagePath = WORKER_PATH
        split_task.Parameters.StdoutRedirectPath = LOG_PATH
        split_task.Parameters.StderrRedirectPath = LOG_PATH
        split_task.InstanceCount = 1
        split_task.AutoCluster = cluster
        split_task.InputMapping[OSS_MOUNT]='/home/input'
        split_task.OutputMapping['/home/output'] = OSS_MOUNT


        # Create map task.
        count_task = TaskDescription(split_task)
        count_task.Parameters.Command.CommandLine = "python count.py"
        count_task.InstanceCount = 3
        count_task.InputMapping[OSS_MOUNT] = '/home/input'
        count_task.OutputMapping['/home/output'] = OSS_MOUNT

        # Create merge task
        merge_task = TaskDescription(split_task)
        merge_task.Parameters.Command.CommandLine = "python merge.py"
        merge_task.InstanceCount = 1
        merge_task.InputMapping[OSS_MOUNT] = '/home/input'
        merge_task.OutputMapping['/home/output'] = OSS_MOUNT

        # Create task dag.
        task_dag = DAG()
        task_dag.add_task(task_name="split", task=split_task)
        task_dag.add_task(task_name="count", task=count_task)
        task_dag.add_task(task_name="merge", task=merge_task)
        task_dag.Dependencies = {
            'split': ['count'],
            'count': ['merge']
        }

        # Create job description.
        job_desc.DAG = task_dag
        job_desc.Priority = 99 # 0-1000
        job_desc.Name = "log-count"
        job_desc.Description = "PythonSDKDemo"
        job_desc.JobFailOnInstanceFail = True

        job_id = client.create_job(job_desc).Id
        print('job created: %s' % job_id)

    except ClientError, e:
        print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())

if __name__ == '__main__':
    sys.exit(main())

3. 查看作業狀態

您可以用 SDK 中的 擷取作業資訊方法擷取作業狀態:

jobInfo = client.get_job(job_id)
print (jobInfo.State)

State 狀態可能為:Waiting, Running, Finished, Failed, Stopped。

4. 查看結果

您可以登入 OSS 控制台 查看 your-bucket 下面的這個檔案:/log-count/merge_result.json。

內容應該如下:

{"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}

您也可以使用 OSS SDK來擷取結果。