全部产品
Search
文档中心

大数据开发治理平台 DataWorks:最佳实践:自定义任务发布封网管控

更新时间:Dec 11, 2024

DataWorks的开发平台提供了OpenAPI、OpenEvent、Extensions功能,支持您通过这三个开放性功能,实现对指定流程的自定义管控与响应。本文以一个任务发布封网管控的业务场景,如在数据开发页面提交发布节点事件为例,为您演示如何进行开放平台的相关配置。

背景信息

本实践涉及的开放平台的相关功能介绍与基本概念可参见OpenEvent概述扩展程序概述

景说明

订阅配置说明

配置订阅时,通过事件规则指定订阅的事件类型为文件提交或发布。

  • 标准模式空间环境时:dataworks:FileChange:CommitFile

  • 简单模式空间环境时:dataworks:FileChange:DeployFile

前提条件

操作步骤

步骤一:配置自定义总线

开启并配置消息订阅的详细步骤请参见开启消息订阅,以下为本实践中的核心配置流程与注意事项。

  1. 登录事件总线EventBridge控制台,单击左侧导航栏事件总线,进入事件总线创建页面。

  2. 单击image按钮,创建自定义事件总线

    1. 总线模块内配置完自定义事件总线名称后,单击下一步,进入对事件源配置中。

    2. 单击跳过,跳过事件源规则目标模块的配置。创建自定义总线

  3. 单击左侧导航栏事件总线进入事件总线创建页面,找到已创建的事件总线后,单击名称,进入事件总线概览页面。

    • 单击左侧导航栏事件规则进入事件规则页面后,单击创建规则新建事件规则。

    • 本实践定义该EventBridge自定义总线可接收DataWorks文件提交事件消息和文件发布事件消息,配置demo与核心参数如下。

      1. 配置基本信息自定义规则名称即可。

      2. 实践模式内容

        • 事件源类型:选择自定义事件源。

        • 事件源:不进行配置。

        • 模式内容:以JSON格式编写,配置内容如下。本示例以标准空间为例。

          说明

          如果是简单模式的工作空间,可配置为dataworks:FileChange:DeployFile

          {
              "source": [
                  "acs.dataworks"
              ],
              "type": [
                  "dataworks:FileChange:CommitFile"
              ]
          }
          • source:定义事件的产品名称标识acs.dataworks

          • type:定义产品下事件的类型标识,配置为dataworks:FileChange:CommitFile

        • 事件模式调试:将sourcetype取值进行补充修改,然后进行事件测试,测试成功后单击下一步。测试

      3. 配置事件目标

        • 服务类型:选择HTTPS或HTTP,更多服务类型可参见管理事件规则

        • URL:填写接收自定义总线推送信息的URL,例如https://服务器地址:端口号/extensions/consumer

        • Body:选择完整事件。

        • 网络类型:选择公网。事件目标

步骤二:配置事件分发通道

  1. 进入开放平台页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 开放平台,进入开放平台的开发者后台页面。

  2. 在开发者后台页面,单击左侧导航栏OpenEvent,进入页面后,单击添加事件分发通道,在弹窗内进行配置。

    • 要分发事件的工作空间:选择已创建空间。

    • 指定分发到EventBridge中自定义总线:选择步骤一创建的事件总线。

  3. 保存事件分发通道后,在事件分发通道的操作列,单击启用按钮,启用新建的事件分发通道。image

步骤三:注册并配置扩展程序

  1. 进入开放平台页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 开放平台,进入开放平台的开发者后台页面。

  2. 在开发者后台页面,单击左导航栏扩展程序,进入页面后,单击注册扩展程序,在弹窗内进行配置。

    1. 部署方式选择:通过自建服务部署。

    2. 注册扩展程序。

      • 扩展程序名称:自定义配置。

      • 处理的扩展点:选择文件提交前置事件文件发布前置事件

      • 测试用工作空间:配置后,可在扩展性程序创建好,但并未提交前针对该空间内进行扩展程序测试。

      • 扩展程序参数配置文件提交前置事件

      其他参数可根据界面提示进行配置即可。注册扩展程序

  3. 配置完成以上内容后,单击确定保存注册扩展程序。

  4. 在已创建好的扩展程序的操作列,单击提交,进入审核状态。

    说明
    • 扩展程序审核由DataWorks平台审核,审核事件通常在T+3个工作日完成,请耐心等待。

    • 如需要测试创建的扩展程序,需要配置测试用工作空间

  5. 审核通过后,单击操作列的上线,即可上线使用该扩展程序。

  6. 单击扩展程序管理按钮,进入管理中心 > 扩展程序页面,选择创建的扩展程序,在启用列启用该扩展程序。启用扩展程序

