全部产品
Search
文档中心

大数据开发治理平台 DataWorks:最佳实践:(高级特性应用)禁止使用MAX_PT函数

更新时间:Dec 12, 2024

DataWorks提供内置的流程检查,如任务发布前代码评审、数据治理中心治理项的内置检查项校验,此外,DataWorks还支持您自定义校验逻辑并接入DataWorks,实现DataWorks流程管控。本文以在提交与发布时校验代码中是否存在MAX_PT函数为例,为您介绍如何基于扩展程序实现工作空间中不允许使用特定函数。

场景说明

本实践示例的场景为校验提交发布的代码中是否有MAX_PT函数,示例的校验流程为:

序号

核心流程

核心功能点

1

将工作空间中的文件提交与发布消息(例如提交、发布节点)通过OpenEvent发布至EventBridge,后续通过EventBridge过滤事件消息,并将消息发送至您的服务。

开启消息订阅的时候,通过事件规则指定订阅的事件类型为文件提交与发布(dataworks:FileChange:CommitFiledataworks:FileChange:DeployFile

配置详情请参见最佳实践:(高级特性应用)禁止使用MAX_PT函数

2

本地或在线服务接收消息,通过对扩展程序的设置,实现:

  • 对于某个指定工作空间的提交文件的消息:不触发校验,流程直接通过。

  • 对于某个指定工作空间的发布文件的消息:触发校验,判断代码中是否使用MAX_PT函数并给出响应。

    若存在MAX_PT函数,则通过回调API阻塞DataWorks上的提交或发布流程,或者发送报警信息。

  • 注册扩展程序时,通过扩展程序的参数配置,指定不生效的工作空间和事件。本示例指定提交文件事件不生效(不触发扩展程序校验)。

  • 注册扩展程序时,通过扩展程序选项配置,指定扩展程序对于不满足校验条件的事件的响应选项,后续在启用扩展程序时选择具体的响应方式。本示例可选的响应方式有告警和禁用。

配置详情请参见最佳实践:(高级特性应用)禁止使用MAX_PT函数

前提条件

操作步骤

步骤一:配置自定义总线

  1. 登录事件总线EventBridge控制台,单击左侧导航栏事件总线,进入事件总线创建页面。

  2. 单击image按钮,创建自定义事件总线

    1. 总线模块内配置完自定义事件总线名称后,单击下一步,进入对事件源配置中。

    2. 单击跳过,跳过事件源规则目标模块的配置。image

  3. 单击左侧导航栏事件总线进入事件总线创建页面,找到已创建的事件总线后,单击名称,进入事件总线概览页面。

    • 单击左侧导航栏事件规则进入事件规则页面后,单击创建规则新建事件规则。

    • 本实践定义该EventBridge自定义总线可接收DataWorks文件提交事件消息和文件发布事件消息,配置demo与核心参数如下。

      1. 配置基本信息自定义规则名称即可。

      2. 实践模式内容

        • 事件源类型:选择自定义事件源。

        • 事件源:不进行配置。

        • 模式内容:以JSON格式编写,配置内容如下。

          {
              "source": [
                  "acs.dataworks"
              ],
              "type": [
                  "dataworks:FileChange:CommitFile",
                  "dataworks:FileChange:DeployFile"
              ]
          }
          • source:定义事件的产品名称标识acs.dataworks

          • type:定义产品下事件的类型标识,配置为dataworks:FileChange:CommitFiledataworks:FileChange:DeployFile

        • 事件模式调试:将sourcetype取值进行补充修改,然后进行事件测试,测试成功后单击下一步。测试

      3. 配置事件目标

        • 服务类型:选择HTTPS或HTTP,更多服务类型可参见管理事件规则

        • URL:填写接收自定义总线推送信息的URL,例如https://服务器地址:端口号/extensions/consumer

        • Body:选择完整事件。

        • 网络类型:选择公网。事件目标

步骤二:配置事件分发通道

  1. 进入开放平台页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 开放平台,进入开放平台的开发者后台页面。

  2. 在开发者后台页面,单击左侧导航栏OpenEvent,进入页面后,单击添加事件分发通道,在弹窗内进行配置。

    • 要分发事件的工作空间:选择已创建空间。

    • 指定分发到EventBridge中自定义总线:选择步骤一创建的事件总线。

  3. 保存事件分发通道后,在事件分发通道的操作列,单击启用按钮,启用新建的事件分发通道。image

步骤三:注册并配置扩展程序(Extensions)

  1. 进入开放平台页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 开放平台,进入开放平台的开发者后台页面。

  2. 在开发者后台页面,单击左导航栏扩展程序,进入页面后,单击注册扩展程序,在弹窗内进行配置。

    1. 部署方式选择:选择通过自建服务部署。

    2. 注册扩展程序

      • 扩展程序名称:自定义配置。

      • 处理的扩展点:选择文件提交前置事件文件发布前置事件

      • 测试用工作空间:配置后,可在扩展性程序创建好,但并未提交前针对该空间内进行扩展程序测试。

      • 扩展程序参数配置

        您可通过本参数的配置来控制扩展程序的生效范围,在指定工作空间下:

        • 提交节点(文件提交前置事件)时不触发扩展程序校验、不阻塞提交节点的流程。

        • 发布节点(文件发布前置事件)时触发扩展程序校验,如果不满足校验通过条件会阻塞发布节点的流程。

        需配置为extension.project.commit-file.disabled=YourProjectId。其中YourProjectId需替换为扩展程序不生效的扩展点事件所在的工作空间ID。

      • 扩展程序选项配置:用于设置不满足校验条件后的响应方式,以下示例中指定的相应方式包括告警和禁用。

        {
          "type":"object",
          "properties":{
            "checkStatus":{
              "type":"number",
              "title":"MAX-PT函数检查方式",
              "x-decorator":"FormItem",
              "x-component":"Radio.Group",
              "x-decorator-props":{
                "tooltip":"描述文件"
              },
              "x-component-props":{
                "dataSource":[
                  {
                    "value":"WARN",
                    "label":"告警"
                  },
                  {
                    "value":"FAIL",
                    "label":"禁用"
                  }
                ],
                "mode":"multiple"
              }
            }
          }
        }
  3. 配置完成以上内容后,单击确定保存注册扩展程序。

  4. 在已创建好的扩展程序的操作列,单击提交,进入审核状态。

    说明
    • 扩展程序审核由DataWorks平台审核,审核事件通常在T+3个工作日完成,请耐心等待。

    • 如需要测试创建的扩展程序,需要配置测试用工作空间

  5. 审核通过后,单击操作列的上线,即可上线使用该扩展程序。

  6. 单击扩展程序管理按钮,进入管理中心 > 扩展程序页面,选择创建的扩展程序,在启用列启用该扩展程序,并单击设置工作空间对MAX_PT函数使用的管控力度。image

开发配置扩展程序

事件总线EventBridge通过HTTP请求获取DataWorks发送的JSON格式事件,解析消息,并推送至目标服务程序上,对事进行处理后,返回结果至DataWorks中。

示例代码

该代码示例是通过调用UpdateIDEEventResultAPI,指定API中的messageId参数来获取详细的事件详情后进行逻辑判断是否包含限制函数,判断结果使用UpdateIDEEventResult这个API将判断结果返回至DataWorks中。详情请参见开发扩展程序

环境构建Java8Maven构建工具。

package com.aliyun.dataworks.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import com.aliyun.dataworks.config.EventCheckEnum;
import com.aliyun.dataworks.config.ExtensionParamProperties;
import com.aliyun.dataworks.services.DataWorksOpenApiClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.exceptions.ClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * @author dataworks demo
 */
@RestController
@RequestMapping("/extensions")
public class ExtensionsController {

    @Autowired(required = false)
    private DataWorksOpenApiClient dataWorksOpenApiClient;

    @Autowired
    private ExtensionParamProperties extensionParamProperties;

    /**
     * 接收eventBridge推送过来的消息
     * @param jsonParam
     */
    @PostMapping("/consumer")
    public void consumerEventBridge(@RequestBody String jsonParam){
        JSONObject jsonObj = JSON.parseObject(jsonParam);
        String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
        if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)){
            //初始化client
            IAcsClient client = dataWorksOpenApiClient.createClient();
            try {
                //当前事件参数信息
                String messageId = jsonObj.getString("id");
                JSONObject data = jsonObj.getObject("data", JSONObject.class);
                Long projectId = data.getLong("projectId");

                //初始化事件回调
                UpdateIDEEventResultRequest updateIDEEventResultRequest = new UpdateIDEEventResultRequest();
                updateIDEEventResultRequest.setMessageId(messageId);
                updateIDEEventResultRequest.setExtensionCode(extensionParamProperties.getExtensionCode());

                //查询触发扩展点事件时的扩展点数据快照
                GetIDEEventDetailRequest getIDEEventDetailRequest = new GetIDEEventDetailRequest();
                getIDEEventDetailRequest.setMessageId(messageId);
                getIDEEventDetailRequest.setProjectId(projectId);
                GetIDEEventDetailResponse getIDEEventDetailResponse = client.getAcsResponse(getIDEEventDetailRequest);
                String content = getIDEEventDetailResponse.getEventDetail().getCommittedFile().getContent();

                //判断代码是否包含限制函数
                if(content.contains(Constants.CHECK_CODE)){
                    //获取扩展程序选项配置在项目空间下的配置
                    GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest();
                    getOptionValueForProjectRequest.setProjectId(String.valueOf(projectId));
                    getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
                    GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getAcsResponse(getOptionValueForProjectRequest);
                    JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getOptionValue());
                    //注意:这里需根据在DataWorks上实际设置格式来填写
                    String checkStatus = jsonObject.getString("checkStatus");
                    updateIDEEventResultRequest.setCheckResult(checkStatus);
                    updateIDEEventResultRequest.setCheckResultTip("代码中存在限制函数");
                }else{//成功回调
                    updateIDEEventResultRequest.setCheckResult(EventCheckEnum.OK.getCode());
                    updateIDEEventResultRequest.setCheckResultTip(EventCheckEnum.OK.getName());
                }
                //回调DataWorks
                UpdateIDEEventResultResponse acsResponse = client.getAcsResponse(updateIDEEventResultRequest);
                //请求的唯一标识,用于后续错误排查使用
                System.out.println("acsResponse:" + acsResponse.getRequestId());
            } catch (ClientException e) {
                //请求的唯一标识,用于后续错误排查使用
                System.out.println("RequestId:" + e.getRequestId());
                //错误状态码
                System.out.println("ErrCode:" + e.getErrCode());
                //错误描述信息
                System.out.println("ErrMsg:" + e.getErrMsg());
            }
        }else{
            System.out.println("未能过滤其他事件,请检查配置步骤");
        }
    }
}

