This topic describes how to use DataWorks Data Integration to synchronize LogHub data to MaxCompute.
Background information
- Synchronize data from a LogHub data source to data sources, such as a MaxCompute data source, across regions.
- Synchronize data from a LogHub data source to data sources, such as a MaxCompute data source, across Alibaba Cloud accounts.
- Synchronize data from a LogHub data source to data sources, such as a MaxCompute data source, within the same Alibaba Cloud account.
- Synchronize data from a LogHub data source to data sources, such as a MaxCompute data source, across the Alibaba Cloud public cloud and Alibaba Finance Cloud.
- Use the AccessKey ID and AccessKey secret of Account A to create a LogHub data source.
Account B can be used to synchronize data in all Log Service projects created by using Account A.
- Use the AccessKey ID and AccessKey secret of RAM user A1 within Account A to add a LogHub data source.
- Use Account A to attach the
AliyunLogFullAccess
andAliyunLogReadOnlyAccess
system policies on Log Service to RAM user A1. For more information, see Create a RAM user and authorize the RAM user to access Log Service. - Use Account A to grant custom permissions on Log Service to RAM user A1.
Use Account A to log on to the RAM console. In the left-side navigation pane, choose Policies page, click Create Policy.
. On theFor more information about how to grant custom permissions on Log Service to a RAM user, see Overview and the Authorize a RAM user to access Log Service section in RAM Overview.
If the following policy is attached to RAM user A1, Account B can be used to synchronize data only of project_name1 and project_name2 created by using RAM user A1 in Log Service.{ "Version": "1", "Statement": [ { "Action": [ "log:Get*", "log:List*", "log:CreateConsumerGroup", "log:UpdateConsumerGroup", "log:DeleteConsumerGroup", "log:ListConsumerGroup", "log:ConsumerGroupUpdateCheckPoint", "log:ConsumerGroupHeartBeat", "log:GetConsumerGroupCheckPoint" ], "Resource": [ "acs:log:*:*:project/project_name1", "acs:log:*:*:project/project_name1/*", "acs:log:*:*:project/project_name2", "acs:log:*:*:project/project_name2/*" ], "Effect": "Allow" } ] }
- Use Account A to attach the
Add a LogHub data source
- Log on to the DataWorks console. In the left-side navigation pane, click Workspaces. On the Workspaces page, find the desired workspace and click Data Integration in the Actions column.
- On the Data Integration page, click Data Source in the left-side navigation pane. The Data Source page appears.
- On the Data Source page, click Add data source in the upper-right corner.
- In the Add data source dialog box, click LogHub in the Message Queue section.
- In the Add LogHub data source dialog box, configure the parameters.
Parameter Description Data Source Name The name of the data source. The name can contain only letters, digits, and underscores (_) and must start with a letter. Data Source Description The description of the data source. The description cannot exceed 80 characters in length. LogHub Endpoint The URL that is used to access the Log Service project. The URL is in the format of http://example.com
. example.com indicates the endpoint of the Log Service project. For more information about how to obtain the endpoint of a Log Service project, see Endpoints.Project The name of the Log Service project. AccessKey ID The AccessKey ID of the Alibaba Cloud account that is used to connect to the Log Service project. You can copy the AccessKey ID on the AccessKey Pair page. AccessKey Secret The AccessKey secret of the Alibaba Cloud account that is used to connect to the Log Service project. - Click Test connectivity in the Actions column that corresponds to the resource group you want to use to test the connectivity between the LogHub data source and resource group.
- If the connectivity test is successful, click Complete.
Create a batch synchronization node
- On the Data Source page, click the icon in the upper-left corner and choose . The DataStudio page appears.
- On the DataStudio page, move the pointer over the icon and select Create Workflow.
- In the Create Workflow dialog box, configure the Workflow Name and Description parameters and click Create.
- Click the name of the created workflow in the Scheduled Workflow pane, right-click Data Integration, and then choose .
- In the Create Node dialog box, configure the Name and Path parameters.
- Click Commit. The configuration tab of the node appears.
Configure the batch synchronization node on the codeless UI
- In the Connections step, configure the parameters in the Data source section.
Parameter Description Data source The name of the LogHub data source. Select LogHub from the drop-down list on the left and select the LogHub data source that you added from the drop-down list on the right. Logstore The name of the Logstore from which you want to read data. Log start time The start time of data consumption. This parameter defines the left boundary of a time range (left-closed and right-open) in the format of yyyyMMddHHmmss. Example: 20180111013000. The parameter can work with the scheduling parameters in DataWorks. Log end time The end time of data consumption. This parameter defines the right boundary of a time range (left-closed and right-open) in the format of yyyyMMddHHmmss. Example: 20180111013010. The parameter can work with the scheduling parameters in DataWorks. Number of batches The number of data entries to read at a time. Default value: 256. Note You can click Data preview to preview data. Only a small number of LogHub data entries are displayed. The data entries that are displayed may be different from the actual data to be synchronized due to the start time and end time that you specified. - In the Connections step, select the MaxCompute data source that you added and configure the remaining parameters in the Data Destination section.
- In the Mappings step, configure field mappings between the source and destination.
- In the Channel step, configure the parameters.
- Verify that the preceding configuration is correct and click the Save icon in the upper-left corner of the configuration tab.
- Run the batch synchronization node. You can use one of the following methods to run the batch synchronization node:
- Run the node only once. Click the Run icon in the top toolbar to run the node on the node configuration tab.Note Before you run the node, you must configure custom parameters for the node.
- Run the node based on the scheduling configurations of the node.
Click the Submit icon in the top toolbar to commit the node to the scheduling system. The scheduling system automatically runs the node from the next day based on the scheduling properties that you configured.
Click the Properties tab in the right-side navigation pane of the configuration tab of the node. In the Parameters section of the Properties tab, enter startTime=$[yyyymmddhh24miss-10/24/60] and endTime=$[yyyymmddhh24miss-5/24/60]. The values indicate that the start time of the node is 10 minutes earlier than the system time and the end time is 5 minutes earlier than the system time.
In the Schedule section, set Scheduling Cycle to Minute, Start From to 00:00, Interval to 05, and End At to 23:59, as shown in the preceding figure. Then, the node is scheduled to run every 5 minutes from 00:00 to 23:59.
- Run the node only once.
Configure the batch synchronization node in the code editor
- On the configuration tab of the batch synchronization node, click the Conversion script icon in the top toolbar.
- In the Tips message, click OK to switch to the code editor.
- Click the Import Template icon in the top toolbar.
- In the Import Template dialog box, configure the Source type, Data source, Target type, and Data source parameters and click Determine to apply the template.
- Edit the code based on your business requirements in the code editor. Sample code:
{ "type": "job", "version": "1.0", "configuration": { "reader": { "plugin": "loghub", "parameter": { "datasource": "loghub_lzz",// The name of the LogHub data source from which you want to read data. The name must be the same as the name of the data source that you added. "logstore": "logstore-ut2",// The name of the Logstore from which you want to read data. A Logstore is a Log Service unit for collecting, storing, and querying log data. "beginDateTime": "${startTime}",// The start time of data consumption. This parameter defines the left boundary of a time range (left-closed and right-open). "endDateTime": "${endTime}",// The end time of data consumption. This parameter defines the right boundary of a time range (left-closed and right-open). "batchSize": 256,// The number of data entries to read at a time. Default value: 256. "splitPk": "", "column": [ "key1", "key2", "key3" ] } }, "writer": { "plugin": "odps", "parameter": { "datasource": "odps_first",// The name of the data source to which you want to write data. The name must be the same as the name of the data source that you added. "table": "ok",// The name of the table to which you want to write data. "truncate": true, "partition": "",// The partition information in the destination table. "column": [// The names of the columns to which you want to write data. "key1", "key2", "key3" ] } }, "setting": { "speed": { "mbps": 8,// The maximum transmission rate. "concurrent": 7// The maximum number of parallel threads. } } } }