DataWorks allows you to subscribe to messages by using OpenEvent. You can register a service program as an extension of DataWorks. Then, you can use the extension to capture and respond to subscribed event messages. This way, you can receive notifications and manage the processes of handling specific events. This topic describes the process from development to verification of a risk identification rule. In this topic, the event related to the download of more than 1,000 data records is used as an example.
Background information
Data downloading is an important operation in risk management of an enterprise. In most cases, only enterprise data developers and analysts can access and use data on the data platform, and they are not allowed to download detailed data for analysis. After data is exported to an on-premises machine, you cannot audit the operations performed on the data. Improper data usage or malicious attacks can also result in data misuse and breaches. This may cause data security incidents and negative public sentiment. The example in this topic shows how to block the export of data in real time.
Objective
When a user attempts to export more than 1,000 rows of data at the same time, the system automatically blocks the operation or triggers an approval process.
Prerequisites
DataWorks Enterprise Edition is activated. This example is implemented based on the capabilities of Open Platform of DataWorks Enterprise Edition.
EventBridge is activated to receive the message bodies of user operation events. You can use extensions to consume the received messages. The extensions are used to monitor risky behavior.
An Elastic Compute Service (ECS) instance or a data center is created to deploy extensions that are used to monitor risky behavior.
Step 1: Enable and configure message subscription
Enable and configure message subscription.
Downloading query results is not a workspace-level operation. In this example, the default bus in EventBridge is used to receive messages related to the operation event.
Query the event whose type is
dataworks:ResourcesDownload:DownloadResources
.Find the event and click Event Detail in the Operations column to view the message body of the event. The following code is a sample message body.
ImportantThe content in the message body can be used as the context information for risk identification. For example, you can use the key fields in the following table as the context information for risk determination in other similar scenarios.
If you want to use a RAM user or a RAM role to read events from the default bus, you must grant the required permissions to the RAM user or RAM role. For more information, see RAM authorization.
{ "datacontenttype": "application/json;charset=utf-8", "aliyunaccountid": "110755000425****", "aliyunpublishtime": "2023-12-05T07:25:31.708Z", "data": { "eventCode": "download-resources", "extensionBizId": "audit_4d7ebb42b805428483148295a97a****", "extensionBizName": "DataWorks_IDE_Query_20231205152530.csv", "requestId": "77cac0c2fc12cecbf1d289128897****@@ac15054317017611303051804e****", "appId": ****, "tenantId": 52425742456****, "blockBusiness": true, "eventBody": { "sqlText": "SELECT * FROM table_1", "queryDwProjectId": "****", "moduleType": "develop_query", "operatorBaseId": "110755000425****", "datasourceId": "1****", "queryDwProjectName": "yongxunQA_emr_chen****", "dataRowSize": 4577, "datasourceName": "odps_source", "operatorUid": "110755000425****" }, "operator": "110755000425****" }, "aliyunoriginalaccountid": "110755000425****", "specversion": "1.0", "aliyuneventbusname": "default", "id": "169d171c-d523-4370-a874-bb0fa083****", "source": "acs.dataworks", "time": "2023-12-05T15:25:31.588Z", "aliyunregionid": "cn-chengdu", "type": "dataworks:ResourcesDownload:DownloadResources" }
The following table describes the key fields.
Field
Description
sqlText
The SQL statement used for query.
queryDwProjectId
The ID of the workspace to which the queried data source belongs.
moduleType
The source of the data to be downloaded. Valid values:
develop_query: query results in DataStudio.
sqlx_query: SQL query results in DataAnalysis.
dw_excel: query results in workbooks in DataAnalysis.
operatorBaseId
The UID of the operator.
datasourceId
The ID of the queried data source.
queryDwProjectName
The name of the workspace to which the queried data source belongs.
dataRowSize
The number of rows to be downloaded.
datasourceName
The name of the queried data source.
Step 2: Develop and deploy an extension
Make preparations.
Enable the message subscription feature, register an extension, and obtain the required information for developing an extension. For more information, see Develop and deploy an extension based on a self-managed service.
Develop and deploy an extension.
Develop an extension and deploy the extension as an application service based on the obtained information. For more information, see Develop and deploy an extension based on Function Compute. The following sample code provides an example on how to develop an extension.
When you register an extension, you need to set the Extension point of processing parameter to Pre-event for Data Download.
The following sample code provides an example on how to develop an extension.
ImportantThe sample extension identifies the risk of downloading a specific number of rows of data based on the
dataRowSize
field in the message body of the event in Step 1.When you configure a risk response policy for the extension, take note of the following points: If you want the response to be Approval, make sure that
callbackExtensionRequest.setCheckResult()
returnsWARN
when the extension identifies risky behavior of a user. If you want the response to be Blocking, make surecallbackExtensionRequest.setCheckResult()
returnsFAIL
when the extension identifies risky behavior of a user.The event related to the download of more than 1,000 data records is used in the sample code of the extension. If you want to trigger different approval processes for different ranges of data records, you can configure multiple extensions. For more information, see Step 3: Configure a risk identification rule. Example:
The first extension is used when the number of data records that a user wants to download is between 0 to 2,000, and Approval Process 1 is triggered.
The second extension is used when the number of data records that a user wants to download is 2,001 or more, and Approval Process 2 is triggered.
package com.aliyun.dataworks.demo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.aliyun.dataworks.config.Constants; import com.aliyun.dataworks.config.EventCheckEnum; import com.aliyun.dataworks.config.ExtensionParamProperties; import com.aliyun.dataworks.services.DataWorksOpenApiClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.dataworks_public.model.v20200518.*; import com.aliyuncs.exceptions.ClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * @author dataworks demo */ @RestController @RequestMapping("/extensions") public class ExtensionsController { @Autowired(required = false) private DataWorksOpenApiClient dataWorksOpenApiClient; @Autowired private ExtensionParamProperties extensionParamProperties; /** * Receive event messages that are sent from EventBridge. * @param jsonParam */ @PostMapping("/consumer") public void consumerEventBridge(@RequestBody String jsonParam){ JSONObject jsonObj = JSON.parseObject(jsonParam); String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED); if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)){ // Initialize the client. IAcsClient client = dataWorksOpenApiClient.createClient(); try { // The information about the parameters for the current event. String messageId = jsonObj.getString("id"); JSONObject data = jsonObj.getObject("data", JSONObject.class); // Long projectId = data.getLong("appId"); // Initialize the event callback. CallbackExtensionRequest callbackExtensionRequest = new CallbackExtensionRequest(); callbackExtensionRequest.setMessageId(messageId); callbackExtensionRequest.setExtensionCode(extensionParamProperties.getExtensionCode()); JSONObject eventBody = data.getJSONObject("eventBody"); Long dataRowSize = eventBody.getLong("dataRowSize"); // Query the configuration item that is specified for the Options for Extension parameter in a workspace. GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest(); // The default value of the projectId parameter for the global extension point event is -1. getOptionValueForProjectRequest.setProjectId("-1"); getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode()); GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getAcsResponse(getOptionValueForProjectRequest); JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getOptionValue()); // Note: You must configure this parameter based on the format of dataRowRize in DataWorks. Long maxDataRowSize = jsonObject.getLong("dataRowSize"); // Check whether the code contains the prohibited function. if(dataRowSize > 1000){ callbackExtensionRequest.setCheckResult(EventCheckEnum.FAIL.getCode()); callbackExtensionRequest.setCheckMessage("The number of rows to be downloaded exceeds the upper limit."); }else{// The callback is successful. callbackExtensionRequest.setCheckResult(EventCheckEnum.OK.getCode()); } // The extension processes the event messages and calls an API operation of DataWorks to send the processing result to DataWorks. CallbackExtensionResponse acsResponse = client.getAcsResponse(callbackExtensionRequest); // The ID of the request. You can troubleshoot errors based on the ID. System.out.println("acsResponse:" + acsResponse.getRequestId()); } catch (ClientException e) { // The ID of the request. You can troubleshoot errors based on the ID. System.out.println("RequestId:" + e.getRequestId()); // The status code of an error. System.out.println("ErrCode:" + e.getErrCode()); // The description of an error. System.out.println("ErrMsg:" + e.getErrMsg()); } }else{ System.out.println("Failed to filter out other types of events. Check the parameter configurations."); } } }
Step 3: Configure a risk identification rule
Go to the Security Center page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, click Go to Security Center.
In the left-side navigation pane, select
.Configure an approval process for a specific published extension. For more information, see Configure a risk response.
Step 4: Enable a risk identification rule
Find the desired risk identification rule and turn on the switch in the Enable column to enable the risk identification rule.
Step 5: Verify results
Go to the Download Data page.
Find the desired file and click Download in the Actions column.
If the operation passes the check, you can continue to download the file.
If the operation fails the check, the download is blocked or a message that prompts you to request the permissions to download the file is displayed.
Descriptions for other similar scenarios
You can use other fields in the download event to implement real-time risk management in other scenarios. The fields include the workspace name, SQL details, data source name, and user ID. Examples:
Determine whether to allow data download based on the department (workspace) to which the related personnel belongs.
Block data download when an SQL statement contains sensitive fields.
Perform different risk management operations based on the number of data records that a user wants to download. If the number of data records that a user wants to download exceeds 20,000, initiate an approval process. If the number of data records that a user wants to download exceeds 50,000, block the data download.
Specify the number of downloadable data records for each workspace-level role. For example, the Development role can download N data records. If the number of data records that a user assigned with the Development role wants to download exceeds N, the data download is blocked. The Data Analyst role can download M data records. If the number of data records that a user assigned with the Data Analyst role wants to download exceeds M, the data download is blocked. The ListProjectMembers operation needs to be used in this example.
Configure different policies of downloadable data records for data development and data analysis scenarios.