开发配置扩展程序

示例代码

该代码通过EventBridge推送至服务的消息调动服务,获取服务启动的当前日期与节假日及其封网管控日期进行对比,判断是否为管控日期,判断结果通过UpdateIDEEventResult这个OpenAPI来回调至DataWorks中。

环境构建Java8Maven构建工具。

package com.aliyun.dataworks.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import com.aliyun.dataworks.config.EventCheckEnum;
import com.aliyun.dataworks.config.ExtensionParamProperties;
import com.aliyun.dataworks.services.DataWorksOpenApiClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.UpdateIDEEventResultRequest;
import com.aliyuncs.dataworks_public.model.v20200518.UpdateIDEEventResultResponse;
import com.aliyuncs.exceptions.ClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * @author dataworks demo
 */
@RestController
@RequestMapping("/extensions")
public class ExtensionsController {

    @Autowired(required = false)
    private DataWorksOpenApiClient dataWorksOpenApiClient;

    @Autowired
    private ExtensionParamProperties extensionParamProperties;

    /**
     * 接收eventBridge推送过来的消息
     * @param jsonParam
     */
    @PostMapping("/consumer")
    public void consumerEventBridge(@RequestBody String jsonParam){
        JSONObject jsonObj = JSON.parseObject(jsonParam);
        String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
        if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode)){
            //初始化client
            IAcsClient client = dataWorksOpenApiClient.createClient();
            //获取当前时间
            SimpleDateFormat sdf = new SimpleDateFormat(Constants.YYYY_MM_DD);
            String now = sdf.format(System.currentTimeMillis());
            //获取2022年节假日及其封网管控日期
            List<String> holidayList = Arrays.asList(extensionParamProperties.getHolidayList().split(","));
            //判断当前时间是否是管控日
            boolean isExists = holidayList.stream().anyMatch(day -> day.equals(now));
            //回调方法
            UpdateIDEEventResultRequest updateIDEEventResultRequest = new UpdateIDEEventResultRequest();
            updateIDEEventResultRequest.setMessageId(jsonObj.getString("id"));
            updateIDEEventResultRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
            //是管控日 检查不通过
            if(isExists){
                updateIDEEventResultRequest.setCheckResult(EventCheckEnum.FAIL.getCode());
                updateIDEEventResultRequest.setCheckResultTip("管控日不可作提交文件操作");
            }else{
                //非管控日 检查通过
                updateIDEEventResultRequest.setCheckResult(EventCheckEnum.OK.getCode());
                updateIDEEventResultRequest.setCheckResultTip(EventCheckEnum.OK.getName());
            }
            try {
                //回调DataWorks
                UpdateIDEEventResultResponse acsResponse = client.getAcsResponse(updateIDEEventResultRequest);
                //请求的唯一标识,用于后续错误排查使用
                System.out.println("acsResponse:" + acsResponse.getRequestId());
            } catch (ClientException e) {
                //请求的唯一标识,用于后续错误排查使用
                System.out.println("RequestId:" + e.getRequestId());
                //错误状态码
                System.out.println("ErrCode:" + e.getErrCode());
                //错误描述信息
                System.out.println("ErrMsg:" + e.getErrMsg());
            }
        }else{
            System.out.println("未能过滤其他事件,请检查配置步骤");
        }
    }
}
                        

示例工程部署

  1. 准备环境与工程

  2. 部署方式

    • 本地部署:将工程文件打成jar包后,在本地环境中已部署Java8和Maven工具的本地服务器或Windows上进行java -jar yourapp.jar运行服务程序。

    • 云平台部署:将工程文件打成jar包后,上传至相应的运行环境例如:Docker容器、云服务器等进行部署。

  3. 下载工程后,进入工程根目录下执行打包命令,将工程项目打成jar包。

    mvn clean package -Dmaven.test.skip=true spring-boot:repackage
  4. 执行jar包:

    java -jar target/extension-demo-deploycontroller-1.0.jar

此时会成功启动工程,如下图所示:启动工程在浏览器输入http://localhost:8080/index会得到"hello world!",表示应用成功部署,打通网络后即可订阅EventBridge的消息了。

结果验证

完成代码部署与打通EventBridge的网络后,您可以在开启扩展程序的空间内进行验证。

文件提交