全部產品
Search
文件中心

DataWorks:最佳實務:訂閱執行個體狀態變更訊息

更新時間:Jun 19, 2024

DataWorks提供內建的流程檢查,如任務發布前程式碼檢閱、資料治理中心治理項的內建檢查項校正,此外,DataWorks還支援您自訂校正邏輯並接入DataWorks,實現DataWorks流程管控。本文以在營運中心的擷取執行個體狀態變更為例,為您介紹如何基於開放訊息實現訂閱執行個體狀態變更。

背景資訊

本實踐涉及的開放平台的相關功能介紹與基本概念可參見OpenEvent概述

開啟並配置訊息訂閱(OpenEvent)

開啟並配置訊息訂閱的詳細步驟請參見開啟訊息訂閱,以下為本實踐中的核心配置流程與注意事項。

  1. EvenBridge控制台,跳過事件來源等配置,快速建立一個自訂匯流排。建立自訂匯流排
  2. EvenBridge控制台對應的事件匯流排中,建立事件規則。
    本實踐定義該EventBridge自訂匯流排可接收DataWorks執行個體狀態變更訊息,配置demo與核心參數配置如下。
    1. 配置事件模式事件規則3
      {
          "source": [
              "acs.dataworks"
          ],
          "type": [
              "dataworks:InstanceStatusChanges:InstanceStatusChanges"
          ]
      }
      • source:定義事件的產品名稱標識,配置為acs.dataworks
      • type:定義產品下事件的類型標識,配置為dataworks:InstanceStatusChanges:InstanceStatusChanges。您可以在下方的事件模式調試中,將source、type取值進行補充修改,然後進行事件測試,測試成功後單擊下一步測試3
    2. 配置事件目標中,服務類型選擇為HTTPS,並填寫合適的URL,其他參數可保持預設。事件目標
  3. DataWorks控制台的開放平台頁面,啟用上述訊息分發通道。啟用

代碼編寫

package com.aliyun.dataworks.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author dataworks demo
 */
@RestController
@RequestMapping("/event")
public class ExtensionsController {

    /**
     * 接收eventBridge推送過來的訊息
     * @param jsonParam
     */
    @PostMapping("/consumer")
    public void consumerEventBridge(@RequestBody String jsonParam){
        JSONObject jsonObj = JSON.parseObject(jsonParam);
        String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
        if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
            JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
            //調度任務執行個體開始等時間的具體時間
            System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
            //DagId
            System.out.println("dagId: "+ dataParam.getString("dagId"));
            //Dag的類型,取值如下:
            //0:周期調度任務
            //1:手動任務
            //2:煙霧測試 (Smoke Test)
            //3:補資料
            //4:手動商務程序
            //5:臨時商務程序
            System.out.println("dagType: "+dataParam.getString("dagType"));
            //任務執行個體的調度類型,取值如下:
            //NORMAL(0):正常調度任務。該任務被日常調度。
            //MANUAL(1):手動任務。該任務不會被日常調度。
            //PAUSE(2):凍結任務。該任務被日常調度,但啟動調度時直接被置為失敗狀態。
            //SKIP(3):空跑任務。該任務被日常調度,但啟動調度時直接被置為成功狀態。
            //SKIP_UNCHOOSE(4):臨時工作流程中未選擇的任務,僅存在於臨時工作流程中,啟動調度時直接被置為成功狀態。
            //SKIP_CYCLE(5):未到運行周期的周或月任務。該任務被日常調度,但啟動調度時直接被置為成功狀態。
            //CONDITION_UNCHOOSE(6):上遊執行個體中有分支(IF)節點,但是該下遊節點未被分支節點選中,直接置為空白跑任務。
            //REALTIME_DEPRECATED(7):即時產生的已經到期的周期執行個體,該類型的任務直接被置為成功狀態。
            System.out.println("taskType: "+dataParam.getString("taskType"));
            //任務執行個體的修改時間
            System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
            //任務執行個體的建立時間
            System.out.println("createTime: "+dataParam.getString("createTime"));
            //工作空間的ID。您可以調用ListProjects查看空間ID資訊。
            System.out.println("appId: "+dataParam.getString("appId"));
            //調度任務執行個體所在工作空間的租戶ID
            System.out.println("tenantId: "+dataParam.getString("tenantId"));
            //調度任務執行個體的作業碼:該欄位可忽略
            System.out.println("opCode: "+dataParam.getString("opCode"));
            //商務程序的ID,周期調度任務執行個體的商務程序預設為1,手動商務程序和內部工作流程調度任務執行個體為實際的商務程序ID
            System.out.println("flowId: "+dataParam.getString("flowId"));
            //調度任務執行個體對應的節點ID
            System.out.println("nodeId:"+dataParam.getString("nodeId"));
            //調度任務執行個體開始等資源的具體時間
            System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
            //調度任務執行個體ID
            System.out.println("taskId: "+dataParam.getString("taskId"));
            //任務的狀態,取值如下:
            //0(未運行)
            //2(等待定時時間dueTime或cycleTime到來)
            //3(等待資源)
            //4(運行中)
            //7(下發給資料品質進行資料校檢)
            //8(進行中分支條件校檢)
            //5(執行失敗)
            //6(執行成功)
            System.out.println("status: "+dataParam.getString("status"));
        }else{
            System.out.println("未能過濾其他事件,請檢查配置步驟");
        }
    }
}
            

本地部署運行

下載工程:下載工程後,進入工程根目錄下執行:
mvn clean package -Dmaven.test.skip=true spring-boot:repackage
獲得可直接啟動並執行jar後執行:
java -jar target/event-demo-instance-status-1.0.jar
此時會成功啟動工程,如下圖所示:部署成功在瀏覽器輸入http://localhost:8080/index會得到"hello world!",表示應用成功部署,打通網路後即可訂閱EventBridge的訊息了。