全部产品
Search
文档中心

大数据开发治理平台 DataWorks:风险识别规则响应案例

更新时间:Nov 14, 2024

DataWorks通过OpenEvent能力为您提供消息订阅能力,您可以将服务程序注册为DataWorks的扩展程序,通过扩展程序来捕捉并响应订阅的事件消息,以此实现对特定事件进行消息通知与流程管控。本文以“实时阻断或审批超过1000条数据的下载行为”事件为例,为您介绍风险识别规则从开发到验证的全流程。

背景信息

数据下载在企业风险管控中举足轻重。通常情况下,只有企业数据开发人员和分析人员可以在数据平台上浏览和使用数据,而不允许将详细数据下载到本地进行分析。一旦数据导出到本地,就无法对其使用行为进行审计。同时,如果数据被不当使用或者受到别有用心者的攻击,就可能导致数据滥用和泄露,甚至可能引发数据安全事件和风险舆情。本场景将向您展示如何实现实时阻断数据导出行为。

场景目标

当用户一次性导出数据超过1000行时,系统将自动阻止或审批该操作。

前提条件

步骤一:开启并配置消息订阅

  1. 开启并配置消息订阅

    由于查询结果下载非工作空间操作,因此本案例使用default总线承接操作事件消息。

  2. 查询事件类型为dataworks:ResourcesDownload:DownloadResources的事件。image

  3. 单击操作列的事件详情,查看事件消息体,示例如下:

    重要
    • 消息体中的内容可作为您进行风险判断的上下文信息,例如您可以将以下表格中关键字段作为其他同类场景风险判断的上下文信息。

    • 如果您需要通过RAM子账号、RAM角色读取default总线中的事件,需进行RAM授权

    {
      "datacontenttype": "application/json;charset=utf-8",
      "aliyunaccountid": "110755000425****",
      "aliyunpublishtime": "2023-12-05T07:25:31.708Z",
      "data": {
        "eventCode": "download-resources",
        "extensionBizId": "audit_4d7ebb42b805428483148295a97a****",
        "extensionBizName": "DataWorks_IDE_Query_20231205152530.csv",
        "requestId": "77cac0c2fc12cecbf1d289128897****@@ac15054317017611303051804e****",
        "appId": ****,
        "tenantId": 52425742456****,
        "blockBusiness": true,
        "eventBody": {
          "sqlText": "SELECT * FROM table_1",
          "queryDwProjectId": "****",
          "moduleType": "develop_query",
          "operatorBaseId": "110755000425****",
          "datasourceId": "1****",
          "queryDwProjectName": "yongxunQA_emr_chen****",
          "dataRowSize": 4577,
          "datasourceName": "odps_source",
          "operatorUid": "110755000425****"
        },
        "operator": "110755000425****"
      },
      "aliyunoriginalaccountid": "110755000425****",
      "specversion": "1.0",
      "aliyuneventbusname": "default",
      "id": "169d171c-d523-4370-a874-bb0fa083****",
      "source": "acs.dataworks",
      "time": "2023-12-05T15:25:31.588Z",
      "aliyunregionid": "cn-chengdu",
      "type": "dataworks:ResourcesDownload:DownloadResources"
    }

    关键参数字段说明:

    参数

    说明

    sqlText

    查询SQL。

    queryDwProjectId

    查询数据源所在的工作空间ID。

    moduleType

    下载来源,取值:

    • develop_query:数据开发查询。

    • sqlx_query:数据分析查询。

    • dw_excel:数据分析电子表格。

    operatorBaseId

    操作者的UID。

    datasourceId

    查询的数据源ID。

    queryDwProjectName

    查询数据源所在的工作空间名称。

    dataRowSize

    下载的数据量。

    datasourceName

    查询的数据源名称。