示例工程部署

  1. 准备环境与工程

  2. 部署方式

    • 本地部署:将工程文件打成jar包后,在本地环境中已部署Java8和Maven工具的本地服务器或Windows上进行java -jar yourapp.jar运行服务程序。

    • 云平台部署:将工程文件打成jar包后,上传至相应的运行环境例如:Docker容器、云服务器等进行部署。

    说明

    部署完成的服务,需要保证EventBridge可通过公网访问到该服务。

  3. 下载工程后,进入工程根目录下执行打包命令,将工程项目打成jar包。

    mvn clean package -Dmaven.test.skip=true spring-boot:repackage
  4. 执行jar包:

    java -jar target/extensions-demo-maxpt-1.0.jar

    此时会成功启动工程,如下图所示:maxpt在浏览器输入http://localhost:8080/index会得到"hello world!",表示应用成功部署,打通网络后即可订阅EventBridge的消息了。

结果验证

完成代码部署与打通EventBridge的网络后,您可以在开启扩展程序的空间内进行验证。

验证步骤

  1. 在数据开发页面创建节点,在节点内编辑被禁止的MAX_PT函数保存并提交。

  2. 单击发布按钮,进入创建发布包页面对该节点直接进行发布,则会触发扩展程序校验。

    说明

    根据扩展程序配置,并不会对指定工作空间的代码文件提交事件生效,所以在提交包含MAX_PT函数的节点时不会触发扩展校验程序,而是在对包含MAX_PT函数节点进行发布时,触发扩展程序校验。