步骤二:开发并部署扩展程序

  1. 准备工作:

    开启消息订阅并注册扩展程序,获取开发扩展程序所需的必要信息,详情请参见开发部署扩展程序:自建服务方式

  2. 开发并部署扩展程序。

    根据已获取的信息,开发扩展程序并将其部署为一个应用服务,详情请参见开发部署扩展程序:函数计算方式。关键参数配置和示例如下:

    • 注册扩展程序时,处理的扩展点选择数据下载前置事件

    • 开发扩展程序示例代码如下:

      重要
      • 此扩展程序示例基于步骤一中事件消息体中的dataRowSize字段实现下载行数风险判断。

      • 在配置响应时,如需实现“审批”,则需保证扩展程序在识别到用户风险行为时callbackExtensionRequest.setCheckResult()返回WARN;如需实现“阻断”,则callbackExtensionRequest.setCheckResult()应返回FAIL

      • 本文扩展程序代码以下载1000条数据进行判断举例说明,如果希望不同的下载条数触发不同的审批流,您可以配置多个扩展程序,详情请参见步骤三:配置风险识别规则。例如:

        • 第一个扩展程序仅在下载条数为0~2000时命中,对应审批流程1。

        • 第二个扩展程序仅在下载条数为2001及以上时命中,对应审批流程2。

      package com.aliyun.dataworks.demo;
      
      import com.alibaba.fastjson.JSON;
      import com.alibaba.fastjson.JSONObject;
      import com.aliyun.dataworks.config.Constants;
      import com.aliyun.dataworks.config.EventCheckEnum;
      import com.aliyun.dataworks.config.ExtensionParamProperties;
      import com.aliyun.dataworks.services.DataWorksOpenApiClient;
      import com.aliyuncs.IAcsClient;
      import com.aliyuncs.dataworks_public.model.v20200518.*;
      import com.aliyuncs.exceptions.ClientException;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.*;
      
      /**
       * @author dataworks demo
       */
      @RestController
      @RequestMapping("/extensions")
      public class ExtensionsController {
      
          @Autowired(required = false)
          private DataWorksOpenApiClient dataWorksOpenApiClient;
      
          @Autowired
          private ExtensionParamProperties extensionParamProperties;
      
          /**
           * 接收eventBridge推送过来的消息
           * @param jsonParam
           */
          @PostMapping("/consumer")
          public void consumerEventBridge(@RequestBody String jsonParam){
              JSONObject jsonObj = JSON.parseObject(jsonParam);
              String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
              if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)){
                  //初始化client
                  IAcsClient client = dataWorksOpenApiClient.createClient();
                  try {
                      //当前事件参数信息
                      String messageId = jsonObj.getString("id");
                      JSONObject data = jsonObj.getObject("data", JSONObject.class);
                     // Long projectId = data.getLong("appId");
      
                      //初始化事件回调
                      CallbackExtensionRequest callbackExtensionRequest = new CallbackExtensionRequest();
                      callbackExtensionRequest.setMessageId(messageId);
                      callbackExtensionRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
                      JSONObject eventBody = data.getJSONObject("eventBody");
                      Long dataRowSize = eventBody.getLong("dataRowSize");
                      //获取扩展程序选项配置在项目空间下的配置
                      GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest();
                      //全局扩展点事件的配置信息所属projectId默认为-1
                      getOptionValueForProjectRequest.setProjectId("-1");
                      getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
                      GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getAcsResponse(getOptionValueForProjectRequest);
                      JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getOptionValue());
                      //注意:这里需根据在DataWorks上实际设置格式来填写
                      Long maxDataRowSize = jsonObject.getLong("dataRowSize");
                      //判断代码是否包含限制函数
                      if(dataRowSize > 1000){
      
                          callbackExtensionRequest.setCheckResult(EventCheckEnum.FAIL.getCode());
                          callbackExtensionRequest.setCheckMessage("下载的行数超过限制数");
                      }else{//成功回调
                          callbackExtensionRequest.setCheckResult(EventCheckEnum.OK.getCode());
                      }
                      //回调DataWorks
                      CallbackExtensionResponse acsResponse = client.getAcsResponse(callbackExtensionRequest);
                      //请求的唯一标识,用于后续错误排查使用
                      System.out.println("acsResponse:" + acsResponse.getRequestId());
                  } catch (ClientException e) {
                      //请求的唯一标识,用于后续错误排查使用
                      System.out.println("RequestId:" + e.getRequestId());
                      //错误状态码
                      System.out.println("ErrCode:" + e.getErrCode());
                      //错误描述信息
                      System.out.println("ErrMsg:" + e.getErrMsg());
                  }
              }else{
                  System.out.println("未能过滤其他事件,请检查配置步骤");
              }
          }
      }

步骤三:配置风险识别规则

  1. 进入安全中心页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 安全中心,在右侧页面中单击进入安全中心

  2. 在左侧导航栏单击安全策略 > 风险识别规则

  3. 为已上线的扩展程序设置审批流程。具体操作,请参见配置风险响应image

步骤四:启用风险识别规则

单击启用开关,根据提示开启扩展程序。image

步骤五:结果验证

  1. 进入数据下载页面。

  2. 单击指定文件操作列的下载

    • 如果检测通过,则可继续下载。

    • 如果检测不通过,则下载被阻断或提示发起申请操作。

其他同类场景说明

你可以基于下载事件中的其他字段,例如工作空间名称、SQL明细、数据源名称、人员UID等,实现其他符合实际业务的实时风控场景,例如:

  • 按人员所属部门(工作空间)判断是否允许下载数据。

  • 当查询SQL中包含敏感字段时阻断下载。

  • 分段风控,当下载条数超过2万条时需要审批,超过5万条时则阻断。

  • 针对空间角色定义下载条数,例如开发角色允许下载N条,超过则阻断;分析师角色允许下载M条,超过则阻断(需结合ListProjectMembers - 查询工作空间成员接口实现)。

  • 需针对数据开发、数据分析场景分别设置不同的下载数量策略。